ecl/contrib/thread.patch
2001-06-26 17:14:44 +00:00

2062 lines
52 KiB
Diff
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

--- src/c/error.c Mon Jun 24 04:19:09 1996
+++ zsrc/c/error.c Mon Jul 22 18:13:48 1996
@@ -103,10 +103,44 @@
object siSterminal_interrupt;
+/* This gets _hard_ in threaded systems... */
+/* remembering that we may be in any thread when we get this call... */
+/* we may also _not_ be in a thread. Fortunately we can tell which */
+/* thread we are in by examining 'active'. */
+/* First determine where we are, if we are scheduled, or descheduled */
+/* if descheduled, then we need to be rescheduled... */
+/* then we can */
+
+#ifdef THREADS
+static bool ti_corr = 0;
+extern void *override_redirect_fun;
+extern pd *override_redirect_process;
+extern pd main_pd;
+
+terminal_interrupt2()
+{
+ funcall(2, siSterminal_interrupt, ti_corr? Ct : Cnil);
+}
+
+terminal_interrupt(bool correctable)
+{
+ ti_corr = correctable;
+
+ start_critical_section();
+
+ override_redirect_process = &main_pd;
+ override_redirect_fun = terminal_interrupt2;
+
+ force_resumption(&main_pd);
+ end_critical_section();
+}
+
+#else
terminal_interrupt(bool correctable)
{
funcall(2, siSterminal_interrupt, correctable? Ct : Cnil);
}
+#endif /* THREADS */
object
ihs_function_name(object x)
diff --recursive --unified=3 src/c/file.d zsrc/c/file.d
--- src/c/file.d Tue Mar 12 20:38:01 1996
+++ zsrc/c/file.d Wed Jul 17 22:16:17 1996
@@ -22,6 +22,11 @@
#include "config.h"
+#ifdef THREADS
+# include <fcntl.h>
+#endif
+
+
#if defined(BSD) && !defined(MSDOS)
#include <sys/ioctl.h>
#endif
@@ -347,6 +352,9 @@
x->sm.sm_object0 = Sstring_char;
x->sm.sm_object1 = fn;
x->sm.sm_int0 = x->sm.sm_int1 = 0;
+#ifdef THREADS
+ fcntl(fileno(fp), F_SETFL, O_NONBLOCK);
+#endif
setbuf(fp, alloc_contblock(BUFSIZ));
return(x);
}
@@ -509,6 +517,11 @@
#ifdef TK
bool no_input = FALSE;
+#ifdef THREADS
+# define PUTC(c, fp) lwpputc(c, fp)
+#else
+# define PUTC(c, fp) putc(c, fp)
+#endif
StdinEnableEvents()
{
@@ -521,11 +534,24 @@
}
# define GETC(c, fp) { if (fp == stdin) \
while (no_input) Tk_DoOneEvent(0); \
+#ifdef THREADS
+ c = lwpgetc(fp); \
+#else
c = getc(fp); \
+#endif /* THREADS */
no_input = !FILE_CNT(fp); }
# define UNGETC(c, fp) { if (fp == stdin) no_input = FALSE; ungetc(c, fp); }
#else
+#ifdef THREADS
+# define PUTC(c, fp) lwpputc(c, fp)
+#else
+# define PUTC(c, fp) putc(c, fp)
+#endif
+#ifdef THREADS
+# define GETC(c, fp) c = lwpgetc(fp)
+#else
# define GETC(c, fp) c = getc(fp)
+#endif /* THREADS */
# define UNGETC(c, fp) ungetc(c, fp)
#endif
@@ -544,8 +570,11 @@
if (fp == NULL)
closed_stream(strm);
GETC(c, fp);
-/* c &= 0377; */
- if (feof(fp))
+/* c &= 0377; */
+/* if (feof(fp)) */
+ /*c &= 0x7f;
+ printf("<%d:%c>", c, c); fflush(stdout);*/
+ if (c == EOF)
end_of_stream(strm);
/* strm->sm.sm_int0++; useless in smm_io, Beppe */
return(c);
@@ -612,6 +641,7 @@
if (fp == NULL)
closed_stream(strm);
UNGETC(c, fp);
+ /* c &= 0x7f; /* hmm? */
/* --strm->sm.sm_int0; useless in smm_io, Beppe */
break;
@@ -678,7 +708,7 @@
strm->sm.sm_int1++;
if (fp == NULL)
closed_stream(strm);
- putc(c, fp);
+ PUTC(c, fp);
break;
case smm_synonym:
@@ -921,7 +951,8 @@
if (fp == NULL)
closed_stream(strm);
GETC(c, fp);
- if (feof(fp))
+/* if (feof(fp)) */
+ if (c == EOF)
return(TRUE);
else {
UNGETC(c, fp);
diff --recursive --unified=3 src/c/gbc.c zsrc/c/gbc.c
--- src/c/gbc.c Wed Jul 3 02:15:49 1996
+++ zsrc/c/gbc.c Mon Jul 22 18:04:37 1996
@@ -530,7 +530,7 @@
break;
#endif CLOS
default:
- if (debug)
+ if (1 || debug)
printf("\ttype = %d\n", type_of(x));
error("mark botch");
}
@@ -588,10 +588,14 @@
#ifdef THREADS
{
- pd *pdp;
+ pd *pdp, *queue;
lpd *old_clwp = clwp;
- for (pdp = running_head; pdp != (pd *)NULL; pdp = pdp->pd_next) {
+ queue = running_queue;
+ do {
+ pdp = queue;
+ do {
+ /*for (pdp = running_head; pdp != (pd *)NULL; pdp = pdp->pd_next) <*/
clwp = pdp->pd_lpd;
#endif THREADS
@@ -620,7 +624,7 @@
mark_object(clwp->lwp_gensym_prefix);
mark_object(clwp->lwp_gentemp_prefix);
mark_object(clwp->lwp_token);
-
+
/* (current-thread) can return it at any time
*/
mark_object(clwp->lwp_thread);
@@ -654,7 +658,48 @@
mark_stack_conservative(cs_org, where);
}
#ifdef THREADS
- }
+ pdp = pdp->pd_next;
+ } while(pdp != queue);
+
+
+ /* Now I have to wonder why I didn't use an array of queues... :] */
+
+ if (queue == running_queue) {
+ if (blocking_queue) queue = blocking_queue;
+ else if (delayed_queue) queue = delayed_queue;
+ else if (dead_queue) queue = dead_queue;
+ else if (stopped_queue) queue = stopped_queue;
+ else if (suspended_queue) queue = suspended_queue;
+ else if (waiting_queue) queue = waiting_queue;
+ else queue = NULL;
+ } else if (queue == blocking_queue) {
+ if (delayed_queue) queue = delayed_queue;
+ else if (dead_queue) queue = dead_queue;
+ else if (stopped_queue) queue = stopped_queue;
+ else if (suspended_queue) queue = suspended_queue;
+ else if (waiting_queue) queue = waiting_queue;
+ else queue = NULL;
+ } else if (queue == delayed_queue) {
+ if (dead_queue) queue = dead_queue;
+ else if (stopped_queue) queue = stopped_queue;
+ else if (suspended_queue) queue = suspended_queue;
+ else if (waiting_queue) queue = waiting_queue;
+ else queue = NULL;
+ } else if (queue == dead_queue) {
+ if (stopped_queue) queue = stopped_queue;
+ else if (suspended_queue) queue = suspended_queue;
+ else if (waiting_queue) queue = waiting_queue;
+ else queue = NULL;
+ } else if (queue == stopped_queue) {
+ if (suspended_queue) queue = suspended_queue;
+ else if (waiting_queue) queue = waiting_queue;
+ else queue = NULL;
+ } else if (queue == suspended_queue) {
+ if (waiting_queue) queue = waiting_queue;
+ else queue = NULL;
+ } else if (queue == waiting_queue)
+ queue = NULL;
+ } while(queue != NULL);
clwp = old_clwp;
}
#endif THREADS
@@ -853,9 +898,9 @@
if (val == 0) {
/* informations used by the garbage collector need to be updated */
# ifdef __linux
- running_head->pd_env[0].__jmpbuf[0].__sp = old_env[0].__jmpbuf[0].__sp;
+ running_queue->pd_env[0].__jmpbuf[0].__sp = old_env[0].__jmpbuf[0].__sp;
# else
- running_head->pd_env[JB_SP] = old_env[JB_SP];
+ running_queue->pd_env[JB_SP] = old_env[JB_SP];
# endif
old_clwp = clwp;
Values = main_lpd.lwp_Values;
Only in zsrc/c: gbc.my
diff --recursive --unified=3 src/c/load.d zsrc/c/load.d
--- src/c/load.d Tue Mar 12 20:40:01 1996
+++ zsrc/c/load.d Wed Jul 17 17:55:13 1996
@@ -31,7 +31,9 @@
extern object Kwild;
extern object Vdefault_pathname_defaults;
extern object Vpackage;
+#ifndef THREADS
extern object Vstandard_output;
+#endif
extern object readc();
/******************************* ------- ******************************/
diff --recursive --unified=3 src/c/lwp.d zsrc/c/lwp.d
--- src/c/lwp.d Thu Jun 27 17:43:28 1996
+++ zsrc/c/lwp.d Mon Jul 22 18:14:34 1996
@@ -3,6 +3,7 @@
*/
/*
Copyright (c) 1990, Giuseppe Attardi.
+ Copyright (c) 1996, Brian Spilsbury.
ECoLisp is free software; you can redistribute it and/or
modify it under the terms of the GNU Library General Public
@@ -12,18 +13,43 @@
See file '../Copyright' for full details.
*/
+/*
+ Rewritten to use a multiple queue scheme to reduce time, and
+ facilitate simulated blocking io, and sleeping without
+ busy-looping.
+ Changed timing to be millisecond standard.
+ Made (sleep) equivelent to (%delay).
+ added lwpgetc(), lwpputc(), lwpread(), lwpwrite() to provide
+ for transparent blocking character and sequence io. lwpread, and
+ lwpwrite aren't used yet.
+ Made all streams non-blocking.
+
+ Brian Spilsbury, 1996.
+*/
#include "config.h"
+#include <sys/timeb.h>
+
/******************************* EXPORTS ******************************/
lpd main_lpd;
lpd *clwp = &main_lpd;
int critical_level = 0;
-pd *running_head; /* front of running pd's */
-pd *running_tail; /* back of running pd's */
+
pd main_pd;
+pd *active = &main_pd; /* the pd that is attached to clwp */
+
+ /* circular queues, no tails */
+pd *running_queue; /* running pd's */
+pd *blocking_queue; /* blocking pd's */
+pd *delayed_queue; /* delaying pd's */
+pd *dead_queue; /* dead pd's */
+pd *stopped_queue; /* stopped pd's */
+pd *suspended_queue; /* suspended pd's */
+pd *waiting_queue; /* waiting pd's */
+
/******************************* IMPORTS ******************************/
extern scheduler_interruption; /* in unixint.c */
@@ -37,27 +63,41 @@
#define thread_switch() { setTimer(0); enable_scheduler(); \
scheduler(0, 0, NULL); }
-static bool timer_active = FALSE;
-static bool scheduler_disabled = FALSE;
+static int timer_active = FALSE;
+static int scheduler_disabled = FALSE;
static int scheduler_level = 0; /* tito */
-static bool reset_timer = FALSE;
+static int reset_timer = FALSE;
static int running_processes = 1;
+static int awake_processes = 1;
static int absolute_time = 0;
+static int housekeeping_time = 0;
+static int fd_hightide = 3; /* highest fd ever to block + 1 */
+static int wake_lowtide = -1; /* time to closest sleep-wake */
+
+/* hopefully this will work with DJGPP, but I really have no idea... */
+/* Sets for blocking threads, see housekeeping */
+static fd_set fd_rd, fd_wr, fd_ex;
object Srunning;
object Ssuspended;
object Swaiting;
object Sstopped;
object Sdead;
+object Sblocking;
+object Sdelayed;
object siSthread_top_level;
+void (*override_redirect_fun)() = NULL;
+pd *override_redirect_process = NULL;
+
static object main_thread;
static
setTimer(long time)
{
- struct itimerval oldtimer;
- struct itimerval itimer;
+ static struct itimerval oldtimer;
+ static struct itimerval itimer;
+
itimer.it_value.tv_sec = 0;
itimer.it_value.tv_usec = time;
itimer.it_interval.tv_sec = 0;
@@ -66,32 +106,25 @@
}
pd *
-dequeue()
-{
- pd *tmp;
- tmp = running_head;
- if (running_head != NULL)
- running_head = running_head->pd_next;
- return tmp;
-}
-
-pd *
-make_pd()
+make_pd(pd *o)
{
pd *new_pd; lpd *npd;
/* Allocate a new descriptor for the new lwp */
- new_pd = (pd *)malloc(sizeof(pd));
+ /* if we already have one, then we have passed it in o... */
+
+ if (o) new_pd = o;
+ else new_pd = (pd *)malloc(sizeof(pd));
/* create a new stack ... */
- new_pd->pd_base = (int *)malloc(STACK_SIZE * sizeof(int));
- new_pd->pd_status = SUSPENDED;
+ if (!o) new_pd->pd_base = (int *)malloc(STACK_SIZE * sizeof(int));
/* allocate a lisp descriptor:
* using the calloc here it's possible to avoid the
* critical section in the various push operations
*/
- npd = new_pd->pd_lpd = (lpd *)calloc(sizeof(lpd), 1);
+ if (o) { npd = new_pd->pd_lpd; }
+ else npd = new_pd->pd_lpd = (lpd *)calloc(1, sizeof(lpd));
/* initialize it */
@@ -117,11 +150,13 @@
npd->lwp_frs_top = npd->lwp_frame_stack - 1;
npd->lwp_frs_limit = npd->lwp_frame_stack + FRSSIZE;
+ /* constants are fine for a reincarnatee */
npd->lwp_alloc_temporary = OBJNULL;
npd->lwp_backq_level = 0;
npd->lwp_eval1 = 0;
/* for gc */
- npd->lwp_fmt_temporary_stream = OBJNULL;
+ /* we need to rebuild temporary_stream for some reason */
+ if (!o) npd->lwp_fmt_temporary_stream = OBJNULL;
npd->lwp_fmt_temporary_string = OBJNULL;
npd->lwp_PRINTstream = Cnil;
@@ -153,7 +188,7 @@
npd->lwp_string_register = OBJNULL;
npd->lwp_gensym_prefix = OBJNULL;
npd->lwp_gentemp_prefix = OBJNULL;
- npd->lwp_token = OBJNULL;
+ if (!o) npd->lwp_token = OBJNULL;
/* lex_env copy */
npd->lwp_lex[0] = lex_env[0];
@@ -168,70 +203,30 @@
/* Now the allocation. If the gc is invoked we are able to mark
* the objects already allocated
*/
- npd->lwp_fmt_temporary_stream = make_string_output_stream(64);
- npd->lwp_fmt_temporary_string =
- npd->lwp_fmt_temporary_stream->sm.sm_object0;
-
- npd->lwp_string_register = alloc_simple_string(0);
- npd->lwp_gensym_prefix = make_simple_string("G");
- npd->lwp_gentemp_prefix = make_simple_string("T");
- npd->lwp_token = alloc_simple_string(LISP_PAGESIZE);
- npd->lwp_token->st.st_self = alloc_contblock(LISP_PAGESIZE);
+
+ /* Hmm, this gets more complex with a reincarnatee */
+ /* ideally we just want to initialize these destructively */
+ /* and hope that this is good enough. */
+ /* The main problem is that if other things have been passed these */
+ /* and don't expect them to suddenly change, but I'm not sure that this */
+ /* can be the case, since these should be local to a thread... */
+
+ if (!o) npd->lwp_fmt_temporary_stream = make_string_output_stream(64);
+ /* might need some resetting here? */
+ npd->lwp_fmt_temporary_string = npd->lwp_fmt_temporary_stream->sm.sm_object0;
+
+ if (!o) npd->lwp_string_register = alloc_simple_string(0);
+ if (!o) npd->lwp_gensym_prefix = make_simple_string("G");
+ if (!o) npd->lwp_gentemp_prefix = make_simple_string("T");
+ if (!o) npd->lwp_token = alloc_simple_string(LISP_PAGESIZE);
+ if (!o) npd->lwp_token->st.st_self = alloc_contblock(LISP_PAGESIZE);
npd->lwp_token->st.st_fillp = 0;
npd->lwp_token->st.st_hasfillp = TRUE;
npd->lwp_token->st.st_adjustable = TRUE;
-
- return new_pd;
-}
-
-update_queue()
-{
- register pd *dead_pd;
- pd *last = running_tail;
-
- do
- switch (running_head->pd_status) {
-
- case DEAD:
-
- /* remove the dead process */
- dead_pd = dequeue();
- /* free the lisp descriptor */
- free(dead_pd->pd_lpd);
- /* free the memory allocated for the stack and the descriptor */
- free(dead_pd->pd_base);
- free(dead_pd);
- break;
-
-/* case SUSPENDED: */
- case DELAYED:
-
- if (running_head->pd_slice != 0)
- if (absolute_time > running_head->pd_slice) {
-
- /* the time slice has expired */
- running_head->pd_slice = 0;
- if ((running_head->pd_thread->th.th_cont) != OBJNULL) {
- /* in this case a continuation was created before %delay */
- running_head->pd_thread->th.th_cont->cn.cn_timed_out = TRUE;
- running_head->pd_thread->th.th_cont = OBJNULL;
- }
- running_head->pd_status = RUNNING;
- return; /* now you are a running process */
- }
- ROTQUEUE();
- break;
-
- case WAITING: /* waiting processes need to be scheduled */
- case RUNNING:
- return; /* found schedulable process */
-
- default: /* currently is only STOPPED */
- ROTQUEUE();
- break;
- }
- while (running_head != last);
+ new_pd->pd_status = SUSPENDED;
+ ENQUEUE(new_pd, suspended_queue); /* needs to be on a queue */
+ return new_pd;
}
activate_thread(object thread)
@@ -275,33 +270,111 @@
sigsetmask(sigblock(0) & ~(sigmask(SIGALRM)));
#endif
+/* to get here we've been scheduled */
+/* so we aren't in someone else's bind stack */
+/* so we should get the defaults for the stdio */
+
+start_critical_section();
+
+for(;;) { /* mortal coil */
+ /* set up local stdio bindings below everything else on the bind stack */
+ /* so that they shouldn't be take out of scope ever... */
+
+ bind_var(Vstandard_input, SYM_VAL(Vstandard_input), Cnil);
+ bind_var(Vstandard_output, SYM_VAL(Vstandard_output), Cnil);
+ bind_var(Verror_output, SYM_VAL(Verror_output), Cnil);
+ bind_var(Vquery_io, SYM_VAL(Vquery_io), Cnil);
+ bind_var(Vdebug_io, SYM_VAL(Vdebug_io), Cnil);
+ bind_var(Vterminal_io, SYM_VAL(Vterminal_io), Cnil);
+ bind_var(Vtrace_output, SYM_VAL(Vtrace_output), Cnil);
+
{ int i;
- for (i = clwp->lwp_nValues; i > 0;)
+ for (i = clwp->lwp_nValues; i > 0;)
VALUES(i) = VALUES(--i);
VALUES(0) = clwp->lwp_thread->th.th_fun;
+ end_critical_section();
apply(clwp->lwp_nValues+1, siSthread_top_level, &VALUES(0));
+ start_critical_section();
}
/* Termination */
-
- terpri(Cnil);
- running_head->pd_status = DEAD;
- running_head->pd_thread->th.th_self = NULL;
+
+ {
+ pd *tmp = active;
+
+ tmp->pd_status = DEAD;
running_processes--;
+ awake_processes--;
+ DEQUEUE(tmp, running_queue);
+ ENQUEUE(tmp, dead_queue);
+ }
- update_queue();
- thread_next(); /* update_queue has freed our stack!!! */
+ end_critical_section();
+ thread_switch(); /* stack won't have been free'd yet... that's a */
+ /* job for housekeeping to think about. */
+ /* otherwise dead threads live in limbo waiting */
+ /* for reincarnation */
+ /* incase we are raised from the dead, we want to do it again */
+ start_critical_section();
+ }
}
/*
- * switch to the first thread on queue
+ * switch to the next thread on queue
*/
thread_next()
{
+ /* rotate the running-queue */
+
+ /* should devolve into an if, but *shrug* better to guarantee */
+ start_critical_section();
+
+ if (override_redirect_process == active) {
+ void (*fun)();
+
+ force_resumption(active);
+
+ if (running_processes > 1) {
+ timer_active = TRUE;
+ setTimer(REALQUANTUM);
+ } else {
+ timer_active = FALSE;
+ absolute_time = 0;
+ }
+ fun = override_redirect_fun;
+
+ override_redirect_fun = NULL;
+ override_redirect_process = NULL;
+
+ end_critical_section();
+ (*fun)();
+ start_critical_section();
+ }
+
/* unwind the bind stack */
lwp_bds_unwind(clwp->lwp_bind_stack, clwp->lwp_bds_top);
+ ROTQUEUE(running_queue);
+
+ end_critical_section();
+ /* housekeeping isn't actually critical */
+ /* and turns off the timer while its in there */
+ if (absolute_time > housekeeping_time)
+ housekeeping();
+
+ /* we need this incase a signal blew us out of the previous housekeeping */
+ /* and running_queue is void */
+
+ while(running_queue == NULL) {
+ static struct timeb tb;
+ ftime(&tb); /* not sure how portable */
+ absolute_time = tb.millitm + tb.time*1000;
+ housekeeping();
+ }
+ start_critical_section();
+
/* switch clwp */
- clwp = running_head->pd_lpd;
+ clwp = running_queue->pd_lpd;
+ active = running_queue;
/* restore Values pointer */
Values = clwp->lwp_Values;
@@ -313,19 +386,253 @@
if (running_processes > 1) {
timer_active = TRUE;
setTimer(REALQUANTUM);
- } else {
+ } else {
timer_active = FALSE;
absolute_time = 0;
}
- siglongjmp(running_head->pd_env, 1);
+
+ end_critical_section();
+ siglongjmp(active->pd_env, 1);
}
/*
* Called when time slice expires or explicitily to switch thread
+ * New version...
*/
scheduler(int sig, int code, struct sigcontext *scp)
{
int val;
+ static struct timeb tb;
+
+#if defined(SYSV) || defined(__svr4__) || defined(__linux)
+ signal(SIGALRM, scheduler);
+#endif SYSV
+
+ ftime(&tb); /* not sure how portable */
+ absolute_time = tb.millitm + tb.time*1000;
+
+ if (critical_level > 0) { /* within critical section */
+ scheduler_interrupted = TRUE;
+ scheduler_interruption = SCHEDULER_INT;
+ return;
+ }
+ if (scheduler_level > 0) { /* abilitation check */
+ scheduler_interrupted = TRUE;
+ return;
+ }
+
+ val = sigsetjmp(active->pd_env, 1);
+
+ if (val == 1) /* resume interrupted thread execution */
+ return; /* coming back from longjmp in thread_next */
+
+ if (val == 2) /* coming back from longjmp in GC */
+ gc(garbage_parameter); /* GC will return to the previous thread */
+
+ thread_next();
+}
+
+/* TODO: Add in waiting thread condition resolution */
+housekeeping()
+{
+ static struct timeval timeout;
+ static pd *p, *q;
+ static int tide;
+ static fd_set rd, wr, ex;
+ /* see if we are polling or lurking */
+
+ /* turn off that bloody timer... */
+ setTimer(0);
+
+ if ((running_processes > 1) && (awake_processes > 0)) {
+ /* poll */
+ /* set timeout to instant */
+ timeout.tv_sec = timeout.tv_usec = 0;
+ tide = 1;
+ } else {
+ /* is recovery possible? */
+
+ if ( (running_queue == NULL) &&
+ (blocking_queue == NULL) &&
+ (delayed_queue == NULL)) {
+ /* in a coma... can't awaken itself... */
+ /* there is a possibility that a signal */
+ /* will, but um, for now assume dead and buried */
+ exit(0); /* bail w/out error */
+ }
+
+ /* ok, in theory we can wake up.... so lurk */
+ /* set timeout to the shortest sleep resumption time */
+ /* if there isn't a sleep resumption time */
+ /* block forever... */
+
+ if (wake_lowtide != -1) {
+ if (absolute_time >= wake_lowtide) {
+ timeout.tv_sec = timeout.tv_usec = 0;
+ tide = 1;
+ } else {
+ timeout.tv_sec = (wake_lowtide-absolute_time)/1000;
+ timeout.tv_usec = ((wake_lowtide-absolute_time)%1000)*1000;
+ tide = 1;
+ }
+ } else tide = 0;
+ }
+
+ memcpy(&rd, &fd_rd, sizeof(fd_set));
+ memcpy(&wr, &fd_wr, sizeof(fd_set));
+ memcpy(&ex, &fd_ex, sizeof(fd_set));
+
+ /* If there is an error, the sets are undefined, just bail out... */
+ /* we'll catch it next time, it was probably a signal interrupting */
+ /* us. */
+ if (select(fd_hightide, &rd, &wr, &ex, (tide ? &timeout : NULL)) == -1) {
+ /* we broke for some reason */
+ /* a signal handler may have been invoked... */
+ /* someone may have been woken up */
+ /* so schedule another housekeeping apointment */
+ /* and bail */
+ goto out;
+ }
+
+ /* check for awakened threads */
+
+ if (p = blocking_queue)
+ do {
+ q = p->pd_next;
+ switch(p->pd_fp_mode) {
+ case PD_INPUT:
+ if (FD_ISSET(fileno(p->pd_fp), &rd)) {
+ FD_CLR(fileno(p->pd_fp), &fd_rd);
+ DEQUEUE(p, blocking_queue);
+ ENQUEUE(p, running_queue);
+ p->pd_status = RUNNING;
+ awake_processes++;
+ if (blocking_queue == NULL) goto endblk;
+ }
+ break;
+ case PD_OUTPUT:
+ if (FD_ISSET(fileno(p->pd_fp), &wr)) {
+ FD_CLR(fileno(p->pd_fp), &fd_wr);
+ DEQUEUE(p, blocking_queue);
+ ENQUEUE(p, running_queue);
+ p->pd_status = RUNNING;
+ awake_processes++;
+ if (blocking_queue == NULL) goto endblk;
+ }
+ break;
+ case PD_EXCEPTION:
+ if (FD_ISSET(fileno(p->pd_fp), &ex)) {
+ FD_CLR(fileno(p->pd_fp), &fd_ex);
+ DEQUEUE(p, blocking_queue);
+ ENQUEUE(p, running_queue);
+ p->pd_status = RUNNING;
+ awake_processes++;
+ if (blocking_queue == NULL) goto endblk;
+ }
+ break;
+ }
+ p = q;
+ } while(p != blocking_queue);
+
+ /* if sleeping, check for wakeup.... */
+
+endblk: tide = -1;
+
+ /*putchar('.'); fflush(stdout);*/
+
+ if ((wake_lowtide != -1) && ((p = delayed_queue) != NULL))
+ do {
+ q = p->pd_next;
+ if (absolute_time >= p->pd_slice) {
+ DEQUEUE(p, delayed_queue);
+ ENQUEUE(p, running_queue);
+ p->pd_status = RUNNING;
+ awake_processes++;
+ if (delayed_queue == NULL) break;
+ } else {
+ /* get new low-tide */
+ if ((tide == -1) || (p->pd_slice < tide))
+ tide = p->pd_slice;
+ }
+
+ p = q;
+ } while(p != delayed_queue);
+
+ wake_lowtide = tide;
+
+ /* requeue waiting threads for resolution checking */
+ /* not a good solution, but the only one that I can see */
+ /* at least now we will only check them on housekeeping intervals */
+ /* just get the timeout cases to set the wake_lowtide on entry */
+ /* to prevent oversleeping. */
+
+ while(p = waiting_queue) {
+ DEQUEUE(p, waiting_queue);
+ ENQUEUE(p, running_queue);
+ p->pd_status = RUNNING;
+ awake_processes++;
+ }
+
+out: housekeeping_time = absolute_time + 1000; /* one second */
+}
+
+/* this will mostly be used in conjunction with override_redirect */
+/* which should be set before entry here. */
+
+force_resumption(pd *p)
+{
+ start_critical_section();
+
+ switch(p->pd_status) {
+ case RUNNING: DEQUEUE(p, running_queue);
+ break;
+ case STOPPED: DEQUEUE(p, stopped_queue);
+ running_processes++;
+ awake_processes++;
+ break;
+ case SUSPENDED: DEQUEUE(p, stopped_queue);
+ running_processes++;
+ awake_processes++;
+ break;
+ case DEAD: /* hmmm... raising the dead might be dangerous */
+ running_processes++;
+ awake_processes++;
+ DEQUEUE(p, dead_queue);
+ break;
+ case WAITING: DEQUEUE(p, waiting_queue);
+ awake_processes++;
+ break;
+ case DELAYED: DEQUEUE(p, delayed_queue);
+ awake_processes++;
+ break;
+ /* don't worry about the magic, at worst this can */
+ running_processes++;
+ /* cause housekeeping to make a false check */
+ case BLOCKED: DEQUEUE(p, blocking_queue);
+ /* this needs some fixing */
+ switch(p->pd_fp_mode) {
+ case PD_INPUT:
+ FD_CLR(fileno(p->pd_fp), &fd_rd);
+ break;
+ case PD_OUTPUT:
+ FD_CLR(fileno(p->pd_fp), &fd_wr);
+ break;
+ case PD_EXCEPTION:
+ FD_CLR(fileno(p->pd_fp), &fd_ex);
+ break;
+ }
+ awake_processes++;
+ break;
+ }
+
+ ENQUEUE(p, running_queue);
+ end_critical_section();
+}
+
+#ifdef 0
+scheduler(int sig, int code, struct sigcontext *scp)
+{
+ int val;
#if defined(SYSV) || defined(__svr4__) || defined(__linux)
signal(SIGALRM, scheduler);
@@ -342,7 +649,7 @@
return;
}
- val = sigsetjmp(running_head->pd_env, 1);
+ val = sigsetjmp(running_dead->pd_env, 1);
if (val == 1) /* resume interrupted thread execution */
return; /* coming back from longjmp in thread_next */
@@ -353,6 +660,7 @@
ROTQUEUE();
thread_next();
}
+#endif /* old version */
/*
* Handle signal received within critical section
@@ -400,18 +708,34 @@
register pd *p;
start_critical_section();
- running_processes++;
+
+ if (rpd->pd_status == STOPPED) {
+ DEQUEUE(rpd, stopped_queue);
+ running_processes++;
+ awake_processes++;
+ } else if (rpd->pd_status == DELAYED) {
+ DEQUEUE(rpd, delayed_queue);
+ /* TODO: look for interaction with housekeeping problem for this case... */
+ awake_processes++;
+ } else if (rpd->pd_status == SUSPENDED) {
+ DEQUEUE(rpd, suspended_queue);
+ running_processes++;
+ awake_processes++;
+ }
+
+ /* else, should we be here? no. hmmm. */
+ /* where are the fucking arguments? */
rpd->pd_status = RUNNING;
- for (p = running_head; (p != rpd) && (p != NULL); p = p->pd_next) ;
- if (p == NULL)
- ENQUEUE(rpd);
+
+ ENQUEUE(rpd, running_queue);
+
end_critical_section();
if (!timer_active) {
timer_active = TRUE;
setTimer(REALQUANTUM);
- }
+ }
}
/***********
@@ -428,6 +752,11 @@
RETURN(1);
}
+/* Hmmmmm, what the hell is this supposed to do? */
+/* and why bother? */
+/* Put this in the TODO basket, as something of dubiousness */
+
+#ifdef 0
siLthread_break_quit(int narg)
{
/* reset everything in MT */
@@ -443,17 +772,21 @@
critical_level = 0;
scheduler_interrupted = 0;
- for (p = running_head; (p != NULL); p = p->pd_next)
- if (p != &main_pd)
+ /*for (p = running_dead; (p != NULL); p = p->pd_next)*/
+ p = running_queue;
+ do {
+ if (p != &main_pd) {
p->pd_status = DEAD;
- else {
+ } else {
p->pd_status = RUNNING;
p->pd_thread->th.th_cont = OBJNULL;
- }
+ }
+ p = p->pd_next;
+ } while(running_queue->pd_next != running_queue);
- if (running_head != &main_pd) {
+ if (running_queue != &main_pd) {
update_queue();
- thread_next();
+ thread_dext();
/* here one should deallocate the main-thread function */
}
else
@@ -462,6 +795,7 @@
VALUES(0) = Cnil;
RETURN(1);
}
+#endif
siLthread_break_resume(int narg)
{
@@ -478,16 +812,70 @@
Lthread_list(int narg)
{
pd *p;
- object tmp, x = CONS(running_head->pd_thread, Cnil);
+ object tmp;
+ object tmp2 = CONS(running_queue->pd_thread, Srunning);
+ object x = CONS(tmp2, Cnil);
tmp = x;
start_critical_section();
- for (p = running_head->pd_next; (p != NULL); p = p->pd_next) {
- CDR(tmp) = CONS(p->pd_thread, Cnil);
- tmp = CDR(tmp);
- }
+ p = running_queue->pd_next;
+ while(p != running_queue) {
+ tmp2 = CONS(p->pd_thread, Srunning);
+ CDR(tmp) = CONS(tmp2, Cnil);
+ tmp = CDR(tmp);
+ p = p->pd_next;
+ }
+
+ if (p = blocking_queue)
+ do {
+ tmp2 = CONS(p->pd_thread, Sblocking);
+ CDR(tmp) = CONS(tmp2, Cnil);
+ p = p->pd_next;
+ tmp = CDR(tmp);
+ } while(p != blocking_queue);
+
+ if (p = delayed_queue)
+ do {
+ tmp2 = CONS(p->pd_thread, Sdelayed);
+ CDR(tmp) = CONS(tmp2, Cnil);
+ p = p->pd_next;
+ tmp = CDR(tmp);
+ } while(p != delayed_queue);
+
+ /* TODO: Should this queue be listed? */
+ if (p = dead_queue)
+ do {
+ tmp2 = CONS(p->pd_thread, Sdead);
+ CDR(tmp) = CONS(tmp2, Cnil);
+ p = p->pd_next;
+ tmp = CDR(tmp);
+ } while(p != dead_queue);
+
+ if (p = stopped_queue)
+ do {
+ tmp2 = CONS(p->pd_thread, Sstopped);
+ CDR(tmp) = CONS(tmp2, Cnil);
+ p = p->pd_next;
+ tmp = CDR(tmp);
+ } while(p != stopped_queue);
+
+ if (p = suspended_queue)
+ do {
+ tmp2 = CONS(p->pd_thread, Ssuspended);
+ CDR(tmp) = CONS(tmp2, Cnil);
+ p = p->pd_next;
+ tmp = CDR(tmp);
+ } while(p != suspended_queue);
+
+ if (p = waiting_queue)
+ do {
+ tmp2 = CONS(p->pd_thread, Swaiting);
+ CDR(tmp) = CONS(tmp2, Cnil);
+ p = p->pd_next;
+ tmp = CDR(tmp);
+ } while(p != waiting_queue);
end_critical_section();
@@ -511,20 +899,41 @@
/* fun = SYM_FUN(fun); confusing */
}
- x = alloc_object(t_thread);
- x->th.th_fun = fun;
- x->th.th_size = sizeof(pd);
- x->th.th_self = npd = make_pd();
- x->th.th_cont = OBJNULL;
+start_critical_section();
+ /* see if there is a lost soul waiting for reincarnation */
+ if (dead_queue) {
+ /* ok, lets juice it up */
+
+ npd = dead_queue;
+ DEQUEUE(npd, dead_queue);
+
+ x = npd->pd_lpd->lwp_thread;
+ /* enqueued in make_pd */
+
+ x->th.th_fun = fun;
+ x->th.th_size = sizeof(pd);
+ x->th.th_self = make_pd(npd); /* reinitialize it */
+ x->th.th_cont = OBJNULL;
+ } else {
+ /* ok, no lost souls, better build a new one */
+ x = alloc_object(t_thread);
+ x->th.th_fun = fun;
+ x->th.th_size = sizeof(pd);
+ x->th.th_self = npd = make_pd(0);
+ x->th.th_cont = OBJNULL;
- npd->pd_thread = x;
- npd->pd_slice = 0;
+ npd->pd_thread = x;
+ npd->pd_slice = 0;
- /* Backpointer to thread */
- npd->pd_lpd->lwp_thread = x;
+ /* Backpointer to thread */
+ npd->pd_lpd->lwp_thread = x;
activate_thread(x);
+ }
+
+ /* note: this is created as a suspended thread, and in that queue */
+end_critical_section();
VALUES(0) = x;
RETURN(1);
}
@@ -542,12 +951,17 @@
start_critical_section(); /* tito */
thread->th.th_self->pd_status = STOPPED;
running_processes--;
- if (thread->th.th_self == running_head) {
- critical_level--; /* end_critical_section() */
- update_queue();
- thread_next();
- } else
- end_critical_section();
+ awake_processes--;
+ if (thread->th.th_self == active) {
+ DEQUEUE(thread->th.th_self, running_queue);
+ ENQUEUE(thread->th.th_self, stopped_queue);
+ critical_level--; /* end_critical_section() */
+ thread_switch();
+ } else {
+ DEQUEUE(thread->th.th_self, running_queue);
+ ENQUEUE(thread->th.th_self, stopped_queue);
+ end_critical_section();
+ }
VALUES(0) = Cnil;
RETURN(1);
}
@@ -568,6 +982,7 @@
start_critical_section(); /* tito */
thread->th.th_self->pd_status = RUNNING;
running_processes++;
+ awake_processes++;
if (!timer_active) {
timer_active = TRUE;
@@ -582,11 +997,8 @@
Lkill_thread(int narg, object thread)
{
+ pd *tmp;
- /* The following code is not enough.
- Consider: The scheduler can be disabled
- What about killing the current thread?
- */
check_arg(1);
if (type_of(thread) != t_thread)
@@ -597,13 +1009,17 @@
thread->th.th_self->pd_status = DEAD;
if (thread->th.th_self->pd_lpd == clwp) {
/* if a thread kills itself the scheduler is to be called */
- thread->th.th_self = NULL;
+ tmp = thread->th.th_self;
+ DEQUEUE(tmp, running_queue);
+ ENQUEUE(tmp, dead_queue);
critical_level--; /* end_critical_section() */
- update_queue();
- thread_next();
+ thread_switch();
}
else {
- thread->th.th_self = NULL;
+ /*thread->th.th_self = NULL;*/
+ tmp = thread->th.th_self;
+ DEQUEUE(tmp, running_queue);
+ ENQUEUE(tmp, dead_queue);
end_critical_section();
}
}
@@ -642,6 +1058,11 @@
case DEAD:
VALUES(0) = Sdead;
break;
+ case BLOCKED:
+ VALUES(0) = Sblocking;
+ break;
+ case DELAYED:
+ VALUES(0) = Sdelayed;
default:
FEerror("Unexpected type for thread ~A", 1, thread);
}
@@ -663,6 +1084,7 @@
object x;
check_arg(1);
+
if (type_of(thread) != t_thread)
FEwrong_type_argument(Sthread, thread);
@@ -797,91 +1219,206 @@
check_arg(0);
if (timer_active) {
- running_head->pd_status = SUSPENDED;
+ pd *tmp = active;
+ tmp->pd_status = SUSPENDED;
+ DEQUEUE(tmp, running_queue);
+ ENQUEUE(tmp, suspended_queue);
running_processes--;
+ awake_processes--;
thread_switch();
/* When resumed it will be provided with the Values to return */
- RETURN(running_head->pd_lpd->lwp_nValues);
+ RETURN(tmp->pd_lpd->lwp_nValues);
}
else
FEerror("No other active thread.", 0);
}
+void lwpblockon(pd *who, FILE *fp, int mode)
+{
+ who->pd_fp = fp;
+ who->pd_fp_mode = mode; /* in, out, execept */
+ who->pd_status = BLOCKED;
+
+ start_critical_section();
+
+ DEQUEUE(who, running_queue);
+ ENQUEUE(who, blocking_queue);
+ awake_processes--;
+
+ if (fd_hightide <= fileno(fp))
+ fd_hightide = fileno(fp)+1;
+
+ switch(mode) {
+ case PD_INPUT:
+ FD_SET(fileno(fp), &fd_rd);
+ break;
+ case PD_OUTPUT:
+ FD_SET(fileno(fp), &fd_wr);
+ break;
+ case PD_EXCEPTION:
+ FD_SET(fileno(fp), &fd_ex);
+ break;
+ }
+
+ end_critical_section();
+
+ thread_switch();
+}
+
+int inline lwpgetc(FILE *fp)
+{
+ int c;
+
+loop: errno = 0;
+ c = getc(fp);
+ if (errno) {
+ lwpblockon(active, fp, PD_INPUT);
+ clearerr(fp);
+ goto loop;
+ }
+ return(c);
+}
+
+void inline lwpputc(char c, FILE *fp)
+{
+loop: errno = 0;
+ putc(c, fp);
+ if (errno) {
+ lwpblockon(active, fp, PD_OUTPUT);
+ clearerr(fp);
+ goto loop;
+ }
+ return;
+}
+
+int inline lwpread(char *buf, int len, FILE *fp)
+{
+ int ind = 0, left = len, n;
+
+loop: errno = 0;
+ n = read(&buf[ind], left, fileno(fp));
+ if (errno) {
+ ind += n;
+ left -= n;
+ lwpblockon(active, fp, PD_INPUT);
+ clearerr(fp);
+ goto loop;
+ }
+ return(ind+n);
+}
+
+int inline lwpwrite(char *buf, int len, FILE *fp)
+{
+ int ind = 0, left = len, n;
+
+loop: errno = 0;
+ n = write(&buf[ind], left, fileno(fp));
+ if (errno) {
+ ind += n;
+ left -= n;
+ lwpblockon(active, fp, PD_OUTPUT);
+ clearerr(fp);
+ goto loop;
+ }
+ return(ind+n);
+}
+
Ldelay(int narg, object interval)
{ int z;
check_arg(1);
check_type_non_negative_integer(&interval);
z = fix(interval);
+
+ if (timer_active) {
+ pd *tmp = active; /* remember who we are */
+ lwpsleep(z*1000); /* lwpsleep is in milliseconds */
+ /* When resumed it will be provided with the Values to return */
+ RETURN(tmp->pd_lpd->lwp_nValues);
+ }
+ else
+ {
+ sleep(z);
+ }
+}
+
+/* Sleep for at least ms milliseconds */
+lwpsleep(int ms)
+{
if (timer_active) {
- running_head->pd_status = DELAYED;
- running_processes--;
+ pd *tmp = active;
+
+ start_critical_section();
+ tmp->pd_status = DELAYED;
+ DEQUEUE(tmp, running_queue);
+ ENQUEUE(tmp, delayed_queue);
+ awake_processes--;
- /* Translate seconds in number of scheduler slices */
- running_head->pd_slice = z * 10 + absolute_time;
+ tmp->pd_slice = ms + absolute_time;
+
+ if ((wake_lowtide == -1) || (wake_lowtide > tmp->pd_slice))
+ wake_lowtide = tmp->pd_slice;
+
+ end_critical_section();
thread_switch();
-
- /* When resumed it will be provided with the Values to return */
- RETURN(running_head->pd_lpd->lwp_nValues);
}
- else
- sleep(z);
+ else usleep(ms*1000); /* milli->micro */
}
+/* TODO: Find a way to move this functionality into housekeeping()
+ sigh */
+
Lthread_wait(int narg, object fun, ...)
{ int nr;
+ pd *tmp = active;
va_list args;
va_start(args, fun);
-
+
if (narg < 1) FEtoo_few_arguments(&narg);
- start_critical_section();
- running_head->pd_status = WAITING;
- running_processes--;
- end_critical_section();
-
for (;;) {
nr = apply(narg-1, fun, args);
if (VALUES(0) != Cnil)
break;
- else if (timer_active) {
- /* the time slice has not been used */
- absolute_time--;
- thread_switch();
- } else
- FEerror("The condition will never be satisfied for lack of active processes", 0);
+
+ start_critical_section();
+ tmp->pd_status = WAITING;
+
+ DEQUEUE(tmp, running_queue);
+ ENQUEUE(tmp, waiting_queue);
+
+ awake_processes--;
+
+ end_critical_section();
+ thread_switch();
}
- running_head->pd_status = RUNNING;
- running_processes++;
- end_critical_section();
+
RETURN(nr);
}
-
+/* TODO: whack this into housekeeping() */
+
Lthread_wait_with_timeout(int narg, object timeout, object fun, ...)
{
int nr;
+ pd *tmp = active;
va_list args;
va_start(args, fun);
if (narg < 2) FEtoo_few_arguments(&narg);
check_type_non_negative_integer(&timeout);
- /* We have to translate seconds in scheduler call number */
- start_critical_section();
- running_head->pd_slice = fix(timeout) * 10 + absolute_time;
+ /* We have to translate seconds into milliseconds into the future */
+ tmp->pd_slice = fix(timeout) * 1000 + absolute_time;
- running_head->pd_status = WAITING;
- running_processes--;
- end_critical_section();
for (;;) {
- if (absolute_time > running_head->pd_slice) {
+ if (absolute_time > tmp->pd_slice) {
/* the time slice has expired */
VALUES(0) = Cnil;
nr = 1;
@@ -897,13 +1434,19 @@
absolute_time--;
thread_switch();
}
- }
+
+ tmp->pd_status = WAITING;
+ DEQUEUE(tmp, running_queue);
+ ENQUEUE(tmp, waiting_queue);
+ awake_processes--;
+
+ if ((wake_lowtide == -1) || (wake_lowtide > tmp->pd_slice))
+ wake_lowtide = tmp->pd_slice;
+
+ thread_switch();
+ }
- start_critical_section();
- running_head->pd_slice = 0;
- running_head->pd_status = RUNNING;
- running_processes++;
- end_critical_section();
+ tmp->pd_slice = 0;
RETURN(nr);
}
@@ -912,11 +1455,25 @@
signal(SIGALRM, scheduler);
}
+/* called when we start after a dump */
+linit_lwp()
+{
+ FD_ZERO(&fd_rd);
+ FD_ZERO(&fd_wr);
+ FD_ZERO(&fd_ex);
+}
+
init_lwp()
{ pd *temp_pd;
temp_pd = &main_pd;
- PUSH(temp_pd);
+ /*PUSH(temp_pd);*/
+
+ FD_ZERO(&fd_rd);
+ FD_ZERO(&fd_wr);
+ FD_ZERO(&fd_ex);
+
+ ENQUEUE(temp_pd, running_queue);
main_thread = alloc_object(t_thread);
main_pd.pd_thread = main_thread;
@@ -935,10 +1492,12 @@
Swaiting = make_ordinary("WAITING");
Sstopped = make_ordinary("STOPPED");
Sdead = make_ordinary("DEAD");
+ Sblocking = make_ordinary("BLOCKING");
+ Sdelayed = make_ordinary("DELAYED");
siSthread_top_level = make_si_ordinary("THREAD-TOP-LEVEL");
make_si_function("THREAD-BREAK-IN", siLthread_break_in);
- make_si_function("THREAD-BREAK-QUIT", siLthread_break_quit);
+/* make_si_function("THREAD-BREAK-QUIT", siLthread_break_quit); */
make_si_function("THREAD-BREAK-RESUME", siLthread_break_resume);
make_function("MAKE-THREAD", Lmake_thread);
Only in zsrc/c: lwp.my
Only in zsrc/c: lwp.orig
diff --recursive --unified=3 src/c/main.c zsrc/c/main.c
--- src/c/main.c Mon Apr 15 20:54:12 1996
+++ zsrc/c/main.c Mon Jul 22 18:14:02 1996
@@ -170,6 +170,11 @@
setbuf(stdin, stdin_buf);
setbuf(stdout, stdout_buf);
+#ifdef THREADS
+ fcntl(fileno(stdin), F_SETFL, O_NONBLOCK);
+ fcntl(fileno(stdout), F_SETFL, O_NONBLOCK);
+#endif
+
ARGC = argc;
ARGV = argv;
ecl_self = argv[0];
@@ -185,6 +190,7 @@
gc_time = 0;
#ifdef THREADS
+ clwp = &main_lpd;
Values = main_lpd.lwp_Values;
#endif
frs_top = frs_org-1;
@@ -226,6 +232,7 @@
enable_interrupt();
siLcatch_bad_signals(0);
#ifdef THREADS
+ linit_lwp();
enable_lwp();
#endif THREADS
SYM_VAL(siVlisp_maxpages) = MAKE_FIXNUM(real_maxpage);
Only in zsrc/c: main.my
diff --recursive --unified=3 src/c/print.d zsrc/c/print.d
--- src/c/print.d Fri Jul 5 02:41:09 1996
+++ zsrc/c/print.d Wed Jul 17 16:38:16 1996
@@ -2258,7 +2258,10 @@
write_ch_fun = interactive_writec_stream;
else
#endif CLOS
+ {
+ printf("type_of(strm) == %d, t_stream == %d\n", type_of(strm), t_stream); fflush(stdout);
FEerror("~S is not a stream.", 1, strm);
+ }
write_ch('\n', strm);
FLUSH_STREAM(strm);
return(Cnil);
diff --recursive --unified=3 src/c/read.d zsrc/c/read.d
--- src/c/read.d Thu Jun 6 03:50:15 1996
+++ zsrc/c/read.d Wed Jul 17 02:20:29 1996
@@ -253,12 +253,30 @@
#if TK
extern bool no_input;
+#ifdef THREADS
+# define PUTC(c, fp) lwpputc(c, fp)
+#else
+# define PUTC(c, fp) putc(c, fp)
+#endif
#define GETC(c, fp) { if (fp == stdin) \
while (no_input) Tk_DoOneEvent(0); \
+#ifdef THREADS
+ c = lwpgetc(fp); \
+#else
c = getc(fp); \
+#endif /* THREADS */
no_input = !FILE_CNT(fp); }
#else
+#ifdef THREADS
+# define PUTC(c, fp) lwpputc(c, fp)
+#else
+# define PUTC(c, fp) putc(c, fp)
+#endif
+#ifdef THREADS
+#define GETC(c, fp) c = lwpgetc(fp)
+#else
#define GETC(c, fp) c = getc(fp)
+#endif /* THREADS */
#endif /* TK */
/* Beppe: faster code for inner loop from file stream */
diff --recursive --unified=3 src/c/tcp.c zsrc/c/tcp.c
--- src/c/tcp.c Wed May 31 18:36:36 1995
+++ zsrc/c/tcp.c Wed Jul 17 21:22:12 1996
@@ -12,6 +12,7 @@
*/
#include "config.h"
+#include <fcntl.h>
object
make_stream(object host, int fd, enum smmode smm)
@@ -35,6 +36,9 @@
stream = alloc_object(t_stream);
stream->sm.sm_mode = (short)smm;
stream->sm.sm_fp = fp;
+#ifdef THREADS
+ fcntl(fd, F_SETFL, O_NONBLOCK);
+#endif
fp->_IO_buf_base = NULL; /* BASEFF */;
stream->sm.sm_object0 = Sstring_char;
stream->sm.sm_object1 = host; /* not really used */
@@ -67,11 +71,11 @@
FEerror("~S is a too long file name.", 1, host);
#ifdef THREADS
- start_critical_section();
+/* start_critical_section(); */
#endif THREADS
fd = connect_to_server(host->st.st_self, fix(port));
#ifdef THREADS
- end_critical_section();
+/* end_critical_section(); */
#endif THREADS
if (fd == 0) {
@@ -94,13 +98,7 @@
if (!FIXNUMP(port))
FEwrong_type_argument(TSpositive_number, port);
-#ifdef THREADS
- start_critical_section();
-#endif THREADS
fd = create_server_port(fix(port));
-#ifdef THREADS
- end_critical_section();
-#endif THREADS
if (fd == 0)
VALUES(0) = Cnil;
@@ -116,4 +114,190 @@
{
make_si_function("OPEN-CLIENT-STREAM", Lopen_client_stream);
make_si_function("OPEN-SERVER-STREAM", Lopen_server_stream);
+}
+
+/*
+
+ Ok, maybe this shouldn't be here, but it really doesn't belong in
+ crs does it? Also moving it here makes life much easier.
+*/
+
+/* socket.c -- socket interface */
+/* Maybe this shouldn't be here, but what the hell. */
+/*
+ Copyright (c) 1990, Giuseppe Attardi.
+
+ ECoLisp is free software; you can redistribute it and/or modify it
+ under the terms of the GNU General Library Public License as published
+ by the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ See file '../Copyright' for full details.
+*/
+
+#include <stdio.h>
+#include <sys/types.h>
+#include <errno.h>
+
+#include <netinet/in.h>
+#include <netdb.h>
+#include <sys/socket.h>
+#include <string.h>
+
+#include <sys/ioctl.h>
+
+extern int errno;
+
+/***********************************************************************
+ * Client side
+ **********************************************************************/
+
+/*
+ * Attempts to connect to server, given host and port. Returns file
+ * descriptor (network socket) or 0 if connection fails.
+ */
+int connect_to_server(char *host, int port)
+{
+ struct sockaddr_in inaddr; /* INET socket address. */
+ struct sockaddr *addr; /* address to connect to */
+ struct hostent *host_ptr;
+ int addrlen; /* length of address */
+ extern char *getenv();
+ extern struct hostent *gethostbyname();
+ int fd; /* Network socket */
+
+ /* Get the statistics on the specified host. */
+ if ((inaddr.sin_addr.s_addr = inet_addr(host)) == -1) {
+ if ((host_ptr = gethostbyname(host)) == NULL) {
+ /* No such host! */
+ errno = EINVAL;
+ return(0);
+ }
+ /* Check the address type for an internet host. */
+ if (host_ptr->h_addrtype != AF_INET) {
+ /* Not an Internet host! */
+ errno = EPROTOTYPE;
+ return(0);
+ }
+ /* Set up the socket data. */
+ inaddr.sin_family = host_ptr->h_addrtype;
+ memcpy((char *)&inaddr.sin_addr, (char *)host_ptr->h_addr,
+ sizeof(inaddr.sin_addr));
+ }
+ else
+ inaddr.sin_family = AF_INET;
+
+ addr = (struct sockaddr *) &inaddr;
+ addrlen = sizeof (struct sockaddr_in);
+ inaddr.sin_port = port;
+ inaddr.sin_port = htons(inaddr.sin_port);
+ /*
+ * Open the network connection.
+ */
+ if ((fd = socket((int) addr->sa_family, SOCK_STREAM, 0)) < 0)
+ return(0); /* errno set by system call. */
+
+#ifdef TCP_NODELAY
+ /* make sure to turn off TCP coalescence */
+ { int mi;
+ setsockopt (fd, IPPROTO_TCP, TCP_NODELAY, &mi, sizeof (int));
+ }
+#endif
+
+#ifdef THREADS
+start_critical_section();
+#endif
+ if (connect(fd, addr, addrlen) == -1) {
+ (void) close (fd);
+#ifdef THREADS
+end_critical_section();
+#endif
+ return(0); /* errno set by system call. */
+ }
+ /*
+ * Return the id if the connection succeeded.
+ */
+ return(fd);
+}
+
+
+/***********************************************************************
+ * Server side
+ **********************************************************************/
+/*
+ * Creates a server port. Returns file
+ * descriptor (network socket) or 0 if connection fails.
+ */
+
+int create_server_port(int port)
+{
+ struct sockaddr_in inaddr; /* INET socket address. */
+ struct sockaddr *addr; /* address to connect to */
+ int addrlen; /* length of address */
+ int request, conn; /* Network socket */
+
+ /*
+ * Open the network connection.
+ */
+ if ((request = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ return(0); /* errno set by system call. */
+ }
+
+#ifdef SO_REUSEADDR
+ /* Necesary to restart the server without a reboot */
+ {
+ int one = 1;
+ setsockopt(request, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int));
+ }
+#endif /* SO_REUSEADDR */
+#ifdef TCP_NODELAY
+ /* make sure to turn off TCP coalescence */
+ { int mi;
+ setsockopt(request, IPPROTO_TCP, TCP_NODELAY, &mi, sizeof (int));
+ }
+#endif
+
+ /* Set up the socket data. */
+ memset((char *)&inaddr, 0, sizeof(inaddr));
+ inaddr.sin_family = AF_INET;
+ inaddr.sin_port = htons(port);
+ inaddr.sin_addr.s_addr = htonl(INADDR_ANY);
+
+ if (bind(request, (struct sockaddr *)&inaddr, sizeof (inaddr)))
+ FEerror("Binding TCP socket", 0);
+ if (listen(request, 1))
+ FEerror("TCP listening", 0);
+
+#ifdef THREADS
+ /* Don't make this file-descriptor non-blocking */
+ /* just block on it before we attempt to accept from it */
+ /* Think _hard_ about moving this out of here, into somewhere sane */
+ /* and creating an 'accepting' stream type, which is bound to a port */
+ /* on reading returns streams */
+ {
+ FILE *fp; /* need to use FILE *'s rather than fd... *sigh* */
+ if ((fp = fdopen(request, "r")) == (FILE *)0) {
+ printf("fdopen didn't work on accept fd!\n"); fflush(stdout);
+ }
+ fcntl(request, F_SETFL, O_NONBLOCK);
+ clearerr(fp);
+
+loop: errno = 0;
+#endif
+ if ((conn = accept(request, (struct sockaddr *)NULL, (int *)NULL)) < 0)
+#ifndef THREADS
+ FEerror("Accepting requests", 0);
+#else /* THREADS */
+ if (errno) {
+ lwpblockon(active, fp, PD_INPUT);
+ clearerr(fp);
+ goto loop;
+ } else {
+ fclose(fp);
+ FEerror("Accepting requests", 0);
+ }
+ fclose(fp);
+ }
+#endif /* THREADS */
+ return(conn);
}
diff --recursive --unified=3 src/c/unixint.c zsrc/c/unixint.c
--- src/c/unixint.c Sun Sep 24 01:05:26 1995
+++ zsrc/c/unixint.c Sun Jul 21 13:26:47 1996
@@ -38,12 +38,15 @@
void
sigint()
{
+ /* always reinit on entry, since there is a wee race condition that */
+ /* might bite unless you have BSD flavour signals... *sigh* */
+
+ signal(SIGINT, sigint);
if (!interrupt_enable || interrupt_flag) {
if (!interrupt_enable) {
fprintf(stdout, "\n;;;Interrupt delayed.\n"); fflush(stdout);
interrupt_flag = TRUE;
}
- signal(SIGINT, sigint);
return;
}
if (symbol_value(SVinterrupt_enable) == Cnil) {
@@ -70,9 +73,11 @@
void
sigint()
{
-#ifdef SYSV
+/*#ifdef SYSV*/
+ /* shouldn't hurt to reset it on entry... */
signal(SIGINT, sigint);
-#endif
+/*#endif*/
+
if (critical_level > 0) {
scheduler_interrupted = TRUE;
scheduler_interruption = ERROR_INT;
@@ -101,15 +106,17 @@
signal_catcher(int sig, int code, int scp)
{
char str[64];
+/* if not bsd... */
+ signal(sig, signal_catcher);
if (!interrupt_enable) {
sprintf(str, "signal %d caught (during GC)", sig);
error(str);
}
- else if (sig == SIGSEGV)
+ else if (sig == SIGSEGV) {
FEerror("Segmentation violation.~%\
Wrong type argument to a compiled function.", 0);
- else {
+ } else {
printf("System error. Trying to recover ...\n");
fflush(stdout);
FEerror("Signal ~D caught.~%\
diff --recursive --unified=3 src/c/unixtime.c zsrc/c/unixtime.c
--- src/c/unixtime.c Wed Jul 5 04:12:07 1995
+++ zsrc/c/unixtime.c Tue Jul 16 19:53:05 1996
@@ -74,10 +74,18 @@
Lround(1, z);
z = VALUES(0);
if (FIXNUMP(z))
+#ifdef THREADS
+ lwpsleep(fix(z)*1000);
+#else
sleep(fix(z));
+#endif
else
for(;;)
+#ifdef THREADS
+ lwpsleep(1000000);
+#else
sleep(1000);
+#endif
VALUES(0) = Cnil;
RETURN(1);
}
diff --recursive --unified=3 src/crs/Makefile.in zsrc/crs/Makefile.in
--- src/crs/Makefile.in Wed May 31 02:27:00 1995
+++ zsrc/crs/Makefile.in Wed Jul 17 12:04:30 1996
@@ -26,7 +26,7 @@
# Files
-OBJS = unexec.o dld.o @SETJMPO@ socket.o
+OBJS = unexec.o dld.o @SETJMPO@
HFILES = ../h/config.h $(srcdir)/objff.h
SYSDIR = ..
@@ -39,9 +39,6 @@
dld.o: $(srcdir)/@DLD@.c $(HFILES)
$(CC) -c $(CFLAGS) $(srcdir)/@DLD@.c -o $@
-
-socket.o: $(srcdir)/socket.c
- $(CC) -c $(CFLAGS) $(srcdir)/socket.c -o $@
unexec.o: $(srcdir)/@UNEXEC@.c $(HFILES)
$(CC) -c $(CFLAGS) $(srcdir)/@UNEXEC@.c -o $@
Only in src/crs: socket.c
diff --recursive --unified=3 src/h/external.h zsrc/h/external.h
--- src/h/external.h Tue Mar 12 20:17:12 1996
+++ zsrc/h/external.h Wed Jul 17 19:25:23 1996
@@ -279,9 +279,15 @@
#ifdef THREADS
extern lpd main_lpd;
extern lpd *clwp;
-extern pd *running_head;
-extern pd *running_tail;
extern pd main_pd;
+extern pd *active; /* active pd */
+extern pd *running_queue; /* running pd's */
+extern pd *blocking_queue; /* blocking pd's */
+extern pd *delayed_queue; /* delaying pd's */
+extern pd *dead_queue; /* dead pd's */
+extern pd *stopped_queue; /* stopped pd's */
+extern pd *suspended_queue; /* suspended pd's */
+extern pd *waiting_queue; /* waiting pd's */
#endif THREADS
/* macros.c */
diff --recursive --unified=3 src/h/lwp.h zsrc/h/lwp.h
--- src/h/lwp.h Tue Feb 6 04:00:30 1996
+++ zsrc/h/lwp.h Sat Jul 20 16:44:15 1996
@@ -146,8 +146,8 @@
object lwp_gentemp_prefix;
object lwp_token; /* They have to be initialized with
* alloc_simple_string and */
-} lpd;
+} lpd;
#define RUNNING 0
#define SUSPENDED 1
@@ -155,6 +155,7 @@
#define DEAD 3
#define WAITING 4
#define DELAYED 5
+#define BLOCKED 6
typedef struct pd {
object pd_thread; /* point back to its thread */
@@ -166,14 +167,46 @@
sigjmp_buf pd_env; /* Stack Environment */
#endif VAX
int pd_slice; /* time out */
+ int pd_fp_mode; /* in, out, execpt */
FILE *pd_fp; /* File pointer waiting input on */
lpd *pd_lpd; /* lisp process descriptor */
struct pd *pd_next;
} pd;
+#define PD_INPUT 0
+#define PD_OUTPUT 1
+#define PD_EXCEPTION 2
+
+#define ENQUEUE(lpd, queue) \
+ { if (queue == NULL) { \
+ lpd->pd_next = lpd; \
+ queue = lpd; \
+ } else { \
+ lpd->pd_next = queue->pd_next; \
+ queue->pd_next = lpd; \
+ } }
+
+#define DEQUEUE(lpd, queue) \
+ { pd *TMP; \
+ TMP = queue; \
+ do { \
+ if (TMP->pd_next == lpd) { \
+ TMP->pd_next = lpd->pd_next; \
+ lpd->pd_next = lpd; \
+ if (lpd == queue) \
+ queue = TMP; \
+ break; \
+ } \
+ TMP = TMP->pd_next; \
+ } while(TMP != queue); \
+ if (lpd == queue) queue = NULL; \
+ }
+#define ROTQUEUE(queue) \
+ if (queue != NULL) queue = queue->pd_next
+/*
#define PUSH(lpd) { if ( running_head == NULL) \
{ running_head = lpd; \
running_tail = lpd; \
@@ -194,7 +227,7 @@
running_head = running_head->pd_next; \
running_tail = running_tail->pd_next; \
running_tail->pd_next = NULL; }
-
+*/
/*
#define PUSH(lpd) ( running_head == NULL \
Only in zsrc/h: lwp.orig
diff --recursive --unified=3 src/h/machines.h zsrc/h/machines.h
--- src/h/machines.h Thu Jul 4 21:14:03 1996
+++ zsrc/h/machines.h Mon Jul 15 22:01:57 1996
@@ -209,7 +209,7 @@
#define IEEEFLOAT
#define DOWN_STACK
#define BSD
-# if __GNUC__ > 2 || __GNUC_MINOR__ > 6
+# if 0 /*__GNUC__ > 2 || __GNUC_MINOR__ > 6*/
# define ELF
# define UNEXEC unexelf
#define DATA_START ELF_TEXT_BASE
Only in zsrc: newthread.tgz
Only in zsrc: socket.c