From 4ea1528b34592fc1971fa970722f92e12f30ce05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Kochma=C5=84ski?= Date: Thu, 2 May 2024 15:25:26 +0200 Subject: [PATCH] nucleus: [2/n] move processing unit managament to nucleus --- src/c/process.d | 190 ++++++++++++++++++++--- src/c/threads/thread.d | 342 +++++++++++------------------------------ src/h/nucleus.h | 4 +- 3 files changed, 256 insertions(+), 280 deletions(-) diff --git a/src/c/process.d b/src/c/process.d index 5736a3451..fc26b971b 100644 --- a/src/c/process.d +++ b/src/c/process.d @@ -93,6 +93,32 @@ ecl_set_process_env(cl_env_ptr env) cl_env_ptr cl_env_p = NULL; #endif /* ECL_THREADS */ +/* -- Thread local bindings */ +static void +init_tl_bindings(cl_object process, cl_env_ptr env) +{ +#ifdef ECL_THREADS + cl_index bindings_size; + cl_object *bindings; + if (Null(process) || Null(process->process.inherit_bindings_p)) { + cl_index idx = 0, size = 256; + bindings_size = size; + bindings = (cl_object *)ecl_malloc(size*sizeof(cl_object*)); + for(idx=0; idx<256; idx++) { + bindings[idx] = ECL_NO_TL_BINDING; + } + } else { + cl_env_ptr parent_env = ecl_process_env(); + bindings_size = parent_env->bds_stack.tl_bindings_size; + bindings = (cl_object *)ecl_malloc(bindings_size*sizeof(cl_object*)); + ecl_copy(bindings, parent_env->bds_stack.tl_bindings, bindings_size*sizeof(cl_object*)); + } + env->bds_stack.tl_bindings_size = bindings_size; + env->bds_stack.tl_bindings = bindings; +#endif +} + + /* -- Managing the collection of processes ---------------------------------- */ #ifdef ECL_THREADS @@ -122,12 +148,13 @@ cl_env_ptr ecl_adopt_cpu() { struct cl_env_struct env_aux[1]; + struct ecl_interrupt_struct int_aux[1]; cl_env_ptr the_env = ecl_process_env_unsafe(); ecl_thread_t current; int registered; if (the_env != NULL) return the_env; - /* 1. Ensure that the thread is known to the GC. */ + /* Ensure that the thread is known to the GC. */ /* FIXME this should be executed with hooks. */ #ifdef GBC_BOEHM { @@ -153,7 +180,7 @@ ecl_adopt_cpu() * the gc there. */ memset(env_aux, 0, sizeof(*env_aux)); env_aux->disable_interrupts = 1; - env_aux->interrupt_struct = ecl_alloc_unprotected(sizeof(*env_aux->interrupt_struct)); + env_aux->interrupt_struct = int_aux; env_aux->interrupt_struct->pending_interrupt = ECL_NIL; ecl_mutex_init(&env_aux->interrupt_struct->signal_queue_lock, FALSE); env_aux->interrupt_struct->signal_queue = ECL_NIL; @@ -167,28 +194,144 @@ ecl_adopt_cpu() memcpy(the_env, env_aux, sizeof(*the_env)); ecl_set_process_env(the_env); add_env(the_env); + init_tl_bindings(ECL_NIL, the_env); return the_env; } +void +ecl_disown_cpu() +{ + int registered; + cl_env_ptr the_env = ecl_process_env_unsafe(); + if (the_env == NULL) + return; + registered = the_env->cleanup; + ecl_disable_interrupts_env(the_env); + /* FIXME this should be part of dealloc. */ + ecl_clear_bignum_registers(the_env); +#ifdef ECL_WINDOWS_THREADS + CloseHandle(the_env->thread); +#endif + ecl_set_process_env(NULL); + del_env(the_env); + _ecl_dealloc_env(the_env); + /* FIXME thsi should be executed with hooks. */ + if (registered) { + GC_unregister_my_thread(); + } +} + +#ifdef ECL_WINDOWS_THREADS +static DWORD WINAPI +#else +static void * +#endif +thread_entry_point(void *ptr) +{ + cl_env_ptr the_env = ecl_cast_ptr(cl_env_ptr, ptr); + cl_object process = the_env->own_process; + /* Setup the environment for the execution of the thread. */ + ecl_set_process_env(the_env); + ecl_cs_init(the_env); + + process->process.entry(0); + + /* This routine performs some cleanup before a thread is completely + * killed. For instance, it has to remove the associated process object from + * the list, an it has to dealloc some memory. + * + * NOTE: this cleanup does not provide enough "protection". In order to ensure + * that all UNWIND-PROTECT forms are properly executed, never use the function + * pthread_cancel() to kill a process, but rather use the lisp functions + * mp_interrupt_process() and mp_process_kill(). */ + + ecl_set_process_env(NULL); + the_env->own_process = ECL_NIL; + del_env(the_env); +#ifdef ECL_WINDOWS_THREADS + CloseHandle(the_env->thread); +#endif + _ecl_dealloc_env(the_env); + +#ifdef ECL_WINDOWS_THREADS + return 1; +#else + return NULL; +#endif +} + /* Run a process in a new system thread. */ cl_env_ptr -ecl_spawn_cpu() +ecl_spawn_cpu(cl_object process) { - return NULL; -} + cl_env_ptr the_env = ecl_process_env(); + cl_env_ptr new_env = NULL; + int ok = 1; + /* Allocate and initialize the new cpu env. */ + { + new_env = _ecl_alloc_env(the_env); + /* List the process such that its environment is marked by the GC when its + contents are allocated. */ + add_env(new_env); + /* Now we can safely allocate memory for the environment ocntents and store + pointers to it in the environment. */ + ecl_init_env(new_env); + /* Copy the parent env defaults. */ + new_env->trap_fpe_bits = the_env->trap_fpe_bits; + new_env->own_process = process; + init_tl_bindings(process, new_env); + process->process.env = new_env; + } + /* Spawn the thread */ + ecl_disable_interrupts_env(the_env); +#ifdef ECL_WINDOWS_THREADS + { + HANDLE code; + DWORD threadId; + code = (HANDLE)CreateThread(NULL, 0, thread_entry_point, new_env, 0, &threadId); + new_env->thread = code; + ok = code != NULL; + } +#else /* ECL_WINDOWS_THREADS */ + { + int code; + pthread_attr_t pthreadattr; -void -ecl_add_process(cl_object process) -{ - add_env(process->process.env); -} - -void -ecl_del_process(cl_object process) -{ - del_env(process->process.env); + pthread_attr_init(&pthreadattr); + pthread_attr_setdetachstate(&pthreadattr, PTHREAD_CREATE_DETACHED); + /* + * Block all asynchronous signals until the thread is completely + * set up. The synchronous signals SIGSEGV and SIGBUS are needed + * by the gc and thus can't be blocked. + */ +# ifdef HAVE_SIGPROCMASK + { + sigset_t new, previous; + sigfillset(&new); + sigdelset(&new, SIGSEGV); + sigdelset(&new, SIGBUS); + pthread_sigmask(SIG_BLOCK, &new, &previous); + code = pthread_create(&new_env->thread, &pthreadattr, + thread_entry_point, new_env); + pthread_sigmask(SIG_SETMASK, &previous, NULL); + } +# else + code = pthread_create(&new_env->thread, &pthreadattr, + thread_entry_point, new_env); +# endif + ok = (code == 0); + } +#endif /* ECL_WINDOWS_THREADS */ + /* Deal with the fallout of the thread creation. */ + if (!ok) { + del_env(new_env); + process->process.env = NULL; + _ecl_dealloc_env(new_env); + } + ecl_enable_interrupts_env(the_env); + return ok ? new_env : NULL; } #endif @@ -197,8 +340,11 @@ ecl_del_process(cl_object process) void init_process(void) { - cl_env_ptr env = ecl_core.first_env; + cl_env_ptr the_env = ecl_core.first_env; #ifdef ECL_THREADS + ecl_thread_t main_thread; + ecl_set_process_self(main_thread); + the_env->thread = main_thread; ecl_process_key_create(cl_env_key); ecl_mutex_init(&ecl_core.processes_lock, 1); ecl_mutex_init(&ecl_core.global_lock, 1); @@ -206,10 +352,10 @@ init_process(void) ecl_rwlock_init(&ecl_core.global_env_lock); ecl_core.threads = ecl_make_stack(16); #endif - ecl_set_process_env(env); - env->default_sigmask = NULL; - env->method_cache = NULL; - env->slot_cache = NULL; - env->interrupt_struct = NULL; - env->disable_interrupts = 1; + ecl_set_process_env(the_env); + the_env->default_sigmask = NULL; + the_env->method_cache = NULL; + the_env->slot_cache = NULL; + the_env->interrupt_struct = NULL; + the_env->disable_interrupts = 1; } diff --git a/src/c/threads/thread.d b/src/c/threads/thread.d index 2f6609246..40942832a 100644 --- a/src/c/threads/thread.d +++ b/src/c/threads/thread.d @@ -31,26 +31,6 @@ # include #endif -/* -- Macros ---------------------------------------------------------------- */ - -#ifdef ECL_WINDOWS_THREADS -# define ecl_process_eq(t1, t2) (GetThreadId(t1) == GetThreadId(t2)) -# define ecl_set_process_self(var) \ - { \ - HANDLE aux = GetCurrentThread(); \ - DuplicateHandle(GetCurrentProcess(), \ - aux, \ - GetCurrentProcess(), \ - &var, \ - 0, \ - FALSE, \ - DUPLICATE_SAME_ACCESS); \ - } -#else -# define ecl_process_eq(t1, t2) (t1 == t2) -# define ecl_set_process_self(var) (var = pthread_self()) -#endif /* ECL_WINDOWS_THREADS */ - /* -- Core ---------------------------------------------------------- */ static cl_object @@ -87,27 +67,58 @@ assert_type_process(cl_object o) FEwrong_type_argument(@[mp::process], o); } -static void -thread_cleanup(void *aux) +static cl_object +run_process(cl_narg narg, ...) { - /* This routine performs some cleanup before a thread is completely - * killed. For instance, it has to remove the associated process - * object from the list, an it has to dealloc some memory. + /* Upon entering this routine the process environment is set up, the process + * phase is ECL_PROCESS_BOOTING, signals are disabled in the environment and + * the communication interrupt is disabled (sigmasked). * - * NOTE: thread_cleanup() does not provide enough "protection". In - * order to ensure that all UNWIND-PROTECT forms are properly - * executed, never use pthread_cancel() to kill a process, but - * rather use the lisp functions mp_interrupt_process() and - * mp_process_kill(). + * This process will not receive signals that originate from other processes. + * Furthermore, we expect not to get any other interrupts (SIGSEGV, SIGFPE) if + * we do things right. */ - cl_object process = (cl_object)aux; - cl_env_ptr env = process->process.env; + cl_env_ptr the_env = ecl_process_env(); + cl_object process = the_env->own_process; + cl_object fun = process->process.function; + cl_object args = process->process.args; + cl_object output = ECL_NIL; + /* Entry barrier. enable_process releases this lock before exit. */ + ecl_mutex_lock(&process->process.start_stop_lock); + + /* Execute the code. The CATCH_ALL point is the destination provides us with + * an elegant way to exit the thread: we just do an unwind up to frs_top. */ + ECL_CATCH_ALL_BEGIN(the_env) { +#ifdef HAVE_SIGPROCMASK + { + sigset_t *new = (sigset_t*)the_env->default_sigmask; + pthread_sigmask(SIG_SETMASK, new, NULL); + } +#endif + process->process.phase = ECL_PROCESS_ACTIVE; + ecl_mutex_unlock(&process->process.start_stop_lock); + si_trap_fpe(@'last', ECL_T); + + ecl_enable_interrupts_env(the_env); + ecl_bds_bind(the_env, @'mp::*current-process*', process); + + ECL_RESTART_CASE_BEGIN(the_env, @'abort') { + the_env->values[0] = cl_apply(2, fun, args); + int i = the_env->nvalues; + while (i--) { + output = CONS(the_env->values[i], output); + } + process->process.exit_values = output; + } ECL_RESTART_CASE(1,args) { + /* ABORT restart. */ + process->process.exit_values = args; + } ECL_RESTART_CASE_END; + ecl_bds_unwind1(the_env); + } ECL_CATCH_ALL_END; + + ecl_disable_interrupts_env(the_env); + ecl_clear_bignum_registers(the_env); - /* The following flags will disable all interrupts. */ - if (env) { - ecl_disable_interrupts_env(env); - ecl_clear_bignum_registers(env); - } ecl_mutex_lock(&process->process.start_stop_lock); process->process.phase = ECL_PROCESS_EXITING; #ifdef HAVE_SIGPROCMASK @@ -119,123 +130,11 @@ thread_cleanup(void *aux) pthread_sigmask(SIG_BLOCK, new, NULL); } #endif - ecl_del_process(process); process->process.env = NULL; -#ifdef ECL_WINDOWS_THREADS - CloseHandle(env->thread); -#endif - ecl_set_process_env(NULL); - if (env) _ecl_dealloc_env(env); - process->process.phase = ECL_PROCESS_INACTIVE; ecl_cond_var_broadcast(&process->process.exit_barrier); ecl_mutex_unlock(&process->process.start_stop_lock); -} -#ifdef ECL_WINDOWS_THREADS -static DWORD WINAPI -#else -static void * -#endif -thread_entry_point(void *arg) -{ - cl_object process = (cl_object)arg; - cl_env_ptr env = process->process.env; - - /* - * Upon entering this routine - * process.env = our environment for lisp - * process.phase = ECL_PROCESS_BOOTING - * signals are disabled in the environment - * the communication interrupt is disabled (sigmasked) - * - * This process will not receive signals that originate from - * other processes. Furthermore, we expect not to get any - * other interrupts (SIGSEGV, SIGFPE) if we do things right. - */ - /* 1) Setup the environment for the execution of the thread */ - ecl_set_process_env(env = process->process.env); -#ifndef ECL_WINDOWS_THREADS - pthread_cleanup_push(thread_cleanup, (void *)process); -#endif - ecl_cs_init(env); - ecl_mutex_lock(&process->process.start_stop_lock); - - /* 2) Execute the code. The CATCH_ALL point is the destination - * provides us with an elegant way to exit the thread: we just - * do an unwind up to frs_top. - */ - ECL_CATCH_ALL_BEGIN(env) { -#ifdef HAVE_SIGPROCMASK - { - sigset_t *new = (sigset_t*)env->default_sigmask; - pthread_sigmask(SIG_SETMASK, new, NULL); - } -#endif - process->process.phase = ECL_PROCESS_ACTIVE; - ecl_mutex_unlock(&process->process.start_stop_lock); - si_trap_fpe(@'last', ECL_T); - ecl_enable_interrupts_env(env); - ecl_bds_bind(env, @'mp::*current-process*', process); - - ECL_RESTART_CASE_BEGIN(env, @'abort') { - process->process.entry(0); - } ECL_RESTART_CASE(1,args) { - /* ABORT restart. */ - process->process.exit_values = args; - } ECL_RESTART_CASE_END; - ecl_bds_unwind1(env); - } ECL_CATCH_ALL_END; - - /* 4) If everything went right, we should be exiting the thread - * through this point. thread_cleanup is automatically invoked - * marking the process as inactive. - */ -#ifdef ECL_WINDOWS_THREADS - thread_cleanup(process); - return 1; -#else - pthread_cleanup_pop(1); - return NULL; -#endif -} - -static void -init_tl_bindings(cl_object process, cl_env_ptr env) -{ - cl_index bindings_size; - cl_object *bindings; - if (Null(process->process.inherit_bindings_p)) { - cl_index idx = 0, size = 256; - bindings_size = size; - bindings = (cl_object *)ecl_malloc(size*sizeof(cl_object*)); - for(idx=0; idx<256; idx++) { - bindings[idx] = ECL_NO_TL_BINDING; - } - } else { - cl_env_ptr parent_env = ecl_process_env(); - bindings_size = parent_env->bds_stack.tl_bindings_size; - bindings = (cl_object *)ecl_malloc(bindings_size*sizeof(cl_object*)); - ecl_copy(bindings, parent_env->bds_stack.tl_bindings, bindings_size*sizeof(cl_object*)); - } - env->bds_stack.tl_bindings_size = bindings_size; - env->bds_stack.tl_bindings = bindings; -} - -static cl_object -run_process(cl_narg narg, ...) -{ - cl_env_ptr the_env = ecl_process_env(); - cl_object process = the_env->own_process; - cl_object fun = process->process.function; - cl_object args = process->process.args; - cl_object output = ECL_NIL; - the_env->values[0] = cl_apply(2, fun, args); - int i = the_env->nvalues; - while (i--) { - output = CONS(the_env->values[i], output); - } - process->process.exit_values = output; return the_env->values[0]; } @@ -273,10 +172,7 @@ ecl_import_current_thread(cl_object name, cl_object bindings) process = alloc_process(name, ECL_NIL); process->process.env = the_env; process->process.phase = ECL_PROCESS_BOOTING; - - init_tl_bindings(process, the_env); the_env->own_process = process; - process->process.phase = ECL_PROCESS_ACTIVE; ecl_bds_bind(the_env, @'mp::*current-process*', process); @@ -286,16 +182,27 @@ ecl_import_current_thread(cl_object name, cl_object bindings) void ecl_release_current_thread(void) { - cl_env_ptr env = ecl_process_env(); - - int cleanup = env->cleanup; - cl_object own_process = env->own_process; - thread_cleanup(own_process); -#ifdef GBC_BOEHM - if (cleanup) { - GC_unregister_my_thread(); + cl_object process; + cl_env_ptr the_env = ecl_process_env_unsafe(); + if (the_env == NULL) + return; + process = the_env->own_process; + ecl_mutex_lock(&process->process.start_stop_lock); + process->process.env = NULL; + process->process.phase = ECL_PROCESS_EXITING; + ecl_disown_cpu(); +#ifdef HAVE_SIGPROCMASK + /* ...but we might get stray signals. */ + { + sigset_t new[1]; + sigemptyset(new); + sigaddset(new, ecl_option_values[ECL_OPT_THREAD_INTERRUPT_SIGNAL]); + pthread_sigmask(SIG_BLOCK, new, NULL); } #endif + process->process.phase = ECL_PROCESS_INACTIVE; + ecl_cond_var_broadcast(&process->process.exit_barrier); + ecl_mutex_unlock(&process->process.start_stop_lock); } @(defun mp::make-process (&key name ((:initial-bindings initial_bindings_p) ECL_T)) @@ -388,105 +295,33 @@ mp_process_yield(void) cl_object mp_process_enable(cl_object process) { - /* process_env and ok are changed after the setjmp call in - * ECL_UNWIND_PROTECT_BEGIN, so they need to be declared volatile */ - volatile cl_env_ptr process_env = NULL; cl_env_ptr the_env = ecl_process_env(); - volatile int ok = 1; - ECL_UNWIND_PROTECT_BEGIN(the_env) { - /* Try to gain exclusive access to the process. This prevents two - * concurrent calls to process-enable from different threads on - * the same process */ - ecl_mutex_lock(&process->process.start_stop_lock); - /* Ensure that the process is inactive. */ - if (process->process.phase != ECL_PROCESS_INACTIVE) { - FEerror("Cannot enable the running process ~A.", 1, process); - } - ok = 0; - process->process.phase = ECL_PROCESS_BOOTING; - - /* Link environment and process together */ - process_env = _ecl_alloc_env(the_env); - process_env->own_process = process; - process->process.env = process_env; - - /* Immediately list the process such that its environment is - * marked by the gc when its contents are allocated */ - ecl_add_process(process); - - /* Now we can safely allocate memory for the environment contents - * and store pointers to it in the environment */ - ecl_init_env(process_env); - - process_env->trap_fpe_bits = the_env->trap_fpe_bits; - init_tl_bindings(process, process_env); - - ecl_disable_interrupts_env(the_env); -#ifdef ECL_WINDOWS_THREADS - { - HANDLE code; - DWORD threadId; - - code = (HANDLE)CreateThread(NULL, 0, thread_entry_point, process, 0, &threadId); - ok = (process_env->thread = code) != NULL; - } -#else - { - int code; - pthread_attr_t pthreadattr; - - pthread_attr_init(&pthreadattr); - pthread_attr_setdetachstate(&pthreadattr, PTHREAD_CREATE_DETACHED); - /* - * Block all asynchronous signals until the thread is completely - * set up. The synchronous signals SIGSEGV and SIGBUS are needed - * by the gc and thus can't be blocked. - */ -#ifdef HAVE_SIGPROCMASK - { - sigset_t new, previous; - sigfillset(&new); - sigdelset(&new, SIGSEGV); - sigdelset(&new, SIGBUS); - pthread_sigmask(SIG_BLOCK, &new, &previous); - code = pthread_create(&process_env->thread, &pthreadattr, - thread_entry_point, process); - pthread_sigmask(SIG_SETMASK, &previous, NULL); - } -#else - code = pthread_create(&process_env->thread, &pthreadattr, - thread_entry_point, process); -#endif - ok = (code == 0); - } -#endif - ecl_enable_interrupts_env(the_env); - } ECL_UNWIND_PROTECT_THREAD_SAFE_EXIT { - if (!ok) { - /* INV: interrupts are already disabled through thread safe - * unwind-protect */ - ecl_del_process(process); - process->process.phase = ECL_PROCESS_INACTIVE; - /* Alert possible waiting processes. */ - ecl_cond_var_broadcast(&process->process.exit_barrier); - process->process.env = NULL; - if (process_env != NULL) - _ecl_dealloc_env(process_env); - } - /* Unleash the thread */ + cl_env_ptr process_env = NULL; + /* Try to gain exclusive access to the process. This prevents two concurrent + * calls to process-enable from different threads on the same process */ + ecl_mutex_lock(&process->process.start_stop_lock); + /* Ensure that the process is inactive. */ + if (process->process.phase != ECL_PROCESS_INACTIVE) { ecl_mutex_unlock(&process->process.start_stop_lock); - } ECL_UNWIND_PROTECT_THREAD_SAFE_END; - - @(return (ok? process : ECL_NIL)); + FEerror("Cannot enable the running process ~A.", 1, process); + } + process->process.phase = ECL_PROCESS_BOOTING; + /* Spawn the thread (allocates the environment)*/ + process_env = ecl_spawn_cpu(process); + if (process_env == NULL) { + process->process.phase = ECL_PROCESS_INACTIVE; + ecl_cond_var_broadcast(&process->process.exit_barrier); + } + /* Unleash the thread */ + ecl_mutex_unlock(&process->process.start_stop_lock); + ecl_return1(the_env, (process_env ? process : ECL_NIL)); } cl_object mp_exit_process(void) { - /* We simply undo the whole of the frame stack. This brings up - back to the thread entry point, going through all possible - UNWIND-PROTECT. - */ + /* We simply undo the whole of the frame stack. This brings up back to the + thread entry point, going through all possible UNWIND-PROTECT. */ const cl_env_ptr the_env = ecl_process_env(); ecl_unwind(the_env, the_env->frs_stack.org); /* Never reached */ @@ -647,11 +482,9 @@ init_threads() { cl_env_ptr the_env = ecl_process_env(); cl_object process, _env = ecl_cast_ptr(cl_object,the_env); - ecl_thread_t main_thread; /* We have to set the environment before any allocation takes place, * so that the interrupt handling code works. */ ecl_cs_init(the_env); - ecl_set_process_self(main_thread); process = ecl_alloc_object(t_process); process->process.phase = ECL_PROCESS_ACTIVE; process->process.name = @'si::top-level'; @@ -660,7 +493,6 @@ init_threads() process->process.env = the_env; ecl_mutex_init(&process->process.start_stop_lock, TRUE); ecl_cond_var_init(&process->process.exit_barrier); - the_env->thread = main_thread; the_env->own_process = process; ecl_stack_push(ecl_core.threads, _env); } diff --git a/src/h/nucleus.h b/src/h/nucleus.h index 08f553651..039fb5dd9 100644 --- a/src/h/nucleus.h +++ b/src/h/nucleus.h @@ -36,9 +36,7 @@ struct ecl_core_struct { /* process.c */ cl_env_ptr ecl_adopt_cpu(); cl_env_ptr ecl_spawn_cpu(); - -void ecl_add_process(cl_object process); -void ecl_del_process(cl_object process); +void ecl_disown_cpu(); /* control.c */ cl_object ecl_escape(cl_object continuation) ecl_attr_noreturn;