multithreading: implement mailboxes using native mutexes

The old implementation was not race condition free. If two threads (A
and B) were writing at the same time while one thread (C) was reading,
the following could happen:

1. thread A increases the write pointer (but does not store the
   message yet)
2. thread B increases the write pointer, stores the message and
   signals thread C
3. thread C tries to read from the location that thread A has not yet
   written to

The new implementation is a simple and obvious solution using a common
mutex and two condition variables for reading/writing. We don't bother
with a (complex) interrupt safe implementation.
This commit is contained in:
Marius Gerbershagen 2020-09-07 22:05:49 +02:00
parent 968083738a
commit 23a7ade20c
4 changed files with 88 additions and 63 deletions

View file

@ -466,8 +466,6 @@ cl_object_mark_proc(void *addr, struct GC_ms_entry *msp, struct GC_ms_entry *msl
case t_mailbox:
MAYBE_MARK(o->mailbox.data);
MAYBE_MARK(o->mailbox.name);
MAYBE_MARK(o->mailbox.reader_semaphore);
MAYBE_MARK(o->mailbox.writer_semaphore);
break;
# endif
case t_codeblock:
@ -1027,9 +1025,7 @@ init_alloc(void)
to_bitmap(&o, &(o.barrier.name));
type_info[t_mailbox].descriptor =
to_bitmap(&o, &(o.mailbox.name)) |
to_bitmap(&o, &(o.mailbox.data)) |
to_bitmap(&o, &(o.mailbox.reader_semaphore)) |
to_bitmap(&o, &(o.mailbox.writer_semaphore));
to_bitmap(&o, &(o.mailbox.data));
# endif
type_info[t_codeblock].descriptor =
to_bitmap(&o, &(o.cblock.data)) |
@ -1135,6 +1131,15 @@ standard_finalizer(cl_object o)
ecl_enable_interrupts_env(the_env);
break;
}
case t_mailbox: {
const cl_env_ptr the_env = ecl_process_env();
ecl_disable_interrupts_env(the_env);
ecl_mutex_destroy(&o->mailbox.mutex);
ecl_cond_var_destroy(&o->mailbox.reader_cv);
ecl_cond_var_destroy(&o->mailbox.writer_cv);
ecl_enable_interrupts_env(the_env);
break;
}
# ifdef ECL_RWLOCK
case t_rwlock: {
const cl_env_ptr the_env = ecl_process_env();
@ -1186,6 +1191,7 @@ register_finalizer(cl_object o, void *finalized_object,
case t_condition_variable:
case t_barrier:
case t_semaphore:
case t_mailbox:
# if defined(ECL_RWLOCK)
case t_rwlock:
# endif

View file

@ -13,22 +13,13 @@
#include <ecl/ecl.h>
#include <ecl/internal.h>
static ECL_INLINE void
FEerror_not_a_mailbox(cl_object mailbox)
{
FEwrong_type_argument(@'mp::mailbox', mailbox);
}
/* NOTE: The mailbox functions are not interrupt safe. */
cl_object
ecl_make_mailbox(cl_object name, cl_fixnum count)
{
cl_env_ptr env = ecl_process_env();
cl_object output = ecl_alloc_object(t_mailbox);
cl_fixnum mask;
for (mask = 1; mask < count; mask <<= 1) {}
if (mask == 1)
mask = 63;
count = mask;
mask = count - 1;
output->mailbox.name = name;
output->mailbox.data = si_make_vector(ECL_T, /* element type */
ecl_make_fixnum(count), /* size */
@ -36,13 +27,15 @@ ecl_make_mailbox(cl_object name, cl_fixnum count)
ECL_NIL, /* fill pointer */
ECL_NIL, /* displaced to */
ECL_NIL); /* displacement */
output->mailbox.reader_semaphore =
ecl_make_semaphore(name, 0);
output->mailbox.writer_semaphore =
ecl_make_semaphore(name, count);
output->mailbox.message_count = 0;
output->mailbox.read_pointer = 0;
output->mailbox.write_pointer = 0;
output->mailbox.mask = mask;
ecl_disable_interrupts_env(env);
ecl_mutex_init(&output->mailbox.mutex, FALSE);
ecl_cond_var_init(&output->mailbox.reader_cv);
ecl_cond_var_init(&output->mailbox.writer_cv);
ecl_set_finalizer_unprotected(output, ECL_T);
ecl_enable_interrupts_env(env);
return output;
}
@ -56,7 +49,7 @@ mp_mailbox_name(cl_object mailbox)
{
cl_env_ptr env = ecl_process_env();
unlikely_if (ecl_t_of(mailbox) != t_mailbox) {
FEerror_not_a_mailbox(mailbox);
FEwrong_type_only_arg(@[mp::mailbox-name], mailbox, @[mp::mailbox]);
}
ecl_return1(env, mailbox->mailbox.name);
}
@ -66,7 +59,7 @@ mp_mailbox_count(cl_object mailbox)
{
cl_env_ptr env = ecl_process_env();
unlikely_if (ecl_t_of(mailbox) != t_mailbox) {
FEerror_not_a_mailbox(mailbox);
FEwrong_type_only_arg(@[mp::mailbox-count], mailbox, @[mp::mailbox]);
}
ecl_return1(env, ecl_make_fixnum(mailbox->mailbox.data->vector.dim));
}
@ -76,27 +69,41 @@ mp_mailbox_empty_p(cl_object mailbox)
{
cl_env_ptr env = ecl_process_env();
unlikely_if (ecl_t_of(mailbox) != t_mailbox) {
FEerror_not_a_mailbox(mailbox);
FEwrong_type_only_arg(@[mp::mailbox-empty-p], mailbox, @[mp::mailbox]);
}
ecl_return1(env, mailbox->mailbox.reader_semaphore->semaphore.counter? ECL_NIL : ECL_T);
ecl_return1(env, mailbox->mailbox.message_count? ECL_NIL : ECL_T);
}
static cl_object
read_message(cl_object mailbox)
{
cl_object output;
cl_fixnum ndx = mailbox->mailbox.read_pointer++;
if (mailbox->mailbox.read_pointer >= mailbox->mailbox.data->vector.dim) {
mailbox->mailbox.read_pointer = 0;
}
output = mailbox->mailbox.data->vector.self.t[ndx];
mailbox->mailbox.message_count--;
ecl_cond_var_signal(&mailbox->mailbox.writer_cv);
return output;
}
cl_object
mp_mailbox_read(cl_object mailbox)
{
cl_env_ptr env = ecl_process_env();
cl_fixnum ndx;
cl_object output;
unlikely_if (ecl_t_of(mailbox) != t_mailbox) {
FEerror_not_a_mailbox(mailbox);
FEwrong_type_only_arg(@[mp::mailbox-read], mailbox, @[mp::mailbox]);
}
mp_wait_on_semaphore(mailbox->mailbox.reader_semaphore);
ecl_mutex_lock(&mailbox->mailbox.mutex);
{
ndx = AO_fetch_and_add1((AO_t*)&mailbox->mailbox.read_pointer) &
mailbox->mailbox.mask;
output = mailbox->mailbox.data->vector.self.t[ndx];
while (mailbox->mailbox.message_count == 0) {
ecl_cond_var_wait(&mailbox->mailbox.reader_cv, &mailbox->mailbox.mutex);
}
output = read_message(mailbox);
}
mp_signal_semaphore(1, mailbox->mailbox.writer_semaphore);
ecl_mutex_unlock(&mailbox->mailbox.mutex);
ecl_return1(env, output);
}
@ -104,37 +111,50 @@ cl_object
mp_mailbox_try_read(cl_object mailbox)
{
cl_env_ptr env = ecl_process_env();
cl_fixnum ndx;
cl_object output;
unlikely_if (ecl_t_of(mailbox) != t_mailbox) {
FEerror_not_a_mailbox(mailbox);
FEwrong_type_only_arg(@[mp::mailbox-try-read], mailbox, @[mp::mailbox]);
}
output = mp_try_get_semaphore(mailbox->mailbox.reader_semaphore);
if (output != ECL_NIL) {
ndx = AO_fetch_and_add1((AO_t*)&mailbox->mailbox.read_pointer) &
mailbox->mailbox.mask;
output = mailbox->mailbox.data->vector.self.t[ndx];
mp_signal_semaphore(1, mailbox->mailbox.writer_semaphore);
ecl_mutex_lock(&mailbox->mailbox.mutex);
{
if (mailbox->mailbox.message_count == 0) {
output = ECL_NIL;
} else {
output = read_message(mailbox);
}
}
ecl_mutex_unlock(&mailbox->mailbox.mutex);
ecl_return1(env, output);
}
static void
store_message(cl_object mailbox, cl_object msg)
{
cl_fixnum ndx = mailbox->mailbox.write_pointer++;
if (mailbox->mailbox.write_pointer >= mailbox->mailbox.data->vector.dim) {
mailbox->mailbox.write_pointer = 0;
}
mailbox->mailbox.data->vector.self.t[ndx] = msg;
mailbox->mailbox.message_count++;
ecl_cond_var_signal(&mailbox->mailbox.reader_cv);
}
cl_object
mp_mailbox_send(cl_object mailbox, cl_object msg)
{
cl_env_ptr env = ecl_process_env();
cl_fixnum ndx;
unlikely_if (ecl_t_of(mailbox) != t_mailbox) {
FEerror_not_a_mailbox(mailbox);
FEwrong_type_only_arg(@[mp::mailbox-send], mailbox, @[mp::mailbox]);
}
mp_wait_on_semaphore(mailbox->mailbox.writer_semaphore);
ecl_mutex_lock(&mailbox->mailbox.mutex);
{
ndx = AO_fetch_and_add1((AO_t*)&mailbox->mailbox.write_pointer) &
mailbox->mailbox.mask;
mailbox->mailbox.data->vector.self.t[ndx] = msg;
while (mailbox->mailbox.message_count == mailbox->mailbox.data->vector.dim) {
ecl_cond_var_wait(&mailbox->mailbox.writer_cv, &mailbox->mailbox.mutex);
}
store_message(mailbox, msg);
}
mp_signal_semaphore(1, mailbox->mailbox.reader_semaphore);
ecl_return0(env);
ecl_mutex_unlock(&mailbox->mailbox.mutex);
ecl_return1(env, msg);
}
cl_object
@ -142,18 +162,19 @@ mp_mailbox_try_send(cl_object mailbox, cl_object msg)
{
cl_env_ptr env = ecl_process_env();
cl_object output;
cl_fixnum ndx;
unlikely_if (ecl_t_of(mailbox) != t_mailbox) {
FEerror_not_a_mailbox(mailbox);
FEwrong_type_only_arg(@[mp::mailbox-try-send], mailbox, @[mp::mailbox]);
}
output = mp_try_get_semaphore(mailbox->mailbox.writer_semaphore);
if (output != ECL_NIL) {
output = msg;
ndx = AO_fetch_and_add1((AO_t*)&mailbox->mailbox.write_pointer) &
mailbox->mailbox.mask;
mailbox->mailbox.data->vector.self.t[ndx] = msg;
mp_signal_semaphore(1, mailbox->mailbox.reader_semaphore);
ecl_mutex_lock(&mailbox->mailbox.mutex);
{
if (mailbox->mailbox.message_count == mailbox->mailbox.data->vector.dim) {
output = ECL_NIL;
} else {
store_message(mailbox, msg);
output = msg;
}
}
ecl_mutex_unlock(&mailbox->mailbox.mutex);
ecl_return1(env, output);
}

View file

@ -29,9 +29,6 @@
# if !defined(AO_HAVE_compare_and_swap)
# error "ECL needs AO_compare_and_swap or an equivalent"
# endif
# if !defined(AO_HAVE_fetch_and_add1)
# error "Cannot implement mailboxs without AO_fetch_and_add1"
# endif
# if !defined(AO_HAVE_fetch_and_add)
# error "ECL needs AO_fetch_and_add or an equivalent"
# endif

View file

@ -1018,11 +1018,12 @@ struct ecl_mailbox {
_ECL_HDR;
cl_object name;
cl_object data;
cl_object reader_semaphore;
cl_object writer_semaphore;
ecl_mutex_t mutex;
ecl_cond_var_t reader_cv;
ecl_cond_var_t writer_cv;
cl_index message_count;
cl_index read_pointer;
cl_index write_pointer;
cl_index mask;
};
struct ecl_rwlock {