Schedule.c 73.4 KB
Newer Older
1
/* ---------------------------------------------------------------------------
2
 * $Id: Schedule.c,v 1.51 2000/03/13 10:53:56 simonmar Exp $
3
 *
4
 * (c) The GHC Team, 1998-2000
5 6 7
 *
 * Scheduler
 *
8 9 10 11 12 13 14 15
 * The main scheduling code in GranSim is quite different from that in std
 * (concurrent) Haskell: while concurrent Haskell just iterates over the
 * threads in the runnable queue, GranSim is event driven, i.e. it iterates
 * over the events in the global event queue.  -- HWL
 * --------------------------------------------------------------------------*/

//@node Main scheduling code, , ,
//@section Main scheduling code
16

17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
/* Version with scheduler monitor support for SMPs.

   This design provides a high-level API to create and schedule threads etc.
   as documented in the SMP design document.

   It uses a monitor design controlled by a single mutex to exercise control
   over accesses to shared data structures, and builds on the Posix threads
   library.

   The majority of state is shared.  In order to keep essential per-task state,
   there is a Capability structure, which contains all the information
   needed to run a thread: its STG registers, a pointer to its TSO, a
   nursery etc.  During STG execution, a pointer to the capability is
   kept in a register (BaseReg).

   In a non-SMP build, there is one global capability, namely MainRegTable.

   SDM & KH, 10/99
*/

37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
//@menu
//* Includes::			
//* Variables and Data structures::  
//* Prototypes::		
//* Main scheduling loop::	
//* Suspend and Resume::	
//* Run queue code::		
//* Garbage Collextion Routines::  
//* Blocking Queue Routines::	
//* Exception Handling Routines::  
//* Debugging Routines::	
//* Index::			
//@end menu

//@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
//@subsection Includes

54 55 56 57 58 59 60 61 62 63 64 65 66
#include "Rts.h"
#include "SchedAPI.h"
#include "RtsUtils.h"
#include "RtsFlags.h"
#include "Storage.h"
#include "StgRun.h"
#include "StgStartup.h"
#include "GC.h"
#include "Hooks.h"
#include "Schedule.h"
#include "StgMiscClosures.h"
#include "Storage.h"
#include "Evaluator.h"
67
#include "Exception.h"
68 69 70 71 72
#include "Printer.h"
#include "Main.h"
#include "Signals.h"
#include "Profiling.h"
#include "Sanity.h"
73
#include "Stats.h"
74
#include "Sparks.h"
75
#include "Prelude.h"
76 77 78 79 80 81 82 83 84
#if defined(GRAN) || defined(PAR)
# include "GranSimRts.h"
# include "GranSim.h"
# include "ParallelRts.h"
# include "Parallel.h"
# include "ParallelDebug.h"
# include "FetchMe.h"
# include "HLC.h"
#endif
85 86

#include <stdarg.h>
87

88 89 90
//@node Variables and Data structures, Prototypes, Includes, Main scheduling code
//@subsection Variables and Data structures

91 92 93 94 95 96 97 98 99 100 101 102 103 104
/* Main threads:
 *
 * These are the threads which clients have requested that we run.  
 *
 * In an SMP build, we might have several concurrent clients all
 * waiting for results, and each one will wait on a condition variable
 * until the result is available.
 *
 * In non-SMP, clients are strictly nested: the first client calls
 * into the RTS, which might call out again to C with a _ccall_GC, and
 * eventually re-enter the RTS.
 *
 * Main threads information is kept in a linked list:
 */
105
//@cindex StgMainThread
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
typedef struct StgMainThread_ {
  StgTSO *         tso;
  SchedulerStatus  stat;
  StgClosure **    ret;
#ifdef SMP
  pthread_cond_t wakeup;
#endif
  struct StgMainThread_ *link;
} StgMainThread;

/* Main thread queue.
 * Locks required: sched_mutex.
 */
static StgMainThread *main_threads;

/* Thread queues.
 * Locks required: sched_mutex.
 */
124 125 126 127

#if defined(GRAN)

StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
128
/* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146

/* 
   In GranSim we have a runable and a blocked queue for each processor.
   In order to minimise code changes new arrays run_queue_hds/tls
   are created. run_queue_hd is then a short cut (macro) for
   run_queue_hds[CurrentProc] (see GranSim.h).
   -- HWL
*/
StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
StgTSO *ccalling_threadss[MAX_PROC];

#else /* !GRAN */

//@cindex run_queue_hd
//@cindex run_queue_tl
//@cindex blocked_queue_hd
//@cindex blocked_queue_tl
147 148 149
StgTSO *run_queue_hd, *run_queue_tl;
StgTSO *blocked_queue_hd, *blocked_queue_tl;

150 151 152 153 154
/* Threads suspended in _ccall_GC.
 * Locks required: sched_mutex.
 */
static StgTSO *suspended_ccalling_threads;

155 156
static void GetRoots(void);
static StgTSO *threadStackOverflow(StgTSO *tso);
157
#endif
158

159 160 161 162 163
/* KH: The following two flags are shared memory locations.  There is no need
       to lock them, since they are only unset at the end of a scheduler
       operation.
*/

164
/* flag set by signal handler to precipitate a context switch */
165
//@cindex context_switch
166
nat context_switch;
167

168
/* if this flag is set as well, give up execution */
169
//@cindex interrupted
170
rtsBool interrupted;
171

172 173 174
/* Next thread ID to allocate.
 * Locks required: sched_mutex
 */
175
//@cindex next_thread_id
176 177 178 179 180 181 182
StgThreadID next_thread_id = 1;

/*
 * Pointers to the state of the current thread.
 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
 * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
 */
183
 
