multithreading: implement optional timeout for mp:get-lock

This commit is contained in:
Marius Gerbershagen 2020-10-11 20:50:38 +02:00
parent 806336ed2e
commit 5f65deea5b
12 changed files with 201 additions and 7 deletions

3
src/aclocal.m4 vendored
View file

@ -928,12 +928,13 @@ fi
dnl ----------------------------------------------------------------------
dnl Check whether we have POSIX read/write locks are available
AC_DEFUN([ECL_POSIX_RWLOCK],[
AC_DEFUN([ECL_PTHREAD_EXTENSIONS],[
AC_CHECK_FUNC( [pthread_rwlock_init], [
AC_CHECK_TYPES([pthread_rwlock_t], [
AC_DEFINE([HAVE_POSIX_RWLOCK], [], [HAVE_POSIX_RWLOCK])
], [])
], [])
AC_CHECK_FUNCS([pthread_mutex_timedlock])
])

View file

@ -1609,6 +1609,7 @@ cl_symbols[] = {
{MP_ "LOCK-COUNT" ECL_FUN("mp_lock_count", IF_MP(mp_lock_count), 1) ECL_VAR(MP_ORDINARY, OBJNULL)},
{MP_ "GET-LOCK" ECL_FUN("mp_get_lock", IF_MP(mp_get_lock), -2) ECL_VAR(MP_ORDINARY, OBJNULL)},
{MP_ "GIVEUP-LOCK" ECL_FUN("mp_giveup_lock", IF_MP(mp_giveup_lock), 1) ECL_VAR(MP_ORDINARY, OBJNULL)},
{SYS_ "MUTEX-TIMEOUT" ECL_FUN("si_mutex_timeout", IF_MP(si_mutex_timeout), 3) ECL_VAR(SI_SPECIAL, OBJNULL)},
{MP_ "MAKE-CONDITION-VARIABLE" ECL_FUN("mp_make_condition_variable", IF_MP(mp_make_condition_variable), 0) ECL_VAR(MP_ORDINARY, OBJNULL)},
{MP_ "CONDITION-VARIABLE-WAIT" ECL_FUN("mp_condition_variable_wait", IF_MP(mp_condition_variable_wait), 2) ECL_VAR(MP_ORDINARY, OBJNULL)},
{MP_ "CONDITION-VARIABLE-TIMEDWAIT" ECL_FUN("mp_condition_variable_timedwait", IF_MP(mp_condition_variable_timedwait), 3) ECL_VAR(MP_ORDINARY, OBJNULL)},

View file

@ -199,6 +199,7 @@ mp_get_lock_wait(cl_object lock)
#endif
rc = ecl_mutex_lock(&lock->lock.mutex);
if (ecl_likely(rc == ECL_MUTEX_SUCCESS)) {
ecl_disable_interrupts_env(env);
lock->lock.counter++;
lock->lock.owner = own_process;
ecl_enable_interrupts_env(env);
@ -208,7 +209,104 @@ mp_get_lock_wait(cl_object lock)
FEerror_not_a_recursive_lock(lock);
#endif
} else {
FEunknown_lock_error(lock);
}
}
static cl_object
si_abort_wait_on_mutex(cl_narg narg, ...)
{
const cl_env_ptr the_env = ecl_process_env();
cl_object env = the_env->function->cclosure.env;
cl_object lock = CAR(env);
if (ECL_SYM_VAL(the_env, @'si::mutex-timeout') == lock) {
ECL_SETQ(the_env, @'si::mutex-timeout', ECL_T);
cl_throw(@'si::mutex-timeout');
}
@(return)
}
cl_object
si_mutex_timeout(cl_object process, cl_object lock, cl_object timeout)
{
const cl_env_ptr the_env = ecl_process_env();
if (cl_plusp(timeout)) {
cl_sleep(timeout);
}
ECL_WITH_NATIVE_LOCK_BEGIN(the_env, &process->process.start_stop_lock) {
if (ecl_likely(mp_process_active_p(process) != ECL_NIL)) {
ecl_interrupt_process(process,
ecl_make_cclosure_va(si_abort_wait_on_mutex,
cl_list(1, lock),
@'si::mutex-timeout',
0));
}
} ECL_WITH_NATIVE_LOCK_END;
@(return)
}
cl_object
mp_get_lock_timedwait(cl_object lock, cl_object timeout)
{
cl_env_ptr env = ecl_process_env();
cl_object own_process = env->own_process;
if (ecl_unlikely(ecl_t_of(lock) != t_lock)) {
FEwrong_type_nth_arg(@[mp::get-lock], 1, lock, @[mp::lock]);
}
#if !defined(ECL_MUTEX_DEADLOCK)
if (ecl_unlikely(lock->lock.owner == own_process && !lock->lock.recursive)) {
/* INV: owner != nil only if the mutex is locked */
FEerror_not_a_recursive_lock(lock);
}
#endif
#if defined(ECL_WINDOWS_THREADS) || defined(HAVE_PTHREAD_MUTEX_TIMEDLOCK)
int rc = ecl_mutex_timedlock(&lock->lock.mutex, ecl_to_double(timeout));
#else
/* If we don't have pthread_mutex_timedlock available, we create a
* timer thread which interrupts our thread after the specified
* timeout. si::mutex-timeout serves a dual purpose below: the
* symbol itself denotes a catchpoint and its value is used to
* determine a) if the catchpoint is active and b) if the timer has
* fired. */
volatile int rc;
volatile cl_object timer_thread;
ecl_bds_bind(env, @'si::mutex-timeout', lock);
ECL_CATCH_BEGIN(env, @'si::mutex-timeout') {
timer_thread = mp_process_run_function(5, @'si::mutex-timeout',
@'si::mutex-timeout',
env->own_process,
lock,
timeout);
rc = ecl_mutex_lock(&lock->lock.mutex);
ECL_SETQ(env, @'si::mutex-timeout', ECL_NIL);
} ECL_CATCH_END;
ECL_WITH_NATIVE_LOCK_BEGIN(env, &timer_thread->process.start_stop_lock) {
if (mp_process_active_p(timer_thread)) {
ecl_interrupt_process(timer_thread, @'mp::exit-process');
}
} ECL_WITH_NATIVE_LOCK_END;
if (ECL_SYM_VAL(env, @'si::mutex-timeout') == ECL_T) {
rc = ECL_MUTEX_TIMEOUT;
/* The mutex might have been locked before we could kill the timer
* thread. Therefore, we unconditionally try to unlock the mutex
* again and treat the operation as having timed out. */
ecl_mutex_unlock(&lock->lock.mutex);
}
ecl_bds_unwind1(env);
#endif
if (rc == ECL_MUTEX_SUCCESS) {
ecl_disable_interrupts_env(env);
lock->lock.counter++;
lock->lock.owner = own_process;
ecl_enable_interrupts_env(env);
ecl_return1(env, lock);
} else if (rc == ECL_MUTEX_TIMEOUT) {
ecl_return1(env,ECL_NIL);
#if defined(ECL_MUTEX_DEADLOCK)
} else if (ecl_unlikely(rc == ECL_MUTEX_DEADLOCK)) {
FEerror_not_a_recursive_lock(lock);
#endif
} else {
FEunknown_lock_error(lock);
}
}
@ -217,6 +315,8 @@ mp_get_lock_wait(cl_object lock)
@
if (Null(wait)) {
return mp_get_lock_nowait(lock);
} else if (ecl_realp(wait)) {
return mp_get_lock_timedwait(lock, wait);
} else {
return mp_get_lock_wait(lock);
}

11
src/configure vendored
View file

@ -6233,6 +6233,17 @@ fi
fi
for ac_func in pthread_mutex_timedlock
do :
ac_fn_c_check_func "$LINENO" "pthread_mutex_timedlock" "ac_cv_func_pthread_mutex_timedlock"
if test "x$ac_cv_func_pthread_mutex_timedlock" = xyes; then :
cat >>confdefs.h <<_ACEOF
#define HAVE_PTHREAD_MUTEX_TIMEDLOCK 1
_ACEOF
fi
done
boehm_configure_flags="${boehm_configure_flags} ${THREAD_GC_FLAGS}"
for k in $THREAD_OBJ; do EXTRA_OBJS="$EXTRA_OBJS ${k}.${OBJEXT}"; done

View file

@ -584,7 +584,7 @@ if test "${enable_threads}" = "yes" ; then
else
LIBS="${THREAD_LIBS} ${LIBS}"
CFLAGS="${CFLAGS} ${THREAD_CFLAGS}"
ECL_POSIX_RWLOCK
ECL_PTHREAD_EXTENSIONS
boehm_configure_flags="${boehm_configure_flags} ${THREAD_GC_FLAGS}"
for k in $THREAD_OBJ; do EXTRA_OBJS="$EXTRA_OBJS ${k}.${OBJEXT}"; done
AC_MSG_CHECKING([for thread object files])

View file

@ -101,7 +101,9 @@ returns @code{ECL_NIL}, otherwise @code{ECL_T}.
@defun mp:get-lock lock &optional (wait t)
Tries to acquire a lock. @var{wait} indicates whether function should
block or give up if @var{lock} is already taken. If @var{wait} is
@code{nil} and @var{lock} can't be acquired returns
@code{nil}, immediately return, if @var{wait} is a real number
@var{wait} specifies a timeout in seconds and otherwise block until the
lock becomes available. If @var{lock} can't be acquired return
@code{nil}. Succesful operation returns @code{t}. Will signal an error
if the mutex is non-recursive and current thread already owns the lock.
@end defun

View file

@ -436,6 +436,9 @@
/* Define to 1 if you have the `powf' function. */
#undef HAVE_POWF
/* Define to 1 if you have the `pthread_mutex_timedlock' function. */
#undef HAVE_PTHREAD_MUTEX_TIMEDLOCK
/* Define to 1 if the system has the type `pthread_rwlock_t'. */
#undef HAVE_PTHREAD_RWLOCK_T

View file

@ -131,6 +131,8 @@
#undef HAVE_SEM_INIT
/* whether we have read/write locks */
#undef HAVE_POSIX_RWLOCK
/* whether we have mutex lock operations with timeout */
#undef HAVE_PTHREAD_MUTEX_TIMEDLOCK
/* uname() for system identification */
#undef HAVE_UNAME
#undef HAVE_UNISTD_H

View file

@ -311,6 +311,10 @@ extern cl_fixnum ecl_option_values[ECL_OPT_LIMIT+1];
extern void ecl_init_bignum_registers(cl_env_ptr env);
extern void ecl_clear_bignum_registers(cl_env_ptr env);
/* threads/mutex.d */
extern cl_object si_mutex_timeout();
/* print.d */
extern cl_object _ecl_stream_or_default_output(cl_object stream);

View file

@ -101,6 +101,18 @@ add_timeout_delta(struct timespec *ts, double seconds)
}
}
static inline int
ecl_mutex_timedlock(ecl_mutex_t *mutex, double seconds)
{
#if defined(HAVE_PTHREAD_MUTEX_TIMEDLOCK)
struct timespec ts;
add_timeout_delta(&ts, seconds);
return pthread_mutex_timedlock(mutex, &ts);
#else
/* Not implemented, see mutex.d for alternative implementation using interrupts */
return -1;
#endif
}
static inline void
ecl_cond_var_init(ecl_cond_var_t *cv)
@ -359,6 +371,24 @@ remaining_milliseconds(double seconds, DWORD start_ticks)
return (ret < 0) ? 0 : ret;
}
static inline int
ecl_mutex_timedlock(ecl_mutex_t *mutex, double seconds)
{
DWORD start_ticks = GetTickCount();
AGAIN:
switch (WaitForSingleObjectEx(*mutex, remaining_milliseconds(seconds, start_ticks), TRUE)) {
case WAIT_OBJECT_0:
case WAIT_ABANDONED:
return ECL_MUTEX_SUCCESS;
case WAIT_IO_COMPLETION:
goto AGAIN;
case WAIT_TIMEOUT:
return ECL_MUTEX_TIMEOUT;
default:
return GetLastError();
}
}
/* CONDITION VARIABLE */
/* IMPLEMENTATION

View file

@ -112,7 +112,7 @@ by ALLOW-WITH-INTERRUPTS."
;;;
;;; Convenience macros for locks
;;;
(defmacro with-lock ((lock-form &rest options) &body body)
(defmacro with-lock ((lock-form &key (wait-form t)) &body body)
#-threads
`(progn ,@body)
;; We do our best to make sure that the lock is released if we are
@ -121,9 +121,10 @@ by ALLOW-WITH-INTERRUPTS."
;; mutex is locked but before we store the return value of
;; mp:get-lock.
#+threads
(ext:with-unique-names (lock)
`(let ((,lock ,lock-form))
(when (mp:get-lock ,lock)
(ext:with-unique-names (lock wait)
`(let ((,lock ,lock-form)
(,wait ,wait-form))
(when (mp:get-lock ,lock ,wait)
(without-interrupts
(unwind-protect
(with-restored-interrupts

View file

@ -147,6 +147,45 @@
(is (eq (mp:lock-owner mutex) mp:*current-process*)))
(mp:giveup-lock mutex)))
(test-with-timeout (mp.mutex.timedlock-timeout 30)
(let ((mutex (mp:make-lock :name "mutex.timedlock-timeout"))
(flag 0))
(mp:get-lock mutex)
(setf flag 1)
(let ((waiting-process
(mp:process-run-function
"mutex.timedlock-timeout"
(lambda ()
(when (mp:get-lock mutex 1)
(error "Grabbing the mutex shouldn't have succeeded"))
(when (eq (mp:lock-owner mutex) mp:*current-process*)
(error "Wrong lock owner"))
(setf flag 2)))))
(mp:process-join waiting-process)
(is (eq mp:*current-process* (mp:lock-owner mutex)))
(is (= flag 2)))))
(test-with-timeout (mp.mutex.timedlock-acquire 30)
(let ((mutex (mp:make-lock :name "mutex.timedlock-acquire"))
(flag 0))
(mp:get-lock mutex)
(setf flag 1)
(let ((waiting-process
(mp:process-run-function
"mutex.timedlock-acquire"
(lambda ()
(setf flag 2)
(unless (mp:get-lock mutex 60)
(error "Grabbing the mutex should have succeeded"))
(when (not (eq (mp:lock-owner mutex) mp:*current-process*))
(error "Wrong lock owner"))
(setf flag 3)
(mp:giveup-lock mutex)))))
(loop until (> flag 1))
(sleep 1)
(mp:giveup-lock mutex)
(mp:process-join waiting-process)
(is (= flag 3)))))
;; Semaphores