diff --git a/src/c/alloc_2.d b/src/c/alloc_2.d index 8957586b3..a36428c9c 100644 --- a/src/c/alloc_2.d +++ b/src/c/alloc_2.d @@ -463,8 +463,6 @@ cl_object_mark_proc(void *addr, struct GC_ms_entry *msp, struct GC_ms_entry *msl MAYBE_MARK(o->semaphore.name); break; case t_barrier: - MAYBE_MARK(o->barrier.queue_list); - MAYBE_MARK(o->barrier.queue_spinlock); MAYBE_MARK(o->barrier.name); break; case t_mailbox: @@ -1030,9 +1028,7 @@ init_alloc(void) to_bitmap(&o, &(o.semaphore.queue_list)) | to_bitmap(&o, &(o.semaphore.queue_spinlock)); type_info[t_barrier].descriptor = - to_bitmap(&o, &(o.barrier.name)) | - to_bitmap(&o, &(o.barrier.queue_list)) | - to_bitmap(&o, &(o.barrier.queue_spinlock)); + to_bitmap(&o, &(o.barrier.name)); type_info[t_mailbox].descriptor = to_bitmap(&o, &(o.mailbox.name)) | to_bitmap(&o, &(o.mailbox.data)) | @@ -1127,6 +1123,14 @@ standard_finalizer(cl_object o) ecl_enable_interrupts_env(the_env); break; } + case t_barrier: { + const cl_env_ptr the_env = ecl_process_env(); + ecl_disable_interrupts_env(the_env); + ecl_mutex_destroy(&o->barrier.mutex); + ecl_cond_var_destroy(&o->barrier.cv); + ecl_enable_interrupts_env(the_env); + break; + } # ifdef ECL_RWLOCK case t_rwlock: { const cl_env_ptr the_env = ecl_process_env(); @@ -1176,6 +1180,7 @@ register_finalizer(cl_object o, void *finalized_object, #if defined(ECL_THREADS) case t_lock: case t_condition_variable: + case t_barrier: # if defined(ECL_RWLOCK) case t_rwlock: # endif diff --git a/src/c/threads/barrier.d b/src/c/threads/barrier.d index c5512d83c..3aaaf39d3 100755 --- a/src/c/threads/barrier.d +++ b/src/c/threads/barrier.d @@ -5,6 +5,7 @@ * barrier.d - wait barriers * * Copyright (c) 2012 Juan Jose Garcia Ripoll + * Copyright (c) 2020 Marius Gerbershagen * * See file 'LICENSE' for the copyright details. * @@ -13,21 +14,21 @@ #include #include -static ECL_INLINE void -FEerror_not_a_barrier(cl_object barrier) -{ - FEwrong_type_argument(@'mp::barrier', barrier); -} - cl_object ecl_make_barrier(cl_object name, cl_index count) { + cl_env_ptr env = ecl_process_env(); cl_object output = ecl_alloc_object(t_barrier); + output->barrier.disabled = FALSE; + output->barrier.wakeup = 0; output->barrier.name = name; - output->barrier.arrivers_count = count; + output->barrier.arrivers_count = 0; output->barrier.count = count; - output->barrier.queue_list = ECL_NIL; - output->barrier.queue_spinlock = ECL_NIL; + ecl_disable_interrupts_env(env); + ecl_cond_var_init(&output->barrier.cv); + ecl_mutex_init(&output->barrier.mutex, FALSE); + ecl_set_finalizer_unprotected(output, ECL_T); + ecl_enable_interrupts_env(env); return output; } @@ -43,7 +44,7 @@ mp_barrier_name(cl_object barrier) { cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(barrier) != t_barrier) { - FEerror_not_a_barrier(barrier); + FEwrong_type_only_arg(@[mp::barrier-name], barrier, @[mp::barrier]); } ecl_return1(env, barrier->barrier.name); } @@ -53,7 +54,7 @@ mp_barrier_count(cl_object barrier) { cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(barrier) != t_barrier) { - FEerror_not_a_barrier(barrier); + FEwrong_type_only_arg(@[mp::barrier-count], barrier, @[mp::barrier]); } ecl_return1(env, ecl_make_fixnum(barrier->barrier.count)); } @@ -61,104 +62,111 @@ mp_barrier_count(cl_object barrier) cl_object mp_barrier_arrivers_count(cl_object barrier) { - cl_fixnum arrivers, count; cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(barrier) != t_barrier) { - FEerror_not_a_barrier(barrier); + FEwrong_type_only_arg(@[mp::barrier-arrivers_count], barrier, @[mp::barrier]); } - arrivers = barrier->barrier.arrivers_count; - count = barrier->barrier.count; - if (arrivers < 0) - arrivers = 0; /* Disabled barrier */ - else - arrivers = count - arrivers; - ecl_return1(env, ecl_make_fixnum(arrivers)); + ecl_return1(env, ecl_make_fixnum(barrier->barrier.arrivers_count)); } +/* INV: locking the mutex in mp_barrier_unblock and mp_barrier_wait + * will always succeed since the functions are not reentrant and only + * lock/unlock the mutex while interrupts are disabled, therefore + * deadlocks can't happen. */ + @(defun mp::barrier-unblock (barrier &key reset_count disable kill_waiting) - int ping_flags = ECL_WAKEUP_RESET_FLAG | ECL_WAKEUP_ALL; - int kill_flags = ECL_WAKEUP_RESET_FLAG | ECL_WAKEUP_KILL | ECL_WAKEUP_ALL; @ unlikely_if (ecl_t_of(barrier) != t_barrier) { - FEerror_not_a_barrier(barrier); + FEwrong_type_nth_arg(@[mp::barrier-unblock], 1, barrier, @[mp::barrier]); } - if (!Null(reset_count)) + ecl_disable_interrupts_env(the_env); + AGAIN: + ecl_mutex_lock(&barrier->barrier.mutex); + if (barrier->barrier.wakeup) { + /* we are currently waking up blocked threads; loop until all + * threads have woken up */ + ecl_mutex_unlock(&barrier->barrier.mutex); + goto AGAIN; + } + if (!Null(reset_count)) { barrier->barrier.count = fixnnint(reset_count); - if (!Null(disable)) - barrier->barrier.arrivers_count = -1; - else - barrier->barrier.arrivers_count = barrier->barrier.count; - ecl_wakeup_waiters(the_env, barrier, - Null(kill_waiting)? ping_flags : kill_flags); + } + if (!Null(disable)) { + barrier->barrier.disabled = TRUE; + } else { + barrier->barrier.disabled = FALSE; + } + if (barrier->barrier.arrivers_count > 0) { + if (!Null(kill_waiting)) { + barrier->barrier.wakeup = ECL_BARRIER_WAKEUP_KILL; + } else { + barrier->barrier.wakeup = ECL_BARRIER_WAKEUP_NORMAL; + } + ecl_cond_var_broadcast(&barrier->barrier.cv); + } + ecl_mutex_unlock(&barrier->barrier.mutex); + ecl_enable_interrupts_env(the_env); @(return); @) -static cl_object -barrier_wait_condition(cl_env_ptr env, cl_object barrier) -{ - /* We were signaled */ - if (env->own_process->process.woken_up != ECL_NIL) - return ECL_T; - /* Disabled barrier */ - else if (barrier->barrier.arrivers_count < 0) - return ECL_T; - else - return ECL_NIL; -} - -static cl_fixnum -decrement_counter(cl_fixnum *counter) -{ - /* The logic is as follows: - * - If the counter is negative, we abort. This is a way of - * disabling the counter. - * - Otherwise, we decrease the counter only if it is positive - * - If the counter is currently zero, then we block. This - * situation implies that some other thread is unblocking. - */ - cl_fixnum c; - do { - c = *counter; - if (c < 0) { - return c; - } else if (c > 0) { - if (AO_compare_and_swap_full((AO_t*)counter, - (AO_t)c, (AO_t)(c-1))) - return c; - } - } while (1); -} - cl_object -mp_barrier_wait(cl_object barrier) -{ - cl_object output; - cl_fixnum counter; +mp_barrier_wait(cl_object barrier) { cl_env_ptr the_env = ecl_process_env(); - + volatile int wakeup = 0; unlikely_if (ecl_t_of(barrier) != t_barrier) { - FEerror_not_a_barrier(barrier); + FEwrong_type_only_arg(@[mp::barrier-wait], barrier, @[mp::barrier]); } - ecl_disable_interrupts_env(the_env); - counter = decrement_counter(&barrier->barrier.arrivers_count); - if (counter == 1) { - print_lock("barrier %p saturated", barrier, barrier); - /* There are (count-1) threads in the queue and we - * are the last one. We thus unblock all threads and - * proceed. */ - ecl_enable_interrupts_env(the_env); - mp_barrier_unblock(1, barrier); - output = @':unblocked'; - } else if (counter > 1) { - print_lock("barrier %p waiting", barrier, barrier); - ecl_enable_interrupts_env(the_env); - ecl_wait_on(the_env, barrier_wait_condition, barrier); - output = ECL_T; - } else { - print_lock("barrier %p pass-through", barrier, barrier); - ecl_enable_interrupts_env(the_env); - /* Barrier disabled */ - output = ECL_NIL; + ecl_bds_bind(the_env, ECL_INTERRUPTS_ENABLED, ECL_NIL); + /* check if the barrier is disabled */ + do { + ecl_mutex_lock(&barrier->barrier.mutex); + if (barrier->barrier.disabled) { + ecl_mutex_unlock(&barrier->barrier.mutex); + ecl_bds_unwind1(the_env); + ecl_check_pending_interrupts(the_env); + return ECL_NIL; + } + if (barrier->barrier.wakeup) { + /* We are currently waking up blocked threads; loop until all threads have + * woken up. */ + ecl_mutex_unlock(&barrier->barrier.mutex); + } else { + break; + } + } while(1); + /* check if we have reached the maximum count */ + if ((barrier->barrier.arrivers_count+1) == barrier->barrier.count) { + if (barrier->barrier.arrivers_count > 0) { + barrier->barrier.wakeup = ECL_BARRIER_WAKEUP_NORMAL; + ecl_cond_var_broadcast(&barrier->barrier.cv); + } + ecl_mutex_unlock(&barrier->barrier.mutex); + ecl_bds_unwind1(the_env); + ecl_check_pending_interrupts(the_env); + return @':unblocked'; } - return output; + /* barrier is neither disabled nor unblocked, start waiting */ + barrier->barrier.arrivers_count++; + ECL_UNWIND_PROTECT_BEGIN(the_env) { + do { + ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_T); + ecl_cond_var_wait(&barrier->barrier.cv, &barrier->barrier.mutex); + ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_NIL); + } while(!barrier->barrier.wakeup); + wakeup = barrier->barrier.wakeup; + if (barrier->barrier.arrivers_count - 1 == 0) { + /* we are the last thread to wake up, reset the barrier */ + barrier->barrier.wakeup = 0; + } + } ECL_UNWIND_PROTECT_THREAD_SAFE_EXIT { + --barrier->barrier.arrivers_count; + ecl_mutex_unlock(&barrier->barrier.mutex); + if (wakeup == ECL_BARRIER_WAKEUP_KILL) { + mp_exit_process(); + } + } ECL_UNWIND_PROTECT_THREAD_SAFE_END; + ecl_bds_unwind1(the_env); + ecl_check_pending_interrupts(the_env); + return ECL_T; } + diff --git a/src/h/object.h b/src/h/object.h index 11fe20fc2..df1bd271d 100644 --- a/src/h/object.h +++ b/src/h/object.h @@ -993,13 +993,16 @@ struct ecl_semaphore { cl_fixnum counter; }; +#define ECL_BARRIER_WAKEUP_NORMAL 1 +#define ECL_BARRIER_WAKEUP_KILL 2 + struct ecl_barrier { - _ECL_HDR; - cl_object queue_list; - cl_object queue_spinlock; + _ECL_HDR2(disabled,wakeup); cl_object name; - cl_fixnum count; - cl_fixnum arrivers_count; + cl_index count; + cl_index arrivers_count; + ecl_mutex_t mutex; + ecl_cond_var_t cv; }; struct ecl_lock {