184 185 186 187 188 189 190 191 192 193 194 195
/* The smallest stack size that makes any sense is:
 *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
 *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
 *  + 1                       (the realworld token for an IO thread)
 *  + 1                       (the closure to enter)
 *
 * A thread with this stack will bomb immediately with a stack
 * overflow, which will increase its stack size.  
 */

#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)

196 197 198 199
/* Free capability list.
 * Locks required: sched_mutex.
 */
#ifdef SMP
200 201 202 203 204 205 206 207 208 209 210
//@cindex free_capabilities
//@cindex n_free_capabilities
Capability *free_capabilities; /* Available capabilities for running threads */
nat n_free_capabilities;       /* total number of available capabilities */
#else
//@cindex MainRegTable
Capability MainRegTable;       /* for non-SMP, we have one global capability */
#endif

#if defined(GRAN)
StgTSO      *CurrentTSOs[MAX_PROC];
211
#else
212
StgTSO      *CurrentTSO;
213 214 215 216 217 218 219
#endif

rtsBool ready_to_gc;

/* All our current task ids, saved in case we need to kill them later.
 */
#ifdef SMP
220
//@cindex task_ids
221 222 223 224 225 226
task_info *task_ids;
#endif

void            addToBlockedQueue ( StgTSO *tso );

static void     schedule          ( void );
227
       void     interruptStgRts   ( void );
228 229 230 231 232
static StgTSO * createThread_     ( nat size, rtsBool have_lock );

#ifdef DEBUG
static void sched_belch(char *s, ...);
#endif
233 234

#ifdef SMP
235 236 237 238
//@cindex sched_mutex
//@cindex term_mutex
//@cindex thread_ready_cond
//@cindex gc_pending_cond
239 240 241 242 243 244 245 246
pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;

nat await_death;
#endif

247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
#if defined(PAR)
StgTSO *LastTSO;
rtsTime TimeOfLastYield;
#endif

/*
 * The thread state for the main thread.
// ToDo: check whether not needed any more
StgTSO   *MainTSO;
 */


//@node Prototypes, Main scheduling loop, Variables and Data structures, Main scheduling code
//@subsection Prototypes

//@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
//@subsection Main scheduling loop

/* ---------------------------------------------------------------------------
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
   Main scheduling loop.

   We use round-robin scheduling, each thread returning to the
   scheduler loop when one of these conditions is detected:

      * out of heap space
      * timer expires (thread yields)
      * thread blocks
      * thread ends
      * stack overflow

   Locking notes:  we acquire the scheduler lock once at the beginning
   of the scheduler loop, and release it when
    
      * running a thread, or
      * waiting for work, or
      * waiting for a GC to complete.

284 285
   ------------------------------------------------------------------------ */
