Implemented a wait queue for locks and other events.

This commit is contained in:
Juan Jose Garcia Ripoll 2012-02-19 01:22:45 +01:00
parent 632f584598
commit ce73d0a60a
9 changed files with 149 additions and 94 deletions

2
src/aclocal.m4 vendored
View file

@ -248,7 +248,7 @@ THREAD_CFLAGS=''
THREAD_LIBS=''
THREAD_GC_FLAGS='--enable-threads=posix'
INSTALL_TARGET='install'
THREAD_OBJ="$THREAD_OBJ threads/process threads/mutex threads/condition_variable"
THREAD_OBJ="$THREAD_OBJ threads/process threads/queue threads/mutex threads/condition_variable"
clibs=''
SONAME=''
SONAME_LDFLAGS=''

View file

@ -439,6 +439,7 @@ cl_object_mark_proc(void *addr, struct GC_ms_entry *msp, struct GC_ms_entry *msl
# endif
# ifdef ECL_THREADS
case t_process:
MAYBE_MARK(o->process.waiting_for);
MAYBE_MARK(o->process.exit_values);
MAYBE_MARK(o->process.exit_lock);
MAYBE_MARK(o->process.parent);
@ -972,7 +973,8 @@ init_alloc(void)
to_bitmap(&o, &(o.process.initial_bindings)) |
to_bitmap(&o, &(o.process.parent)) |
to_bitmap(&o, &(o.process.exit_lock)) |
to_bitmap(&o, &(o.process.exit_values));
to_bitmap(&o, &(o.process.exit_values)) |
to_bitmap(&o, &(o.process.waiting_for)));
type_info[t_lock].descriptor =
to_bitmap(&o, &(o.lock.name)) |
to_bitmap(&o, &(o.lock.owner));

View file

