From bf62cd9d40862c2bcf98acf42e21a29a80d532a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Kochma=C5=84ski?= Date: Tue, 19 Oct 2021 11:16:18 +0200 Subject: [PATCH 1/2] mp: semaphores: add a new function semaphore-wait This function offers a functionality similar to sbcl, namely allows specifying the timeout and the resource count. --- src/c/symbols_list.h | 5 ++- src/c/threads/semaphore.d | 87 ++++++++++++++++++++++++++------------- src/cmp/proclamations.lsp | 5 ++- src/h/external.h | 7 ++-- 4 files changed, 68 insertions(+), 36 deletions(-) diff --git a/src/c/symbols_list.h b/src/c/symbols_list.h index c46ad787b..92361f796 100755 --- a/src/c/symbols_list.h +++ b/src/c/symbols_list.h @@ -1640,8 +1640,9 @@ cl_symbols[] = { {MP_ "SEMAPHORE" ECL_FUN(NULL, NULL, -1) ECL_VAR(MP_ORDINARY, OBJNULL)}, {MP_ "MAKE-SEMAPHORE" ECL_FUN("mp_make_semaphore", IF_MP(mp_make_semaphore), -1) ECL_VAR(MP_ORDINARY, OBJNULL)}, {MP_ "SIGNAL-SEMAPHORE" ECL_FUN("mp_signal_semaphore", IF_MP(mp_signal_semaphore), -2) ECL_VAR(MP_ORDINARY, OBJNULL)}, -{MP_ "WAIT-ON-SEMAPHORE" ECL_FUN("mp_wait_on_semaphore", IF_MP(mp_wait_on_semaphore), 1) ECL_VAR(MP_ORDINARY, OBJNULL)}, -{MP_ "TRY-GET-SEMAPHORE" ECL_FUN("mp_try_get_semaphore", IF_MP(mp_try_get_semaphore), 1) ECL_VAR(MP_ORDINARY, OBJNULL)}, +{MP_ "SEMAPHORE-WAIT" ECL_FUN("mp_semaphore_wait", IF_MP(mp_semaphore_wait), 3) ECL_VAR(MP_ORDINARY, OBJNULL)}, +{MP_ "WAIT-ON-SEMAPHORE" ECL_FUN("mp_wait_on_semaphore", IF_MP(mp_wait_on_semaphore), -2) ECL_VAR(MP_ORDINARY, OBJNULL)}, +{MP_ "TRY-GET-SEMAPHORE" ECL_FUN("mp_try_get_semaphore", IF_MP(mp_try_get_semaphore), -2) ECL_VAR(MP_ORDINARY, OBJNULL)}, {MP_ "SEMAPHORE-COUNT" ECL_FUN("mp_semaphore_count", IF_MP(mp_semaphore_count), 1) ECL_VAR(MP_ORDINARY, OBJNULL)}, {MP_ "SEMAPHORE-NAME" ECL_FUN("mp_semaphore_name", IF_MP(mp_semaphore_name), 1) ECL_VAR(MP_ORDINARY, OBJNULL)}, {MP_ "SEMAPHORE-WAIT-COUNT" ECL_FUN("mp_semaphore_wait_count", IF_MP(mp_semaphore_wait_count), 1) ECL_VAR(MP_ORDINARY, OBJNULL)}, diff --git a/src/c/threads/semaphore.d b/src/c/threads/semaphore.d index 86c191ca8..69f32dbbb 100644 --- a/src/c/threads/semaphore.d +++ b/src/c/threads/semaphore.d @@ -82,31 +82,67 @@ mp_semaphore_wait_count(cl_object semaphore) @(return); } @) +static inline void +semaphore_wait_unprotected(cl_object semaphore, cl_object count, cl_object timeout) +{ + int rc; + cl_env_ptr the_env = ecl_process_env(); + cl_fixnum counter = fixnnint(count); + ecl_mutex_t *mutex = &semaphore->semaphore.mutex; + ecl_cond_var_t *cv = &semaphore->semaphore.cv; + if (timeout == ECL_NIL) { + do { + ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_T); + ecl_cond_var_wait(cv, mutex); + ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_NIL); + } while (semaphore->semaphore.counter < counter); + } else { + cl_object deadline = ecl_plus(cl_get_internal_real_time(), + ecl_times(timeout, ecl_make_fixnum(1000))); + double seconds = ecl_to_double(timeout); + do { + ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_T); + rc = ecl_cond_var_timedwait(cv, mutex, seconds); + ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_NIL); + timeout = ecl_minus(deadline, cl_get_internal_real_time()); + seconds = ecl_to_double(timeout); + } while(semaphore->semaphore.counter < counter + && rc != ECL_MUTEX_TIMEOUT + && seconds >= 0); + } +} + cl_object -mp_wait_on_semaphore(cl_object semaphore) +mp_semaphore_wait(cl_object semaphore, cl_object count, cl_object timeout) { cl_env_ptr the_env = ecl_process_env(); + cl_fixnum counter = fixnnint(count); volatile cl_object output; unlikely_if (ecl_t_of(semaphore) != t_semaphore) { - FEwrong_type_only_arg(@[mp::wait-on-semaphore], semaphore, @[mp::semaphore]); + FEwrong_type_only_arg(@[mp::semaphore-wait], semaphore, @[mp::semaphore]); } ecl_bds_bind(the_env, ECL_INTERRUPTS_ENABLED, ECL_NIL); ecl_mutex_lock(&semaphore->semaphore.mutex); - if (semaphore->semaphore.counter == 0) { + if (semaphore->semaphore.counter >= counter) { + output = ecl_make_fixnum(semaphore->semaphore.counter); + semaphore->semaphore.counter -= counter; + ecl_mutex_unlock(&semaphore->semaphore.mutex); + } else if (timeout == ECL_NIL || ecl_plusp(timeout)) { semaphore->semaphore.wait_count++; ECL_UNWIND_PROTECT_BEGIN(the_env) { - do { - ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_T); - ecl_cond_var_wait(&semaphore->semaphore.cv, &semaphore->semaphore.mutex); - ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_NIL); - } while (semaphore->semaphore.counter == 0); - output = ecl_make_fixnum(semaphore->semaphore.counter--); + semaphore_wait_unprotected(semaphore, count, timeout); + if (semaphore->semaphore.counter >= counter) { + output = ecl_make_fixnum(semaphore->semaphore.counter); + semaphore->semaphore.counter -= counter; + } else { + output = ECL_NIL; + } } ECL_UNWIND_PROTECT_THREAD_SAFE_EXIT { semaphore->semaphore.wait_count--; ecl_mutex_unlock(&semaphore->semaphore.mutex); } ECL_UNWIND_PROTECT_THREAD_SAFE_END; } else { - output = ecl_make_fixnum(semaphore->semaphore.counter--); + output = ECL_NIL; ecl_mutex_unlock(&semaphore->semaphore.mutex); } ecl_bds_unwind1(the_env); @@ -114,23 +150,16 @@ mp_wait_on_semaphore(cl_object semaphore) ecl_return1(the_env, output); } -cl_object -mp_try_get_semaphore(cl_object semaphore) -{ - cl_env_ptr the_env = ecl_process_env(); - cl_object output; - unlikely_if (ecl_t_of(semaphore) != t_semaphore) { - FEwrong_type_only_arg(@[mp::try-get-semaphore], semaphore, @[mp::semaphore]); - } - ecl_disable_interrupts_env(the_env); - ecl_mutex_lock(&semaphore->semaphore.mutex); - if (semaphore->semaphore.counter > 0) { - output = ecl_make_fixnum(semaphore->semaphore.counter--); - } else { - output = ECL_NIL; - } - ecl_mutex_unlock(&semaphore->semaphore.mutex); - ecl_enable_interrupts_env(the_env); - ecl_return1(the_env, output); -} +@(defun mp::wait-on-semaphore (semaphore &key (count ecl_make_fixnum(1)) + (timeout ECL_NIL)) + @ { + cl_object output = mp_semaphore_wait(semaphore, count, timeout); + @(return output); + } @) +@(defun mp::try-get-semaphore (semaphore &optional (count ecl_make_fixnum(1))) + @ { + cl_object timeout = ecl_make_fixnum(0); + cl_object output = mp_semaphore_wait(semaphore, count, timeout); + @(return output); + } @) diff --git a/src/cmp/proclamations.lsp b/src/cmp/proclamations.lsp index 2a52a1eff..15a8ccfb3 100644 --- a/src/cmp/proclamations.lsp +++ b/src/cmp/proclamations.lsp @@ -1513,8 +1513,9 @@ #+threads (proclamation mp:semaphore-name (mp:semaphore) t :reader) #+threads (proclamation mp:semaphore-count (mp:semaphore) fixnum :reader) #+threads (proclamation mp:semaphore-wait-count (mp:semaphore) natural :reader) -#+threads (proclamation mp:wait-on-semaphore (mp:semaphore) fixnum) -#+threads (proclamation mp:try-get-semaphore (mp:semaphore) t) +#+threads (proclamation mp:semaphore-wait (mp:semaphore fixnum real) t) +#+threads (proclamation mp:wait-on-semaphore (mp:semaphore &key) t) +#+threads (proclamation mp:try-get-semaphore (mp:semaphore &optional fixnum) t) #+threads (proclamation mp:signal-semaphore (mp:semaphore &optional fixnum) t) ;;; diff --git a/src/h/external.h b/src/h/external.h index 4a1a8692d..8066d7827 100755 --- a/src/h/external.h +++ b/src/h/external.h @@ -1816,9 +1816,10 @@ extern ECL_API cl_object mp_make_semaphore _ECL_ARGS((cl_narg, ...)); extern ECL_API cl_object mp_semaphore_count(cl_object); extern ECL_API cl_object mp_semaphore_name(cl_object); extern ECL_API cl_object mp_semaphore_wait_count(cl_object); -extern ECL_API cl_object mp_wait_on_semaphore(cl_object); -extern ECL_API cl_object mp_try_get_semaphore(cl_object); -extern ECL_API cl_object mp_signal_semaphore _ECL_ARGS((cl_narg, cl_object, ...)); +extern ECL_API cl_object mp_semaphore_wait(cl_object, cl_object, cl_object); +extern ECL_API cl_object mp_wait_on_semaphore _ECL_ARGS((cl_narg, cl_object, ...)); +extern ECL_API cl_object mp_try_get_semaphore _ECL_ARGS((cl_narg, cl_object, ...)); +extern ECL_API cl_object mp_signal_semaphore _ECL_ARGS((cl_narg, cl_object, ...)); extern ECL_API cl_object ecl_make_semaphore(cl_object name, cl_fixnum count); /* threads/barrier.d */ From 0660996c37a988a62f3bbb6f088aaeb6c24de5ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Kochma=C5=84ski?= Date: Tue, 19 Oct 2021 11:16:34 +0200 Subject: [PATCH 2/2] mp: semaphores: add tests and the documentation Functions wait-on-semaphore and try-get-semaphore are deprecated in favour of the new function. --- CHANGELOG | 9 +++++ src/doc/manual/extensions/mp_ref_sem.txi | 43 +++++++++++++++++----- src/tests/normal-tests/multiprocessing.lsp | 19 ++++++++++ 3 files changed, 62 insertions(+), 9 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index 612a3d398..821b26afe 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -37,6 +37,15 @@ (Windows always uses the utf-16 format provided by the OS). - Add hook functions for cl:ed via ext:*ed-functions* list) - Signal type-error in condition readers for WSCL conformance. +- Remove homegrown mutex implementation - all multithreading + synchronization objects are now implemented on top of mutex and condition + variable primitives from the operating system (no api changes) +- Timeouts for condition variables and mutex locking and semaphore wait + operations +** API changes +- A new function ~(mp:semaphore-wait semaphore count timeout)~ +- Deprecate Functions ~mp:wait-on-semaphore~ and ~mp:try-get-semphore~ + * 21.2.1 changes since 20.4.24 ** Announcement Dear Community, diff --git a/src/doc/manual/extensions/mp_ref_sem.txi b/src/doc/manual/extensions/mp_ref_sem.txi index 8c60602c4..068181698 100644 --- a/src/doc/manual/extensions/mp_ref_sem.txi +++ b/src/doc/manual/extensions/mp_ref_sem.txi @@ -61,29 +61,54 @@ Returns the resource count of @var{semaphore}. Returns the number of threads waiting on @var{semaphore}. @end defun + +@cppdef mp_semaphore_wait +@lspdef mp:semaphore-wait + +@deftypefun cl_object mp_sempahore_wait(cl_object semaphore, cl_object count, cl_object timeout) +@end deftypefun + +@defun mp:semaphore-wait semaphore count timeout +Decrement the count of @var{semaphore} by @var{count} if the count +would not be negative. + +Else blocks until the semaphore can be decremented. Returns the old +count of @var{semaphore} on success. + +If timeout is not @code{niL}, it is the maximum number of seconds to +wait. If the count cannot be decremented in that time, returns +@code{nil} without decrementing the count. +@end defun + @cppdef mp_wait_on_semaphore @lspdef mp:wait-on-semaphore -@deftypefun cl_object mp_wait_on_semaphore (cl_object semaphore) +@deftypefun cl_object mp_wait_on_semaphore (cl_narg n, cl_object sem, ...) @end deftypefun -@defun mp:wait-on-semaphore semaphore -Waits on semaphore until it can grab the resource (blocking). Returns -resource count before semaphore was acquired. +@defun mp:wait-on-semaphore semaphore &key count timeout +Waits on semaphore until it can grab @var{count} resources. + +Returns resource count before semaphore was acquired. + +This function is equivalent to @code{(mp:semaphore-wait semaphore count timeout)} @end defun @cppdef mp_try_get_semaphore @lspdef mp:try-get-semaphore -@deftypefun cl_object mp_try_get_semaphore (cl_object semaphore) +@deftypefun cl_object mp_try_get_semaphore (cl_narg n, cl_object sem, ...) @end deftypefun -@defun mp:try-get-semaphore semaphore -Tries to get a semaphore (non-blocking). If there is no resource left -returns @code{nil}, otherwise returns resource count before semaphore -was acquired. +@defun mp:try-get-semaphore semaphore &optional count +Tries to get a semaphore (non-blocking). + +If there is no enough resource returns @code{nil}, otherwise returns +resource count before semaphore was acquired. + +This function is equivalent to @code{(mp:semaphore-wait semaphore count 0)} @end defun diff --git a/src/tests/normal-tests/multiprocessing.lsp b/src/tests/normal-tests/multiprocessing.lsp index c2ca95ec4..0afd998c9 100644 --- a/src/tests/normal-tests/multiprocessing.lsp +++ b/src/tests/normal-tests/multiprocessing.lsp @@ -411,6 +411,25 @@ (is (zerop (mp:semaphore-count sem))) (is (zerop (mp:semaphore-wait-count sem))))) +;;; Date: 2021-10-19 +;;; +;;; A smoke test for the new function wait-semaphore. +;;; +(test-with-timeout mp.sem.semaphore-wait/smoke + (let ((sem (mp:make-semaphore :name "sem.semaphore-wait" :count 3))) + (flet ((signal-after-fn (count seconds) + (lambda () + (sleep seconds) + (mp:signal-semaphore sem count)))) + (is (null (mp:semaphore-wait sem 4 0))) + (is (null (mp:semaphore-wait sem 4 0.1))) + (is (= 3 (mp:semaphore-wait sem 2 nil))) + (mp:process-run-function nil (signal-after-fn 1 0.2)) + (is (null (mp:semaphore-wait sem 2 0.1))) + (is (= 2 (mp:semaphore-wait sem 2 0.2))) + (mp:process-run-function nil (signal-after-fn 2 0.2)) + (is (= 2 (mp:semaphore-wait sem 1 nil)))))) + ;; Mailbox