//@cindex schedule
286 287 288 289 290 291
static void
schedule( void )
{
  StgTSO *t;
  Capability *cap;
  StgThreadReturnCode ret;
292 293 294 295 296 297 298
#if defined(GRAN)
  rtsEvent *event;
#elif defined(PAR)
  rtsSpark spark;
  StgTSO *tso;
  GlobalTaskId pe;
#endif
299
  rtsBool was_interrupted = rtsFalse;
300 301 302
  
  ACQUIRE_LOCK(&sched_mutex);

303 304 305 306 307 308 309 310 311
#if defined(GRAN)
# error ToDo: implement GranSim scheduler
#elif defined(PAR)
  while (!GlobalStopPending) {          /* GlobalStopPending set in par_exit */

    if (PendingFetches != END_BF_QUEUE) {
        processFetches();
    }
#else
312
  while (1) {
313
#endif
314

315 316 317 318 319
    /* If we're interrupted (the user pressed ^C, or some other
     * termination condition occurred), kill all the currently running
     * threads.
     */
    if (interrupted) {
320
      IF_DEBUG(scheduler, sched_belch("interrupted"));
321 322 323 324 325 326 327 328
      for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
	deleteThread(t);
      }
      for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
	deleteThread(t);
      }
      run_queue_hd = run_queue_tl = END_TSO_QUEUE;
      blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
329 330
      interrupted = rtsFalse;
      was_interrupted = rtsTrue;
331 332 333 334 335 336 337 338 339 340 341 342
    }

    /* Go through the list of main threads and wake up any
     * clients whose computations have finished.  ToDo: this
     * should be done more efficiently without a linear scan
     * of the main threads list, somehow...
     */
#ifdef SMP
    { 
      StgMainThread *m, **prev;
      prev = &main_threads;
      for (m = main_threads; m != NULL; m = m->link) {
343 344
	switch (m->tso->whatNext) {
	case ThreadComplete:
345 346 347 348 349 350
	  if (m->ret) {
	    *(m->ret) = (StgClosure *)m->tso->sp[0];
	  }
	  *prev = m->link;
	  m->stat = Success;
	  pthread_cond_broadcast(&m->wakeup);
351 352
	  break;
	case ThreadKilled:
353
	  *prev = m->link;
354
	  if (was_interrupted) {
355 356 357 358
	    m->stat = Interrupted;
	  } else {
	    m->stat = Killed;
	  }
359
	  pthread_cond_broadcast(&m->wakeup);
360 361 362
	  break;
	default:
	  break;
363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379
	}
      }
    }
#else
    /* If our main thread has finished or been killed, return.
     */
    {
      StgMainThread *m = main_threads;
      if (m->tso->whatNext == ThreadComplete
	  || m->tso->whatNext == ThreadKilled) {
	main_threads = main_threads->link;
	if (m->tso->whatNext == ThreadComplete) {
	  /* we finished successfully, fill in the return value */
	  if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
	  m->stat = Success;
	  return;
	} else {
380
	  if (was_interrupted) {
381 382 383 384
	    m->stat = Interrupted;
	  } else {
	    m->stat = Killed;
	  }
385 386 387 388 389 390
	  return;
	}
      }
    }
#endif

391 392 393 394
    /* Top up the run queue from our spark pool.  We try to make the
     * number of threads in the run queue equal to the number of
     * free capabilities.
     */
395
#if defined(SMP)
396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411
    {
      nat n = n_free_capabilities;
      StgTSO *tso = run_queue_hd;

      /* Count the run queue */
      while (n > 0 && tso != END_TSO_QUEUE) {
	tso = tso->link;
	n--;
      }

      for (; n > 0; n--) {
	StgClosure *spark;
	spark = findSpark();
	if (spark == NULL) {
	  break; /* no more sparks in the pool */
	} else {
412 413 414
	  /* I'd prefer this to be done in activateSpark -- HWL */
	  /* tricky - it needs to hold the scheduler lock and
	   * not try to re-acquire it -- SDM */
415 416 417 418
	  StgTSO *tso;
	  tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
	  pushClosure(tso,spark);
	  PUSH_ON_RUN_QUEUE(tso);
419
#ifdef PAR
420 421 422 423 424 425 426 427 428 429 430 431 432 433 434
	  advisory_thread_count++;
#endif
	  
	  IF_DEBUG(scheduler,
		   sched_belch("turning spark of closure %p into a thread",
			       (StgClosure *)spark));
	}
      }
      /* We need to wake up the other tasks if we just created some
       * work for them.
       */
      if (n_free_capabilities - n > 1) {
	  pthread_cond_signal(&thread_ready_cond);
      }
    }
435
#endif /* SMP */
436

437 438 439 440 441
    /* Check whether any waiting threads need to be woken up.  If the
     * run queue is empty, and there are no other tasks running, we
     * can wait indefinitely for something to happen.
     * ToDo: what if another client comes along & requests another
     * main thread?
442 443
     */
    if (blocked_queue_hd != END_TSO_QUEUE) {
444 445 446
      awaitEvent(
	   (run_queue_hd == END_TSO_QUEUE)
#ifdef SMP
447
	&& (n_free_capabilities == RtsFlags.ParFlags.nNodes)
448 449
#endif
	);
450 451 452 453 454 455 456 457 458
    }
    
    /* check for signals each time around the scheduler */
#ifndef __MINGW32__
    if (signals_pending()) {
      start_signal_handlers();
    }
#endif

459 460 461 462 463 464 465 466
    /* Detect deadlock: when we have no threads to run, there are
     * no threads waiting on I/O or sleeping, and all the other
     * tasks are waiting for work, we must have a deadlock.  Inform
     * all the main threads.
     */
#ifdef SMP
    if (blocked_queue_hd == END_TSO_QUEUE
	&& run_queue_hd == END_TSO_QUEUE
467
	&& (n_free_capabilities == RtsFlags.ParFlags.nNodes)
468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487
	) {
      StgMainThread *m;
      for (m = main_threads; m != NULL; m = m->link) {
	  m->ret = NULL;
	  m->stat = Deadlock;
	  pthread_cond_broadcast(&m->wakeup);
      }
      main_threads = NULL;
    }
#else /* ! SMP */
    if (blocked_queue_hd == END_TSO_QUEUE
	&& run_queue_hd == END_TSO_QUEUE) {
      StgMainThread *m = main_threads;
      m->ret = NULL;
      m->stat = Deadlock;
      main_threads = m->link;
      return;
    }
#endif

488 489 490 491 492
#ifdef SMP
    /* If there's a GC pending, don't do anything until it has
     * completed.
     */
    if (ready_to_gc) {
493
      IF_DEBUG(scheduler,sched_belch("waiting for GC"));
494 495 496 497 498 499 500
      pthread_cond_wait(&gc_pending_cond, &sched_mutex);
    }
    
    /* block until we've got a thread on the run queue and a free
     * capability.
     */
    while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
501
      IF_DEBUG(scheduler, sched_belch("waiting for work"));
502
      pthread_cond_wait(&thread_ready_cond, &sched_mutex);
503
      IF_DEBUG(scheduler, sched_belch("work now available"));
504 505
    }
#endif
506 507 508 509

#if defined(GRAN)
# error ToDo: implement GranSim scheduler
#elif defined(PAR)
510
    /* ToDo: phps merge with spark activation above */
