From a20b8b9218d325f748bd4d641fb5516ec4aafae3 Mon Sep 17 00:00:00 2001 From: Juan Jose Garcia Ripoll Date: Sun, 15 Apr 2012 11:09:42 +0200 Subject: [PATCH] ecl_wakeup_waiters now take an argument with the number of processes to awake --- src/c/threads/barrier.d | 8 +++++--- src/c/threads/condition_variable.d | 8 ++++---- src/c/threads/mutex.d | 2 +- src/c/threads/queue.d | 9 ++++----- src/c/threads/semaphore.d | 6 +++--- src/h/internal.h | 2 +- src/h/object.h | 8 ++++---- 7 files changed, 22 insertions(+), 21 deletions(-) diff --git a/src/c/threads/barrier.d b/src/c/threads/barrier.d index cf6174e43..bb9c4e37e 100644 --- a/src/c/threads/barrier.d +++ b/src/c/threads/barrier.d @@ -84,8 +84,8 @@ mp_barrier_arrivers_count(cl_object barrier) } @(defun mp::barrier-unblock (barrier &key reset_count disable kill_waiting) - int ping_flags = ECL_WAKEUP_ALL | ECL_WAKEUP_RESET_FLAG; - int kill_flags = ECL_WAKEUP_ALL | ECL_WAKEUP_RESET_FLAG | ECL_WAKEUP_KILL; + int ping_flags = ECL_WAKEUP_RESET_FLAG; + int kill_flags = ECL_WAKEUP_RESET_FLAG | ECL_WAKEUP_KILL; @ unlikely_if (type_of(barrier) != t_barrier) { FEerror_not_a_barrier(barrier); @@ -96,7 +96,9 @@ mp_barrier_arrivers_count(cl_object barrier) barrier->barrier.arrivers_count = -1; else barrier->barrier.arrivers_count = barrier->barrier.count; - ecl_wakeup_waiters(the_env, barrier, Null(kill_waiting)? ping_flags : kill_flags); + ecl_wakeup_waiters(the_env, barrier, + Null(kill_waiting)? ping_flags : kill_flags, + ECL_WAKEUP_ALL); @(return) @) diff --git a/src/c/threads/condition_variable.d b/src/c/threads/condition_variable.d index efec08546..4698659f1 100644 --- a/src/c/threads/condition_variable.d +++ b/src/c/threads/condition_variable.d @@ -87,15 +87,15 @@ mp_condition_variable_timedwait(cl_object cv, cl_object lock, cl_object seconds) cl_object mp_condition_variable_signal(cl_object cv) { - ecl_wakeup_waiters(ecl_process_env(), cv, - ECL_WAKEUP_ONE | ECL_WAKEUP_RESET_FLAG); + ecl_wakeup_waiters(ecl_process_env(), cv, ECL_WAKEUP_RESET_FLAG, + ECL_WAKEUP_ONE); @(return Ct) } cl_object mp_condition_variable_broadcast(cl_object cv) { - ecl_wakeup_waiters(ecl_process_env(), cv, - ECL_WAKEUP_ALL | ECL_WAKEUP_RESET_FLAG); + ecl_wakeup_waiters(ecl_process_env(), cv, ECL_WAKEUP_RESET_FLAG, + ECL_WAKEUP_ALL); @(return Ct) } diff --git a/src/c/threads/mutex.d b/src/c/threads/mutex.d index a8dcb2dc5..5209220cd 100644 --- a/src/c/threads/mutex.d +++ b/src/c/threads/mutex.d @@ -113,7 +113,7 @@ mp_giveup_lock(cl_object lock) if (--lock->lock.counter == 0) { lock->lock.owner = Cnil; print_lock("releasing %p\t", lock, lock); - ecl_wakeup_waiters(env, lock, ECL_WAKEUP_ONE); + ecl_wakeup_waiters(env, lock, 0, ECL_WAKEUP_ONE); } else { print_lock("released %p\t", lock, lock); } diff --git a/src/c/threads/queue.d b/src/c/threads/queue.d index 825d362f6..8bff36e28 100644 --- a/src/c/threads/queue.d +++ b/src/c/threads/queue.d @@ -278,7 +278,7 @@ ecl_wait_on(cl_env_ptr env, cl_object (*condition)(cl_env_ptr, cl_object), cl_ob * first processes are always allowed to check for the * condition */ if (Null(output) && (firstone == record)) { - ecl_wakeup_waiters(the_env, o, 0); + ecl_wakeup_waiters(the_env, o, 0, ECL_WAKEUP_ONE); } /* 7) Restoring signals is done last, to ensure that @@ -292,7 +292,7 @@ ecl_wait_on(cl_env_ptr env, cl_object (*condition)(cl_env_ptr, cl_object), cl_ob } void -ecl_wakeup_waiters(cl_env_ptr the_env, cl_object q, int flags) +ecl_wakeup_waiters(cl_env_ptr the_env, cl_object q, int flags, cl_fixnum n) { ecl_disable_interrupts_env(the_env); ecl_get_spinlock(the_env, &q->queue.spinlock); @@ -303,7 +303,7 @@ ecl_wakeup_waiters(cl_env_ptr the_env, cl_object q, int flags) * because of the UNWIND-PROTECT in ecl_wait_on(), but * sometimes shit happens */ cl_object *tail, l; - for (tail = &q->queue.list; (l = *tail) != Cnil; ) { + for (tail = &q->queue.list; n && ((l = *tail) != Cnil); ) { cl_object p = ECL_CONS_CAR(l); if (p->process.phase == ECL_PROCESS_INACTIVE || p->process.phase == ECL_PROCESS_EXITING) { @@ -319,9 +319,8 @@ ecl_wakeup_waiters(cl_env_ptr the_env, cl_object q, int flags) mp_process_kill(p); else ecl_interrupt_process(p, Cnil); - if ((flags & ECL_WAKEUP_ALL) == 0) - break; tail = &ECL_CONS_CDR(l); + --n; } } } diff --git a/src/c/threads/semaphore.d b/src/c/threads/semaphore.d index 848754bf0..7233539fd 100644 --- a/src/c/threads/semaphore.d +++ b/src/c/threads/semaphore.d @@ -83,9 +83,9 @@ mp_semaphore_wait_count(cl_object semaphore) unlikely_if (type_of(semaphore) != t_semaphore) { FEerror_not_a_semaphore(semaphore); } - AO_fetch_and_add((AO_t*)&semaphore->semaphore.counter, 1); - while (n-- && semaphore->semaphore.queue_list != Cnil) { - ecl_wakeup_waiters(env, semaphore, ECL_WAKEUP_ONE); + AO_fetch_and_add((AO_t*)&semaphore->semaphore.counter, n); + if (semaphore->semaphore.queue_list != Cnil) { + ecl_wakeup_waiters(env, semaphore, 0, n); } @(return) } diff --git a/src/h/internal.h b/src/h/internal.h index 76063174d..efb36a2c9 100644 --- a/src/h/internal.h +++ b/src/h/internal.h @@ -480,7 +480,7 @@ extern void print_lock(char *s, cl_object lock, ...); extern void ecl_get_spinlock(cl_env_ptr env, cl_object *lock); extern void ecl_giveup_spinlock(cl_object *lock); extern cl_object ecl_wait_on(cl_env_ptr env, cl_object (*condition)(cl_env_ptr, cl_object), cl_object o); -extern void ecl_wakeup_waiters(cl_env_ptr the_env, cl_object o, bool all); +extern void ecl_wakeup_waiters(cl_env_ptr the_env, cl_object o, int flags, cl_fixnum n); #endif /* threads/rwlock.d */ diff --git a/src/h/object.h b/src/h/object.h index 9db0eee1d..024931231 100644 --- a/src/h/object.h +++ b/src/h/object.h @@ -899,10 +899,10 @@ struct ecl_process { int trap_fpe_bits; }; -#define ECL_WAKEUP_ONE 0 -#define ECL_WAKEUP_ALL 1 -#define ECL_WAKEUP_RESET_FLAG 2 -#define ECL_WAKEUP_KILL 4 +#define ECL_WAKEUP_ONE 1 +#define ECL_WAKEUP_ALL MOST_POSITIVE_FIXNUM +#define ECL_WAKEUP_RESET_FLAG 1 +#define ECL_WAKEUP_KILL 2 struct ecl_queue { HEADER;