@ -408,9 +408,6 @@ struct cl_core_struct cl_core = {
0, /* path_max */
#ifdef GBC_BOEHM
NULL, /* safety_region */
#endif
#ifdef ECL_THREADS
Cnil, /* signal_queue_lock */
#endif
Cnil, /* signal_queue */

View file

@ -1,6 +1,6 @@
/* -*- mode: c; c-basic-offset: 8 -*- */
/*
threads_mutex.d -- Native mutually exclusive locks.
mutex.d -- mutually exclusive locks.
*/
/*
Copyright (c) 2003, Juan Jose Garcia Ripoll.
@ -13,17 +13,8 @@
See file '../Copyright' for full details.
*/
#ifndef __sun__ /* See unixinit.d for this */
#define _XOPEN_SOURCE 600 /* For pthread mutex attributes */
#endif
#include <errno.h>
#define AO_ASSUME_WINDOWS98 /* We need this for CAS */
#include <ecl/ecl.h>
#ifdef ECL_WINDOWS_THREADS
# include <windows.h>
#else
# include <pthread.h>
#endif
#include <ecl/internal.h>
/*----------------------------------------------------------------------
@ -53,14 +44,12 @@ FEerror_not_owned(cl_object lock)
cl_object
ecl_make_lock(cl_object name, bool recursive)
{
cl_env_ptr the_env = ecl_process_env();
cl_object output = ecl_alloc_object(t_lock);
ecl_disable_interrupts_env(the_env);
output->lock.name = name;
output->lock.owner = Cnil;
output->lock.counter = 0;
output->lock.waiter = Cnil;
output->lock.recursive = recursive;
ecl_enable_interrupts_env(the_env);
return output;
}
@ -123,6 +112,10 @@ mp_giveup_lock(cl_object lock)
ecl_disable_interrupts_env(env);
if (--lock->lock.counter == 0) {
lock->lock.owner = Cnil;
if (lock->lock.waiter != Cnil) {
lock->lock.waiter = Cnil;
ecl_wakeup_waiters(lock, 1);
}
}
ecl_enable_interrupts_env(env);
ecl_return1(env, Ct);
@ -164,28 +157,10 @@ mp_get_lock_nowait(cl_object lock)
cl_object
mp_get_lock_wait(cl_object lock)
{
struct ecl_timeval start;
cl_env_ptr env = ecl_process_env();
cl_object own_process = env->own_process;
cl_fixnum code, iteration;
unlikely_if (type_of(lock) != t_lock) {
FEerror_not_a_lock(lock);
if (mp_get_lock_nowait(lock) == Cnil) {
ecl_wait_on(mp_get_lock_nowait, lock);
}
iteration = 0;
do {
int n;
ecl_disable_interrupts_env(env);
for (n = 0, code = 0; n < 100 && code == 0; n++)
code = get_lock_inner(lock, own_process);
ecl_enable_interrupts_env(env);
unlikely_if (code < 0)
FEerror_not_a_recursive_lock(lock);
if (code > 0)
@(return Ct);
if (!iteration)
ecl_get_internal_real_time(&start);
ecl_wait_for(++iteration, &start);
} while (1);
@(return Ct)
}
@(defun mp::get-lock (lock &optional (wait Ct))

121
src/c/threads/queue.d Normal file
View file

@ -0,0 +1,121 @@
/* -*- mode: c; c-basic-offset: 8 -*- */
/*
queue.d -- waiting queue for threads.
*/
/*
Copyright (c) 2011, Juan Jose Garcia Ripoll.
ECL is free software; you can redistribute it and/or
modify it under the terms of the GNU Library General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
See file '../Copyright' for full details.
*/
#define AO_ASSUME_WINDOWS98 /* We need this for CAS */
#include <ecl/ecl.h>
#include <ecl/internal.h>
/*----------------------------------------------------------------------
* THREAD SCHEDULER & WAITING
*/
static cl_object
bignum_set_time(cl_object bignum, struct ecl_timeval *time)
{
_ecl_big_set_index(bignum, time->tv_sec);
_ecl_big_mul_ui(bignum, bignum, 1000);
_ecl_big_add_ui(bignum, bignum, (time->tv_usec + 999) / 1000);
return bignum;
}
static cl_object
elapsed_time(struct ecl_timeval *start)
{
cl_object delta_big = _ecl_big_register0();
cl_object aux_big = _ecl_big_register1();
struct ecl_timeval now;
ecl_get_internal_real_time(&now);
bignum_set_time(aux_big, start);
bignum_set_time(delta_big, &now);
_ecl_big_sub(delta_big, delta_big, aux_big);
_ecl_big_register_free(aux_big);
return delta_big;
}
static double
waiting_time(cl_index iteration, struct ecl_timeval *start)
{
/* Waiting time is smaller than 0.10 s */
double time;
cl_object top = MAKE_FIXNUM(10 * 1000);
cl_object delta_big = elapsed_time(start);
_ecl_big_div_ui(delta_big, delta_big, iteration);
if (ecl_number_compare(delta_big, top) < 0) {
time = ecl_to_double(delta_big);
} else {
time = 0.10;
}
_ecl_big_register_free(delta_big);
return time;
}
void
spinlock(cl_index iteration, struct ecl_timeval *start)
{
ecl_musleep((iteration > 3) ?
waiting_time(iteration, start) :
0.0,
1);
}
void
ecl_wait_on(cl_object (*condition)(cl_object), cl_object o)
{
cl_env_ptr env = ecl_process_env();
cl_object process = env->own_process;
struct ecl_timeval start;
ecl_bds_bind(env, @'ext::*interrupts-enabled*', Cnil);
ecl_get_internal_real_time(&start);
CL_UNWIND_PROTECT_BEGIN(env) {
cl_fixnum iteration = 0;
process->process.waiting_for = o;
o->lock.waiter = process;
ecl_bds_bind(env, @'ext::*interrupts-enabled*', Ct);
ecl_check_pending_interrupts();
do {
spinlock(iteration++, &start);
} while (condition(o) == Cnil);
ecl_bds_unwind1(env);
} CL_UNWIND_PROTECT_EXIT {
process->process.waiting_for = Cnil;
} CL_UNWIND_PROTECT_END;
ecl_bds_unwind1(env);
}
static void
wakeup_process(cl_object p)
{
mp_interrupt_process(p, @'+');
}
void
ecl_wakeup_waiters(cl_object o, bool all)
{
cl_object v = cl_core.processes;
cl_index size = v->vector.dim;
cl_index i = size;
cl_index ndx = rand() % size;
while (i--) {
cl_object p = v->vector.self.t[ndx];
if (!Null(p)) {
if (p->process.waiting_for == o && p->process.active == 1) {
wakeup_process(p);
if (!all) return;
}
}
if (++ndx >= size)
ndx = 0;
}
}

View file

@ -108,48 +108,8 @@ ecl_get_internal_run_time(struct ecl_timeval *tv)
#endif
}
static cl_object
bignum_set_time(cl_object bignum, struct ecl_timeval *time)
{
_ecl_big_set_index(bignum, time->tv_sec);
_ecl_big_mul_ui(bignum, bignum, 1000);
_ecl_big_add_ui(bignum, bignum, (time->tv_usec + 999) / 1000);
return bignum;
}
static cl_object
elapsed_time(struct ecl_timeval *start)
{
cl_object delta_big = _ecl_big_register0();
cl_object aux_big = _ecl_big_register1();
struct ecl_timeval now;
ecl_get_internal_real_time(&now);
bignum_set_time(aux_big, start);
bignum_set_time(delta_big, &now);
_ecl_big_sub(delta_big, delta_big, aux_big);
_ecl_big_register_free(aux_big);
return delta_big;
}
static double
waiting_time(cl_index iteration, struct ecl_timeval *start)
{
/* Waiting time is smaller than 0.10 s */
double time;
cl_object top = MAKE_FIXNUM(10 * 1000);
cl_object delta_big = elapsed_time(start);
_ecl_big_div_ui(delta_big, delta_big, iteration);
if (ecl_number_compare(delta_big, top) < 0) {
time = ecl_to_double(delta_big);
} else {
time = 0.10;
}
_ecl_big_register_free(delta_big);
return time;
}
static void
musleep(double time, bool alertable)
void
ecl_musleep(double time, bool alertable)
{
#ifdef HAVE_NANOSLEEP
struct timespec tm;
@ -211,15 +171,6 @@ musleep(double time, bool alertable)
#endif
}
void
ecl_wait_for(cl_index iteration, struct ecl_timeval *start)
{
musleep((iteration > 3) ?
waiting_time(iteration, start) :
0.0,
1);
}
cl_fixnum
ecl_runtime(void)
{
@ -247,7 +198,7 @@ cl_sleep(cl_object z)
time = 1e-9;
}
} ECL_WITHOUT_FPE_END;
musleep(time, 0);
ecl_musleep(time, 0);
@(return Cnil)
}