511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532
    /* check whether we have local work and send requests if we have none */
    if (run_queue_hd == END_TSO_QUEUE) {  /* no runnable threads */
      /* :-[  no local threads => look out for local sparks */
      if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
	  (pending_sparks_hd[REQUIRED_POOL] < pending_sparks_tl[REQUIRED_POOL] ||
	   pending_sparks_hd[ADVISORY_POOL] < pending_sparks_tl[ADVISORY_POOL])) {
	/* 
	 * ToDo: add GC code check that we really have enough heap afterwards!!
	 * Old comment:
	 * If we're here (no runnable threads) and we have pending
	 * sparks, we must have a space problem.  Get enough space
	 * to turn one of those pending sparks into a
	 * thread... 
	 */
	
	spark = findSpark();                /* get a spark */
	if (spark != (rtsSpark) NULL) {
	  tso = activateSpark(spark);       /* turn the spark into a thread */
	  IF_PAR_DEBUG(verbose,
		       belch("== [%x] schedule: Created TSO %p (%d); %d threads active",
			     mytid, tso, tso->id, advisory_thread_count));

533
	  if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
534 535
	    belch("^^ failed to activate spark");
	    goto next_thread;
536
	  }               /* otherwise fall through & pick-up new tso */
537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607
	} else {
	  IF_PAR_DEBUG(verbose,
		       belch("^^ no local sparks (spark pool contains only NFs: %d)", 
			     spark_queue_len(ADVISORY_POOL)));
	  goto next_thread;
	}
      } else  
      /* =8-[  no local sparks => look for work on other PEs */
      {
	/*
	 * We really have absolutely no work.  Send out a fish
	 * (there may be some out there already), and wait for
	 * something to arrive.  We clearly can't run any threads
	 * until a SCHEDULE or RESUME arrives, and so that's what
	 * we're hoping to see.  (Of course, we still have to
	 * respond to other types of messages.)
	 */
	if (//!fishing &&  
	    outstandingFishes < RtsFlags.ParFlags.maxFishes ) { // &&
	  // (last_fish_arrived_at+FISH_DELAY < CURRENT_TIME)) {
	  /* fishing set in sendFish, processFish;
	     avoid flooding system with fishes via delay */
	  pe = choosePE();
	  sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
		   NEW_FISH_HUNGER);
	}
	
	processMessages();
	goto next_thread;
	// ReSchedule(0);
      }
    } else if (PacketsWaiting()) {  /* Look for incoming messages */
      processMessages();
    }

    /* Now we are sure that we have some work available */
    ASSERT(run_queue_hd != END_TSO_QUEUE);
    /* Take a thread from the run queue, if we have work */
    t = take_off_run_queue(END_TSO_QUEUE);

    /* ToDo: write something to the log-file
    if (RTSflags.ParFlags.granSimStats && !sameThread)
        DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
    */

    CurrentTSO = t;

    IF_DEBUG(scheduler, belch("--^^ %d sparks on [%#x] (hd=%x; tl=%x; lim=%x)", 
			      spark_queue_len(ADVISORY_POOL), CURRENT_PROC,
			      pending_sparks_hd[ADVISORY_POOL], 
			      pending_sparks_tl[ADVISORY_POOL], 
			      pending_sparks_lim[ADVISORY_POOL]));

    IF_DEBUG(scheduler, belch("--== %d threads on [%#x] (hd=%x; tl=%x)", 
			      run_queue_len(), CURRENT_PROC,
			      run_queue_hd, run_queue_tl));

    if (t != LastTSO) {
      /* 
	 we are running a different TSO, so write a schedule event to log file
	 NB: If we use fair scheduling we also have to write  a deschedule 
	     event for LastTSO; with unfair scheduling we know that the
	     previous tso has blocked whenever we switch to another tso, so
	     we don't need it in GUM for now
      */
      DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
		       GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
      
    }

#else /* !GRAN && !PAR */
608 609 610 611
  
    /* grab a thread from the run queue
     */
    t = POP_RUN_QUEUE();
612
    IF_DEBUG(sanity,checkTSO(t));
613 614

#endif
615 616 617 618 619 620 621 622 623 624 625 626 627 628 629
    
    /* grab a capability
     */
#ifdef SMP
    cap = free_capabilities;
    free_capabilities = cap->link;
    n_free_capabilities--;
#else
    cap = &MainRegTable;
#endif
    
    cap->rCurrentTSO = t;
    
    /* set the context_switch flag
     */
630
    if (run_queue_hd == END_TSO_QUEUE)
631 632 633
      context_switch = 0;
    else
      context_switch = 1;
634

635 636
    RELEASE_LOCK(&sched_mutex);
    
637
    IF_DEBUG(scheduler,sched_belch("running thread %d", t->id));
638

639
    /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655
    /* Run the current thread 
     */
    switch (cap->rCurrentTSO->whatNext) {
    case ThreadKilled:
    case ThreadComplete:
      /* Thread already finished, return to scheduler. */
      ret = ThreadFinished;
      break;
    case ThreadEnterGHC:
      ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
      break;
    case ThreadRunGHC:
      ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
      break;
    case ThreadEnterHugs:
#ifdef INTERPRETER
656 657
      {
         StgClosure* c;
658
	 IF_DEBUG(scheduler,sched_belch("entering Hugs"));
659 660 661 662
	 c = (StgClosure *)(cap->rCurrentTSO->sp[0]);
	 cap->rCurrentTSO->sp += 1;
	 ret = enter(cap,c);
         break;
663 664 665 666 667 668 669
      }
#else
      barf("Panic: entered a BCO but no bytecode interpreter in this build");
#endif
    default:
      barf("schedule: invalid whatNext field");
    }
670
    /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
671 672 673 674 675 676 677 678 679
    
    /* Costs for the scheduler are assigned to CCS_SYSTEM */
#ifdef PROFILING
    CCCS = CCS_SYSTEM;
#endif
    
    ACQUIRE_LOCK(&sched_mutex);

#ifdef SMP
680
    IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
681
#else
682
    IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712
