From 23a7ade20ccf3b458e50337d4c47fb448da152aa Mon Sep 17 00:00:00 2001 From: Marius Gerbershagen Date: Mon, 7 Sep 2020 22:05:49 +0200 Subject: [PATCH] multithreading: implement mailboxes using native mutexes The old implementation was not race condition free. If two threads (A and B) were writing at the same time while one thread (C) was reading, the following could happen: 1. thread A increases the write pointer (but does not store the message yet) 2. thread B increases the write pointer, stores the message and signals thread C 3. thread C tries to read from the location that thread A has not yet written to The new implementation is a simple and obvious solution using a common mutex and two condition variables for reading/writing. We don't bother with a (complex) interrupt safe implementation. --- src/c/alloc_2.d | 16 +++-- src/c/threads/mailbox.d | 125 +++++++++++++++++++++++----------------- src/h/ecl_atomics.h | 3 - src/h/object.h | 7 ++- 4 files changed, 88 insertions(+), 63 deletions(-) diff --git a/src/c/alloc_2.d b/src/c/alloc_2.d index 144748e6c..03bacf67a 100644 --- a/src/c/alloc_2.d +++ b/src/c/alloc_2.d @@ -466,8 +466,6 @@ cl_object_mark_proc(void *addr, struct GC_ms_entry *msp, struct GC_ms_entry *msl case t_mailbox: MAYBE_MARK(o->mailbox.data); MAYBE_MARK(o->mailbox.name); - MAYBE_MARK(o->mailbox.reader_semaphore); - MAYBE_MARK(o->mailbox.writer_semaphore); break; # endif case t_codeblock: @@ -1027,9 +1025,7 @@ init_alloc(void) to_bitmap(&o, &(o.barrier.name)); type_info[t_mailbox].descriptor = to_bitmap(&o, &(o.mailbox.name)) | - to_bitmap(&o, &(o.mailbox.data)) | - to_bitmap(&o, &(o.mailbox.reader_semaphore)) | - to_bitmap(&o, &(o.mailbox.writer_semaphore)); + to_bitmap(&o, &(o.mailbox.data)); # endif type_info[t_codeblock].descriptor = to_bitmap(&o, &(o.cblock.data)) | @@ -1135,6 +1131,15 @@ standard_finalizer(cl_object o) ecl_enable_interrupts_env(the_env); break; } + case t_mailbox: { + const cl_env_ptr the_env = ecl_process_env(); + ecl_disable_interrupts_env(the_env); + ecl_mutex_destroy(&o->mailbox.mutex); + ecl_cond_var_destroy(&o->mailbox.reader_cv); + ecl_cond_var_destroy(&o->mailbox.writer_cv); + ecl_enable_interrupts_env(the_env); + break; + } # ifdef ECL_RWLOCK case t_rwlock: { const cl_env_ptr the_env = ecl_process_env(); @@ -1186,6 +1191,7 @@ register_finalizer(cl_object o, void *finalized_object, case t_condition_variable: case t_barrier: case t_semaphore: + case t_mailbox: # if defined(ECL_RWLOCK) case t_rwlock: # endif diff --git a/src/c/threads/mailbox.d b/src/c/threads/mailbox.d index 3d89f516e..556166613 100755 --- a/src/c/threads/mailbox.d +++ b/src/c/threads/mailbox.d @@ -13,22 +13,13 @@ #include #include -static ECL_INLINE void -FEerror_not_a_mailbox(cl_object mailbox) -{ - FEwrong_type_argument(@'mp::mailbox', mailbox); -} +/* NOTE: The mailbox functions are not interrupt safe. */ cl_object ecl_make_mailbox(cl_object name, cl_fixnum count) { + cl_env_ptr env = ecl_process_env(); cl_object output = ecl_alloc_object(t_mailbox); - cl_fixnum mask; - for (mask = 1; mask < count; mask <<= 1) {} - if (mask == 1) - mask = 63; - count = mask; - mask = count - 1; output->mailbox.name = name; output->mailbox.data = si_make_vector(ECL_T, /* element type */ ecl_make_fixnum(count), /* size */ @@ -36,13 +27,15 @@ ecl_make_mailbox(cl_object name, cl_fixnum count) ECL_NIL, /* fill pointer */ ECL_NIL, /* displaced to */ ECL_NIL); /* displacement */ - output->mailbox.reader_semaphore = - ecl_make_semaphore(name, 0); - output->mailbox.writer_semaphore = - ecl_make_semaphore(name, count); + output->mailbox.message_count = 0; output->mailbox.read_pointer = 0; output->mailbox.write_pointer = 0; - output->mailbox.mask = mask; + ecl_disable_interrupts_env(env); + ecl_mutex_init(&output->mailbox.mutex, FALSE); + ecl_cond_var_init(&output->mailbox.reader_cv); + ecl_cond_var_init(&output->mailbox.writer_cv); + ecl_set_finalizer_unprotected(output, ECL_T); + ecl_enable_interrupts_env(env); return output; } @@ -56,7 +49,7 @@ mp_mailbox_name(cl_object mailbox) { cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(mailbox) != t_mailbox) { - FEerror_not_a_mailbox(mailbox); + FEwrong_type_only_arg(@[mp::mailbox-name], mailbox, @[mp::mailbox]); } ecl_return1(env, mailbox->mailbox.name); } @@ -66,7 +59,7 @@ mp_mailbox_count(cl_object mailbox) { cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(mailbox) != t_mailbox) { - FEerror_not_a_mailbox(mailbox); + FEwrong_type_only_arg(@[mp::mailbox-count], mailbox, @[mp::mailbox]); } ecl_return1(env, ecl_make_fixnum(mailbox->mailbox.data->vector.dim)); } @@ -76,27 +69,41 @@ mp_mailbox_empty_p(cl_object mailbox) { cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(mailbox) != t_mailbox) { - FEerror_not_a_mailbox(mailbox); + FEwrong_type_only_arg(@[mp::mailbox-empty-p], mailbox, @[mp::mailbox]); } - ecl_return1(env, mailbox->mailbox.reader_semaphore->semaphore.counter? ECL_NIL : ECL_T); + ecl_return1(env, mailbox->mailbox.message_count? ECL_NIL : ECL_T); +} + +static cl_object +read_message(cl_object mailbox) +{ + cl_object output; + cl_fixnum ndx = mailbox->mailbox.read_pointer++; + if (mailbox->mailbox.read_pointer >= mailbox->mailbox.data->vector.dim) { + mailbox->mailbox.read_pointer = 0; + } + output = mailbox->mailbox.data->vector.self.t[ndx]; + mailbox->mailbox.message_count--; + ecl_cond_var_signal(&mailbox->mailbox.writer_cv); + return output; } cl_object mp_mailbox_read(cl_object mailbox) { cl_env_ptr env = ecl_process_env(); - cl_fixnum ndx; cl_object output; unlikely_if (ecl_t_of(mailbox) != t_mailbox) { - FEerror_not_a_mailbox(mailbox); + FEwrong_type_only_arg(@[mp::mailbox-read], mailbox, @[mp::mailbox]); } - mp_wait_on_semaphore(mailbox->mailbox.reader_semaphore); + ecl_mutex_lock(&mailbox->mailbox.mutex); { - ndx = AO_fetch_and_add1((AO_t*)&mailbox->mailbox.read_pointer) & - mailbox->mailbox.mask; - output = mailbox->mailbox.data->vector.self.t[ndx]; + while (mailbox->mailbox.message_count == 0) { + ecl_cond_var_wait(&mailbox->mailbox.reader_cv, &mailbox->mailbox.mutex); + } + output = read_message(mailbox); } - mp_signal_semaphore(1, mailbox->mailbox.writer_semaphore); + ecl_mutex_unlock(&mailbox->mailbox.mutex); ecl_return1(env, output); } @@ -104,37 +111,50 @@ cl_object mp_mailbox_try_read(cl_object mailbox) { cl_env_ptr env = ecl_process_env(); - cl_fixnum ndx; cl_object output; unlikely_if (ecl_t_of(mailbox) != t_mailbox) { - FEerror_not_a_mailbox(mailbox); + FEwrong_type_only_arg(@[mp::mailbox-try-read], mailbox, @[mp::mailbox]); } - output = mp_try_get_semaphore(mailbox->mailbox.reader_semaphore); - if (output != ECL_NIL) { - ndx = AO_fetch_and_add1((AO_t*)&mailbox->mailbox.read_pointer) & - mailbox->mailbox.mask; - output = mailbox->mailbox.data->vector.self.t[ndx]; - mp_signal_semaphore(1, mailbox->mailbox.writer_semaphore); + ecl_mutex_lock(&mailbox->mailbox.mutex); + { + if (mailbox->mailbox.message_count == 0) { + output = ECL_NIL; + } else { + output = read_message(mailbox); + } } + ecl_mutex_unlock(&mailbox->mailbox.mutex); ecl_return1(env, output); } +static void +store_message(cl_object mailbox, cl_object msg) +{ + cl_fixnum ndx = mailbox->mailbox.write_pointer++; + if (mailbox->mailbox.write_pointer >= mailbox->mailbox.data->vector.dim) { + mailbox->mailbox.write_pointer = 0; + } + mailbox->mailbox.data->vector.self.t[ndx] = msg; + mailbox->mailbox.message_count++; + ecl_cond_var_signal(&mailbox->mailbox.reader_cv); +} + cl_object mp_mailbox_send(cl_object mailbox, cl_object msg) { cl_env_ptr env = ecl_process_env(); - cl_fixnum ndx; unlikely_if (ecl_t_of(mailbox) != t_mailbox) { - FEerror_not_a_mailbox(mailbox); + FEwrong_type_only_arg(@[mp::mailbox-send], mailbox, @[mp::mailbox]); } - mp_wait_on_semaphore(mailbox->mailbox.writer_semaphore); + ecl_mutex_lock(&mailbox->mailbox.mutex); { - ndx = AO_fetch_and_add1((AO_t*)&mailbox->mailbox.write_pointer) & - mailbox->mailbox.mask; - mailbox->mailbox.data->vector.self.t[ndx] = msg; + while (mailbox->mailbox.message_count == mailbox->mailbox.data->vector.dim) { + ecl_cond_var_wait(&mailbox->mailbox.writer_cv, &mailbox->mailbox.mutex); + } + store_message(mailbox, msg); } - mp_signal_semaphore(1, mailbox->mailbox.reader_semaphore); - ecl_return0(env); + ecl_mutex_unlock(&mailbox->mailbox.mutex); + ecl_return1(env, msg); } cl_object @@ -142,18 +162,19 @@ mp_mailbox_try_send(cl_object mailbox, cl_object msg) { cl_env_ptr env = ecl_process_env(); cl_object output; - cl_fixnum ndx; unlikely_if (ecl_t_of(mailbox) != t_mailbox) { - FEerror_not_a_mailbox(mailbox); + FEwrong_type_only_arg(@[mp::mailbox-try-send], mailbox, @[mp::mailbox]); } - output = mp_try_get_semaphore(mailbox->mailbox.writer_semaphore); - if (output != ECL_NIL) { - output = msg; - ndx = AO_fetch_and_add1((AO_t*)&mailbox->mailbox.write_pointer) & - mailbox->mailbox.mask; - mailbox->mailbox.data->vector.self.t[ndx] = msg; - mp_signal_semaphore(1, mailbox->mailbox.reader_semaphore); + ecl_mutex_lock(&mailbox->mailbox.mutex); + { + if (mailbox->mailbox.message_count == mailbox->mailbox.data->vector.dim) { + output = ECL_NIL; + } else { + store_message(mailbox, msg); + output = msg; + } } + ecl_mutex_unlock(&mailbox->mailbox.mutex); ecl_return1(env, output); } diff --git a/src/h/ecl_atomics.h b/src/h/ecl_atomics.h index eb9243f92..ee1c6c86c 100644 --- a/src/h/ecl_atomics.h +++ b/src/h/ecl_atomics.h @@ -29,9 +29,6 @@ # if !defined(AO_HAVE_compare_and_swap) # error "ECL needs AO_compare_and_swap or an equivalent" # endif -# if !defined(AO_HAVE_fetch_and_add1) -# error "Cannot implement mailboxs without AO_fetch_and_add1" -# endif # if !defined(AO_HAVE_fetch_and_add) # error "ECL needs AO_fetch_and_add or an equivalent" # endif diff --git a/src/h/object.h b/src/h/object.h index ec969fa6f..6d6b58120 100644 --- a/src/h/object.h +++ b/src/h/object.h @@ -1018,11 +1018,12 @@ struct ecl_mailbox { _ECL_HDR; cl_object name; cl_object data; - cl_object reader_semaphore; - cl_object writer_semaphore; + ecl_mutex_t mutex; + ecl_cond_var_t reader_cv; + ecl_cond_var_t writer_cv; + cl_index message_count; cl_index read_pointer; cl_index write_pointer; - cl_index mask; }; struct ecl_rwlock {