Merge branch 'semaphore-timeout' into 'develop'

mp: semaphores: add a new function semaphore-wait

See merge request embeddable-common-lisp/ecl!260
This commit is contained in:
Marius Gerbershagen 2022-01-06 14:06:59 +00:00
commit 015cc15c08
7 changed files with 130 additions and 45 deletions

View file

@ -37,6 +37,15 @@
(Windows always uses the utf-16 format provided by the OS).
- Add hook functions for cl:ed via ext:*ed-functions* list)
- Signal type-error in condition readers for WSCL conformance.
- Remove homegrown mutex implementation - all multithreading
synchronization objects are now implemented on top of mutex and condition
variable primitives from the operating system (no api changes)
- Timeouts for condition variables and mutex locking and semaphore wait
operations
** API changes
- A new function ~(mp:semaphore-wait semaphore count timeout)~
- Deprecate Functions ~mp:wait-on-semaphore~ and ~mp:try-get-semphore~
* 21.2.1 changes since 20.4.24
** Announcement
Dear Community,

View file

@ -1640,8 +1640,9 @@ cl_symbols[] = {
{MP_ "SEMAPHORE" ECL_FUN(NULL, NULL, -1) ECL_VAR(MP_ORDINARY, OBJNULL)},
{MP_ "MAKE-SEMAPHORE" ECL_FUN("mp_make_semaphore", IF_MP(mp_make_semaphore), -1) ECL_VAR(MP_ORDINARY, OBJNULL)},
{MP_ "SIGNAL-SEMAPHORE" ECL_FUN("mp_signal_semaphore", IF_MP(mp_signal_semaphore), -2) ECL_VAR(MP_ORDINARY, OBJNULL)},
{MP_ "WAIT-ON-SEMAPHORE" ECL_FUN("mp_wait_on_semaphore", IF_MP(mp_wait_on_semaphore), 1) ECL_VAR(MP_ORDINARY, OBJNULL)},
{MP_ "TRY-GET-SEMAPHORE" ECL_FUN("mp_try_get_semaphore", IF_MP(mp_try_get_semaphore), 1) ECL_VAR(MP_ORDINARY, OBJNULL)},
{MP_ "SEMAPHORE-WAIT" ECL_FUN("mp_semaphore_wait", IF_MP(mp_semaphore_wait), 3) ECL_VAR(MP_ORDINARY, OBJNULL)},
{MP_ "WAIT-ON-SEMAPHORE" ECL_FUN("mp_wait_on_semaphore", IF_MP(mp_wait_on_semaphore), -2) ECL_VAR(MP_ORDINARY, OBJNULL)},
{MP_ "TRY-GET-SEMAPHORE" ECL_FUN("mp_try_get_semaphore", IF_MP(mp_try_get_semaphore), -2) ECL_VAR(MP_ORDINARY, OBJNULL)},
{MP_ "SEMAPHORE-COUNT" ECL_FUN("mp_semaphore_count", IF_MP(mp_semaphore_count), 1) ECL_VAR(MP_ORDINARY, OBJNULL)},
{MP_ "SEMAPHORE-NAME" ECL_FUN("mp_semaphore_name", IF_MP(mp_semaphore_name), 1) ECL_VAR(MP_ORDINARY, OBJNULL)},
{MP_ "SEMAPHORE-WAIT-COUNT" ECL_FUN("mp_semaphore_wait_count", IF_MP(mp_semaphore_wait_count), 1) ECL_VAR(MP_ORDINARY, OBJNULL)},

View file

@ -82,31 +82,67 @@ mp_semaphore_wait_count(cl_object semaphore)
@(return);
} @)
static inline void
semaphore_wait_unprotected(cl_object semaphore, cl_object count, cl_object timeout)
{
int rc;
cl_env_ptr the_env = ecl_process_env();
cl_fixnum counter = fixnnint(count);
ecl_mutex_t *mutex = &semaphore->semaphore.mutex;
ecl_cond_var_t *cv = &semaphore->semaphore.cv;
if (timeout == ECL_NIL) {
do {
ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_T);
ecl_cond_var_wait(cv, mutex);
ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_NIL);
} while (semaphore->semaphore.counter < counter);
} else {
cl_object deadline = ecl_plus(cl_get_internal_real_time(),
ecl_times(timeout, ecl_make_fixnum(1000)));
double seconds = ecl_to_double(timeout);
do {
ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_T);
rc = ecl_cond_var_timedwait(cv, mutex, seconds);
ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_NIL);
timeout = ecl_minus(deadline, cl_get_internal_real_time());
seconds = ecl_to_double(timeout);
} while(semaphore->semaphore.counter < counter
&& rc != ECL_MUTEX_TIMEOUT
&& seconds >= 0);
}
}
cl_object
mp_wait_on_semaphore(cl_object semaphore)
mp_semaphore_wait(cl_object semaphore, cl_object count, cl_object timeout)
{
cl_env_ptr the_env = ecl_process_env();
cl_fixnum counter = fixnnint(count);
volatile cl_object output;
unlikely_if (ecl_t_of(semaphore) != t_semaphore) {
FEwrong_type_only_arg(@[mp::wait-on-semaphore], semaphore, @[mp::semaphore]);
FEwrong_type_only_arg(@[mp::semaphore-wait], semaphore, @[mp::semaphore]);
}
ecl_bds_bind(the_env, ECL_INTERRUPTS_ENABLED, ECL_NIL);
ecl_mutex_lock(&semaphore->semaphore.mutex);
if (semaphore->semaphore.counter == 0) {
if (semaphore->semaphore.counter >= counter) {
output = ecl_make_fixnum(semaphore->semaphore.counter);
semaphore->semaphore.counter -= counter;
ecl_mutex_unlock(&semaphore->semaphore.mutex);
} else if (timeout == ECL_NIL || ecl_plusp(timeout)) {
semaphore->semaphore.wait_count++;
ECL_UNWIND_PROTECT_BEGIN(the_env) {
do {
ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_T);
ecl_cond_var_wait(&semaphore->semaphore.cv, &semaphore->semaphore.mutex);
ecl_setq(the_env, ECL_INTERRUPTS_ENABLED, ECL_NIL);
} while (semaphore->semaphore.counter == 0);
output = ecl_make_fixnum(semaphore->semaphore.counter--);
semaphore_wait_unprotected(semaphore, count, timeout);
if (semaphore->semaphore.counter >= counter) {
output = ecl_make_fixnum(semaphore->semaphore.counter);
semaphore->semaphore.counter -= counter;
} else {
output = ECL_NIL;
}
} ECL_UNWIND_PROTECT_THREAD_SAFE_EXIT {
semaphore->semaphore.wait_count--;
ecl_mutex_unlock(&semaphore->semaphore.mutex);
} ECL_UNWIND_PROTECT_THREAD_SAFE_END;
} else {
output = ecl_make_fixnum(semaphore->semaphore.counter--);
output = ECL_NIL;
ecl_mutex_unlock(&semaphore->semaphore.mutex);
}
ecl_bds_unwind1(the_env);
@ -114,23 +150,16 @@ mp_wait_on_semaphore(cl_object semaphore)
ecl_return1(the_env, output);
}
cl_object
mp_try_get_semaphore(cl_object semaphore)
{
cl_env_ptr the_env = ecl_process_env();
cl_object output;
unlikely_if (ecl_t_of(semaphore) != t_semaphore) {
FEwrong_type_only_arg(@[mp::try-get-semaphore], semaphore, @[mp::semaphore]);
}
ecl_disable_interrupts_env(the_env);
ecl_mutex_lock(&semaphore->semaphore.mutex);
if (semaphore->semaphore.counter > 0) {
output = ecl_make_fixnum(semaphore->semaphore.counter--);
} else {
output = ECL_NIL;
}
ecl_mutex_unlock(&semaphore->semaphore.mutex);
ecl_enable_interrupts_env(the_env);
ecl_return1(the_env, output);
}
@(defun mp::wait-on-semaphore (semaphore &key (count ecl_make_fixnum(1))
(timeout ECL_NIL))
@ {
cl_object output = mp_semaphore_wait(semaphore, count, timeout);
@(return output);
} @)
@(defun mp::try-get-semaphore (semaphore &optional (count ecl_make_fixnum(1)))
@ {
cl_object timeout = ecl_make_fixnum(0);
cl_object output = mp_semaphore_wait(semaphore, count, timeout);
@(return output);
} @)