#endif
    t = cap->rCurrentTSO;
    
    switch (ret) {
    case HeapOverflow:
      /* make all the running tasks block on a condition variable,
       * maybe set context_switch and wait till they all pile in,
       * then have them wait on a GC condition variable.
       */
      IF_DEBUG(scheduler,belch("thread %ld stopped: HeapOverflow", t->id));
      threadPaused(t);
      
      ready_to_gc = rtsTrue;
      context_switch = 1;		/* stop other threads ASAP */
      PUSH_ON_RUN_QUEUE(t);
      break;
      
    case StackOverflow:
      /* just adjust the stack for this thread, then pop it back
       * on the run queue.
       */
      IF_DEBUG(scheduler,belch("thread %ld stopped, StackOverflow", t->id));
      threadPaused(t);
      { 
	StgMainThread *m;
	/* enlarge the stack */
	StgTSO *new_t = threadStackOverflow(t);
	
	/* This TSO has moved, so update any pointers to it from the
	 * main thread stack.  It better not be on any other queues...
713
	 * (it shouldn't be).
714 715 716 717 718 719
	 */
	for (m = main_threads; m != NULL; m = m->link) {
	  if (m->tso == t) {
	    m->tso = new_t;
	  }
	}
720
	threadPaused(new_t);
721 722 723 724 725
	PUSH_ON_RUN_QUEUE(new_t);
      }
      break;

    case ThreadYielding:
726 727 728 729 730 731 732 733
#if defined(GRAN)
      IF_DEBUG(gran, 
	       DumpGranEvent(GR_DESCHEDULE, t));
      globalGranStats.tot_yields++;
#elif defined(PAR)
      IF_DEBUG(par, 
	       DumpGranEvent(GR_DESCHEDULE, t));
#endif
734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753
      /* put the thread back on the run queue.  Then, if we're ready to
       * GC, check whether this is the last task to stop.  If so, wake
       * up the GC thread.  getThread will block during a GC until the
       * GC is finished.
       */
      IF_DEBUG(scheduler,
	       if (t->whatNext == ThreadEnterHugs) {
		 /* ToDo: or maybe a timer expired when we were in Hugs?
		  * or maybe someone hit ctrl-C
		  */
		 belch("thread %ld stopped to switch to Hugs", t->id);
	       } else {
		 belch("thread %ld stopped, yielding", t->id);
	       }
	       );
      threadPaused(t);
      APPEND_TO_RUN_QUEUE(t);
      break;
      
    case ThreadBlocked:
754 755 756 757 758 759 760
#if defined(GRAN)
# error ToDo: implement GranSim scheduler
#elif defined(PAR)
      IF_DEBUG(par, 
	       DumpGranEvent(GR_DESCHEDULE, t)); 
#else
#endif
761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780
      /* don't need to do anything.  Either the thread is blocked on
       * I/O, in which case we'll have called addToBlockedQueue
       * previously, or it's blocked on an MVar or Blackhole, in which
       * case it'll be on the relevant queue already.
       */
      IF_DEBUG(scheduler,
	       fprintf(stderr, "thread %d stopped, ", t->id);
	       printThreadBlockage(t);
	       fprintf(stderr, "\n"));
      threadPaused(t);
      break;
      
    case ThreadFinished:
      /* Need to check whether this was a main thread, and if so, signal
       * the task that started it with the return value.  If we have no
       * more main threads, we probably need to stop all the tasks until
       * we get a new one.
       */
      IF_DEBUG(scheduler,belch("thread %ld finished", t->id));
      t->whatNext = ThreadComplete;
781 782 783 784 785 786 787
#if defined(GRAN)
      // ToDo: endThread(t, CurrentProc); // clean-up the thread
#elif defined(PAR)
      advisory_thread_count--;
      if (RtsFlags.ParFlags.ParStats.Full) 
	DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
#endif
788 789 790 791 792 793 794 795 796 797 798 799 800
      break;
      
    default:
      barf("doneThread: invalid thread return code");
    }
    
#ifdef SMP
    cap->link = free_capabilities;
    free_capabilities = cap;
    n_free_capabilities++;
#endif

#ifdef SMP
801
    if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
802
#else
803
    if (ready_to_gc) 
804
#endif
805
      {
806 807 808 809 810 811
      /* everybody back, start the GC.
       * Could do it in this thread, or signal a condition var
       * to do it in another thread.  Either way, we need to
       * broadcast on gc_pending_cond afterward.
       */
#ifdef SMP
812
      IF_DEBUG(scheduler,sched_belch("doing GC"));
813 814 815 816 817 818 819
#endif
      GarbageCollect(GetRoots);
      ready_to_gc = rtsFalse;
#ifdef SMP
      pthread_cond_broadcast(&gc_pending_cond);
#endif
    }
820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836
#if defined(GRAN)
  next_thread:
    IF_GRAN_DEBUG(unused,
		  print_eventq(EventHd));

    event = get_next_event();

#elif defined(PAR)
  next_thread:
    /* ToDo: wait for next message to arrive rather than busy wait */

#else /* GRAN */
  /* not any more
  next_thread:
    t = take_off_run_queue(END_TSO_QUEUE);
  */
#endif /* GRAN */
837 838 839
  } /* end of while(1) */
}

840 841 842 843
/* A hack for Hugs concurrency support.  Needs sanitisation (?) */
void deleteAllThreads ( void )
{
  StgTSO* t;
844
  IF_DEBUG(scheduler,sched_belch("deleteAllThreads()"));
845 846 847 848 849 850 851 852 853 854
  for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
    deleteThread(t);
  }
  for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
    deleteThread(t);
  }
  run_queue_hd = run_queue_tl = END_TSO_QUEUE;
  blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
}

855
/* startThread and  insertThread are now in GranSim.c -- HWL */
856

857 858 859 860
//@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
//@subsection Suspend and Resume

/* ---------------------------------------------------------------------------
861 862 863 864 865 866 867 868 869 870 871 872
 * Suspending & resuming Haskell threads.
 * 
 * When making a "safe" call to C (aka _ccall_GC), the task gives back
 * its capability before calling the C function.  This allows another
 * task to pick up the capability and carry on running Haskell
 * threads.  It also means that if the C call blocks, it won't lock
 * the whole system.
 *
 * The Haskell thread making the C call is put to sleep for the
 * duration of the call, on the susepended_ccalling_threads queue.  We
 * give out a token to the task, which it can use to resume the thread
 * on return from the C function.
873
 * ------------------------------------------------------------------------- */
874 875 876 877 878 879 880 881 882
   
