From a5eb54dd27d7a2ef5339864547d208e257edf934 Mon Sep 17 00:00:00 2001 From: Juan Jose Garcia Ripoll Date: Fri, 6 Apr 2012 00:18:04 +0200 Subject: [PATCH] Implemented mp:barrier and fixed unixsys.d to use spinlocks and not real locks. --- src/aclocal.m4 | 2 +- src/c/alloc_2.d | 48 ++++++------ src/c/symbols_list.h | 14 +++- src/c/symbols_list2.h | 12 +++ src/c/threads/barrier.d | 163 ++++++++++++++++++++++++++++++++++++++++ src/c/threads/process.d | 1 - src/c/threads/queue.d | 11 +-- src/c/typespec.d | 4 +- src/c/unixint.d | 2 +- src/c/unixsys.d | 60 +++++++-------- src/clos/builtin.lsp | 3 +- src/configure | 4 +- src/h/external.h | 10 +++ src/h/internal.h | 2 +- src/h/object.h | 16 +++- src/lsp/predlib.lsp | 3 + 16 files changed, 285 insertions(+), 70 deletions(-) create mode 100644 src/c/threads/barrier.d diff --git a/src/aclocal.m4 b/src/aclocal.m4 index b5de79c4f..3857ea520 100644 --- a/src/aclocal.m4 +++ b/src/aclocal.m4 @@ -248,7 +248,7 @@ THREAD_CFLAGS='' THREAD_LIBS='' THREAD_GC_FLAGS='--enable-threads=posix' INSTALL_TARGET='install' -THREAD_OBJ="$THREAD_OBJ threads/process threads/queue threads/mutex threads/condition_variable threads/semaphore" +THREAD_OBJ="$THREAD_OBJ threads/process threads/queue threads/mutex threads/condition_variable threads/semaphore threads/barrier" clibs='' SONAME='' SONAME_LDFLAGS='' diff --git a/src/c/alloc_2.d b/src/c/alloc_2.d index fb90cede2..c652619b9 100755 --- a/src/c/alloc_2.d +++ b/src/c/alloc_2.d @@ -469,6 +469,16 @@ cl_object_mark_proc(void *addr, struct GC_ms_entry *msp, struct GC_ms_entry *msl MAYBE_MARK(o->rwlock.mutex); break; # endif + case t_semaphore: + MAYBE_MARK(o->semaphore.queue_list); + MAYBE_MARK(o->semaphore.queue_spinlock); + MAYBE_MARK(o->semaphore.name); + break; + case t_barrier: + MAYBE_MARK(o->barrier.queue_list); + MAYBE_MARK(o->barrier.queue_spinlock); + MAYBE_MARK(o->barrier.name); + break; # endif case t_codeblock: MAYBE_MARK(o->cblock.source); @@ -574,9 +584,8 @@ ecl_alloc_object(cl_type t) case t_lock: case t_rwlock: case t_condition_variable: -#endif -#ifdef ECL_SEMAPHORES - case t_semaphores: + case t_semaphore: + case t_barrier: #endif case t_foreign: case t_codeblock: { @@ -850,9 +859,8 @@ init_alloc(void) init_tm(t_rwlock, "LOCK", sizeof(struct ecl_rwlock), 0); init_tm(t_condition_variable, "CONDITION-VARIABLE", sizeof(struct ecl_condition_variable), 0); -# ifdef ECL_SEMAPHORES - init_tm(t_semaphore, "SEMAPHORES", sizeof(struct ecl_semaphores), 0); -# endif + init_tm(t_semaphore, "SEMAPHORES", sizeof(struct ecl_semaphore), 0); + init_tm(t_barrier, "BARRIER", sizeof(struct ecl_barrier), 0); #endif init_tm(t_codeblock, "CODEBLOCK", sizeof(struct ecl_codeblock), -1); init_tm(t_foreign, "FOREIGN", sizeof(struct ecl_foreign), 2); @@ -1001,9 +1009,14 @@ init_alloc(void) to_bitmap(&o, &(o.condition_variable.lock)) | to_bitmap(&o, &(o.condition_variable.queue_list)) | to_bitmap(&o, &(o.condition_variable.queue_spinlock)); -# ifdef ECL_SEMAPHORES - type_info[t_semaphore].descriptor = 0; -# endif + type_info[t_semaphore].descriptor = + to_bitmap(&o, &(o.semaphore.name)) | + to_bitmap(&o, &(o.semaphore.queue_list)) | + to_bitmap(&o, &(o.semaphore.queue_spinlock)); + type_info[t_barrier].descriptor = + to_bitmap(&o, &(o.barrier.name)) | + to_bitmap(&o, &(o.barrier.queue_list)) | + to_bitmap(&o, &(o.barrier.queue_spinlock)); # endif type_info[t_codeblock].descriptor = to_bitmap(&o, &(o.cblock.data)) | @@ -1076,7 +1089,7 @@ standard_finalizer(cl_object o) GC_unregister_disappearing_link((void**)&(o->weak.value)); break; #ifdef ECL_THREADS -#ifdef ECL_RWLOCK +# ifdef ECL_RWLOCK case t_rwlock: { const cl_env_ptr the_env = ecl_process_env(); ecl_disable_interrupts_env(the_env); @@ -1084,23 +1097,12 @@ standard_finalizer(cl_object o) ecl_enable_interrupts_env(the_env); break; } -#endif -#endif -#ifdef ECL_SEMAPHORES - case t_semaphore: { - const cl_env_ptr the_env = ecl_process_env(); - ecl_disable_interrupts_env(the_env); - mp_semaphore_close(o); - ecl_enable_interrupts_env(the_env); - break; - } -#endif -#ifdef ECL_THREADS +# endif case t_symbol: { ecl_atomic_push(&cl_core.reused_indices, MAKE_FIXNUM(o->symbol.binding)); } -#endif +#endif /* ECL_THREADS */ default:; } } diff --git a/src/c/symbols_list.h b/src/c/symbols_list.h index 811d418c2..fb12360a6 100755 --- a/src/c/symbols_list.h +++ b/src/c/symbols_list.h @@ -1229,7 +1229,7 @@ cl_symbols[] = { {SYS_ "REPLACE-ARRAY", SI_ORDINARY, si_replace_array, 2, OBJNULL}, {SYS_ "ROW-MAJOR-ASET", SI_ORDINARY, si_row_major_aset, 3, OBJNULL}, {EXT_ "RUN-PROGRAM", EXT_ORDINARY, si_run_program, -1, OBJNULL}, -{SYS_ "WAIT-FOR-ALL-PROCESSES", SI_ORDINARY, si_wait_for_all_processes, -1, OBJNULL}, +{SYS_ "WAIT-FOR-ALL-PROCESSES", SI_ORDINARY, si_wait_for_all_processes, 0, OBJNULL}, {EXT_ "SAFE-EVAL", EXT_ORDINARY, si_safe_eval, -1, OBJNULL}, {SYS_ "SCH-FRS-BASE", SI_ORDINARY, si_sch_frs_base, 2, OBJNULL}, {SYS_ "SCHAR-SET", SI_ORDINARY, si_char_set, 3, OBJNULL}, @@ -1611,6 +1611,18 @@ cl_symbols[] = { {MP_ "SEMAPHORE-NAME", MP_ORDINARY, IF_MP(mp_semaphore_name), 1, OBJNULL}, {MP_ "SEMAPHORE-WAIT-COUNT", MP_ORDINARY, IF_MP(mp_semaphore_wait_count), 1, OBJNULL}, {KEY_ "COUNT", KEYWORD, NULL, -1, OBJNULL}, + +{MP_ "BARRIER", MP_ORDINARY, NULL, -1, OBJNULL}, +{MP_ "MAKE-BARRIER", MP_ORDINARY, IF_MP(mp_make_barrier), -1, OBJNULL}, +{MP_ "BARRIER-UNBLOCK", MP_ORDINARY, IF_MP(mp_barrier_unblock), -1, OBJNULL}, +{MP_ "BARRIER-WAIT", MP_ORDINARY, IF_MP(mp_barrier_wait), -1, OBJNULL}, +{MP_ "BARRIER-COUNT", MP_ORDINARY, IF_MP(mp_barrier_count), 1, OBJNULL}, +{MP_ "BARRIER-NAME", MP_ORDINARY, IF_MP(mp_barrier_name), 1, OBJNULL}, +{MP_ "BARRIER-ARRIVERS-COUNT", MP_ORDINARY, IF_MP(mp_barrier_arrivers_count), 1, OBJNULL}, +{KEY_ "DISABLE", KEYWORD, NULL, -1, OBJNULL}, +{KEY_ "RESET-COUNT", KEYWORD, NULL, -1, OBJNULL}, +{KEY_ "KILL-WAITING", KEYWORD, NULL, -1, OBJNULL}, +{KEY_ "UNBLOCKED", KEYWORD, NULL, -1, OBJNULL}, /* #endif defined(ECL_THREADS) */ {SYS_ "WHILE", SI_ORDINARY, NULL, -1, OBJNULL}, diff --git a/src/c/symbols_list2.h b/src/c/symbols_list2.h index e8eb1e1cc..a0959c7cb 100644 --- a/src/c/symbols_list2.h +++ b/src/c/symbols_list2.h @@ -1611,6 +1611,18 @@ cl_symbols[] = { {MP_ "SEMAPHORE-NAME",IF_MP("mp_semaphore_name")}, {MP_ "SEMAPHORE-WAIT-COUNT",IF_MP("mp_semaphore_wait_count")}, {KEY_ "COUNT",NULL}, + +{MP_ "BARRIER",NULL}, +{MP_ "MAKE-BARRIER",IF_MP("mp_make_barrier")}, +{MP_ "BARRIER-UNBLOCK",IF_MP("mp_barrier_unblock")}, +{MP_ "BARRIER-WAIT",IF_MP("mp_barrier_wait")}, +{MP_ "BARRIER-COUNT",IF_MP("mp_barrier_count")}, +{MP_ "BARRIER-NAME",IF_MP("mp_barrier_name")}, +{MP_ "BARRIER-ARRIVERS-COUNT",IF_MP("mp_barrier_arrivers_count")}, +{KEY_ "DISABLE",NULL}, +{KEY_ "RESET-COUNT",NULL}, +{KEY_ "KILL-WAITING",NULL}, +{KEY_ "UNBLOCKED",NULL}, /* #endif defined(ECL_THREADS) */ {SYS_ "WHILE",NULL}, diff --git a/src/c/threads/barrier.d b/src/c/threads/barrier.d new file mode 100644 index 000000000..c227b3ce1 --- /dev/null +++ b/src/c/threads/barrier.d @@ -0,0 +1,163 @@ +/* -*- mode: c; c-basic-offset: 8 -*- */ +/* + barrier.d -- wait barriers +*/ +/* + Copyright (c) 2012, Juan Jose Garcia Ripoll. + + ECL is free software; you can redistribute it and/or + modify it under the terms of the GNU Library General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + See file '../Copyright' for full details. +*/ + +#define AO_ASSUME_WINDOWS98 /* We need this for CAS */ +#include +#include + +#if !defined(AO_HAVE_fetch_and_add_full) +#error "Cannot implement barriers without AO_fetch_and_add_full" +#endif + +static ECL_INLINE void +FEerror_not_a_barrier(cl_object barrier) +{ + FEwrong_type_argument(@'mp::barrier', barrier); +} + +cl_object +ecl_make_barrier(cl_object name, cl_index count) +{ + cl_object output = ecl_alloc_object(t_barrier); + output->barrier.name = name; + output->barrier.arrivers_count = count; + output->barrier.count = count; + output->barrier.queue_list = Cnil; + output->barrier.queue_spinlock = Cnil; + return output; +} + +@(defun mp::make-barrier (count &key name) +@ + if (count == Ct) + count = MAKE_FIXNUM(MOST_POSITIVE_FIXNUM); + @(return ecl_make_barrier(name, fixnnint(count))) +@) + +cl_object +mp_barrier_name(cl_object barrier) +{ + cl_env_ptr env = ecl_process_env(); + unlikely_if (type_of(barrier) != t_barrier) { + FEerror_not_a_barrier(barrier); + } + ecl_return1(env, barrier->barrier.name); +} + +cl_object +mp_barrier_count(cl_object barrier) +{ + cl_env_ptr env = ecl_process_env(); + unlikely_if (type_of(barrier) != t_barrier) { + FEerror_not_a_barrier(barrier); + } + ecl_return1(env, MAKE_FIXNUM(barrier->barrier.count)); +} + +cl_object +mp_barrier_arrivers_count(cl_object barrier) +{ + cl_fixnum arrivers, count; + cl_env_ptr env = ecl_process_env(); + unlikely_if (type_of(barrier) != t_barrier) { + FEerror_not_a_barrier(barrier); + } + arrivers = barrier->barrier.arrivers_count; + count = barrier->barrier.count; + if (arrivers < 0) + arrivers = 0; /* Disabled barrier */ + else + arrivers = count - arrivers; + ecl_return1(env, MAKE_FIXNUM(arrivers)); +} + +@(defun mp::barrier-unblock (barrier &key reset_count disable kill_waiting) + int ping_flags = ECL_WAKEUP_ALL | ECL_WAKEUP_RESET_FLAG; + int kill_flags = ECL_WAKEUP_ALL | ECL_WAKEUP_RESET_FLAG | ECL_WAKEUP_KILL; +@ + unlikely_if (type_of(barrier) != t_barrier) { + FEerror_not_a_barrier(barrier); + } + if (!Null(reset_count)) + barrier->barrier.count = fixnnint(reset_count); + if (!Null(disable)) + barrier->barrier.arrivers_count = -1; + else + barrier->barrier.arrivers_count = barrier->barrier.count; + ecl_wakeup_waiters(the_env, barrier, Null(kill_waiting)? ping_flags : kill_flags); + @(return) +@) + +static cl_object +barrier_wait_condition(cl_env_ptr env, cl_object barrier) +{ + cl_object output; + if (env->own_process->process.waiting_for != barrier) + return Ct; + else + return Cnil; +} + +static cl_fixnum +decrement_counter(cl_fixnum *counter) +{ + /* The logic is as follows: + * - If the counter is negative, we abort. This is a way of + * disabling the counter. + * - Otherwise, we decrease the counter only if it is positive + * - If the counter is currently zero, then we block. This + * situation implies that some other thread is unblocking. + */ + cl_fixnum c; + do { + c = *counter; + if (c < 0) { + return c; + } else if (c > 0) { + if (AO_compare_and_swap_full((AO_t*)counter, + (AO_t)c, (AO_t)(c-1))) + return c; + } + } while (1); +} + +@(defun mp::barrier-wait (barrier &key) + cl_object output; + cl_fixnum counter; +@ +{ + cl_object own_process = the_env->own_process; + + unlikely_if (type_of(barrier) != t_barrier) { + FEerror_not_a_barrier(barrier); + } + ecl_disable_interrupts_env(the_env); + own_process->process.waiting_for = barrier; + if (decrement_counter(&barrier->barrier.arrivers_count) == 0) { + /* There are (count-1) threads in the queue and we + * are the last one. We thus unblock all threads and + * proceed. */ + own_process->process.waiting_for = Cnil; + mp_barrier_unblock(1, barrier); + ecl_enable_interrupts_env(the_env); + output = @':unblocked'; + } else { + ecl_enable_interrupts_env(the_env); + ecl_wait_on(the_env, barrier_wait_condition, barrier); + output = Ct; + } + @(return output) +} +@) diff --git a/src/c/threads/process.d b/src/c/threads/process.d index 8e4992af2..2e67fa428 100644 --- a/src/c/threads/process.d +++ b/src/c/threads/process.d @@ -736,7 +736,6 @@ init_threads(cl_env_ptr env) v->vector.fillp = 1; cl_core.processes = v; cl_core.global_lock = ecl_make_lock(@'mp::global-lock', 1); - cl_core.external_processes_lock = ecl_make_lock(@'ext::run-program', 1); cl_core.error_lock = ecl_make_lock(@'mp::error-lock', 1); cl_core.global_env_lock = ecl_make_rwlock(@'ext::package-lock'); } diff --git a/src/c/threads/queue.d b/src/c/threads/queue.d index e1e9a280d..1ed657571 100644 --- a/src/c/threads/queue.d +++ b/src/c/threads/queue.d @@ -27,7 +27,7 @@ void ECL_INLINE ecl_process_yield() { -#if defined(HAVE_SCHED_YIELD) +#if defined(HAVE_SCHED_H) sched_yield(); #elif defined(ECL_WINDOWS_THREADS) Sleep(0); @@ -49,7 +49,7 @@ ecl_get_spinlock(cl_env_ptr the_env, cl_object *lock) void ECL_INLINE ecl_giveup_spinlock(cl_object *lock) { - *lock = Cnil; + AO_store((AO_t*)lock, (AO_t)Cnil); } static ECL_INLINE void @@ -158,7 +158,6 @@ ecl_wait_on_timed(cl_env_ptr env, cl_object (*condition)(cl_env_ptr, cl_object), /* 2) Now we add ourselves to the queue. In order to * avoid a call to the GC, we try to reuse records. */ wait_queue_nconc(the_env, o, record); - own_process->process.waiting_for = o; ecl_bds_bind(the_env, @'ext::*interrupts-enabled*', Ct); ecl_check_pending_interrupts(the_env); @@ -215,7 +214,6 @@ ecl_wait_on(cl_env_ptr env, cl_object (*condition)(cl_env_ptr, cl_object), cl_ob /* 2) Now we add ourselves to the queue. In order to avoid a * call to the GC, we try to reuse records. */ wait_queue_nconc(the_env, o, record); - own_process->process.waiting_for = o; CL_UNWIND_PROTECT_BEGIN(the_env) { /* 3) At this point we may receive signals, but we @@ -271,7 +269,10 @@ wakeup_this(cl_object p, int flags) if (flags & ECL_WAKEUP_RESET_FLAG) p->process.waiting_for = Cnil; print_lock("awaking\t\t%d", Cnil, fix(p->process.name)); - ecl_interrupt_process(p, Cnil); + if (flags & ECL_WAKEUP_KILL) + mp_process_kill(p); + else + ecl_interrupt_process(p, Cnil); } static void diff --git a/src/c/typespec.d b/src/c/typespec.d index 8d1f834a4..3152f8676 100644 --- a/src/c/typespec.d +++ b/src/c/typespec.d @@ -163,10 +163,10 @@ ecl_type_to_symbol(cl_type t) return @'mp::lock'; case t_condition_variable: return @'mp::condition-variable'; -#endif -#ifdef ECL_SEMAPHORES case t_semaphore: return @'mp::semaphore'; + case t_barrier: + return @'mp::barrier'; #endif case t_codeblock: return @'si::code-block'; diff --git a/src/c/unixint.d b/src/c/unixint.d index 7864c5f31..36b79d83e 100644 --- a/src/c/unixint.d +++ b/src/c/unixint.d @@ -1040,7 +1040,7 @@ asynchronous_signal_servicing_thread() #endif #ifdef SIGCHLD if (signo == SIGCHLD) { - si_wait_for_all_processes(0); + si_wait_for_all_processes(); continue; } #endif diff --git a/src/c/unixsys.d b/src/c/unixsys.d index cb818af2b..5088e8412 100755 --- a/src/c/unixsys.d +++ b/src/c/unixsys.d @@ -182,10 +182,12 @@ add_external_process(cl_env_ptr env, cl_object process) { cl_object l = ecl_list1(process); ecl_disable_interrupts_env(env); - ECL_WITH_LOCK_BEGIN(env, cl_core.external_processes_lock) { + ecl_get_spinlock(env, &cl_core.external_processes_lock); + { ECL_RPLACD(l, cl_core.external_processes); cl_core.external_processes = l; - } ECL_WITH_LOCK_END; + } + ecl_giveup_spinlock(&cl_core.external_processes_lock); ecl_enable_interrupts_env(env); } @@ -193,24 +195,34 @@ static void remove_external_process(cl_env_ptr env, cl_object process) { ecl_disable_interrupts_env(env); - ECL_WITH_LOCK_BEGIN(env, cl_core.external_processes_lock) { + ecl_get_spinlock(env, &cl_core.external_processes_lock); + { cl_core.external_processes = ecl_delete_eq(process, cl_core.external_processes); - } ECL_WITH_LOCK_END; + } + ecl_giveup_spinlock(&cl_core.external_processes_lock); ecl_enable_interrupts_env(env); } static cl_object -find_external_process(cl_object pid) +find_external_process(cl_env_ptr env, cl_object pid) { - cl_object p; - for (p = cl_core.external_processes; p != Cnil; p = ECL_CONS_CDR(p)) { - cl_object process = ECL_CONS_CAR(p); - if (external_process_pid(process) == pid) { - return process; - } - } - return Cnil; + cl_object output = Cnil; + ecl_disable_interrupts_env(env); + ecl_get_spinlock(env, &cl_core.external_processes_lock); + { + cl_object p; + for (p = cl_core.external_processes; p != Cnil; p = ECL_CONS_CDR(p)) { + cl_object process = ECL_CONS_CAR(p); + if (external_process_pid(process) == pid) { + output = process; + break; + } + } + } + ecl_giveup_spinlock(&cl_core.external_processes_lock); + ecl_enable_interrupts_env(env); + return output; } #else #define add_external_process(env,p) @@ -277,20 +289,11 @@ ecl_waitpid(cl_object pid, cl_object wait) @(return status code pid) } -@(defun si::wait-for-all-processes (&optional unsafep) -@ +cl_object +si_wait_for_all_processes() { #if defined(SIGCHLD) && !defined(ECL_WINDOWS_HOST) const cl_env_ptr env = ecl_process_env(); -# ifdef ECL_THREADS - if (Null(unsafep)) { - /* We come from the parallel thread, must lock */ - ECL_WITH_LOCK_BEGIN(env, cl_core.external_processes_lock) { - si_wait_for_all_processes(1, Ct); - } ECL_WITH_LOCK_END(env, cl_core.external_processes_lock); - return; - } -# endif do { cl_object status = ecl_waitpid(MAKE_FIXNUM(-1), Cnil); cl_object code = env->values[1]; @@ -299,13 +302,13 @@ ecl_waitpid(cl_object pid, cl_object wait) if (status != @':abort') break; } else { - cl_object p = find_external_process(pid); + cl_object p = find_external_process(env, pid); if (!Null(p)) { + set_external_process_pid(p, Cnil); update_process_status(p, status, code); } if (status != @':running') { - cl_core.external_processes = - ecl_delete_eq(p, cl_core.external_processes); + remove_external_process(env, p); ecl_delete_eq(p, cl_core.external_processes); } } } while (1); @@ -313,7 +316,6 @@ ecl_waitpid(cl_object pid, cl_object wait) @(return); #endif } -@) #if defined(ECL_MS_WINDOWS_HOST) || defined(cygwin) cl_object @@ -696,7 +698,6 @@ make_windows_handle(HANDLE h) /* We have to protect this, to avoid the signal being delivered or handled * before we set the process pid */ ecl_bds_bind(the_env, @'ext::*interrupts-enabled*', Cnil); - ECL_WITH_LOCK_BEGIN(the_env, cl_core.external_processes_lock) { child_pid = fork(); if (child_pid == 0) { /* Child */ @@ -733,7 +734,6 @@ make_windows_handle(HANDLE h) pid = MAKE_FIXNUM(child_pid); } set_external_process_pid(process, pid); - } ECL_WITH_LOCK_END; ecl_bds_unwind1(the_env); ecl_check_pending_interrupts(the_env); close(child_stdin); diff --git a/src/clos/builtin.lsp b/src/clos/builtin.lsp index d042fa607..3025265bc 100644 --- a/src/clos/builtin.lsp +++ b/src/clos/builtin.lsp @@ -88,7 +88,8 @@ #+threads (mp::lock) #+threads (mp::rwlock) #+threads (mp::condition-variable) - #+semaphores (mp::semaphore) + #+threads (mp::semaphore) + #+threads (mp::barrier) #+sse2 (ext::sse-pack)))) (loop for (name . rest) in '#.+builtin-classes-list+ diff --git a/src/configure b/src/configure index 0351f2398..05e0c58a7 100755 --- a/src/configure +++ b/src/configure @@ -4524,7 +4524,7 @@ THREAD_CFLAGS='' THREAD_LIBS='' THREAD_GC_FLAGS='--enable-threads=posix' INSTALL_TARGET='install' -THREAD_OBJ="$THREAD_OBJ threads/process threads/queue threads/mutex threads/condition_variable threads/semaphore" +THREAD_OBJ="$THREAD_OBJ threads/process threads/queue threads/mutex threads/condition_variable threads/semaphore threads/barrier" clibs='' SONAME='' SONAME_LDFLAGS='' @@ -15683,7 +15683,7 @@ do cat >>$CONFIG_STATUS <<_ACEOF # First, check the format of the line: cat >"\$tmp/defines.sed" <<\\CEOF -/^[ ]*#[ ]*undef[ ][ ]*$ac_word_re[ ]*/b def +/^[ ]*#[ ]*undef[ ][ ]*$ac_word_re[ ]*\$/b def /^[ ]*#[ ]*define[ ][ ]*$ac_word_re[( ]/b def b :def diff --git a/src/h/external.h b/src/h/external.h index babc02294..3b3b6a2e0 100755 --- a/src/h/external.h +++ b/src/h/external.h @@ -1735,6 +1735,16 @@ extern ECL_API cl_object mp_wait_on_semaphore(cl_object); extern ECL_API cl_object mp_signal_semaphore _ARGS((cl_narg, cl_object, ...)); extern ECL_API cl_object ecl_make_semaphore(cl_object name, cl_fixnum count); +/* threads/barrier.d */ + +extern ECL_API cl_object ecl_make_barrier(cl_object name, cl_index count); +extern ECL_API cl_object mp_make_barrier _ARGS((cl_narg, cl_object, ...)); +extern ECL_API cl_object mp_barrier_count(cl_object); +extern ECL_API cl_object mp_barrier_name(cl_object); +extern ECL_API cl_object mp_barrier_arrivers_count(cl_object); +extern ECL_API cl_object mp_barrier_wait _ARGS((cl_narg, cl_object, ...)); +extern ECL_API cl_object mp_barrier_unblock _ARGS((cl_narg, cl_object, ...)); + /* threads/atomic.c */ extern ECL_API cl_object ecl_atomic_get(cl_object *slot); diff --git a/src/h/internal.h b/src/h/internal.h index 0b5209517..d62c93f83 100644 --- a/src/h/internal.h +++ b/src/h/internal.h @@ -493,7 +493,7 @@ extern cl_object mp_get_rwlock_write_wait(cl_object lock); extern void ecl_interrupt_process(cl_object process, cl_object function); /* unixsys.d */ -extern cl_object si_wait_for_all_processes(cl_narg,...); +extern cl_object si_wait_for_all_processes(); /* * Fake several ISO C99 mathematical functions diff --git a/src/h/object.h b/src/h/object.h index c2bae1b5d..94da8b270 100644 --- a/src/h/object.h +++ b/src/h/object.h @@ -82,6 +82,7 @@ typedef enum { t_rwlock, t_condition_variable, t_semaphore, + t_barrier, #endif t_codeblock, t_foreign, @@ -900,9 +901,10 @@ struct ecl_process { #define ECL_WAKEUP_ONE 0 #define ECL_WAKEUP_ALL 1 #define ECL_WAKEUP_RESET_FLAG 2 +#define ECL_WAKEUP_KILL 4 struct ecl_queue { - HEADER1(recursive); + HEADER; cl_object list; cl_object spinlock; }; @@ -915,6 +917,15 @@ struct ecl_semaphore { cl_fixnum counter; }; +struct ecl_barrier { + HEADER; + cl_object queue_list; + cl_object queue_spinlock; + cl_object name; + cl_fixnum count; + cl_fixnum arrivers_count; +}; + struct ecl_lock { HEADER1(recursive); cl_object queue_list; @@ -1046,8 +1057,9 @@ union cl_lispunion { struct ecl_lock lock; /* lock */ struct ecl_rwlock rwlock; /* read/write lock */ struct ecl_condition_variable condition_variable; /* condition-variable */ -#endif struct ecl_semaphore semaphore; /* semaphore */ + struct ecl_barrier barrier; /* barrier */ +#endif struct ecl_codeblock cblock; /* codeblock */ struct ecl_foreign foreign; /* user defined data type */ struct ecl_stack_frame frame; /* stack frame */ diff --git a/src/lsp/predlib.lsp b/src/lsp/predlib.lsp index 59ed9abd7..47a88fb41 100644 --- a/src/lsp/predlib.lsp +++ b/src/lsp/predlib.lsp @@ -1230,6 +1230,9 @@ if not possible." #+threads (MP::PROCESS) #+threads (MP::LOCK) #+threads (MP::RWLOCK) + #+threads (MP::CONDITION-VARIABLE) + #+threads (MP::SEMAPHORE) + #+threads (MP::BARRIER) #+ffi (FOREIGN-DATA) #+sse2 (EXT:SSE-PACK (OR EXT:INT-SSE-PACK EXT:FLOAT-SSE-PACK