diff --git a/src/c/symbols_list.h b/src/c/symbols_list.h index c46ad787b..92361f796 100755 --- a/src/c/symbols_list.h +++ b/src/c/symbols_list.h @@ -1640,8 +1640,9 @@ cl_symbols[] = { {MP_ "SEMAPHORE" ECL_FUN(NULL, NULL, -1) ECL_VAR(MP_ORDINARY, OBJNULL)}, {MP_ "MAKE-SEMAPHORE" ECL_FUN("mp_make_semaphore", IF_MP(mp_make_semaphore), -1) ECL_VAR(MP_ORDINARY, OBJNULL)}, {MP_ "SIGNAL-SEMAPHORE" ECL_FUN("mp_signal_semaphore", IF_MP(mp_signal_semaphore), -2) ECL_VAR(MP_ORDINARY, OBJNULL)}, -{MP_ "WAIT-ON-SEMAPHORE" ECL_FUN("mp_wait_on_semaphore", IF_MP(mp_wait_on_semaphore), 1) ECL_VAR(MP_ORDINARY, OBJNULL)}, -{MP_ "TRY-GET-SEMAPHORE" ECL_FUN("mp_try_get_semaphore", IF_MP(mp_try_get_semaphore), 1) ECL_VAR(MP_ORDINARY, OBJNULL)}, +{MP_ "SEMAPHORE-WAIT" ECL_FUN("mp_semaphore_wait", IF_MP(mp_semaphore_wait), 3) ECL_VAR(MP_ORDINARY, OBJNULL)}, +{MP_ "WAIT-ON-SEMAPHORE" ECL_FUN("mp_wait_on_semaphore", IF_MP(mp_wait_on_semaphore), -2) ECL_VAR(MP_ORDINARY, OBJNULL)}, +{MP_ "TRY-GET-SEMAPHORE" ECL_FUN("mp_try_get_semaphore", IF_MP(mp_try_get_semaphore), -2) ECL_VAR(MP_ORDINARY, OBJNULL)}, {MP_ "SEMAPHORE-COUNT" ECL_FUN("mp_semaphore_count", IF_MP(mp_semaphore_count), 1) ECL_VAR(MP_ORDINARY, OBJNULL)}, {MP_ "SEMAPHORE-NAME" ECL_FUN("mp_semaphore_name", IF_MP(mp_semaphore_name), 1) ECL_VAR(MP_ORDINARY, OBJNULL)}, {MP_ "SEMAPHORE-WAIT-COUNT" ECL_FUN("mp_semaphore_wait_count", IF_MP(mp_semaphore_wait_count), 1) ECL_VAR(MP_ORDINARY, OBJNULL)}, diff --git a/src/c/threads/semaphore.d b/src/c/threads/semaphore.d index 86c191ca8..69f32dbbb 100644 --- a/src/c/threads/semaphore.d +++ b/src/c/threads/semaphore.d @@ -82,31 +82,67 @@ mp_semaphore_wait_count(cl_object semaphore) @(return); } @) +static inline void +semaphore_wait_unprotected(cl_object semaphore, cl_object count, cl_object timeout) +{ + int rc; + cl_env_ptr the_env = ecl_process_env(); + cl_fixnum counter = fixnnint(count); + ecl_mutex_t *mutex = &semaphore->semaphore.mutex; + ecl_cond_var_t *cv = &semaphore->semaphore.cv; + if (timeout == ECL_NIL) { + do { + ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_T); + ecl_cond_var_wait(cv, mutex); + ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_NIL); + } while (semaphore->semaphore.counter < counter); + } else { + cl_object deadline = ecl_plus(cl_get_internal_real_time(), + ecl_times(timeout, ecl_make_fixnum(1000))); + double seconds = ecl_to_double(timeout); + do { + ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_T); + rc = ecl_cond_var_timedwait(cv, mutex, seconds); + ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_NIL); + timeout = ecl_minus(deadline, cl_get_internal_real_time()); + seconds = ecl_to_double(timeout); + } while(semaphore->semaphore.counter < counter + && rc != ECL_MUTEX_TIMEOUT + && seconds >= 0); + } +} + cl_object -mp_wait_on_semaphore(cl_object semaphore) +mp_semaphore_wait(cl_object semaphore, cl_object count, cl_object timeout) { cl_env_ptr the_env = ecl_process_env(); + cl_fixnum counter = fixnnint(count); volatile cl_object output; unlikely_if (ecl_t_of(semaphore) != t_semaphore) { - FEwrong_type_only_arg(@[mp::wait-on-semaphore], semaphore, @[mp::semaphore]); + FEwrong_type_only_arg(@[mp::semaphore-wait], semaphore, @[mp::semaphore]); } ecl_bds_bind(the_env, ECL_INTERRUPTS_ENABLED, ECL_NIL); ecl_mutex_lock(&semaphore->semaphore.mutex); - if (semaphore->semaphore.counter == 0) { + if (semaphore->semaphore.counter >= counter) { + output = ecl_make_fixnum(semaphore->semaphore.counter); + semaphore->semaphore.counter -= counter; + ecl_mutex_unlock(&semaphore->semaphore.mutex); + } else if (timeout == ECL_NIL || ecl_plusp(timeout)) { semaphore->semaphore.wait_count++; ECL_UNWIND_PROTECT_BEGIN(the_env) { - do { - ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_T); - ecl_cond_var_wait(&semaphore->semaphore.cv, &semaphore->semaphore.mutex); - ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_NIL); - } while (semaphore->semaphore.counter == 0); - output = ecl_make_fixnum(semaphore->semaphore.counter--); + semaphore_wait_unprotected(semaphore, count, timeout); + if (semaphore->semaphore.counter >= counter) { + output = ecl_make_fixnum(semaphore->semaphore.counter); + semaphore->semaphore.counter -= counter; + } else { + output = ECL_NIL; + } } ECL_UNWIND_PROTECT_THREAD_SAFE_EXIT { semaphore->semaphore.wait_count--; ecl_mutex_unlock(&semaphore->semaphore.mutex); } ECL_UNWIND_PROTECT_THREAD_SAFE_END; } else { - output = ecl_make_fixnum(semaphore->semaphore.counter--); + output = ECL_NIL; ecl_mutex_unlock(&semaphore->semaphore.mutex); } ecl_bds_unwind1(the_env); @@ -114,23 +150,16 @@ mp_wait_on_semaphore(cl_object semaphore) ecl_return1(the_env, output); } -cl_object -mp_try_get_semaphore(cl_object semaphore) -{ - cl_env_ptr the_env = ecl_process_env(); - cl_object output; - unlikely_if (ecl_t_of(semaphore) != t_semaphore) { - FEwrong_type_only_arg(@[mp::try-get-semaphore], semaphore, @[mp::semaphore]); - } - ecl_disable_interrupts_env(the_env); - ecl_mutex_lock(&semaphore->semaphore.mutex); - if (semaphore->semaphore.counter > 0) { - output = ecl_make_fixnum(semaphore->semaphore.counter--); - } else { - output = ECL_NIL; - } - ecl_mutex_unlock(&semaphore->semaphore.mutex); - ecl_enable_interrupts_env(the_env); - ecl_return1(the_env, output); -} +@(defun mp::wait-on-semaphore (semaphore &key (count ecl_make_fixnum(1)) + (timeout ECL_NIL)) + @ { + cl_object output = mp_semaphore_wait(semaphore, count, timeout); + @(return output); + } @) +@(defun mp::try-get-semaphore (semaphore &optional (count ecl_make_fixnum(1))) + @ { + cl_object timeout = ecl_make_fixnum(0); + cl_object output = mp_semaphore_wait(semaphore, count, timeout); + @(return output); + } @) diff --git a/src/cmp/proclamations.lsp b/src/cmp/proclamations.lsp index 2a52a1eff..15a8ccfb3 100644 --- a/src/cmp/proclamations.lsp +++ b/src/cmp/proclamations.lsp @@ -1513,8 +1513,9 @@ #+threads (proclamation mp:semaphore-name (mp:semaphore) t :reader) #+threads (proclamation mp:semaphore-count (mp:semaphore) fixnum :reader) #+threads (proclamation mp:semaphore-wait-count (mp:semaphore) natural :reader) -#+threads (proclamation mp:wait-on-semaphore (mp:semaphore) fixnum) -#+threads (proclamation mp:try-get-semaphore (mp:semaphore) t) +#+threads (proclamation mp:semaphore-wait (mp:semaphore fixnum real) t) +#+threads (proclamation mp:wait-on-semaphore (mp:semaphore &key) t) +#+threads (proclamation mp:try-get-semaphore (mp:semaphore &optional fixnum) t) #+threads (proclamation mp:signal-semaphore (mp:semaphore &optional fixnum) t) ;;; diff --git a/src/h/external.h b/src/h/external.h index 4a1a8692d..8066d7827 100755 --- a/src/h/external.h +++ b/src/h/external.h @@ -1816,9 +1816,10 @@ extern ECL_API cl_object mp_make_semaphore _ECL_ARGS((cl_narg, ...)); extern ECL_API cl_object mp_semaphore_count(cl_object); extern ECL_API cl_object mp_semaphore_name(cl_object); extern ECL_API cl_object mp_semaphore_wait_count(cl_object); -extern ECL_API cl_object mp_wait_on_semaphore(cl_object); -extern ECL_API cl_object mp_try_get_semaphore(cl_object); -extern ECL_API cl_object mp_signal_semaphore _ECL_ARGS((cl_narg, cl_object, ...)); +extern ECL_API cl_object mp_semaphore_wait(cl_object, cl_object, cl_object); +extern ECL_API cl_object mp_wait_on_semaphore _ECL_ARGS((cl_narg, cl_object, ...)); +extern ECL_API cl_object mp_try_get_semaphore _ECL_ARGS((cl_narg, cl_object, ...)); +extern ECL_API cl_object mp_signal_semaphore _ECL_ARGS((cl_narg, cl_object, ...)); extern ECL_API cl_object ecl_make_semaphore(cl_object name, cl_fixnum count); /* threads/barrier.d */