View file

@ -1513,8 +1513,9 @@
#+threads (proclamation mp:semaphore-name (mp:semaphore) t :reader)
#+threads (proclamation mp:semaphore-count (mp:semaphore) fixnum :reader)
#+threads (proclamation mp:semaphore-wait-count (mp:semaphore) natural :reader)
#+threads (proclamation mp:wait-on-semaphore (mp:semaphore) fixnum)
#+threads (proclamation mp:try-get-semaphore (mp:semaphore) t)
#+threads (proclamation mp:semaphore-wait (mp:semaphore fixnum real) t)
#+threads (proclamation mp:wait-on-semaphore (mp:semaphore &key) t)
#+threads (proclamation mp:try-get-semaphore (mp:semaphore &optional fixnum) t)
#+threads (proclamation mp:signal-semaphore (mp:semaphore &optional fixnum) t)
;;;

View file

@ -61,29 +61,54 @@ Returns the resource count of @var{semaphore}.
Returns the number of threads waiting on @var{semaphore}.
@end defun
@cppdef mp_semaphore_wait
@lspdef mp:semaphore-wait
@deftypefun cl_object mp_sempahore_wait(cl_object semaphore, cl_object count, cl_object timeout)
@end deftypefun
@defun mp:semaphore-wait semaphore count timeout
Decrement the count of @var{semaphore} by @var{count} if the count
would not be negative.
Else blocks until the semaphore can be decremented. Returns the old
count of @var{semaphore} on success.
If timeout is not @code{niL}, it is the maximum number of seconds to
wait. If the count cannot be decremented in that time, returns
@code{nil} without decrementing the count.
@end defun
@cppdef mp_wait_on_semaphore
@lspdef mp:wait-on-semaphore
@deftypefun cl_object mp_wait_on_semaphore (cl_object semaphore)
@deftypefun cl_object mp_wait_on_semaphore (cl_narg n, cl_object sem, ...)
@end deftypefun
@defun mp:wait-on-semaphore semaphore
Waits on semaphore until it can grab the resource (blocking). Returns
resource count before semaphore was acquired.
@defun mp:wait-on-semaphore semaphore &key count timeout
Waits on semaphore until it can grab @var{count} resources.
Returns resource count before semaphore was acquired.
This function is equivalent to @code{(mp:semaphore-wait semaphore count timeout)}
@end defun
@cppdef mp_try_get_semaphore
@lspdef mp:try-get-semaphore
@deftypefun cl_object mp_try_get_semaphore (cl_object semaphore)
@deftypefun cl_object mp_try_get_semaphore (cl_narg n, cl_object sem, ...)
@end deftypefun
@defun mp:try-get-semaphore semaphore
Tries to get a semaphore (non-blocking). If there is no resource left
returns @code{nil}, otherwise returns resource count before semaphore
was acquired.
@defun mp:try-get-semaphore semaphore &optional count
Tries to get a semaphore (non-blocking).
If there is no enough resource returns @code{nil}, otherwise returns
resource count before semaphore was acquired.
This function is equivalent to @code{(mp:semaphore-wait semaphore count 0)}
@end defun

