nucleus: [2/n] move processing unit managament to nucleus

This commit is contained in:
Daniel Kochmański 2024-05-02 15:25:26 +02:00
parent 71d5f8dd78
commit d990d2afd5
3 changed files with 256 additions and 280 deletions

View file

@ -93,6 +93,32 @@ ecl_set_process_env(cl_env_ptr env)
cl_env_ptr cl_env_p = NULL;
#endif /* ECL_THREADS */
/* -- Thread local bindings */
static void
init_tl_bindings(cl_object process, cl_env_ptr env)
{
#ifdef ECL_THREADS
cl_index bindings_size;
cl_object *bindings;
if (Null(process) || Null(process->process.inherit_bindings_p)) {
cl_index idx = 0, size = 256;
bindings_size = size;
bindings = (cl_object *)ecl_malloc(size*sizeof(cl_object*));
for(idx=0; idx<256; idx++) {
bindings[idx] = ECL_NO_TL_BINDING;
}
} else {
cl_env_ptr parent_env = ecl_process_env();
bindings_size = parent_env->bds_stack.tl_bindings_size;
bindings = (cl_object *)ecl_malloc(bindings_size*sizeof(cl_object*));
ecl_copy(bindings, parent_env->bds_stack.tl_bindings, bindings_size*sizeof(cl_object*));
}
env->bds_stack.tl_bindings_size = bindings_size;
env->bds_stack.tl_bindings = bindings;
#endif
}
/* -- Managing the collection of processes ---------------------------------- */
#ifdef ECL_THREADS
@ -122,12 +148,13 @@ cl_env_ptr
ecl_adopt_cpu()
{
struct cl_env_struct env_aux[1];
struct ecl_interrupt_struct int_aux[1];
cl_env_ptr the_env = ecl_process_env_unsafe();
ecl_thread_t current;
int registered;
if (the_env != NULL)
return the_env;
/* 1. Ensure that the thread is known to the GC. */
/* Ensure that the thread is known to the GC. */
/* FIXME this should be executed with hooks. */
#ifdef GBC_BOEHM
{
@ -153,7 +180,7 @@ ecl_adopt_cpu()
* the gc there. */
memset(env_aux, 0, sizeof(*env_aux));
env_aux->disable_interrupts = 1;
env_aux->interrupt_struct = ecl_alloc_unprotected(sizeof(*env_aux->interrupt_struct));
env_aux->interrupt_struct = int_aux;
env_aux->interrupt_struct->pending_interrupt = ECL_NIL;
ecl_mutex_init(&env_aux->interrupt_struct->signal_queue_lock, FALSE);
env_aux->interrupt_struct->signal_queue = ECL_NIL;
@ -167,28 +194,144 @@ ecl_adopt_cpu()
memcpy(the_env, env_aux, sizeof(*the_env));
ecl_set_process_env(the_env);
add_env(the_env);
init_tl_bindings(ECL_NIL, the_env);
return the_env;
}
void
ecl_disown_cpu()
{
int registered;
cl_env_ptr the_env = ecl_process_env_unsafe();
if (the_env == NULL)
return;
registered = the_env->cleanup;
ecl_disable_interrupts_env(the_env);
/* FIXME this should be part of dealloc. */
ecl_clear_bignum_registers(the_env);
#ifdef ECL_WINDOWS_THREADS
CloseHandle(the_env->thread);
#endif
ecl_set_process_env(NULL);
del_env(the_env);
_ecl_dealloc_env(the_env);
/* FIXME thsi should be executed with hooks. */
if (registered) {
GC_unregister_my_thread();
}
}
#ifdef ECL_WINDOWS_THREADS
static DWORD WINAPI
#else
static void *
#endif
thread_entry_point(void *ptr)
{
cl_env_ptr the_env = ecl_cast_ptr(cl_env_ptr, ptr);
cl_object process = the_env->own_process;
/* Setup the environment for the execution of the thread. */
ecl_set_process_env(the_env);
ecl_cs_init(the_env);
process->process.entry(0);
/* This routine performs some cleanup before a thread is completely
* killed. For instance, it has to remove the associated process object from
* the list, an it has to dealloc some memory.
*
* NOTE: this cleanup does not provide enough "protection". In order to ensure
* that all UNWIND-PROTECT forms are properly executed, never use the function
* pthread_cancel() to kill a process, but rather use the lisp functions
* mp_interrupt_process() and mp_process_kill(). */
ecl_set_process_env(NULL);
the_env->own_process = ECL_NIL;
del_env(the_env);
#ifdef ECL_WINDOWS_THREADS
CloseHandle(the_env->thread);
#endif
_ecl_dealloc_env(the_env);
#ifdef ECL_WINDOWS_THREADS
return 1;
#else
return NULL;
#endif
}
/* Run a process in a new system thread. */
cl_env_ptr
ecl_spawn_cpu()
ecl_spawn_cpu(cl_object process)
{
return NULL;
}
cl_env_ptr the_env = ecl_process_env();
cl_env_ptr new_env = NULL;
int ok = 1;
/* Allocate and initialize the new cpu env. */
{
new_env = _ecl_alloc_env(the_env);
/* List the process such that its environment is marked by the GC when its
contents are allocated. */
add_env(new_env);
/* Now we can safely allocate memory for the environment ocntents and store
pointers to it in the environment. */
ecl_init_env(new_env);
/* Copy the parent env defaults. */
new_env->trap_fpe_bits = the_env->trap_fpe_bits;
new_env->own_process = process;
init_tl_bindings(process, new_env);
process->process.env = new_env;
}
/* Spawn the thread */
ecl_disable_interrupts_env(the_env);
#ifdef ECL_WINDOWS_THREADS
{
HANDLE code;
DWORD threadId;
code = (HANDLE)CreateThread(NULL, 0, thread_entry_point, new_env, 0, &threadId);
new_env->thread = code;
ok = code != NULL;
}
#else /* ECL_WINDOWS_THREADS */
{
int code;
pthread_attr_t pthreadattr;
void
ecl_add_process(cl_object process)
{
add_env(process->process.env);
}
void
ecl_del_process(cl_object process)
{
del_env(process->process.env);
pthread_attr_init(&pthreadattr);
pthread_attr_setdetachstate(&pthreadattr, PTHREAD_CREATE_DETACHED);
/*
* Block all asynchronous signals until the thread is completely
* set up. The synchronous signals SIGSEGV and SIGBUS are needed
* by the gc and thus can't be blocked.
*/
# ifdef HAVE_SIGPROCMASK
{
sigset_t new, previous;
sigfillset(&new);
sigdelset(&new, SIGSEGV);
sigdelset(&new, SIGBUS);
pthread_sigmask(SIG_BLOCK, &new, &previous);
code = pthread_create(&new_env->thread, &pthreadattr,
thread_entry_point, new_env);
pthread_sigmask(SIG_SETMASK, &previous, NULL);
}
# else
code = pthread_create(&new_env->thread, &pthreadattr,
thread_entry_point, new_env);
# endif
ok = (code == 0);
}
#endif /* ECL_WINDOWS_THREADS */
/* Deal with the fallout of the thread creation. */
if (!ok) {
del_env(new_env);
process->process.env = NULL;
_ecl_dealloc_env(new_env);
}
ecl_enable_interrupts_env(the_env);
return ok ? new_env : NULL;
}
#endif
@ -197,8 +340,11 @@ ecl_del_process(cl_object process)
void
init_process(void)
{
cl_env_ptr env = ecl_core.first_env;
cl_env_ptr the_env = ecl_core.first_env;
#ifdef ECL_THREADS
ecl_thread_t main_thread;
ecl_set_process_self(main_thread);
the_env->thread = main_thread;
ecl_process_key_create(cl_env_key);
ecl_mutex_init(&ecl_core.processes_lock, 1);
ecl_mutex_init(&ecl_core.global_lock, 1);
@ -206,10 +352,10 @@ init_process(void)
ecl_rwlock_init(&ecl_core.global_env_lock);
ecl_core.threads = ecl_make_stack(16);
#endif
ecl_set_process_env(env);
env->default_sigmask = NULL;
env->method_cache = NULL;
env->slot_cache = NULL;
env->interrupt_struct = NULL;
env->disable_interrupts = 1;
ecl_set_process_env(the_env);
the_env->default_sigmask = NULL;
the_env->method_cache = NULL;
the_env->slot_cache = NULL;
the_env->interrupt_struct = NULL;
the_env->disable_interrupts = 1;
}

View file

@ -31,26 +31,6 @@
# include <sched.h>
#endif
/* -- Macros ---------------------------------------------------------------- */
#ifdef ECL_WINDOWS_THREADS
# define ecl_process_eq(t1, t2) (GetThreadId(t1) == GetThreadId(t2))
# define ecl_set_process_self(var) \
{ \
HANDLE aux = GetCurrentThread(); \
DuplicateHandle(GetCurrentProcess(), \
aux, \
GetCurrentProcess(), \
&var, \
0, \
FALSE, \
DUPLICATE_SAME_ACCESS); \
}
#else
# define ecl_process_eq(t1, t2) (t1 == t2)
# define ecl_set_process_self(var) (var = pthread_self())
#endif /* ECL_WINDOWS_THREADS */
/* -- Core ---------------------------------------------------------- */
static cl_object
@ -87,27 +67,58 @@ assert_type_process(cl_object o)
FEwrong_type_argument(@[mp::process], o);
}
static void
thread_cleanup(void *aux)
static cl_object
run_process(cl_narg narg, ...)
{
/* This routine performs some cleanup before a thread is completely
* killed. For instance, it has to remove the associated process
* object from the list, an it has to dealloc some memory.
/* Upon entering this routine the process environment is set up, the process
* phase is ECL_PROCESS_BOOTING, signals are disabled in the environment and
* the communication interrupt is disabled (sigmasked).
*
* NOTE: thread_cleanup() does not provide enough "protection". In
* order to ensure that all UNWIND-PROTECT forms are properly
* executed, never use pthread_cancel() to kill a process, but
* rather use the lisp functions mp_interrupt_process() and
* mp_process_kill().
* This process will not receive signals that originate from other processes.
* Furthermore, we expect not to get any other interrupts (SIGSEGV, SIGFPE) if
* we do things right.
*/
cl_object process = (cl_object)aux;
cl_env_ptr env = process->process.env;
cl_env_ptr the_env = ecl_process_env();
cl_object process = the_env->own_process;
cl_object fun = process->process.function;
cl_object args = process->process.args;
cl_object output = ECL_NIL;
/* Entry barrier. enable_process releases this lock before exit. */
ecl_mutex_lock(&process->process.start_stop_lock);
/* Execute the code. The CATCH_ALL point is the destination provides us with
* an elegant way to exit the thread: we just do an unwind up to frs_top. */
ECL_CATCH_ALL_BEGIN(the_env) {
#ifdef HAVE_SIGPROCMASK
{
sigset_t *new = (sigset_t*)the_env->default_sigmask;
pthread_sigmask(SIG_SETMASK, new, NULL);
}
#endif
process->process.phase = ECL_PROCESS_ACTIVE;
ecl_mutex_unlock(&process->process.start_stop_lock);
si_trap_fpe(@'last', ECL_T);
ecl_enable_interrupts_env(the_env);
ecl_bds_bind(the_env, @'mp::*current-process*', process);
ECL_RESTART_CASE_BEGIN(the_env, @'abort') {
the_env->values[0] = cl_apply(2, fun, args);
int i = the_env->nvalues;
while (i--) {
output = CONS(the_env->values[i], output);
}
process->process.exit_values = output;
} ECL_RESTART_CASE(1,args) {
/* ABORT restart. */
process->process.exit_values = args;
} ECL_RESTART_CASE_END;
ecl_bds_unwind1(the_env);
} ECL_CATCH_ALL_END;
ecl_disable_interrupts_env(the_env);
ecl_clear_bignum_registers(the_env);
/* The following flags will disable all interrupts. */
if (env) {
ecl_disable_interrupts_env(env);
ecl_clear_bignum_registers(env);
}
ecl_mutex_lock(&process->process.start_stop_lock);
process->process.phase = ECL_PROCESS_EXITING;
#ifdef HAVE_SIGPROCMASK
@ -119,123 +130,11 @@ thread_cleanup(void *aux)
pthread_sigmask(SIG_BLOCK, new, NULL);
}
#endif
ecl_del_process(process);
process->process.env = NULL;
#ifdef ECL_WINDOWS_THREADS
CloseHandle(env->thread);
#endif
ecl_set_process_env(NULL);
if (env) _ecl_dealloc_env(env);
process->process.phase = ECL_PROCESS_INACTIVE;
ecl_cond_var_broadcast(&process->process.exit_barrier);
ecl_mutex_unlock(&process->process.start_stop_lock);
}
#ifdef ECL_WINDOWS_THREADS
static DWORD WINAPI
#else
static void *
#endif
thread_entry_point(void *arg)
{
cl_object process = (cl_object)arg;
cl_env_ptr env = process->process.env;
/*
* Upon entering this routine
* process.env = our environment for lisp
* process.phase = ECL_PROCESS_BOOTING
* signals are disabled in the environment
* the communication interrupt is disabled (sigmasked)
*
* This process will not receive signals that originate from
* other processes. Furthermore, we expect not to get any
* other interrupts (SIGSEGV, SIGFPE) if we do things right.
*/
/* 1) Setup the environment for the execution of the thread */
ecl_set_process_env(env = process->process.env);
#ifndef ECL_WINDOWS_THREADS
pthread_cleanup_push(thread_cleanup, (void *)process);
#endif
ecl_cs_init(env);
ecl_mutex_lock(&process->process.start_stop_lock);
/* 2) Execute the code. The CATCH_ALL point is the destination
* provides us with an elegant way to exit the thread: we just
* do an unwind up to frs_top.
*/
ECL_CATCH_ALL_BEGIN(env) {
#ifdef HAVE_SIGPROCMASK
{
sigset_t *new = (sigset_t*)env->default_sigmask;
pthread_sigmask(SIG_SETMASK, new, NULL);
}
#endif
process->process.phase = ECL_PROCESS_ACTIVE;
ecl_mutex_unlock(&process->process.start_stop_lock);
si_trap_fpe(@'last', ECL_T);
ecl_enable_interrupts_env(env);
ecl_bds_bind(env, @'mp::*current-process*', process);
ECL_RESTART_CASE_BEGIN(env, @'abort') {
process->process.entry(0);
} ECL_RESTART_CASE(1,args) {
/* ABORT restart. */
process->process.exit_values = args;
} ECL_RESTART_CASE_END;
ecl_bds_unwind1(env);
} ECL_CATCH_ALL_END;
/* 4) If everything went right, we should be exiting the thread
* through this point. thread_cleanup is automatically invoked
* marking the process as inactive.
*/
#ifdef ECL_WINDOWS_THREADS
thread_cleanup(process);
return 1;
#else
pthread_cleanup_pop(1);
return NULL;
#endif
}
static void
init_tl_bindings(cl_object process, cl_env_ptr env)
{
cl_index bindings_size;
cl_object *bindings;
if (Null(process->process.inherit_bindings_p)) {
cl_index idx = 0, size = 256;
bindings_size = size;
bindings = (cl_object *)ecl_malloc(size*sizeof(cl_object*));
for(idx=0; idx<256; idx++) {
bindings[idx] = ECL_NO_TL_BINDING;
}
} else {
cl_env_ptr parent_env = ecl_process_env();
bindings_size = parent_env->bds_stack.tl_bindings_size;
bindings = (cl_object *)ecl_malloc(bindings_size*sizeof(cl_object*));
ecl_copy(bindings, parent_env->bds_stack.tl_bindings, bindings_size*sizeof(cl_object*));
}
env->bds_stack.tl_bindings_size = bindings_size;
env->bds_stack.tl_bindings = bindings;
}
static cl_object
run_process(cl_narg narg, ...)
{
cl_env_ptr the_env = ecl_process_env();
cl_object process = the_env->own_process;
cl_object fun = process->process.function;
cl_object args = process->process.args;
cl_object output = ECL_NIL;
the_env->values[0] = cl_apply(2, fun, args);
int i = the_env->nvalues;
while (i--) {
output = CONS(the_env->values[i], output);
}
process->process.exit_values = output;
return the_env->values[0];
}
@ -273,10 +172,7 @@ ecl_import_current_thread(cl_object name, cl_object bindings)
process = alloc_process(name, ECL_NIL);
process->process.env = the_env;
process->process.phase = ECL_PROCESS_BOOTING;
init_tl_bindings(process, the_env);
the_env->own_process = process;
process->process.phase = ECL_PROCESS_ACTIVE;
ecl_bds_bind(the_env, @'mp::*current-process*', process);
@ -286,16 +182,27 @@ ecl_import_current_thread(cl_object name, cl_object bindings)
void
ecl_release_current_thread(void)
{
cl_env_ptr env = ecl_process_env();
int cleanup = env->cleanup;
cl_object own_process = env->own_process;
thread_cleanup(own_process);
#ifdef GBC_BOEHM
if (cleanup) {
GC_unregister_my_thread();
cl_object process;
cl_env_ptr the_env = ecl_process_env_unsafe();
if (the_env == NULL)
return;
process = the_env->own_process;
ecl_mutex_lock(&process->process.start_stop_lock);
process->process.env = NULL;
process->process.phase = ECL_PROCESS_EXITING;
ecl_disown_cpu();
#ifdef HAVE_SIGPROCMASK
/* ...but we might get stray signals. */
{
sigset_t new[1];
sigemptyset(new);
sigaddset(new, ecl_option_values[ECL_OPT_THREAD_INTERRUPT_SIGNAL]);
pthread_sigmask(SIG_BLOCK, new, NULL);
}
#endif
process->process.phase = ECL_PROCESS_INACTIVE;
ecl_cond_var_broadcast(&process->process.exit_barrier);
ecl_mutex_unlock(&process->process.start_stop_lock);
}
@(defun mp::make-process (&key name ((:initial-bindings initial_bindings_p) ECL_T))
@ -388,105 +295,33 @@ mp_process_yield(void)
cl_object
mp_process_enable(cl_object process)
{
/* process_env and ok are changed after the setjmp call in
* ECL_UNWIND_PROTECT_BEGIN, so they need to be declared volatile */
volatile cl_env_ptr process_env = NULL;
cl_env_ptr the_env = ecl_process_env();
volatile int ok = 1;
ECL_UNWIND_PROTECT_BEGIN(the_env) {
/* Try to gain exclusive access to the process. This prevents two
* concurrent calls to process-enable from different threads on
* the same process */
ecl_mutex_lock(&process->process.start_stop_lock);
/* Ensure that the process is inactive. */
if (process->process.phase != ECL_PROCESS_INACTIVE) {
FEerror("Cannot enable the running process ~A.", 1, process);
}
ok = 0;
process->process.phase = ECL_PROCESS_BOOTING;
/* Link environment and process together */
process_env = _ecl_alloc_env(the_env);
process_env->own_process = process;
process->process.env = process_env;
/* Immediately list the process such that its environment is
* marked by the gc when its contents are allocated */
ecl_add_process(process);
/* Now we can safely allocate memory for the environment contents
* and store pointers to it in the environment */
ecl_init_env(process_env);
process_env->trap_fpe_bits = the_env->trap_fpe_bits;
init_tl_bindings(process, process_env);
ecl_disable_interrupts_env(the_env);
#ifdef ECL_WINDOWS_THREADS
{
HANDLE code;
DWORD threadId;
code = (HANDLE)CreateThread(NULL, 0, thread_entry_point, process, 0, &threadId);
ok = (process_env->thread = code) != NULL;
}
#else
{
int code;
pthread_attr_t pthreadattr;
pthread_attr_init(&pthreadattr);
pthread_attr_setdetachstate(&pthreadattr, PTHREAD_CREATE_DETACHED);
/*
* Block all asynchronous signals until the thread is completely
* set up. The synchronous signals SIGSEGV and SIGBUS are needed
* by the gc and thus can't be blocked.
*/
#ifdef HAVE_SIGPROCMASK
{
sigset_t new, previous;
sigfillset(&new);
sigdelset(&new, SIGSEGV);
sigdelset(&new, SIGBUS);
pthread_sigmask(SIG_BLOCK, &new, &previous);
code = pthread_create(&process_env->thread, &pthreadattr,
thread_entry_point, process);
pthread_sigmask(SIG_SETMASK, &previous, NULL);
}
#else
code = pthread_create(&process_env->thread, &pthreadattr,
thread_entry_point, process);
#endif
ok = (code == 0);
}
#endif
ecl_enable_interrupts_env(the_env);
} ECL_UNWIND_PROTECT_THREAD_SAFE_EXIT {
if (!ok) {
/* INV: interrupts are already disabled through thread safe
* unwind-protect */
ecl_del_process(process);
process->process.phase = ECL_PROCESS_INACTIVE;
/* Alert possible waiting processes. */
ecl_cond_var_broadcast(&process->process.exit_barrier);
process->process.env = NULL;
if (process_env != NULL)
_ecl_dealloc_env(process_env);
}
/* Unleash the thread */
cl_env_ptr process_env = NULL;
/* Try to gain exclusive access to the process. This prevents two concurrent
* calls to process-enable from different threads on the same process */
ecl_mutex_lock(&process->process.start_stop_lock);
/* Ensure that the process is inactive. */
if (process->process.phase != ECL_PROCESS_INACTIVE) {
ecl_mutex_unlock(&process->process.start_stop_lock);
} ECL_UNWIND_PROTECT_THREAD_SAFE_END;
@(return (ok? process : ECL_NIL));
FEerror("Cannot enable the running process ~A.", 1, process);
}
process->process.phase = ECL_PROCESS_BOOTING;
/* Spawn the thread (allocates the environment)*/
process_env = ecl_spawn_cpu(process);
if (process_env == NULL) {
process->process.phase = ECL_PROCESS_INACTIVE;
ecl_cond_var_broadcast(&process->process.exit_barrier);
}
/* Unleash the thread */
ecl_mutex_unlock(&process->process.start_stop_lock);
ecl_return1(the_env, (process_env ? process : ECL_NIL));
}
cl_object
mp_exit_process(void)
{
/* We simply undo the whole of the frame stack. This brings up
back to the thread entry point, going through all possible
UNWIND-PROTECT.
*/
/* We simply undo the whole of the frame stack. This brings up back to the
thread entry point, going through all possible UNWIND-PROTECT. */
const cl_env_ptr the_env = ecl_process_env();
ecl_unwind(the_env, the_env->frs_stack.org);
/* Never reached */
@ -647,11 +482,9 @@ init_threads()
{
cl_env_ptr the_env = ecl_process_env();
cl_object process, _env = ecl_cast_ptr(cl_object,the_env);
ecl_thread_t main_thread;
/* We have to set the environment before any allocation takes place,
* so that the interrupt handling code works. */
ecl_cs_init(the_env);
ecl_set_process_self(main_thread);
process = ecl_alloc_object(t_process);
process->process.phase = ECL_PROCESS_ACTIVE;
process->process.name = @'si::top-level';
@ -660,7 +493,6 @@ init_threads()
process->process.env = the_env;
ecl_mutex_init(&process->process.start_stop_lock, TRUE);
ecl_cond_var_init(&process->process.exit_barrier);
the_env->thread = main_thread;
the_env->own_process = process;
ecl_stack_push(ecl_core.threads, _env);
}

View file

@ -36,9 +36,7 @@ struct ecl_core_struct {
/* process.c */
cl_env_ptr ecl_adopt_cpu();
cl_env_ptr ecl_spawn_cpu();
void ecl_add_process(cl_object process);
void ecl_del_process(cl_object process);
void ecl_disown_cpu();
/* control.c */
cl_object ecl_escape(cl_object continuation) ecl_attr_noreturn;