From b9cebb0cf7e62901f57c4794bccd484f6abe73f9 Mon Sep 17 00:00:00 2001 From: Juan Jose Garcia Ripoll Date: Thu, 29 Mar 2012 12:45:22 +0200 Subject: [PATCH] print_lock() is now more like printf(). New implementation of the queue. --- src/c/file.d | 2 +- src/c/threads/mutex.d | 3 ++ src/c/threads/queue.d | 94 ++++++++++++++++++++++++++----------------- src/h/internal.h | 2 +- 4 files changed, 63 insertions(+), 38 deletions(-) diff --git a/src/c/file.d b/src/c/file.d index 69a907188..d660cfffd 100755 --- a/src/c/file.d +++ b/src/c/file.d @@ -5392,7 +5392,7 @@ init_file(void) /* We choose C streams by default only when _not_ using threads. * The reason is that C streams block on I/O operations. */ -#ifndef ECL_THREADS +#if 1 /*ndef ECL_THREADS*/ standard_input = ecl_make_stream_from_FILE(make_constant_base_string("stdin"), stdin, smm_input, 8, flags, external_format); standard_output = ecl_make_stream_from_FILE(make_constant_base_string("stdout"), diff --git a/src/c/threads/mutex.d b/src/c/threads/mutex.d index 4987d2692..75143dc20 100644 --- a/src/c/threads/mutex.d +++ b/src/c/threads/mutex.d @@ -116,6 +116,8 @@ mp_giveup_lock(cl_object lock) ecl_return1(env, Ct); } +#define print_lock(a,b,...) (void)0 + static cl_object get_lock_inner(cl_env_ptr env, cl_object lock) { @@ -126,6 +128,7 @@ get_lock_inner(cl_env_ptr env, cl_object lock) (AO_t)Cnil, (AO_t)own_process)) { lock->lock.counter = 1; output = Ct; + print_lock("acquiring\t", lock, lock); } else if (lock->lock.owner == own_process) { unlikely_if (!lock->lock.recursive) { FEerror_not_a_recursive_lock(lock); diff --git a/src/c/threads/queue.d b/src/c/threads/queue.d index 7777efd3f..2122ff0ca 100644 --- a/src/c/threads/queue.d +++ b/src/c/threads/queue.d @@ -17,9 +17,12 @@ #ifdef HAVE_SCHED_H #include #endif +#include #include #include +#define print_lock(a,b,...) (void)0 + void ECL_INLINE ecl_get_spinlock(cl_env_ptr the_env, cl_object *lock) { @@ -169,31 +172,40 @@ ecl_wait_on(cl_object (*condition)(cl_env_ptr, cl_object), cl_object o) { cl_env_ptr the_env = ecl_process_env(); cl_object own_process = the_env->own_process; - cl_object queue = o->lock.waiter; - cl_fixnum iteration = 1; - struct ecl_timeval start; - ecl_get_internal_real_time(&start); -#if 0 - for (iteration = 0; iteration < 10; iteration++) { - if (condition(the_env, o) != Cnil) - return; - } -#endif - ecl_bds_bind(the_env, @'ext::*interrupts-enabled*', Cnil); + sigset_t original, empty; + + /* 1) First we block all signals. */ + sigemptyset(&empty); + pthread_sigmask(SIG_SETMASK, &original, &empty); + CL_UNWIND_PROTECT_BEGIN(the_env) { + /* 2) Now we add ourselves to the queue. */ ecl_atomic_queue_nconc(o->lock.waiter, own_process); own_process->process.waiting_for = o; - ecl_bds_bind(the_env, @'ext::*interrupts-enabled*', Ct); - ecl_check_pending_interrupts(the_env); - do { - ecl_musleep(waiting_time(iteration++, &start), 1); - } while (condition(the_env, o) == Cnil); - ecl_bds_unwind1(the_env); + + /* 3) At this point we may receive signals, but we + * might have missed the wakeup one that happened + * before 1), which is why we start with the check*/ + if (cl_second(o->lock.waiter) != own_process || + condition(the_env, o) == Cnil) { + do { + /* This will wait until we get a signal that + * demands some code being executed. Note that + * this includes our communication signals and + * the signals used by the GC. Note also that + * as a consequence we might throw / return + * which is why need to protect it all with + * UNWIND-PROTECT. */ + sigsuspend(&original); + } while (condition(the_env, o) == Cnil); + } } CL_UNWIND_PROTECT_EXIT { + /* 4) At this point we wrap up. We remove ourselves + from the queue and restore signals, which were */ own_process->process.waiting_for = Cnil; ecl_atomic_queue_delete(o->lock.waiter, own_process); + pthread_sigmask(SIG_SETMASK, NULL, &original); } CL_UNWIND_PROTECT_END; - ecl_bds_unwind1(the_env); } static void @@ -201,6 +213,7 @@ wakeup_this(cl_object p, int flags) { if (flags & ECL_WAKEUP_RESET_FLAG) p->process.waiting_for = Cnil; + print_lock("awaking\t\t%d", Cnil, fix(p->process.name)); ecl_interrupt_process(p, Cnil); } @@ -231,28 +244,11 @@ wakeup_one(cl_object waiter, int flags) } while (1); } -void -print_lock(char *prefix, cl_object l, cl_object x) -{ - static cl_object lock = Cnil; - if (l == Cnil || l->lock.name == MAKE_FIXNUM(0)) { - cl_env_ptr env = ecl_process_env(); - ecl_get_spinlock(env, &lock); - cl_terpri(0); - ecl_princ_str(prefix, cl_core.standard_output); - cl_princ(1,env->own_process->process.name); - ecl_princ_str("\t", cl_core.standard_output); - cl_princ(1,(x)); cl_force_output(0); - fflush(stdout); - ecl_giveup_spinlock(&lock); - } -} -/*#define print_lock(a,b,c) (void)0*/ - void ecl_wakeup_waiters(cl_object o, int flags) { cl_object waiter = o->lock.waiter; + print_lock("releasing\t", o); if (ECL_CONS_CDR(waiter) != Cnil) { if (flags & ECL_WAKEUP_ALL) { wakeup_all(waiter, flags); @@ -262,3 +258,29 @@ ecl_wakeup_waiters(cl_object o, int flags) } sched_yield(); } + +#undef print_lock + +void +print_lock(char *prefix, cl_object l, ...) +{ + static cl_object lock = Cnil; + va_list args; + va_start(args, lock); + if (l == Cnil || l->lock.name == MAKE_FIXNUM(0)) { + cl_env_ptr env = ecl_process_env(); + ecl_get_spinlock(env, &lock); + printf("\n%d\t", fix(env->own_process->process.name)); + vprintf(prefix, args); + if (l != Cnil) { + cl_object p = ECL_CONS_CDR(l->lock.waiter); + while (p != Cnil) { + printf(" %d", fix(ECL_CONS_CAR(p)->process.name)); + p = ECL_CONS_CDR(p); + } + } + fflush(stdout); + ecl_giveup_spinlock(&lock); + } +} +/*#define print_lock(a,b,c) (void)0*/ diff --git a/src/h/internal.h b/src/h/internal.h index 44f7ad840..ac1fe216a 100644 --- a/src/h/internal.h +++ b/src/h/internal.h @@ -463,7 +463,7 @@ extern cl_fixnum ecl_runtime(void); /* threads/mutex.d */ #ifdef ECL_THREADS -extern void print_lock(char *s, cl_object l, cl_object x); +extern void print_lock(char *s, cl_object lock, ...); extern void ecl_get_spinlock(cl_env_ptr env, cl_object *lock); extern void ecl_giveup_spinlock(cl_object *lock);