StgInt
suspendThread( Capability *cap )
{
  nat tok;

  ACQUIRE_LOCK(&sched_mutex);

  IF_DEBUG(scheduler,
883
	   sched_belch("thread %d did a _ccall_gc\n", cap->rCurrentTSO->id));
884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924

  threadPaused(cap->rCurrentTSO);
  cap->rCurrentTSO->link = suspended_ccalling_threads;
  suspended_ccalling_threads = cap->rCurrentTSO;

  /* Use the thread ID as the token; it should be unique */
  tok = cap->rCurrentTSO->id;

#ifdef SMP
  cap->link = free_capabilities;
  free_capabilities = cap;
  n_free_capabilities++;
#endif

  RELEASE_LOCK(&sched_mutex);
  return tok; 
}

Capability *
resumeThread( StgInt tok )
{
  StgTSO *tso, **prev;
  Capability *cap;

  ACQUIRE_LOCK(&sched_mutex);

  prev = &suspended_ccalling_threads;
  for (tso = suspended_ccalling_threads; 
       tso != END_TSO_QUEUE; 
       prev = &tso->link, tso = tso->link) {
    if (tso->id == (StgThreadID)tok) {
      *prev = tso->link;
      break;
    }
  }
  if (tso == END_TSO_QUEUE) {
    barf("resumeThread: thread not found");
  }

#ifdef SMP
  while (free_capabilities == NULL) {
925
    IF_DEBUG(scheduler, sched_belch("waiting to resume"));
926
    pthread_cond_wait(&thread_ready_cond, &sched_mutex);
927
    IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
928 929 930 931 932 933 934 935 936 937 938 939 940 941
  }
  cap = free_capabilities;
  free_capabilities = cap->link;
  n_free_capabilities--;
#else  
  cap = &MainRegTable;
#endif

  cap->rCurrentTSO = tso;

  RELEASE_LOCK(&sched_mutex);
  return cap;
}

942 943

/* ---------------------------------------------------------------------------
944
 * Static functions
945
 * ------------------------------------------------------------------------ */
946 947
static void unblockThread(StgTSO *tso);

948
/* ---------------------------------------------------------------------------
949 950 951 952
 * Comparing Thread ids.
 *
 * This is used from STG land in the implementation of the
 * instances of Eq/Ord for ThreadIds.
953
 * ------------------------------------------------------------------------ */
954 955 956 957 958 959 960 961 962 963 964

int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
{ 
  StgThreadID id1 = tso1->id; 
  StgThreadID id2 = tso2->id;
 
  if (id1 < id2) return (-1);
  if (id1 > id2) return 1;
  return 0;
}

965
/* ---------------------------------------------------------------------------
966 967 968 969 970 971 972
   Create a new thread.

   The new thread starts with the given stack size.  Before the
   scheduler can run, however, this thread needs to have a closure
   (and possibly some arguments) pushed on its stack.  See
   pushClosure() in Schedule.h.

sof's avatar
sof committed
973
   createGenThread() and createIOThread() (in SchedAPI.h) are
974
   convenient packaged versions of this function.
975 976 977 978 979 980 981 982 983
   ------------------------------------------------------------------------ */
//@cindex createThread
#if defined(GRAN)
/* currently pri (priority) is only used in a GRAN setup -- HWL */
StgTSO *
createThread(nat stack_size, StgInt pri)
{
  return createThread_(stack_size, rtsFalse, pri);
}
984

985 986 987 988
static StgTSO *
createThread_(nat size, rtsBool have_lock, StgInt pri)
{
#else
989
StgTSO *
990
createThread(nat stack_size)
991
{
992
  return createThread_(stack_size, rtsFalse);
993 994 995 996
}

static StgTSO *
createThread_(nat size, rtsBool have_lock)
997
{
998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018
#endif
    StgTSO *tso;
    nat stack_size;

    /* First check whether we should create a thread at all */
#if defined(PAR)
  /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
  if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
    threadsIgnored++;
    belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
	  RtsFlags.ParFlags.maxThreads, advisory_thread_count);
    return END_TSO_QUEUE;
  }
  threadsCreated++;
#endif

#if defined(GRAN)
  ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
#endif

  // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1019

1020
  /* catch ridiculously small stack sizes */
1021 1022
  if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
    size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1023 1024
  }

1025 1026
  tso = (StgTSO *)allocate(size);
  TICK_ALLOC_TSO(size-sizeofW(StgTSO),0);
1027
  
1028
  stack_size = size - TSO_STRUCT_SIZEW;
1029

1030
  // Hmm, this CCS_MAIN is not protected by a PROFILING cpp var;
1031
  SET_HDR(tso, &TSO_info, CCS_MAIN);
1032 1033 1034 1035 1036
#if defined(GRAN)
  SET_GRAN_HDR(tso, ThisPE);
#endif
  tso->whatNext     = ThreadEnterGHC;

1037 1038 1039 1040 1041
  /* tso->id needs to be unique.  For now we use a heavyweight mutex to
  	 protect the increment operation on next_thread_id.
  	 In future, we could use an atomic increment instead.
  */
  
1042
  if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1043
  tso->id = next_thread_id++; 
1044
  if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1045

1046
  tso->why_blocked  = NotBlocked;
1047
  tso->blocked_exceptions = NULL;
1048 1049 1050

  tso->splim        = (P_)&(tso->stack) + RESERVED_STACK_WORDS;
  tso->stack_size   = stack_size;
1051 1052
  tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
                              - TSO_STRUCT_SIZEW;
1053 1054 1055 1056 1057 1058 1059 1060
  tso->sp           = (P_)&(tso->stack) + stack_size;

#ifdef PROFILING
  tso->prof.CCCS = CCS_MAIN;
#endif

  /* put a stop frame on the stack */
  tso->sp -= sizeofW(StgStopFrame);
1061 1062
  SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_MAIN);
  tso->su = (StgUpdateFrame*)tso->sp;
1063

1064 1065 1066 1067 1068 1069 1070 1071 1072
  IF_DEBUG(scheduler,belch("---- Initialised TSO %ld (%p), stack size = %lx words", 
			   tso->id, tso, tso->stack_size));

  // ToDo: check this
