multithreading: implement barriers using native mutexes

This commit is contained in:
Marius Gerbershagen 2020-09-05 20:53:00 +02:00
parent 579a8d4380
commit b332f2c592
3 changed files with 119 additions and 103 deletions

View file

@ -463,8 +463,6 @@ cl_object_mark_proc(void *addr, struct GC_ms_entry *msp, struct GC_ms_entry *msl
MAYBE_MARK(o->semaphore.name);
break;
case t_barrier:
MAYBE_MARK(o->barrier.queue_list);
MAYBE_MARK(o->barrier.queue_spinlock);
MAYBE_MARK(o->barrier.name);
break;
case t_mailbox:
@ -1030,9 +1028,7 @@ init_alloc(void)
to_bitmap(&o, &(o.semaphore.queue_list)) |
to_bitmap(&o, &(o.semaphore.queue_spinlock));
type_info[t_barrier].descriptor =
to_bitmap(&o, &(o.barrier.name)) |
to_bitmap(&o, &(o.barrier.queue_list)) |
to_bitmap(&o, &(o.barrier.queue_spinlock));
to_bitmap(&o, &(o.barrier.name));
type_info[t_mailbox].descriptor =
to_bitmap(&o, &(o.mailbox.name)) |
to_bitmap(&o, &(o.mailbox.data)) |
@ -1127,6 +1123,14 @@ standard_finalizer(cl_object o)
ecl_enable_interrupts_env(the_env);
break;
}
case t_barrier: {
const cl_env_ptr the_env = ecl_process_env();
ecl_disable_interrupts_env(the_env);
ecl_mutex_destroy(&o->barrier.mutex);
ecl_cond_var_destroy(&o->barrier.cv);
ecl_enable_interrupts_env(the_env);
break;
}
# ifdef ECL_RWLOCK
case t_rwlock: {
const cl_env_ptr the_env = ecl_process_env();
@ -1176,6 +1180,7 @@ register_finalizer(cl_object o, void *finalized_object,
#if defined(ECL_THREADS)
case t_lock:
case t_condition_variable:
case t_barrier:
# if defined(ECL_RWLOCK)
case t_rwlock:
# endif

View file

@ -5,6 +5,7 @@
* barrier.d - wait barriers
*
* Copyright (c) 2012 Juan Jose Garcia Ripoll
* Copyright (c) 2020 Marius Gerbershagen
*
* See file 'LICENSE' for the copyright details.
*
@ -13,21 +14,21 @@
#include <ecl/ecl.h>
#include <ecl/internal.h>
static ECL_INLINE void
FEerror_not_a_barrier(cl_object barrier)
{
FEwrong_type_argument(@'mp::barrier', barrier);
}
cl_object
ecl_make_barrier(cl_object name, cl_index count)
{
cl_env_ptr env = ecl_process_env();
cl_object output = ecl_alloc_object(t_barrier);
output->barrier.disabled = FALSE;
output->barrier.wakeup = 0;
output->barrier.name = name;
output->barrier.arrivers_count = count;
output->barrier.arrivers_count = 0;
output->barrier.count = count;
output->barrier.queue_list = ECL_NIL;
output->barrier.queue_spinlock = ECL_NIL;
ecl_disable_interrupts_env(env);
ecl_cond_var_init(&output->barrier.cv);
ecl_mutex_init(&output->barrier.mutex, FALSE);
ecl_set_finalizer_unprotected(output, ECL_T);
ecl_enable_interrupts_env(env);
return output;
}
@ -43,7 +44,7 @@ mp_barrier_name(cl_object barrier)
{
cl_env_ptr env = ecl_process_env();
unlikely_if (ecl_t_of(barrier) != t_barrier) {
FEerror_not_a_barrier(barrier);
FEwrong_type_only_arg(@[mp::barrier-name], barrier, @[mp::barrier]);
}
ecl_return1(env, barrier->barrier.name);
}
@ -53,7 +54,7 @@ mp_barrier_count(cl_object barrier)
{
cl_env_ptr env = ecl_process_env();
unlikely_if (ecl_t_of(barrier) != t_barrier) {
FEerror_not_a_barrier(barrier);
FEwrong_type_only_arg(@[mp::barrier-count], barrier, @[mp::barrier]);
}
ecl_return1(env, ecl_make_fixnum(barrier->barrier.count));
}
@ -61,104 +62,111 @@ mp_barrier_count(cl_object barrier)
cl_object
mp_barrier_arrivers_count(cl_object barrier)
{
cl_fixnum arrivers, count;
cl_env_ptr env = ecl_process_env();
unlikely_if (ecl_t_of(barrier) != t_barrier) {
FEerror_not_a_barrier(barrier);
FEwrong_type_only_arg(@[mp::barrier-arrivers_count], barrier, @[mp::barrier]);
}
arrivers = barrier->barrier.arrivers_count;
count = barrier->barrier.count;
if (arrivers < 0)
arrivers = 0; /* Disabled barrier */
else
arrivers = count - arrivers;
ecl_return1(env, ecl_make_fixnum(arrivers));
ecl_return1(env, ecl_make_fixnum(barrier->barrier.arrivers_count));
}
/* INV: locking the mutex in mp_barrier_unblock and mp_barrier_wait
* will always succeed since the functions are not reentrant and only
* lock/unlock the mutex while interrupts are disabled, therefore
* deadlocks can't happen. */
@(defun mp::barrier-unblock (barrier &key reset_count disable kill_waiting)
int ping_flags = ECL_WAKEUP_RESET_FLAG | ECL_WAKEUP_ALL;
int kill_flags = ECL_WAKEUP_RESET_FLAG | ECL_WAKEUP_KILL | ECL_WAKEUP_ALL;
@
unlikely_if (ecl_t_of(barrier) != t_barrier) {
FEerror_not_a_barrier(barrier);
FEwrong_type_nth_arg(@[mp::barrier-unblock], 1, barrier, @[mp::barrier]);
}
if (!Null(reset_count))
ecl_disable_interrupts_env(the_env);
AGAIN:
ecl_mutex_lock(&barrier->barrier.mutex);
if (barrier->barrier.wakeup) {
/* we are currently waking up blocked threads; loop until all
* threads have woken up */
ecl_mutex_unlock(&barrier->barrier.mutex);
goto AGAIN;
}
if (!Null(reset_count)) {
barrier->barrier.count = fixnnint(reset_count);
if (!Null(disable))
barrier->barrier.arrivers_count = -1;
else
barrier->barrier.arrivers_count = barrier->barrier.count;
ecl_wakeup_waiters(the_env, barrier,
Null(kill_waiting)? ping_flags : kill_flags);
}
if (!Null(disable)) {
barrier->barrier.disabled = TRUE;
} else {
barrier->barrier.disabled = FALSE;
}
if (barrier->barrier.arrivers_count > 0) {
if (!Null(kill_waiting)) {
barrier->barrier.wakeup = ECL_BARRIER_WAKEUP_KILL;
} else {
barrier->barrier.wakeup = ECL_BARRIER_WAKEUP_NORMAL;
}
ecl_cond_var_broadcast(&barrier->barrier.cv);
}
ecl_mutex_unlock(&barrier->barrier.mutex);
ecl_enable_interrupts_env(the_env);
@(return);
@)
static cl_object
barrier_wait_condition(cl_env_ptr env, cl_object barrier)
{
/* We were signaled */
if (env->own_process->process.woken_up != ECL_NIL)
return ECL_T;
/* Disabled barrier */
else if (barrier->barrier.arrivers_count < 0)
return ECL_T;
else
return ECL_NIL;
}
static cl_fixnum
decrement_counter(cl_fixnum *counter)
{
/* The logic is as follows:
* - If the counter is negative, we abort. This is a way of
* disabling the counter.
* - Otherwise, we decrease the counter only if it is positive
* - If the counter is currently zero, then we block. This
* situation implies that some other thread is unblocking.
*/
cl_fixnum c;
do {
c = *counter;
if (c < 0) {
return c;
} else if (c > 0) {
if (AO_compare_and_swap_full((AO_t*)counter,
(AO_t)c, (AO_t)(c-1)))
return c;
}
} while (1);
}
cl_object
mp_barrier_wait(cl_object barrier)
{
cl_object output;
cl_fixnum counter;
mp_barrier_wait(cl_object barrier) {
cl_env_ptr the_env = ecl_process_env();
volatile int wakeup = 0;
unlikely_if (ecl_t_of(barrier) != t_barrier) {
FEerror_not_a_barrier(barrier);
FEwrong_type_only_arg(@[mp::barrier-wait], barrier, @[mp::barrier]);
}
ecl_disable_interrupts_env(the_env);
counter = decrement_counter(&barrier->barrier.arrivers_count);
if (counter == 1) {
print_lock("barrier %p saturated", barrier, barrier);
/* There are (count-1) threads in the queue and we
* are the last one. We thus unblock all threads and
* proceed. */
ecl_enable_interrupts_env(the_env);
mp_barrier_unblock(1, barrier);
output = @':unblocked';
} else if (counter > 1) {
print_lock("barrier %p waiting", barrier, barrier);
ecl_enable_interrupts_env(the_env);
ecl_wait_on(the_env, barrier_wait_condition, barrier);
output = ECL_T;
} else {
print_lock("barrier %p pass-through", barrier, barrier);
ecl_enable_interrupts_env(the_env);
/* Barrier disabled */
output = ECL_NIL;
ecl_bds_bind(the_env, ECL_INTERRUPTS_ENABLED, ECL_NIL);
/* check if the barrier is disabled */
do {
ecl_mutex_lock(&barrier->barrier.mutex);
if (barrier->barrier.disabled) {
ecl_mutex_unlock(&barrier->barrier.mutex);
ecl_bds_unwind1(the_env);
ecl_check_pending_interrupts(the_env);
return ECL_NIL;
}
if (barrier->barrier.wakeup) {
/* We are currently waking up blocked threads; loop until all threads have
* woken up. */
ecl_mutex_unlock(&barrier->barrier.mutex);
} else {
break;
}
} while(1);
/* check if we have reached the maximum count */
if ((barrier->barrier.arrivers_count+1) == barrier->barrier.count) {
if (barrier->barrier.arrivers_count > 0) {
barrier->barrier.wakeup = ECL_BARRIER_WAKEUP_NORMAL;
ecl_cond_var_broadcast(&barrier->barrier.cv);
}
ecl_mutex_unlock(&barrier->barrier.mutex);
ecl_bds_unwind1(the_env);
ecl_check_pending_interrupts(the_env);
return @':unblocked';
}
return output;
/* barrier is neither disabled nor unblocked, start waiting */
barrier->barrier.arrivers_count++;
ECL_UNWIND_PROTECT_BEGIN(the_env) {
do {
ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_T);
ecl_cond_var_wait(&barrier->barrier.cv, &barrier->barrier.mutex);
ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_NIL);
} while(!barrier->barrier.wakeup);
wakeup = barrier->barrier.wakeup;
if (barrier->barrier.arrivers_count - 1 == 0) {
/* we are the last thread to wake up, reset the barrier */
barrier->barrier.wakeup = 0;
}
} ECL_UNWIND_PROTECT_THREAD_SAFE_EXIT {
--barrier->barrier.arrivers_count;
ecl_mutex_unlock(&barrier->barrier.mutex);
if (wakeup == ECL_BARRIER_WAKEUP_KILL) {
mp_exit_process();
}
} ECL_UNWIND_PROTECT_THREAD_SAFE_END;
ecl_bds_unwind1(the_env);
ecl_check_pending_interrupts(the_env);
return ECL_T;
}

View file

@ -993,13 +993,16 @@ struct ecl_semaphore {
cl_fixnum counter;
};
#define ECL_BARRIER_WAKEUP_NORMAL 1
#define ECL_BARRIER_WAKEUP_KILL 2
struct ecl_barrier {
_ECL_HDR;
cl_object queue_list;
cl_object queue_spinlock;
_ECL_HDR2(disabled,wakeup);
cl_object name;
cl_fixnum count;
cl_fixnum arrivers_count;
cl_index count;
cl_index arrivers_count;
ecl_mutex_t mutex;
ecl_cond_var_t cv;
};
struct ecl_lock {