View file

@ -1816,9 +1816,10 @@ extern ECL_API cl_object mp_make_semaphore _ECL_ARGS((cl_narg, ...));
extern ECL_API cl_object mp_semaphore_count(cl_object);
extern ECL_API cl_object mp_semaphore_name(cl_object);
extern ECL_API cl_object mp_semaphore_wait_count(cl_object);
extern ECL_API cl_object mp_wait_on_semaphore(cl_object);
extern ECL_API cl_object mp_try_get_semaphore(cl_object);
extern ECL_API cl_object mp_signal_semaphore _ECL_ARGS((cl_narg, cl_object, ...));
extern ECL_API cl_object mp_semaphore_wait(cl_object, cl_object, cl_object);
extern ECL_API cl_object mp_wait_on_semaphore _ECL_ARGS((cl_narg, cl_object, ...));
extern ECL_API cl_object mp_try_get_semaphore _ECL_ARGS((cl_narg, cl_object, ...));
extern ECL_API cl_object mp_signal_semaphore _ECL_ARGS((cl_narg, cl_object, ...));
extern ECL_API cl_object ecl_make_semaphore(cl_object name, cl_fixnum count);
/* threads/barrier.d */

View file

@ -411,6 +411,25 @@
(is (zerop (mp:semaphore-count sem)))
(is (zerop (mp:semaphore-wait-count sem)))))
;;; Date: 2021-10-19
;;;
;;; A smoke test for the new function wait-semaphore.
;;;
(test-with-timeout mp.sem.semaphore-wait/smoke
(let ((sem (mp:make-semaphore :name "sem.semaphore-wait" :count 3)))
(flet ((signal-after-fn (count seconds)
(lambda ()
(sleep seconds)
(mp:signal-semaphore sem count))))
(is (null (mp:semaphore-wait sem 4 0)))
(is (null (mp:semaphore-wait sem 4 0.1)))
(is (= 3 (mp:semaphore-wait sem 2 nil)))
(mp:process-run-function nil (signal-after-fn 1 0.2))
(is (null (mp:semaphore-wait sem 2 0.1)))
(is (= 2 (mp:semaphore-wait sem 2 0.2)))
(mp:process-run-function nil (signal-after-fn 2 0.2))
(is (= 2 (mp:semaphore-wait sem 1 nil))))))
;; Mailbox