--- src/c/error.c Mon Jun 24 04:19:09 1996 +++ zsrc/c/error.c Mon Jul 22 18:13:48 1996 @@ -103,10 +103,44 @@ object siSterminal_interrupt; +/* This gets _hard_ in threaded systems... */ +/* remembering that we may be in any thread when we get this call... */ +/* we may also _not_ be in a thread. Fortunately we can tell which */ +/* thread we are in by examining 'active'. */ +/* First determine where we are, if we are scheduled, or descheduled */ +/* if descheduled, then we need to be rescheduled... */ +/* then we can */ + +#ifdef THREADS +static bool ti_corr = 0; +extern void *override_redirect_fun; +extern pd *override_redirect_process; +extern pd main_pd; + +terminal_interrupt2() +{ + funcall(2, siSterminal_interrupt, ti_corr? Ct : Cnil); +} + +terminal_interrupt(bool correctable) +{ + ti_corr = correctable; + + start_critical_section(); + + override_redirect_process = &main_pd; + override_redirect_fun = terminal_interrupt2; + + force_resumption(&main_pd); + end_critical_section(); +} + +#else terminal_interrupt(bool correctable) { funcall(2, siSterminal_interrupt, correctable? Ct : Cnil); } +#endif /* THREADS */ object ihs_function_name(object x) diff --recursive --unified=3 src/c/file.d zsrc/c/file.d --- src/c/file.d Tue Mar 12 20:38:01 1996 +++ zsrc/c/file.d Wed Jul 17 22:16:17 1996 @@ -22,6 +22,11 @@ #include "config.h" +#ifdef THREADS +# include +#endif + + #if defined(BSD) && !defined(MSDOS) #include #endif @@ -347,6 +352,9 @@ x->sm.sm_object0 = Sstring_char; x->sm.sm_object1 = fn; x->sm.sm_int0 = x->sm.sm_int1 = 0; +#ifdef THREADS + fcntl(fileno(fp), F_SETFL, O_NONBLOCK); +#endif setbuf(fp, alloc_contblock(BUFSIZ)); return(x); } @@ -509,6 +517,11 @@ #ifdef TK bool no_input = FALSE; +#ifdef THREADS +# define PUTC(c, fp) lwpputc(c, fp) +#else +# define PUTC(c, fp) putc(c, fp) +#endif StdinEnableEvents() { @@ -521,11 +534,24 @@ } # define GETC(c, fp) { if (fp == stdin) \ while (no_input) Tk_DoOneEvent(0); \ +#ifdef THREADS + c = lwpgetc(fp); \ +#else c = getc(fp); \ +#endif /* THREADS */ no_input = !FILE_CNT(fp); } # define UNGETC(c, fp) { if (fp == stdin) no_input = FALSE; ungetc(c, fp); } #else +#ifdef THREADS +# define PUTC(c, fp) lwpputc(c, fp) +#else +# define PUTC(c, fp) putc(c, fp) +#endif +#ifdef THREADS +# define GETC(c, fp) c = lwpgetc(fp) +#else # define GETC(c, fp) c = getc(fp) +#endif /* THREADS */ # define UNGETC(c, fp) ungetc(c, fp) #endif @@ -544,8 +570,11 @@ if (fp == NULL) closed_stream(strm); GETC(c, fp); -/* c &= 0377; */ - if (feof(fp)) +/* c &= 0377; */ +/* if (feof(fp)) */ + /*c &= 0x7f; + printf("<%d:%c>", c, c); fflush(stdout);*/ + if (c == EOF) end_of_stream(strm); /* strm->sm.sm_int0++; useless in smm_io, Beppe */ return(c); @@ -612,6 +641,7 @@ if (fp == NULL) closed_stream(strm); UNGETC(c, fp); + /* c &= 0x7f; /* hmm? */ /* --strm->sm.sm_int0; useless in smm_io, Beppe */ break; @@ -678,7 +708,7 @@ strm->sm.sm_int1++; if (fp == NULL) closed_stream(strm); - putc(c, fp); + PUTC(c, fp); break; case smm_synonym: @@ -921,7 +951,8 @@ if (fp == NULL) closed_stream(strm); GETC(c, fp); - if (feof(fp)) +/* if (feof(fp)) */ + if (c == EOF) return(TRUE); else { UNGETC(c, fp); diff --recursive --unified=3 src/c/gbc.c zsrc/c/gbc.c --- src/c/gbc.c Wed Jul 3 02:15:49 1996 +++ zsrc/c/gbc.c Mon Jul 22 18:04:37 1996 @@ -530,7 +530,7 @@ break; #endif CLOS default: - if (debug) + if (1 || debug) printf("\ttype = %d\n", type_of(x)); error("mark botch"); } @@ -588,10 +588,14 @@ #ifdef THREADS { - pd *pdp; + pd *pdp, *queue; lpd *old_clwp = clwp; - for (pdp = running_head; pdp != (pd *)NULL; pdp = pdp->pd_next) { + queue = running_queue; + do { + pdp = queue; + do { + /*for (pdp = running_head; pdp != (pd *)NULL; pdp = pdp->pd_next) <*/ clwp = pdp->pd_lpd; #endif THREADS @@ -620,7 +624,7 @@ mark_object(clwp->lwp_gensym_prefix); mark_object(clwp->lwp_gentemp_prefix); mark_object(clwp->lwp_token); - + /* (current-thread) can return it at any time */ mark_object(clwp->lwp_thread); @@ -654,7 +658,48 @@ mark_stack_conservative(cs_org, where); } #ifdef THREADS - } + pdp = pdp->pd_next; + } while(pdp != queue); + + + /* Now I have to wonder why I didn't use an array of queues... :] */ + + if (queue == running_queue) { + if (blocking_queue) queue = blocking_queue; + else if (delayed_queue) queue = delayed_queue; + else if (dead_queue) queue = dead_queue; + else if (stopped_queue) queue = stopped_queue; + else if (suspended_queue) queue = suspended_queue; + else if (waiting_queue) queue = waiting_queue; + else queue = NULL; + } else if (queue == blocking_queue) { + if (delayed_queue) queue = delayed_queue; + else if (dead_queue) queue = dead_queue; + else if (stopped_queue) queue = stopped_queue; + else if (suspended_queue) queue = suspended_queue; + else if (waiting_queue) queue = waiting_queue; + else queue = NULL; + } else if (queue == delayed_queue) { + if (dead_queue) queue = dead_queue; + else if (stopped_queue) queue = stopped_queue; + else if (suspended_queue) queue = suspended_queue; + else if (waiting_queue) queue = waiting_queue; + else queue = NULL; + } else if (queue == dead_queue) { + if (stopped_queue) queue = stopped_queue; + else if (suspended_queue) queue = suspended_queue; + else if (waiting_queue) queue = waiting_queue; + else queue = NULL; + } else if (queue == stopped_queue) { + if (suspended_queue) queue = suspended_queue; + else if (waiting_queue) queue = waiting_queue; + else queue = NULL; + } else if (queue == suspended_queue) { + if (waiting_queue) queue = waiting_queue; + else queue = NULL; + } else if (queue == waiting_queue) + queue = NULL; + } while(queue != NULL); clwp = old_clwp; } #endif THREADS @@ -853,9 +898,9 @@ if (val == 0) { /* informations used by the garbage collector need to be updated */ # ifdef __linux - running_head->pd_env[0].__jmpbuf[0].__sp = old_env[0].__jmpbuf[0].__sp; + running_queue->pd_env[0].__jmpbuf[0].__sp = old_env[0].__jmpbuf[0].__sp; # else - running_head->pd_env[JB_SP] = old_env[JB_SP]; + running_queue->pd_env[JB_SP] = old_env[JB_SP]; # endif old_clwp = clwp; Values = main_lpd.lwp_Values; Only in zsrc/c: gbc.my diff --recursive --unified=3 src/c/load.d zsrc/c/load.d --- src/c/load.d Tue Mar 12 20:40:01 1996 +++ zsrc/c/load.d Wed Jul 17 17:55:13 1996 @@ -31,7 +31,9 @@ extern object Kwild; extern object Vdefault_pathname_defaults; extern object Vpackage; +#ifndef THREADS extern object Vstandard_output; +#endif extern object readc(); /******************************* ------- ******************************/ diff --recursive --unified=3 src/c/lwp.d zsrc/c/lwp.d --- src/c/lwp.d Thu Jun 27 17:43:28 1996 +++ zsrc/c/lwp.d Mon Jul 22 18:14:34 1996 @@ -3,6 +3,7 @@ */ /* Copyright (c) 1990, Giuseppe Attardi. + Copyright (c) 1996, Brian Spilsbury. ECoLisp is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public @@ -12,18 +13,43 @@ See file '../Copyright' for full details. */ +/* + Rewritten to use a multiple queue scheme to reduce time, and + facilitate simulated blocking io, and sleeping without + busy-looping. + Changed timing to be millisecond standard. + Made (sleep) equivelent to (%delay). + added lwpgetc(), lwpputc(), lwpread(), lwpwrite() to provide + for transparent blocking character and sequence io. lwpread, and + lwpwrite aren't used yet. + Made all streams non-blocking. + + Brian Spilsbury, 1996. +*/ #include "config.h" +#include + /******************************* EXPORTS ******************************/ lpd main_lpd; lpd *clwp = &main_lpd; int critical_level = 0; -pd *running_head; /* front of running pd's */ -pd *running_tail; /* back of running pd's */ + pd main_pd; +pd *active = &main_pd; /* the pd that is attached to clwp */ + + /* circular queues, no tails */ +pd *running_queue; /* running pd's */ +pd *blocking_queue; /* blocking pd's */ +pd *delayed_queue; /* delaying pd's */ +pd *dead_queue; /* dead pd's */ +pd *stopped_queue; /* stopped pd's */ +pd *suspended_queue; /* suspended pd's */ +pd *waiting_queue; /* waiting pd's */ + /******************************* IMPORTS ******************************/ extern scheduler_interruption; /* in unixint.c */ @@ -37,27 +63,41 @@ #define thread_switch() { setTimer(0); enable_scheduler(); \ scheduler(0, 0, NULL); } -static bool timer_active = FALSE; -static bool scheduler_disabled = FALSE; +static int timer_active = FALSE; +static int scheduler_disabled = FALSE; static int scheduler_level = 0; /* tito */ -static bool reset_timer = FALSE; +static int reset_timer = FALSE; static int running_processes = 1; +static int awake_processes = 1; static int absolute_time = 0; +static int housekeeping_time = 0; +static int fd_hightide = 3; /* highest fd ever to block + 1 */ +static int wake_lowtide = -1; /* time to closest sleep-wake */ + +/* hopefully this will work with DJGPP, but I really have no idea... */ +/* Sets for blocking threads, see housekeeping */ +static fd_set fd_rd, fd_wr, fd_ex; object Srunning; object Ssuspended; object Swaiting; object Sstopped; object Sdead; +object Sblocking; +object Sdelayed; object siSthread_top_level; +void (*override_redirect_fun)() = NULL; +pd *override_redirect_process = NULL; + static object main_thread; static setTimer(long time) { - struct itimerval oldtimer; - struct itimerval itimer; + static struct itimerval oldtimer; + static struct itimerval itimer; + itimer.it_value.tv_sec = 0; itimer.it_value.tv_usec = time; itimer.it_interval.tv_sec = 0; @@ -66,32 +106,25 @@ } pd * -dequeue() -{ - pd *tmp; - tmp = running_head; - if (running_head != NULL) - running_head = running_head->pd_next; - return tmp; -} - -pd * -make_pd() +make_pd(pd *o) { pd *new_pd; lpd *npd; /* Allocate a new descriptor for the new lwp */ - new_pd = (pd *)malloc(sizeof(pd)); + /* if we already have one, then we have passed it in o... */ + + if (o) new_pd = o; + else new_pd = (pd *)malloc(sizeof(pd)); /* create a new stack ... */ - new_pd->pd_base = (int *)malloc(STACK_SIZE * sizeof(int)); - new_pd->pd_status = SUSPENDED; + if (!o) new_pd->pd_base = (int *)malloc(STACK_SIZE * sizeof(int)); /* allocate a lisp descriptor: * using the calloc here it's possible to avoid the * critical section in the various push operations */ - npd = new_pd->pd_lpd = (lpd *)calloc(sizeof(lpd), 1); + if (o) { npd = new_pd->pd_lpd; } + else npd = new_pd->pd_lpd = (lpd *)calloc(1, sizeof(lpd)); /* initialize it */ @@ -117,11 +150,13 @@ npd->lwp_frs_top = npd->lwp_frame_stack - 1; npd->lwp_frs_limit = npd->lwp_frame_stack + FRSSIZE; + /* constants are fine for a reincarnatee */ npd->lwp_alloc_temporary = OBJNULL; npd->lwp_backq_level = 0; npd->lwp_eval1 = 0; /* for gc */ - npd->lwp_fmt_temporary_stream = OBJNULL; + /* we need to rebuild temporary_stream for some reason */ + if (!o) npd->lwp_fmt_temporary_stream = OBJNULL; npd->lwp_fmt_temporary_string = OBJNULL; npd->lwp_PRINTstream = Cnil; @@ -153,7 +188,7 @@ npd->lwp_string_register = OBJNULL; npd->lwp_gensym_prefix = OBJNULL; npd->lwp_gentemp_prefix = OBJNULL; - npd->lwp_token = OBJNULL; + if (!o) npd->lwp_token = OBJNULL; /* lex_env copy */ npd->lwp_lex[0] = lex_env[0]; @@ -168,70 +203,30 @@ /* Now the allocation. If the gc is invoked we are able to mark * the objects already allocated */ - npd->lwp_fmt_temporary_stream = make_string_output_stream(64); - npd->lwp_fmt_temporary_string = - npd->lwp_fmt_temporary_stream->sm.sm_object0; - - npd->lwp_string_register = alloc_simple_string(0); - npd->lwp_gensym_prefix = make_simple_string("G"); - npd->lwp_gentemp_prefix = make_simple_string("T"); - npd->lwp_token = alloc_simple_string(LISP_PAGESIZE); - npd->lwp_token->st.st_self = alloc_contblock(LISP_PAGESIZE); + + /* Hmm, this gets more complex with a reincarnatee */ + /* ideally we just want to initialize these destructively */ + /* and hope that this is good enough. */ + /* The main problem is that if other things have been passed these */ + /* and don't expect them to suddenly change, but I'm not sure that this */ + /* can be the case, since these should be local to a thread... */ + + if (!o) npd->lwp_fmt_temporary_stream = make_string_output_stream(64); + /* might need some resetting here? */ + npd->lwp_fmt_temporary_string = npd->lwp_fmt_temporary_stream->sm.sm_object0; + + if (!o) npd->lwp_string_register = alloc_simple_string(0); + if (!o) npd->lwp_gensym_prefix = make_simple_string("G"); + if (!o) npd->lwp_gentemp_prefix = make_simple_string("T"); + if (!o) npd->lwp_token = alloc_simple_string(LISP_PAGESIZE); + if (!o) npd->lwp_token->st.st_self = alloc_contblock(LISP_PAGESIZE); npd->lwp_token->st.st_fillp = 0; npd->lwp_token->st.st_hasfillp = TRUE; npd->lwp_token->st.st_adjustable = TRUE; - - return new_pd; -} - -update_queue() -{ - register pd *dead_pd; - pd *last = running_tail; - - do - switch (running_head->pd_status) { - - case DEAD: - - /* remove the dead process */ - dead_pd = dequeue(); - /* free the lisp descriptor */ - free(dead_pd->pd_lpd); - /* free the memory allocated for the stack and the descriptor */ - free(dead_pd->pd_base); - free(dead_pd); - break; - -/* case SUSPENDED: */ - case DELAYED: - - if (running_head->pd_slice != 0) - if (absolute_time > running_head->pd_slice) { - - /* the time slice has expired */ - running_head->pd_slice = 0; - if ((running_head->pd_thread->th.th_cont) != OBJNULL) { - /* in this case a continuation was created before %delay */ - running_head->pd_thread->th.th_cont->cn.cn_timed_out = TRUE; - running_head->pd_thread->th.th_cont = OBJNULL; - } - running_head->pd_status = RUNNING; - return; /* now you are a running process */ - } - ROTQUEUE(); - break; - - case WAITING: /* waiting processes need to be scheduled */ - case RUNNING: - return; /* found schedulable process */ - - default: /* currently is only STOPPED */ - ROTQUEUE(); - break; - } - while (running_head != last); + new_pd->pd_status = SUSPENDED; + ENQUEUE(new_pd, suspended_queue); /* needs to be on a queue */ + return new_pd; } activate_thread(object thread) @@ -275,33 +270,111 @@ sigsetmask(sigblock(0) & ~(sigmask(SIGALRM))); #endif +/* to get here we've been scheduled */ +/* so we aren't in someone else's bind stack */ +/* so we should get the defaults for the stdio */ + +start_critical_section(); + +for(;;) { /* mortal coil */ + /* set up local stdio bindings below everything else on the bind stack */ + /* so that they shouldn't be take out of scope ever... */ + + bind_var(Vstandard_input, SYM_VAL(Vstandard_input), Cnil); + bind_var(Vstandard_output, SYM_VAL(Vstandard_output), Cnil); + bind_var(Verror_output, SYM_VAL(Verror_output), Cnil); + bind_var(Vquery_io, SYM_VAL(Vquery_io), Cnil); + bind_var(Vdebug_io, SYM_VAL(Vdebug_io), Cnil); + bind_var(Vterminal_io, SYM_VAL(Vterminal_io), Cnil); + bind_var(Vtrace_output, SYM_VAL(Vtrace_output), Cnil); + { int i; - for (i = clwp->lwp_nValues; i > 0;) + for (i = clwp->lwp_nValues; i > 0;) VALUES(i) = VALUES(--i); VALUES(0) = clwp->lwp_thread->th.th_fun; + end_critical_section(); apply(clwp->lwp_nValues+1, siSthread_top_level, &VALUES(0)); + start_critical_section(); } /* Termination */ - - terpri(Cnil); - running_head->pd_status = DEAD; - running_head->pd_thread->th.th_self = NULL; + + { + pd *tmp = active; + + tmp->pd_status = DEAD; running_processes--; + awake_processes--; + DEQUEUE(tmp, running_queue); + ENQUEUE(tmp, dead_queue); + } - update_queue(); - thread_next(); /* update_queue has freed our stack!!! */ + end_critical_section(); + thread_switch(); /* stack won't have been free'd yet... that's a */ + /* job for housekeeping to think about. */ + /* otherwise dead threads live in limbo waiting */ + /* for reincarnation */ + /* incase we are raised from the dead, we want to do it again */ + start_critical_section(); + } } /* - * switch to the first thread on queue + * switch to the next thread on queue */ thread_next() { + /* rotate the running-queue */ + + /* should devolve into an if, but *shrug* better to guarantee */ + start_critical_section(); + + if (override_redirect_process == active) { + void (*fun)(); + + force_resumption(active); + + if (running_processes > 1) { + timer_active = TRUE; + setTimer(REALQUANTUM); + } else { + timer_active = FALSE; + absolute_time = 0; + } + fun = override_redirect_fun; + + override_redirect_fun = NULL; + override_redirect_process = NULL; + + end_critical_section(); + (*fun)(); + start_critical_section(); + } + /* unwind the bind stack */ lwp_bds_unwind(clwp->lwp_bind_stack, clwp->lwp_bds_top); + ROTQUEUE(running_queue); + + end_critical_section(); + /* housekeeping isn't actually critical */ + /* and turns off the timer while its in there */ + if (absolute_time > housekeeping_time) + housekeeping(); + + /* we need this incase a signal blew us out of the previous housekeeping */ + /* and running_queue is void */ + + while(running_queue == NULL) { + static struct timeb tb; + ftime(&tb); /* not sure how portable */ + absolute_time = tb.millitm + tb.time*1000; + housekeeping(); + } + start_critical_section(); + /* switch clwp */ - clwp = running_head->pd_lpd; + clwp = running_queue->pd_lpd; + active = running_queue; /* restore Values pointer */ Values = clwp->lwp_Values; @@ -313,19 +386,253 @@ if (running_processes > 1) { timer_active = TRUE; setTimer(REALQUANTUM); - } else { + } else { timer_active = FALSE; absolute_time = 0; } - siglongjmp(running_head->pd_env, 1); + + end_critical_section(); + siglongjmp(active->pd_env, 1); } /* * Called when time slice expires or explicitily to switch thread + * New version... */ scheduler(int sig, int code, struct sigcontext *scp) { int val; + static struct timeb tb; + +#if defined(SYSV) || defined(__svr4__) || defined(__linux) + signal(SIGALRM, scheduler); +#endif SYSV + + ftime(&tb); /* not sure how portable */ + absolute_time = tb.millitm + tb.time*1000; + + if (critical_level > 0) { /* within critical section */ + scheduler_interrupted = TRUE; + scheduler_interruption = SCHEDULER_INT; + return; + } + if (scheduler_level > 0) { /* abilitation check */ + scheduler_interrupted = TRUE; + return; + } + + val = sigsetjmp(active->pd_env, 1); + + if (val == 1) /* resume interrupted thread execution */ + return; /* coming back from longjmp in thread_next */ + + if (val == 2) /* coming back from longjmp in GC */ + gc(garbage_parameter); /* GC will return to the previous thread */ + + thread_next(); +} + +/* TODO: Add in waiting thread condition resolution */ +housekeeping() +{ + static struct timeval timeout; + static pd *p, *q; + static int tide; + static fd_set rd, wr, ex; + /* see if we are polling or lurking */ + + /* turn off that bloody timer... */ + setTimer(0); + + if ((running_processes > 1) && (awake_processes > 0)) { + /* poll */ + /* set timeout to instant */ + timeout.tv_sec = timeout.tv_usec = 0; + tide = 1; + } else { + /* is recovery possible? */ + + if ( (running_queue == NULL) && + (blocking_queue == NULL) && + (delayed_queue == NULL)) { + /* in a coma... can't awaken itself... */ + /* there is a possibility that a signal */ + /* will, but um, for now assume dead and buried */ + exit(0); /* bail w/out error */ + } + + /* ok, in theory we can wake up.... so lurk */ + /* set timeout to the shortest sleep resumption time */ + /* if there isn't a sleep resumption time */ + /* block forever... */ + + if (wake_lowtide != -1) { + if (absolute_time >= wake_lowtide) { + timeout.tv_sec = timeout.tv_usec = 0; + tide = 1; + } else { + timeout.tv_sec = (wake_lowtide-absolute_time)/1000; + timeout.tv_usec = ((wake_lowtide-absolute_time)%1000)*1000; + tide = 1; + } + } else tide = 0; + } + + memcpy(&rd, &fd_rd, sizeof(fd_set)); + memcpy(&wr, &fd_wr, sizeof(fd_set)); + memcpy(&ex, &fd_ex, sizeof(fd_set)); + + /* If there is an error, the sets are undefined, just bail out... */ + /* we'll catch it next time, it was probably a signal interrupting */ + /* us. */ + if (select(fd_hightide, &rd, &wr, &ex, (tide ? &timeout : NULL)) == -1) { + /* we broke for some reason */ + /* a signal handler may have been invoked... */ + /* someone may have been woken up */ + /* so schedule another housekeeping apointment */ + /* and bail */ + goto out; + } + + /* check for awakened threads */ + + if (p = blocking_queue) + do { + q = p->pd_next; + switch(p->pd_fp_mode) { + case PD_INPUT: + if (FD_ISSET(fileno(p->pd_fp), &rd)) { + FD_CLR(fileno(p->pd_fp), &fd_rd); + DEQUEUE(p, blocking_queue); + ENQUEUE(p, running_queue); + p->pd_status = RUNNING; + awake_processes++; + if (blocking_queue == NULL) goto endblk; + } + break; + case PD_OUTPUT: + if (FD_ISSET(fileno(p->pd_fp), &wr)) { + FD_CLR(fileno(p->pd_fp), &fd_wr); + DEQUEUE(p, blocking_queue); + ENQUEUE(p, running_queue); + p->pd_status = RUNNING; + awake_processes++; + if (blocking_queue == NULL) goto endblk; + } + break; + case PD_EXCEPTION: + if (FD_ISSET(fileno(p->pd_fp), &ex)) { + FD_CLR(fileno(p->pd_fp), &fd_ex); + DEQUEUE(p, blocking_queue); + ENQUEUE(p, running_queue); + p->pd_status = RUNNING; + awake_processes++; + if (blocking_queue == NULL) goto endblk; + } + break; + } + p = q; + } while(p != blocking_queue); + + /* if sleeping, check for wakeup.... */ + +endblk: tide = -1; + + /*putchar('.'); fflush(stdout);*/ + + if ((wake_lowtide != -1) && ((p = delayed_queue) != NULL)) + do { + q = p->pd_next; + if (absolute_time >= p->pd_slice) { + DEQUEUE(p, delayed_queue); + ENQUEUE(p, running_queue); + p->pd_status = RUNNING; + awake_processes++; + if (delayed_queue == NULL) break; + } else { + /* get new low-tide */ + if ((tide == -1) || (p->pd_slice < tide)) + tide = p->pd_slice; + } + + p = q; + } while(p != delayed_queue); + + wake_lowtide = tide; + + /* requeue waiting threads for resolution checking */ + /* not a good solution, but the only one that I can see */ + /* at least now we will only check them on housekeeping intervals */ + /* just get the timeout cases to set the wake_lowtide on entry */ + /* to prevent oversleeping. */ + + while(p = waiting_queue) { + DEQUEUE(p, waiting_queue); + ENQUEUE(p, running_queue); + p->pd_status = RUNNING; + awake_processes++; + } + +out: housekeeping_time = absolute_time + 1000; /* one second */ +} + +/* this will mostly be used in conjunction with override_redirect */ +/* which should be set before entry here. */ + +force_resumption(pd *p) +{ + start_critical_section(); + + switch(p->pd_status) { + case RUNNING: DEQUEUE(p, running_queue); + break; + case STOPPED: DEQUEUE(p, stopped_queue); + running_processes++; + awake_processes++; + break; + case SUSPENDED: DEQUEUE(p, stopped_queue); + running_processes++; + awake_processes++; + break; + case DEAD: /* hmmm... raising the dead might be dangerous */ + running_processes++; + awake_processes++; + DEQUEUE(p, dead_queue); + break; + case WAITING: DEQUEUE(p, waiting_queue); + awake_processes++; + break; + case DELAYED: DEQUEUE(p, delayed_queue); + awake_processes++; + break; + /* don't worry about the magic, at worst this can */ + running_processes++; + /* cause housekeeping to make a false check */ + case BLOCKED: DEQUEUE(p, blocking_queue); + /* this needs some fixing */ + switch(p->pd_fp_mode) { + case PD_INPUT: + FD_CLR(fileno(p->pd_fp), &fd_rd); + break; + case PD_OUTPUT: + FD_CLR(fileno(p->pd_fp), &fd_wr); + break; + case PD_EXCEPTION: + FD_CLR(fileno(p->pd_fp), &fd_ex); + break; + } + awake_processes++; + break; + } + + ENQUEUE(p, running_queue); + end_critical_section(); +} + +#ifdef 0 +scheduler(int sig, int code, struct sigcontext *scp) +{ + int val; #if defined(SYSV) || defined(__svr4__) || defined(__linux) signal(SIGALRM, scheduler); @@ -342,7 +649,7 @@ return; } - val = sigsetjmp(running_head->pd_env, 1); + val = sigsetjmp(running_dead->pd_env, 1); if (val == 1) /* resume interrupted thread execution */ return; /* coming back from longjmp in thread_next */ @@ -353,6 +660,7 @@ ROTQUEUE(); thread_next(); } +#endif /* old version */ /* * Handle signal received within critical section @@ -400,18 +708,34 @@ register pd *p; start_critical_section(); - running_processes++; + + if (rpd->pd_status == STOPPED) { + DEQUEUE(rpd, stopped_queue); + running_processes++; + awake_processes++; + } else if (rpd->pd_status == DELAYED) { + DEQUEUE(rpd, delayed_queue); + /* TODO: look for interaction with housekeeping problem for this case... */ + awake_processes++; + } else if (rpd->pd_status == SUSPENDED) { + DEQUEUE(rpd, suspended_queue); + running_processes++; + awake_processes++; + } + + /* else, should we be here? no. hmmm. */ + /* where are the fucking arguments? */ rpd->pd_status = RUNNING; - for (p = running_head; (p != rpd) && (p != NULL); p = p->pd_next) ; - if (p == NULL) - ENQUEUE(rpd); + + ENQUEUE(rpd, running_queue); + end_critical_section(); if (!timer_active) { timer_active = TRUE; setTimer(REALQUANTUM); - } + } } /*********** @@ -428,6 +752,11 @@ RETURN(1); } +/* Hmmmmm, what the hell is this supposed to do? */ +/* and why bother? */ +/* Put this in the TODO basket, as something of dubiousness */ + +#ifdef 0 siLthread_break_quit(int narg) { /* reset everything in MT */ @@ -443,17 +772,21 @@ critical_level = 0; scheduler_interrupted = 0; - for (p = running_head; (p != NULL); p = p->pd_next) - if (p != &main_pd) + /*for (p = running_dead; (p != NULL); p = p->pd_next)*/ + p = running_queue; + do { + if (p != &main_pd) { p->pd_status = DEAD; - else { + } else { p->pd_status = RUNNING; p->pd_thread->th.th_cont = OBJNULL; - } + } + p = p->pd_next; + } while(running_queue->pd_next != running_queue); - if (running_head != &main_pd) { + if (running_queue != &main_pd) { update_queue(); - thread_next(); + thread_dext(); /* here one should deallocate the main-thread function */ } else @@ -462,6 +795,7 @@ VALUES(0) = Cnil; RETURN(1); } +#endif siLthread_break_resume(int narg) { @@ -478,16 +812,70 @@ Lthread_list(int narg) { pd *p; - object tmp, x = CONS(running_head->pd_thread, Cnil); + object tmp; + object tmp2 = CONS(running_queue->pd_thread, Srunning); + object x = CONS(tmp2, Cnil); tmp = x; start_critical_section(); - for (p = running_head->pd_next; (p != NULL); p = p->pd_next) { - CDR(tmp) = CONS(p->pd_thread, Cnil); - tmp = CDR(tmp); - } + p = running_queue->pd_next; + while(p != running_queue) { + tmp2 = CONS(p->pd_thread, Srunning); + CDR(tmp) = CONS(tmp2, Cnil); + tmp = CDR(tmp); + p = p->pd_next; + } + + if (p = blocking_queue) + do { + tmp2 = CONS(p->pd_thread, Sblocking); + CDR(tmp) = CONS(tmp2, Cnil); + p = p->pd_next; + tmp = CDR(tmp); + } while(p != blocking_queue); + + if (p = delayed_queue) + do { + tmp2 = CONS(p->pd_thread, Sdelayed); + CDR(tmp) = CONS(tmp2, Cnil); + p = p->pd_next; + tmp = CDR(tmp); + } while(p != delayed_queue); + + /* TODO: Should this queue be listed? */ + if (p = dead_queue) + do { + tmp2 = CONS(p->pd_thread, Sdead); + CDR(tmp) = CONS(tmp2, Cnil); + p = p->pd_next; + tmp = CDR(tmp); + } while(p != dead_queue); + + if (p = stopped_queue) + do { + tmp2 = CONS(p->pd_thread, Sstopped); + CDR(tmp) = CONS(tmp2, Cnil); + p = p->pd_next; + tmp = CDR(tmp); + } while(p != stopped_queue); + + if (p = suspended_queue) + do { + tmp2 = CONS(p->pd_thread, Ssuspended); + CDR(tmp) = CONS(tmp2, Cnil); + p = p->pd_next; + tmp = CDR(tmp); + } while(p != suspended_queue); + + if (p = waiting_queue) + do { + tmp2 = CONS(p->pd_thread, Swaiting); + CDR(tmp) = CONS(tmp2, Cnil); + p = p->pd_next; + tmp = CDR(tmp); + } while(p != waiting_queue); end_critical_section(); @@ -511,20 +899,41 @@ /* fun = SYM_FUN(fun); confusing */ } - x = alloc_object(t_thread); - x->th.th_fun = fun; - x->th.th_size = sizeof(pd); - x->th.th_self = npd = make_pd(); - x->th.th_cont = OBJNULL; +start_critical_section(); + /* see if there is a lost soul waiting for reincarnation */ + if (dead_queue) { + /* ok, lets juice it up */ + + npd = dead_queue; + DEQUEUE(npd, dead_queue); + + x = npd->pd_lpd->lwp_thread; + /* enqueued in make_pd */ + + x->th.th_fun = fun; + x->th.th_size = sizeof(pd); + x->th.th_self = make_pd(npd); /* reinitialize it */ + x->th.th_cont = OBJNULL; + } else { + /* ok, no lost souls, better build a new one */ + x = alloc_object(t_thread); + x->th.th_fun = fun; + x->th.th_size = sizeof(pd); + x->th.th_self = npd = make_pd(0); + x->th.th_cont = OBJNULL; - npd->pd_thread = x; - npd->pd_slice = 0; + npd->pd_thread = x; + npd->pd_slice = 0; - /* Backpointer to thread */ - npd->pd_lpd->lwp_thread = x; + /* Backpointer to thread */ + npd->pd_lpd->lwp_thread = x; activate_thread(x); + } + + /* note: this is created as a suspended thread, and in that queue */ +end_critical_section(); VALUES(0) = x; RETURN(1); } @@ -542,12 +951,17 @@ start_critical_section(); /* tito */ thread->th.th_self->pd_status = STOPPED; running_processes--; - if (thread->th.th_self == running_head) { - critical_level--; /* end_critical_section() */ - update_queue(); - thread_next(); - } else - end_critical_section(); + awake_processes--; + if (thread->th.th_self == active) { + DEQUEUE(thread->th.th_self, running_queue); + ENQUEUE(thread->th.th_self, stopped_queue); + critical_level--; /* end_critical_section() */ + thread_switch(); + } else { + DEQUEUE(thread->th.th_self, running_queue); + ENQUEUE(thread->th.th_self, stopped_queue); + end_critical_section(); + } VALUES(0) = Cnil; RETURN(1); } @@ -568,6 +982,7 @@ start_critical_section(); /* tito */ thread->th.th_self->pd_status = RUNNING; running_processes++; + awake_processes++; if (!timer_active) { timer_active = TRUE; @@ -582,11 +997,8 @@ Lkill_thread(int narg, object thread) { + pd *tmp; - /* The following code is not enough. - Consider: The scheduler can be disabled - What about killing the current thread? - */ check_arg(1); if (type_of(thread) != t_thread) @@ -597,13 +1009,17 @@ thread->th.th_self->pd_status = DEAD; if (thread->th.th_self->pd_lpd == clwp) { /* if a thread kills itself the scheduler is to be called */ - thread->th.th_self = NULL; + tmp = thread->th.th_self; + DEQUEUE(tmp, running_queue); + ENQUEUE(tmp, dead_queue); critical_level--; /* end_critical_section() */ - update_queue(); - thread_next(); + thread_switch(); } else { - thread->th.th_self = NULL; + /*thread->th.th_self = NULL;*/ + tmp = thread->th.th_self; + DEQUEUE(tmp, running_queue); + ENQUEUE(tmp, dead_queue); end_critical_section(); } } @@ -642,6 +1058,11 @@ case DEAD: VALUES(0) = Sdead; break; + case BLOCKED: + VALUES(0) = Sblocking; + break; + case DELAYED: + VALUES(0) = Sdelayed; default: FEerror("Unexpected type for thread ~A", 1, thread); } @@ -663,6 +1084,7 @@ object x; check_arg(1); + if (type_of(thread) != t_thread) FEwrong_type_argument(Sthread, thread); @@ -797,91 +1219,206 @@ check_arg(0); if (timer_active) { - running_head->pd_status = SUSPENDED; + pd *tmp = active; + tmp->pd_status = SUSPENDED; + DEQUEUE(tmp, running_queue); + ENQUEUE(tmp, suspended_queue); running_processes--; + awake_processes--; thread_switch(); /* When resumed it will be provided with the Values to return */ - RETURN(running_head->pd_lpd->lwp_nValues); + RETURN(tmp->pd_lpd->lwp_nValues); } else FEerror("No other active thread.", 0); } +void lwpblockon(pd *who, FILE *fp, int mode) +{ + who->pd_fp = fp; + who->pd_fp_mode = mode; /* in, out, execept */ + who->pd_status = BLOCKED; + + start_critical_section(); + + DEQUEUE(who, running_queue); + ENQUEUE(who, blocking_queue); + awake_processes--; + + if (fd_hightide <= fileno(fp)) + fd_hightide = fileno(fp)+1; + + switch(mode) { + case PD_INPUT: + FD_SET(fileno(fp), &fd_rd); + break; + case PD_OUTPUT: + FD_SET(fileno(fp), &fd_wr); + break; + case PD_EXCEPTION: + FD_SET(fileno(fp), &fd_ex); + break; + } + + end_critical_section(); + + thread_switch(); +} + +int inline lwpgetc(FILE *fp) +{ + int c; + +loop: errno = 0; + c = getc(fp); + if (errno) { + lwpblockon(active, fp, PD_INPUT); + clearerr(fp); + goto loop; + } + return(c); +} + +void inline lwpputc(char c, FILE *fp) +{ +loop: errno = 0; + putc(c, fp); + if (errno) { + lwpblockon(active, fp, PD_OUTPUT); + clearerr(fp); + goto loop; + } + return; +} + +int inline lwpread(char *buf, int len, FILE *fp) +{ + int ind = 0, left = len, n; + +loop: errno = 0; + n = read(&buf[ind], left, fileno(fp)); + if (errno) { + ind += n; + left -= n; + lwpblockon(active, fp, PD_INPUT); + clearerr(fp); + goto loop; + } + return(ind+n); +} + +int inline lwpwrite(char *buf, int len, FILE *fp) +{ + int ind = 0, left = len, n; + +loop: errno = 0; + n = write(&buf[ind], left, fileno(fp)); + if (errno) { + ind += n; + left -= n; + lwpblockon(active, fp, PD_OUTPUT); + clearerr(fp); + goto loop; + } + return(ind+n); +} + Ldelay(int narg, object interval) { int z; check_arg(1); check_type_non_negative_integer(&interval); z = fix(interval); + + if (timer_active) { + pd *tmp = active; /* remember who we are */ + lwpsleep(z*1000); /* lwpsleep is in milliseconds */ + /* When resumed it will be provided with the Values to return */ + RETURN(tmp->pd_lpd->lwp_nValues); + } + else + { + sleep(z); + } +} + +/* Sleep for at least ms milliseconds */ +lwpsleep(int ms) +{ if (timer_active) { - running_head->pd_status = DELAYED; - running_processes--; + pd *tmp = active; + + start_critical_section(); + tmp->pd_status = DELAYED; + DEQUEUE(tmp, running_queue); + ENQUEUE(tmp, delayed_queue); + awake_processes--; - /* Translate seconds in number of scheduler slices */ - running_head->pd_slice = z * 10 + absolute_time; + tmp->pd_slice = ms + absolute_time; + + if ((wake_lowtide == -1) || (wake_lowtide > tmp->pd_slice)) + wake_lowtide = tmp->pd_slice; + + end_critical_section(); thread_switch(); - - /* When resumed it will be provided with the Values to return */ - RETURN(running_head->pd_lpd->lwp_nValues); } - else - sleep(z); + else usleep(ms*1000); /* milli->micro */ } +/* TODO: Find a way to move this functionality into housekeeping() + sigh */ + Lthread_wait(int narg, object fun, ...) { int nr; + pd *tmp = active; va_list args; va_start(args, fun); - + if (narg < 1) FEtoo_few_arguments(&narg); - start_critical_section(); - running_head->pd_status = WAITING; - running_processes--; - end_critical_section(); - for (;;) { nr = apply(narg-1, fun, args); if (VALUES(0) != Cnil) break; - else if (timer_active) { - /* the time slice has not been used */ - absolute_time--; - thread_switch(); - } else - FEerror("The condition will never be satisfied for lack of active processes", 0); + + start_critical_section(); + tmp->pd_status = WAITING; + + DEQUEUE(tmp, running_queue); + ENQUEUE(tmp, waiting_queue); + + awake_processes--; + + end_critical_section(); + thread_switch(); } - running_head->pd_status = RUNNING; - running_processes++; - end_critical_section(); + RETURN(nr); } - +/* TODO: whack this into housekeeping() */ + Lthread_wait_with_timeout(int narg, object timeout, object fun, ...) { int nr; + pd *tmp = active; va_list args; va_start(args, fun); if (narg < 2) FEtoo_few_arguments(&narg); check_type_non_negative_integer(&timeout); - /* We have to translate seconds in scheduler call number */ - start_critical_section(); - running_head->pd_slice = fix(timeout) * 10 + absolute_time; + /* We have to translate seconds into milliseconds into the future */ + tmp->pd_slice = fix(timeout) * 1000 + absolute_time; - running_head->pd_status = WAITING; - running_processes--; - end_critical_section(); for (;;) { - if (absolute_time > running_head->pd_slice) { + if (absolute_time > tmp->pd_slice) { /* the time slice has expired */ VALUES(0) = Cnil; nr = 1; @@ -897,13 +1434,19 @@ absolute_time--; thread_switch(); } - } + + tmp->pd_status = WAITING; + DEQUEUE(tmp, running_queue); + ENQUEUE(tmp, waiting_queue); + awake_processes--; + + if ((wake_lowtide == -1) || (wake_lowtide > tmp->pd_slice)) + wake_lowtide = tmp->pd_slice; + + thread_switch(); + } - start_critical_section(); - running_head->pd_slice = 0; - running_head->pd_status = RUNNING; - running_processes++; - end_critical_section(); + tmp->pd_slice = 0; RETURN(nr); } @@ -912,11 +1455,25 @@ signal(SIGALRM, scheduler); } +/* called when we start after a dump */ +linit_lwp() +{ + FD_ZERO(&fd_rd); + FD_ZERO(&fd_wr); + FD_ZERO(&fd_ex); +} + init_lwp() { pd *temp_pd; temp_pd = &main_pd; - PUSH(temp_pd); + /*PUSH(temp_pd);*/ + + FD_ZERO(&fd_rd); + FD_ZERO(&fd_wr); + FD_ZERO(&fd_ex); + + ENQUEUE(temp_pd, running_queue); main_thread = alloc_object(t_thread); main_pd.pd_thread = main_thread; @@ -935,10 +1492,12 @@ Swaiting = make_ordinary("WAITING"); Sstopped = make_ordinary("STOPPED"); Sdead = make_ordinary("DEAD"); + Sblocking = make_ordinary("BLOCKING"); + Sdelayed = make_ordinary("DELAYED"); siSthread_top_level = make_si_ordinary("THREAD-TOP-LEVEL"); make_si_function("THREAD-BREAK-IN", siLthread_break_in); - make_si_function("THREAD-BREAK-QUIT", siLthread_break_quit); +/* make_si_function("THREAD-BREAK-QUIT", siLthread_break_quit); */ make_si_function("THREAD-BREAK-RESUME", siLthread_break_resume); make_function("MAKE-THREAD", Lmake_thread); Only in zsrc/c: lwp.my Only in zsrc/c: lwp.orig diff --recursive --unified=3 src/c/main.c zsrc/c/main.c --- src/c/main.c Mon Apr 15 20:54:12 1996 +++ zsrc/c/main.c Mon Jul 22 18:14:02 1996 @@ -170,6 +170,11 @@ setbuf(stdin, stdin_buf); setbuf(stdout, stdout_buf); +#ifdef THREADS + fcntl(fileno(stdin), F_SETFL, O_NONBLOCK); + fcntl(fileno(stdout), F_SETFL, O_NONBLOCK); +#endif + ARGC = argc; ARGV = argv; ecl_self = argv[0]; @@ -185,6 +190,7 @@ gc_time = 0; #ifdef THREADS + clwp = &main_lpd; Values = main_lpd.lwp_Values; #endif frs_top = frs_org-1; @@ -226,6 +232,7 @@ enable_interrupt(); siLcatch_bad_signals(0); #ifdef THREADS + linit_lwp(); enable_lwp(); #endif THREADS SYM_VAL(siVlisp_maxpages) = MAKE_FIXNUM(real_maxpage); Only in zsrc/c: main.my diff --recursive --unified=3 src/c/print.d zsrc/c/print.d --- src/c/print.d Fri Jul 5 02:41:09 1996 +++ zsrc/c/print.d Wed Jul 17 16:38:16 1996 @@ -2258,7 +2258,10 @@ write_ch_fun = interactive_writec_stream; else #endif CLOS + { + printf("type_of(strm) == %d, t_stream == %d\n", type_of(strm), t_stream); fflush(stdout); FEerror("~S is not a stream.", 1, strm); + } write_ch('\n', strm); FLUSH_STREAM(strm); return(Cnil); diff --recursive --unified=3 src/c/read.d zsrc/c/read.d --- src/c/read.d Thu Jun 6 03:50:15 1996 +++ zsrc/c/read.d Wed Jul 17 02:20:29 1996 @@ -253,12 +253,30 @@ #if TK extern bool no_input; +#ifdef THREADS +# define PUTC(c, fp) lwpputc(c, fp) +#else +# define PUTC(c, fp) putc(c, fp) +#endif #define GETC(c, fp) { if (fp == stdin) \ while (no_input) Tk_DoOneEvent(0); \ +#ifdef THREADS + c = lwpgetc(fp); \ +#else c = getc(fp); \ +#endif /* THREADS */ no_input = !FILE_CNT(fp); } #else +#ifdef THREADS +# define PUTC(c, fp) lwpputc(c, fp) +#else +# define PUTC(c, fp) putc(c, fp) +#endif +#ifdef THREADS +#define GETC(c, fp) c = lwpgetc(fp) +#else #define GETC(c, fp) c = getc(fp) +#endif /* THREADS */ #endif /* TK */ /* Beppe: faster code for inner loop from file stream */ diff --recursive --unified=3 src/c/tcp.c zsrc/c/tcp.c --- src/c/tcp.c Wed May 31 18:36:36 1995 +++ zsrc/c/tcp.c Wed Jul 17 21:22:12 1996 @@ -12,6 +12,7 @@ */ #include "config.h" +#include object make_stream(object host, int fd, enum smmode smm) @@ -35,6 +36,9 @@ stream = alloc_object(t_stream); stream->sm.sm_mode = (short)smm; stream->sm.sm_fp = fp; +#ifdef THREADS + fcntl(fd, F_SETFL, O_NONBLOCK); +#endif fp->_IO_buf_base = NULL; /* BASEFF */; stream->sm.sm_object0 = Sstring_char; stream->sm.sm_object1 = host; /* not really used */ @@ -67,11 +71,11 @@ FEerror("~S is a too long file name.", 1, host); #ifdef THREADS - start_critical_section(); +/* start_critical_section(); */ #endif THREADS fd = connect_to_server(host->st.st_self, fix(port)); #ifdef THREADS - end_critical_section(); +/* end_critical_section(); */ #endif THREADS if (fd == 0) { @@ -94,13 +98,7 @@ if (!FIXNUMP(port)) FEwrong_type_argument(TSpositive_number, port); -#ifdef THREADS - start_critical_section(); -#endif THREADS fd = create_server_port(fix(port)); -#ifdef THREADS - end_critical_section(); -#endif THREADS if (fd == 0) VALUES(0) = Cnil; @@ -116,4 +114,190 @@ { make_si_function("OPEN-CLIENT-STREAM", Lopen_client_stream); make_si_function("OPEN-SERVER-STREAM", Lopen_server_stream); +} + +/* + + Ok, maybe this shouldn't be here, but it really doesn't belong in + crs does it? Also moving it here makes life much easier. +*/ + +/* socket.c -- socket interface */ +/* Maybe this shouldn't be here, but what the hell. */ +/* + Copyright (c) 1990, Giuseppe Attardi. + + ECoLisp is free software; you can redistribute it and/or modify it + under the terms of the GNU General Library Public License as published + by the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + See file '../Copyright' for full details. +*/ + +#include +#include +#include + +#include +#include +#include +#include + +#include + +extern int errno; + +/*********************************************************************** + * Client side + **********************************************************************/ + +/* + * Attempts to connect to server, given host and port. Returns file + * descriptor (network socket) or 0 if connection fails. + */ +int connect_to_server(char *host, int port) +{ + struct sockaddr_in inaddr; /* INET socket address. */ + struct sockaddr *addr; /* address to connect to */ + struct hostent *host_ptr; + int addrlen; /* length of address */ + extern char *getenv(); + extern struct hostent *gethostbyname(); + int fd; /* Network socket */ + + /* Get the statistics on the specified host. */ + if ((inaddr.sin_addr.s_addr = inet_addr(host)) == -1) { + if ((host_ptr = gethostbyname(host)) == NULL) { + /* No such host! */ + errno = EINVAL; + return(0); + } + /* Check the address type for an internet host. */ + if (host_ptr->h_addrtype != AF_INET) { + /* Not an Internet host! */ + errno = EPROTOTYPE; + return(0); + } + /* Set up the socket data. */ + inaddr.sin_family = host_ptr->h_addrtype; + memcpy((char *)&inaddr.sin_addr, (char *)host_ptr->h_addr, + sizeof(inaddr.sin_addr)); + } + else + inaddr.sin_family = AF_INET; + + addr = (struct sockaddr *) &inaddr; + addrlen = sizeof (struct sockaddr_in); + inaddr.sin_port = port; + inaddr.sin_port = htons(inaddr.sin_port); + /* + * Open the network connection. + */ + if ((fd = socket((int) addr->sa_family, SOCK_STREAM, 0)) < 0) + return(0); /* errno set by system call. */ + +#ifdef TCP_NODELAY + /* make sure to turn off TCP coalescence */ + { int mi; + setsockopt (fd, IPPROTO_TCP, TCP_NODELAY, &mi, sizeof (int)); + } +#endif + +#ifdef THREADS +start_critical_section(); +#endif + if (connect(fd, addr, addrlen) == -1) { + (void) close (fd); +#ifdef THREADS +end_critical_section(); +#endif + return(0); /* errno set by system call. */ + } + /* + * Return the id if the connection succeeded. + */ + return(fd); +} + + +/*********************************************************************** + * Server side + **********************************************************************/ +/* + * Creates a server port. Returns file + * descriptor (network socket) or 0 if connection fails. + */ + +int create_server_port(int port) +{ + struct sockaddr_in inaddr; /* INET socket address. */ + struct sockaddr *addr; /* address to connect to */ + int addrlen; /* length of address */ + int request, conn; /* Network socket */ + + /* + * Open the network connection. + */ + if ((request = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + return(0); /* errno set by system call. */ + } + +#ifdef SO_REUSEADDR + /* Necesary to restart the server without a reboot */ + { + int one = 1; + setsockopt(request, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int)); + } +#endif /* SO_REUSEADDR */ +#ifdef TCP_NODELAY + /* make sure to turn off TCP coalescence */ + { int mi; + setsockopt(request, IPPROTO_TCP, TCP_NODELAY, &mi, sizeof (int)); + } +#endif + + /* Set up the socket data. */ + memset((char *)&inaddr, 0, sizeof(inaddr)); + inaddr.sin_family = AF_INET; + inaddr.sin_port = htons(port); + inaddr.sin_addr.s_addr = htonl(INADDR_ANY); + + if (bind(request, (struct sockaddr *)&inaddr, sizeof (inaddr))) + FEerror("Binding TCP socket", 0); + if (listen(request, 1)) + FEerror("TCP listening", 0); + +#ifdef THREADS + /* Don't make this file-descriptor non-blocking */ + /* just block on it before we attempt to accept from it */ + /* Think _hard_ about moving this out of here, into somewhere sane */ + /* and creating an 'accepting' stream type, which is bound to a port */ + /* on reading returns streams */ + { + FILE *fp; /* need to use FILE *'s rather than fd... *sigh* */ + if ((fp = fdopen(request, "r")) == (FILE *)0) { + printf("fdopen didn't work on accept fd!\n"); fflush(stdout); + } + fcntl(request, F_SETFL, O_NONBLOCK); + clearerr(fp); + +loop: errno = 0; +#endif + if ((conn = accept(request, (struct sockaddr *)NULL, (int *)NULL)) < 0) +#ifndef THREADS + FEerror("Accepting requests", 0); +#else /* THREADS */ + if (errno) { + lwpblockon(active, fp, PD_INPUT); + clearerr(fp); + goto loop; + } else { + fclose(fp); + FEerror("Accepting requests", 0); + } + fclose(fp); + } +#endif /* THREADS */ + return(conn); } diff --recursive --unified=3 src/c/unixint.c zsrc/c/unixint.c --- src/c/unixint.c Sun Sep 24 01:05:26 1995 +++ zsrc/c/unixint.c Sun Jul 21 13:26:47 1996 @@ -38,12 +38,15 @@ void sigint() { + /* always reinit on entry, since there is a wee race condition that */ + /* might bite unless you have BSD flavour signals... *sigh* */ + + signal(SIGINT, sigint); if (!interrupt_enable || interrupt_flag) { if (!interrupt_enable) { fprintf(stdout, "\n;;;Interrupt delayed.\n"); fflush(stdout); interrupt_flag = TRUE; } - signal(SIGINT, sigint); return; } if (symbol_value(SVinterrupt_enable) == Cnil) { @@ -70,9 +73,11 @@ void sigint() { -#ifdef SYSV +/*#ifdef SYSV*/ + /* shouldn't hurt to reset it on entry... */ signal(SIGINT, sigint); -#endif +/*#endif*/ + if (critical_level > 0) { scheduler_interrupted = TRUE; scheduler_interruption = ERROR_INT; @@ -101,15 +106,17 @@ signal_catcher(int sig, int code, int scp) { char str[64]; +/* if not bsd... */ + signal(sig, signal_catcher); if (!interrupt_enable) { sprintf(str, "signal %d caught (during GC)", sig); error(str); } - else if (sig == SIGSEGV) + else if (sig == SIGSEGV) { FEerror("Segmentation violation.~%\ Wrong type argument to a compiled function.", 0); - else { + } else { printf("System error. Trying to recover ...\n"); fflush(stdout); FEerror("Signal ~D caught.~%\ diff --recursive --unified=3 src/c/unixtime.c zsrc/c/unixtime.c --- src/c/unixtime.c Wed Jul 5 04:12:07 1995 +++ zsrc/c/unixtime.c Tue Jul 16 19:53:05 1996 @@ -74,10 +74,18 @@ Lround(1, z); z = VALUES(0); if (FIXNUMP(z)) +#ifdef THREADS + lwpsleep(fix(z)*1000); +#else sleep(fix(z)); +#endif else for(;;) +#ifdef THREADS + lwpsleep(1000000); +#else sleep(1000); +#endif VALUES(0) = Cnil; RETURN(1); } diff --recursive --unified=3 src/crs/Makefile.in zsrc/crs/Makefile.in --- src/crs/Makefile.in Wed May 31 02:27:00 1995 +++ zsrc/crs/Makefile.in Wed Jul 17 12:04:30 1996 @@ -26,7 +26,7 @@ # Files -OBJS = unexec.o dld.o @SETJMPO@ socket.o +OBJS = unexec.o dld.o @SETJMPO@ HFILES = ../h/config.h $(srcdir)/objff.h SYSDIR = .. @@ -39,9 +39,6 @@ dld.o: $(srcdir)/@DLD@.c $(HFILES) $(CC) -c $(CFLAGS) $(srcdir)/@DLD@.c -o $@ - -socket.o: $(srcdir)/socket.c - $(CC) -c $(CFLAGS) $(srcdir)/socket.c -o $@ unexec.o: $(srcdir)/@UNEXEC@.c $(HFILES) $(CC) -c $(CFLAGS) $(srcdir)/@UNEXEC@.c -o $@ Only in src/crs: socket.c diff --recursive --unified=3 src/h/external.h zsrc/h/external.h --- src/h/external.h Tue Mar 12 20:17:12 1996 +++ zsrc/h/external.h Wed Jul 17 19:25:23 1996 @@ -279,9 +279,15 @@ #ifdef THREADS extern lpd main_lpd; extern lpd *clwp; -extern pd *running_head; -extern pd *running_tail; extern pd main_pd; +extern pd *active; /* active pd */ +extern pd *running_queue; /* running pd's */ +extern pd *blocking_queue; /* blocking pd's */ +extern pd *delayed_queue; /* delaying pd's */ +extern pd *dead_queue; /* dead pd's */ +extern pd *stopped_queue; /* stopped pd's */ +extern pd *suspended_queue; /* suspended pd's */ +extern pd *waiting_queue; /* waiting pd's */ #endif THREADS /* macros.c */ diff --recursive --unified=3 src/h/lwp.h zsrc/h/lwp.h --- src/h/lwp.h Tue Feb 6 04:00:30 1996 +++ zsrc/h/lwp.h Sat Jul 20 16:44:15 1996 @@ -146,8 +146,8 @@ object lwp_gentemp_prefix; object lwp_token; /* They have to be initialized with * alloc_simple_string and */ -} lpd; +} lpd; #define RUNNING 0 #define SUSPENDED 1 @@ -155,6 +155,7 @@ #define DEAD 3 #define WAITING 4 #define DELAYED 5 +#define BLOCKED 6 typedef struct pd { object pd_thread; /* point back to its thread */ @@ -166,14 +167,46 @@ sigjmp_buf pd_env; /* Stack Environment */ #endif VAX int pd_slice; /* time out */ + int pd_fp_mode; /* in, out, execpt */ FILE *pd_fp; /* File pointer waiting input on */ lpd *pd_lpd; /* lisp process descriptor */ struct pd *pd_next; } pd; +#define PD_INPUT 0 +#define PD_OUTPUT 1 +#define PD_EXCEPTION 2 + +#define ENQUEUE(lpd, queue) \ + { if (queue == NULL) { \ + lpd->pd_next = lpd; \ + queue = lpd; \ + } else { \ + lpd->pd_next = queue->pd_next; \ + queue->pd_next = lpd; \ + } } + +#define DEQUEUE(lpd, queue) \ + { pd *TMP; \ + TMP = queue; \ + do { \ + if (TMP->pd_next == lpd) { \ + TMP->pd_next = lpd->pd_next; \ + lpd->pd_next = lpd; \ + if (lpd == queue) \ + queue = TMP; \ + break; \ + } \ + TMP = TMP->pd_next; \ + } while(TMP != queue); \ + if (lpd == queue) queue = NULL; \ + } +#define ROTQUEUE(queue) \ + if (queue != NULL) queue = queue->pd_next +/* #define PUSH(lpd) { if ( running_head == NULL) \ { running_head = lpd; \ running_tail = lpd; \ @@ -194,7 +227,7 @@ running_head = running_head->pd_next; \ running_tail = running_tail->pd_next; \ running_tail->pd_next = NULL; } - +*/ /* #define PUSH(lpd) ( running_head == NULL \ Only in zsrc/h: lwp.orig diff --recursive --unified=3 src/h/machines.h zsrc/h/machines.h --- src/h/machines.h Thu Jul 4 21:14:03 1996 +++ zsrc/h/machines.h Mon Jul 15 22:01:57 1996 @@ -209,7 +209,7 @@ #define IEEEFLOAT #define DOWN_STACK #define BSD -# if __GNUC__ > 2 || __GNUC_MINOR__ > 6 +# if 0 /*__GNUC__ > 2 || __GNUC_MINOR__ > 6*/ # define ELF # define UNEXEC unexelf #define DATA_START ELF_TEXT_BASE Only in zsrc: newthread.tgz Only in zsrc: socket.c