#if defined(GRAN)
  tso->link = END_TSO_QUEUE;
  /* uses more flexible routine in GranSim */
  insertThread(tso, CurrentProc);
#else
1073 1074 1075
  /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
     from its creation
  */
1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122
#endif

#if defined(GRAN)
  tso->gran.pri = pri;
  tso->gran.magic = TSO_MAGIC; // debugging only
  tso->gran.sparkname   = 0;
  tso->gran.startedat   = CURRENT_TIME; 
  tso->gran.exported    = 0;
  tso->gran.basicblocks = 0;
  tso->gran.allocs      = 0;
  tso->gran.exectime    = 0;
  tso->gran.fetchtime   = 0;
  tso->gran.fetchcount  = 0;
  tso->gran.blocktime   = 0;
  tso->gran.blockcount  = 0;
  tso->gran.blockedat   = 0;
  tso->gran.globalsparks = 0;
  tso->gran.localsparks  = 0;
  if (RtsFlags.GranFlags.Light)
    tso->gran.clock  = Now; /* local clock */
  else
    tso->gran.clock  = 0;

  IF_DEBUG(gran,printTSO(tso));
#elif defined(PAR)
  tso->par.sparkname   = 0;
  tso->par.startedat   = CURRENT_TIME; 
  tso->par.exported    = 0;
  tso->par.basicblocks = 0;
  tso->par.allocs      = 0;
  tso->par.exectime    = 0;
  tso->par.fetchtime   = 0;
  tso->par.fetchcount  = 0;
  tso->par.blocktime   = 0;
  tso->par.blockcount  = 0;
  tso->par.blockedat   = 0;
  tso->par.globalsparks = 0;
  tso->par.localsparks  = 0;
#endif

#if defined(GRAN)
  globalGranStats.tot_threads_created++;
  globalGranStats.threads_created_on_PE[CurrentProc]++;
  globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
  globalGranStats.tot_sq_probes++;
#endif 

1123 1124 1125
  IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
				 tso->id, tso->stack_size));
  return tso;
1126 1127
}

1128
/* ---------------------------------------------------------------------------
1129 1130 1131 1132 1133 1134 1135
 * scheduleThread()
 *
 * scheduleThread puts a thread on the head of the runnable queue.
 * This will usually be done immediately after a thread is created.
 * The caller of scheduleThread must create the thread using e.g.
 * createThread and push an appropriate closure
 * on this thread's stack before the scheduler is invoked.
1136
 * ------------------------------------------------------------------------ */
1137 1138 1139 1140 1141 1142 1143 1144 1145 1146

void
scheduleThread(StgTSO *tso)
{
  ACQUIRE_LOCK(&sched_mutex);

  /* Put the new thread on the head of the runnable queue.  The caller
   * better push an appropriate closure on this thread's stack
   * beforehand.  In the SMP case, the thread may start running as
   * soon as we release the scheduler lock below.
1147
   */
1148 1149
  PUSH_ON_RUN_QUEUE(tso);
  THREAD_RUNNABLE();
1150 1151

  IF_DEBUG(scheduler,printTSO(tso));
1152
  RELEASE_LOCK(&sched_mutex);
1153 1154
}

1155
/* ---------------------------------------------------------------------------
1156 1157 1158 1159
 * startTasks()
 *
 * Start up Posix threads to run each of the scheduler tasks.
 * I believe the task ids are not needed in the system as defined.
1160 1161
 *  KH @ 25/10/99
 * ------------------------------------------------------------------------ */
1162 1163 1164 1165 1166 1167 1168 1169 1170 1171

#ifdef SMP
static void *
taskStart( void *arg STG_UNUSED )
{
  schedule();
  return NULL;
}
#endif

1172
/* ---------------------------------------------------------------------------
1173 1174 1175 1176 1177
 * initScheduler()
 *
 * Initialise the scheduler.  This resets all the queues - if the
 * queues contained any threads, they'll be garbage collected at the
 * next pass.
1178 1179
 *
 * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1180
 * ------------------------------------------------------------------------ */
1181

1182 1183 1184 1185
#ifdef SMP
static void
term_handler(int sig STG_UNUSED)
{
1186
  stat_workerStop();
1187 1188 1189 1190 1191 1192 1193
  ACQUIRE_LOCK(&term_mutex);
  await_death--;
  RELEASE_LOCK(&term_mutex);
  pthread_exit(NULL);
}
#endif

1194 1195 1196
//@cindex initScheduler
void 
initScheduler(void)
1197
{
1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208
#if defined(GRAN)
  nat i;

  for (i=0; i<=MAX_PROC; i++) {
    run_queue_hds[i]      = END_TSO_QUEUE;
    run_queue_tls[i]      = END_TSO_QUEUE;
    blocked_queue_hds[i]  = END_TSO_QUEUE;
    blocked_queue_tls[i]  = END_TSO_QUEUE;
    ccalling_threadss[i]  = END_TSO_QUEUE;
  }
#else
1209 1210 1211 1212
  run_queue_hd      = END_TSO_QUEUE;
  run_queue_tl      = END_TSO_QUEUE;
  blocked_queue_hd  = END_TSO_QUEUE;
  blocked_queue_tl  = END_TSO_QUEUE;
1213
#endif 
1214 1215 1216 1217

  suspended_ccalling_threads  = END_TSO_QUEUE;

  main_threads = NULL;
1218 1219 1220 1221 1222

  context_switch = 0;
  interrupted    = 0;

  enteredCAFs = END_CAF_LIST;
1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244

  /* Install the SIGHUP handler */
#ifdef SMP
  {
    struct sigaction action,oact;

    action.sa_handler = term_handler;
    sigemptyset(&action.sa_mask);
    action.sa_flags = 0;
    if (sigaction(SIGTERM, &action, &oact) != 0) {
      barf("can't install TERM handler");
    }
  }
#endif

#ifdef SMP
  /* Allocate N Capabilities */
  {
    nat i;
    Capability *cap, *prev;
    cap  = NULL;
    prev = NULL;
1245
    for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1246 1247 1248 1249 1250
      cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
      cap->link = prev;
      prev = cap;
    }
    free_capabilities = cap;
1251
    n_free_capabilities = RtsFlags.ParFlags.nNodes;
1252
  }
