diff --git a/src/c/alloc_2.d b/src/c/alloc_2.d index 144748e6c..03bacf67a 100644 --- a/src/c/alloc_2.d +++ b/src/c/alloc_2.d @@ -466,8 +466,6 @@ cl_object_mark_proc(void *addr, struct GC_ms_entry *msp, struct GC_ms_entry *msl case t_mailbox: MAYBE_MARK(o->mailbox.data); MAYBE_MARK(o->mailbox.name); - MAYBE_MARK(o->mailbox.reader_semaphore); - MAYBE_MARK(o->mailbox.writer_semaphore); break; # endif case t_codeblock: @@ -1027,9 +1025,7 @@ init_alloc(void) to_bitmap(&o, &(o.barrier.name)); type_info[t_mailbox].descriptor = to_bitmap(&o, &(o.mailbox.name)) | - to_bitmap(&o, &(o.mailbox.data)) | - to_bitmap(&o, &(o.mailbox.reader_semaphore)) | - to_bitmap(&o, &(o.mailbox.writer_semaphore)); + to_bitmap(&o, &(o.mailbox.data)); # endif type_info[t_codeblock].descriptor = to_bitmap(&o, &(o.cblock.data)) | @@ -1135,6 +1131,15 @@ standard_finalizer(cl_object o) ecl_enable_interrupts_env(the_env); break; } + case t_mailbox: { + const cl_env_ptr the_env = ecl_process_env(); + ecl_disable_interrupts_env(the_env); + ecl_mutex_destroy(&o->mailbox.mutex); + ecl_cond_var_destroy(&o->mailbox.reader_cv); + ecl_cond_var_destroy(&o->mailbox.writer_cv); + ecl_enable_interrupts_env(the_env); + break; + } # ifdef ECL_RWLOCK case t_rwlock: { const cl_env_ptr the_env = ecl_process_env(); @@ -1186,6 +1191,7 @@ register_finalizer(cl_object o, void *finalized_object, case t_condition_variable: case t_barrier: case t_semaphore: + case t_mailbox: # if defined(ECL_RWLOCK) case t_rwlock: # endif diff --git a/src/c/threads/mailbox.d b/src/c/threads/mailbox.d index 3d89f516e..556166613 100755 --- a/src/c/threads/mailbox.d +++ b/src/c/threads/mailbox.d @@ -13,22 +13,13 @@ #include #include -static ECL_INLINE void -FEerror_not_a_mailbox(cl_object mailbox) -{ - FEwrong_type_argument(@'mp::mailbox', mailbox); -} +/* NOTE: The mailbox functions are not interrupt safe. */ cl_object ecl_make_mailbox(cl_object name, cl_fixnum count) { + cl_env_ptr env = ecl_process_env(); cl_object output = ecl_alloc_object(t_mailbox); - cl_fixnum mask; - for (mask = 1; mask < count; mask <<= 1) {} - if (mask == 1) - mask = 63; - count = mask; - mask = count - 1; output->mailbox.name = name; output->mailbox.data = si_make_vector(ECL_T, /* element type */ ecl_make_fixnum(count), /* size */ @@ -36,13 +27,15 @@ ecl_make_mailbox(cl_object name, cl_fixnum count) ECL_NIL, /* fill pointer */ ECL_NIL, /* displaced to */ ECL_NIL); /* displacement */ - output->mailbox.reader_semaphore = - ecl_make_semaphore(name, 0); - output->mailbox.writer_semaphore = - ecl_make_semaphore(name, count); + output->mailbox.message_count = 0; output->mailbox.read_pointer = 0; output->mailbox.write_pointer = 0; - output->mailbox.mask = mask; + ecl_disable_interrupts_env(env); + ecl_mutex_init(&output->mailbox.mutex, FALSE); + ecl_cond_var_init(&output->mailbox.reader_cv); + ecl_cond_var_init(&output->mailbox.writer_cv); + ecl_set_finalizer_unprotected(output, ECL_T); + ecl_enable_interrupts_env(env); return output; } @@ -56,7 +49,7 @@ mp_mailbox_name(cl_object mailbox) { cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(mailbox) != t_mailbox) { - FEerror_not_a_mailbox(mailbox); + FEwrong_type_only_arg(@[mp::mailbox-name], mailbox, @[mp::mailbox]); } ecl_return1(env, mailbox->mailbox.name); } @@ -66,7 +59,7 @@ mp_mailbox_count(cl_object mailbox) { cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(mailbox) != t_mailbox) { - FEerror_not_a_mailbox(mailbox); + FEwrong_type_only_arg(@[mp::mailbox-count], mailbox, @[mp::mailbox]); } ecl_return1(env, ecl_make_fixnum(mailbox->mailbox.data->vector.dim)); } @@ -76,27 +69,41 @@ mp_mailbox_empty_p(cl_object mailbox) { cl_env_ptr env = ecl_process_env(); unlikely_if (ecl_t_of(mailbox) != t_mailbox) { - FEerror_not_a_mailbox(mailbox); + FEwrong_type_only_arg(@[mp::mailbox-empty-p], mailbox, @[mp::mailbox]); } - ecl_return1(env, mailbox->mailbox.reader_semaphore->semaphore.counter? ECL_NIL : ECL_T); + ecl_return1(env, mailbox->mailbox.message_count? ECL_NIL : ECL_T); +} + +static cl_object +read_message(cl_object mailbox) +{ + cl_object output; + cl_fixnum ndx = mailbox->mailbox.read_pointer++; + if (mailbox->mailbox.read_pointer >= mailbox->mailbox.data->vector.dim) { + mailbox->mailbox.read_pointer = 0; + } + output = mailbox->mailbox.data->vector.self.t[ndx]; + mailbox->mailbox.message_count--; + ecl_cond_var_signal(&mailbox->mailbox.writer_cv); + return output; } cl_object mp_mailbox_read(cl_object mailbox) { cl_env_ptr env = ecl_process_env(); - cl_fixnum ndx; cl_object output; unlikely_if (ecl_t_of(mailbox) != t_mailbox) { - FEerror_not_a_mailbox(mailbox); + FEwrong_type_only_arg(@[mp::mailbox-read], mailbox, @[mp::mailbox]); } - mp_wait_on_semaphore(mailbox->mailbox.reader_semaphore); + ecl_mutex_lock(&mailbox->mailbox.mutex); { - ndx = AO_fetch_and_add1((AO_t*)&mailbox->mailbox.read_pointer) & - mailbox->mailbox.mask; - output = mailbox->mailbox.data->vector.self.t[ndx]; + while (mailbox->mailbox.message_count == 0) { + ecl_cond_var_wait(&mailbox->mailbox.reader_cv, &mailbox->mailbox.mutex); + } + output = read_message(mailbox); } - mp_signal_semaphore(1, mailbox->mailbox.writer_semaphore); + ecl_mutex_unlock(&mailbox->mailbox.mutex); ecl_return1(env, output); } @@ -104,37 +111,50 @@ cl_object mp_mailbox_try_read(cl_object mailbox) { cl_env_ptr env = ecl_process_env(); - cl_fixnum ndx; cl_object output; unlikely_if (ecl_t_of(mailbox) != t_mailbox) { - FEerror_not_a_mailbox(mailbox); + FEwrong_type_only_arg(@[mp::mailbox-try-read], mailbox, @[mp::mailbox]); } - output = mp_try_get_semaphore(mailbox->mailbox.reader_semaphore); - if (output != ECL_NIL) { - ndx = AO_fetch_and_add1((AO_t*)&mailbox->mailbox.read_pointer) & - mailbox->mailbox.mask; - output = mailbox->mailbox.data->vector.self.t[ndx]; - mp_signal_semaphore(1, mailbox->mailbox.writer_semaphore); + ecl_mutex_lock(&mailbox->mailbox.mutex); + { + if (mailbox->mailbox.message_count == 0) { + output = ECL_NIL; + } else { + output = read_message(mailbox); + } } + ecl_mutex_unlock(&mailbox->mailbox.mutex); ecl_return1(env, output); } +static void +store_message(cl_object mailbox, cl_object msg) +{ + cl_fixnum ndx = mailbox->mailbox.write_pointer++; + if (mailbox->mailbox.write_pointer >= mailbox->mailbox.data->vector.dim) { + mailbox->mailbox.write_pointer = 0; + } + mailbox->mailbox.data->vector.self.t[ndx] = msg; + mailbox->mailbox.message_count++; + ecl_cond_var_signal(&mailbox->mailbox.reader_cv); +} + cl_object mp_mailbox_send(cl_object mailbox, cl_object msg) { cl_env_ptr env = ecl_process_env(); - cl_fixnum ndx; unlikely_if (ecl_t_of(mailbox) != t_mailbox) { - FEerror_not_a_mailbox(mailbox); + FEwrong_type_only_arg(@[mp::mailbox-send], mailbox, @[mp::mailbox]); } - mp_wait_on_semaphore(mailbox->mailbox.writer_semaphore); + ecl_mutex_lock(&mailbox->mailbox.mutex); { - ndx = AO_fetch_and_add1((AO_t*)&mailbox->mailbox.write_pointer) & - mailbox->mailbox.mask; - mailbox->mailbox.data->vector.self.t[ndx] = msg; + while (mailbox->mailbox.message_count == mailbox->mailbox.data->vector.dim) { + ecl_cond_var_wait(&mailbox->mailbox.writer_cv, &mailbox->mailbox.mutex); + } + store_message(mailbox, msg); } - mp_signal_semaphore(1, mailbox->mailbox.reader_semaphore); - ecl_return0(env); + ecl_mutex_unlock(&mailbox->mailbox.mutex); + ecl_return1(env, msg); } cl_object @@ -142,18 +162,19 @@ mp_mailbox_try_send(cl_object mailbox, cl_object msg) { cl_env_ptr env = ecl_process_env(); cl_object output; - cl_fixnum ndx; unlikely_if (ecl_t_of(mailbox) != t_mailbox) { - FEerror_not_a_mailbox(mailbox); + FEwrong_type_only_arg(@[mp::mailbox-try-send], mailbox, @[mp::mailbox]); } - output = mp_try_get_semaphore(mailbox->mailbox.writer_semaphore); - if (output != ECL_NIL) { - output = msg; - ndx = AO_fetch_and_add1((AO_t*)&mailbox->mailbox.write_pointer) & - mailbox->mailbox.mask; - mailbox->mailbox.data->vector.self.t[ndx] = msg; - mp_signal_semaphore(1, mailbox->mailbox.reader_semaphore); + ecl_mutex_lock(&mailbox->mailbox.mutex); + { + if (mailbox->mailbox.message_count == mailbox->mailbox.data->vector.dim) { + output = ECL_NIL; + } else { + store_message(mailbox, msg); + output = msg; + } } + ecl_mutex_unlock(&mailbox->mailbox.mutex); ecl_return1(env, output); } diff --git a/src/h/ecl_atomics.h b/src/h/ecl_atomics.h index eb9243f92..ee1c6c86c 100644 --- a/src/h/ecl_atomics.h +++ b/src/h/ecl_atomics.h @@ -29,9 +29,6 @@ # if !defined(AO_HAVE_compare_and_swap) # error "ECL needs AO_compare_and_swap or an equivalent" # endif -# if !defined(AO_HAVE_fetch_and_add1) -# error "Cannot implement mailboxs without AO_fetch_and_add1" -# endif # if !defined(AO_HAVE_fetch_and_add) # error "ECL needs AO_fetch_and_add or an equivalent" # endif diff --git a/src/h/object.h b/src/h/object.h index ec969fa6f..6d6b58120 100644 --- a/src/h/object.h +++ b/src/h/object.h @@ -1018,11 +1018,12 @@ struct ecl_mailbox { _ECL_HDR; cl_object name; cl_object data; - cl_object reader_semaphore; - cl_object writer_semaphore; + ecl_mutex_t mutex; + ecl_cond_var_t reader_cv; + ecl_cond_var_t writer_cv; + cl_index message_count; cl_index read_pointer; cl_index write_pointer; - cl_index mask; }; struct ecl_rwlock {