4
src/configure vendored
View file

@ -4524,7 +4524,7 @@ THREAD_CFLAGS=''
THREAD_LIBS=''
THREAD_GC_FLAGS='--enable-threads=posix'
INSTALL_TARGET='install'
THREAD_OBJ="$THREAD_OBJ threads/process threads/mutex threads/condition_variable"
THREAD_OBJ="$THREAD_OBJ threads/process threads/queue threads/mutex threads/condition_variable"
clibs=''
SONAME=''
SONAME_LDFLAGS=''
@ -15756,7 +15756,7 @@ do
cat >>$CONFIG_STATUS <<_ACEOF
# First, check the format of the line:
cat >"\$tmp/defines.sed" <<\\CEOF
/^[ ]*#[ ]*undef[ ][ ]*$ac_word_re[ ]*/b def
/^[ ]*#[ ]*undef[ ][ ]*$ac_word_re[ ]*\$/b def
/^[ ]*#[ ]*define[ ][ ]*$ac_word_re[( ]/b def
b
:def

View file

@ -450,11 +450,18 @@ struct ecl_timeval {
extern void ecl_get_internal_real_time(struct ecl_timeval *time);
extern void ecl_get_internal_run_time(struct ecl_timeval *time);
extern void ecl_wait_for(cl_index iteration, struct ecl_timeval *start);
extern void ecl_musleep(double time, bool alertable);
#define UTC_time_to_universal_time(x) ecl_plus(ecl_make_integer(x),cl_core.Jan1st1970UT)
extern cl_fixnum ecl_runtime(void);
/* threads/mutex.d */
#ifdef ECL_THREADS
extern void ecl_wait_on(cl_object (*condition)(cl_object), cl_object o);
extern void ecl_wakeup_waiters(cl_object o, bool all);
#endif
/* threads/rwlock.d */
#ifdef ECL_RWLOCK

View file

@ -895,12 +895,14 @@ struct ecl_process {
cl_object exit_lock;
int trap_fpe_bits;
cl_object exit_values;
cl_object waiting_for;
};
struct ecl_lock {
HEADER1(recursive);
cl_object name;
cl_object owner; /* thread holding the lock or NIL */
cl_object waiter;
cl_fixnum counter;
};