1253
  IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1254 1255
			     n_free_capabilities););
#endif
1256

1257
#if defined(SMP) || defined(PAR)
1258
  initSparkPools();
1259
#endif
1260 1261
}

1262 1263 1264 1265 1266 1267 1268 1269 1270
#ifdef SMP
void
startTasks( void )
{
  nat i;
  int r;
  pthread_t tid;
  
  /* make some space for saving all the thread ids */
1271
  task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1272 1273 1274
			    "initScheduler:task_ids");
  
  /* and create all the threads */
1275
  for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1276 1277 1278 1279 1280
    r = pthread_create(&tid,NULL,taskStart,NULL);
    if (r != 0) {
      barf("startTasks: Can't create new Posix thread");
    }
    task_ids[i].id = tid;
1281 1282 1283 1284 1285
    task_ids[i].mut_time = 0.0;
    task_ids[i].mut_etime = 0.0;
    task_ids[i].gc_time = 0.0;
    task_ids[i].gc_etime = 0.0;
    task_ids[i].elapsedtimestart = elapsedtime();
1286
    IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1287 1288 1289
  }
}
#endif
1290

1291 1292 1293 1294
void
exitScheduler( void )
{
#ifdef SMP
1295
  nat i;
1296

1297 1298 1299 1300 1301 1302
  /* Don't want to use pthread_cancel, since we'd have to install
   * these silly exception handlers (pthread_cleanup_{push,pop}) around
   * all our locks.
   */
#if 0
  /* Cancel all our tasks */
1303
  for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1304 1305 1306 1307
    pthread_cancel(task_ids[i].id);
  }
  
  /* Wait for all the tasks to terminate */
1308
  for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1309
    IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1310 1311 1312 1313 1314 1315 1316
			       task_ids[i].id));
    pthread_join(task_ids[i].id, NULL);
  }
#endif

  /* Send 'em all a SIGHUP.  That should shut 'em up.
   */
1317 1318
  await_death = RtsFlags.ParFlags.nNodes;
  for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333
    pthread_kill(task_ids[i].id,SIGTERM);
  }
  while (await_death > 0) {
    sched_yield();
  }
#endif
}

/* -----------------------------------------------------------------------------
   Managing the per-task allocation areas.
   
   Each capability comes with an allocation area.  These are
   fixed-length block lists into which allocation can be done.

   ToDo: no support for two-space collection at the moment???
1334 1335
   -------------------------------------------------------------------------- */

1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371
/* -----------------------------------------------------------------------------
 * waitThread is the external interface for running a new computataion
 * and waiting for the result.
 *
 * In the non-SMP case, we create a new main thread, push it on the 
 * main-thread stack, and invoke the scheduler to run it.  The
 * scheduler will return when the top main thread on the stack has
 * completed or died, and fill in the necessary fields of the
 * main_thread structure.
 *
 * In the SMP case, we create a main thread as before, but we then
 * create a new condition variable and sleep on it.  When our new
 * main thread has completed, we'll be woken up and the status/result
 * will be in the main_thread struct.
 * -------------------------------------------------------------------------- */

SchedulerStatus
waitThread(StgTSO *tso, /*out*/StgClosure **ret)
{
  StgMainThread *m;
  SchedulerStatus stat;

  ACQUIRE_LOCK(&sched_mutex);
  
  m = stgMallocBytes(sizeof(StgMainThread), "waitThread");

  m->tso = tso;
  m->ret = ret;
  m->stat = NoStatus;
#ifdef SMP
  pthread_cond_init(&m->wakeup, NULL);
#endif

  m->link = main_threads;
  main_threads = m;

1372
  IF_DEBUG(scheduler, fprintf(stderr, "scheduler: new main thread (%d)\n", 
1373 1374
			      m->tso->id));

1375
#ifdef SMP
1376 1377 1378
  do {
    pthread_cond_wait(&m->wakeup, &sched_mutex);
  } while (m->stat == NoStatus);
1379 1380
#else
  schedule();
1381
  ASSERT(m->stat != NoStatus);
1382 1383 1384 1385 1386 1387 1388
#endif

  stat = m->stat;

#ifdef SMP
  pthread_cond_destroy(&m->wakeup);
#endif
1389

1390
  IF_DEBUG(scheduler, fprintf(stderr, "scheduler: main thread (%d) finished\n", 
1391
			      m->tso->id));
1392 1393 1394
  free(m);

  RELEASE_LOCK(&sched_mutex);
1395

1396 1397
  return stat;
}
1398

1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416
//@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
//@subsection Run queue code 

#if 0
/* 
   NB: In GranSim we have many run queues; run_queue_hd is actually a macro
       unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
       implicit global variable that has to be correct when calling these
       fcts -- HWL 
*/

/* Put the new thread on the head of the runnable queue.
 * The caller of createThread better push an appropriate closure
 * on this thread's stack before the scheduler is invoked.
 */
static /* inline */ void
add_to_run_queue(tso)
StgTSO* tso; 
1417
{
1418 1419 1420 1421 1422
  ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
  tso->link = run_queue_hd;
  run_queue_hd = tso;
  if (run_queue_tl == END_TSO_QUEUE) {
    run_queue_tl = tso;