Schedule.c 110 KB
Newer Older
1
/* ---------------------------------------------------------------------------
2
 * $Id: Schedule.c,v 1.146 2002/06/26 08:18:42 stolz Exp $
3
 *
4
 * (c) The GHC Team, 1998-2000
5 6 7
 *
 * Scheduler
 *
8 9 10 11 12
 * Different GHC ways use this scheduler quite differently (see comments below)
 * Here is the global picture:
 *
 * WAY  Name     CPP flag  What's it for
 * --------------------------------------
sof's avatar
sof committed
13 14 15 16 17
 * mp   GUM      PAR          Parallel execution on a distributed memory machine
 * s    SMP      SMP          Parallel execution on a shared memory machine
 * mg   GranSim  GRAN         Simulation of parallel execution
 * md   GUM/GdH  DIST         Distributed execution (based on GUM)
 *
18 19 20 21
 * --------------------------------------------------------------------------*/

//@node Main scheduling code, , ,
//@section Main scheduling code
22

23 24
/* 
 * Version with scheduler monitor support for SMPs (WAY=s):
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41

   This design provides a high-level API to create and schedule threads etc.
   as documented in the SMP design document.

   It uses a monitor design controlled by a single mutex to exercise control
   over accesses to shared data structures, and builds on the Posix threads
   library.

   The majority of state is shared.  In order to keep essential per-task state,
   there is a Capability structure, which contains all the information
   needed to run a thread: its STG registers, a pointer to its TSO, a
   nursery etc.  During STG execution, a pointer to the capability is
   kept in a register (BaseReg).

   In a non-SMP build, there is one global capability, namely MainRegTable.

   SDM & KH, 10/99
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59

 * Version with support for distributed memory parallelism aka GUM (WAY=mp):

   The main scheduling loop in GUM iterates until a finish message is received.
   In that case a global flag @receivedFinish@ is set and this instance of
   the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
   for the handling of incoming messages, such as PP_FINISH.
   Note that in the parallel case we have a system manager that coordinates
   different PEs, each of which are running one instance of the RTS.
   See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
   From this routine processes executing ghc/rts/Main.c are spawned. -- HWL

 * Version with support for simulating parallel execution aka GranSim (WAY=mg):

   The main scheduling code in GranSim is quite different from that in std
   (concurrent) Haskell: while concurrent Haskell just iterates over the
   threads in the runnable queue, GranSim is event driven, i.e. it iterates
   over the events in the global event queue.  -- HWL
60 61
*/

62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
//@menu
//* Includes::			
//* Variables and Data structures::  
//* Main scheduling loop::	
//* Suspend and Resume::	
//* Run queue code::		
//* Garbage Collextion Routines::  
//* Blocking Queue Routines::	
//* Exception Handling Routines::  
//* Debugging Routines::	
//* Index::			
//@end menu

//@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
//@subsection Includes

78
#include "PosixSource.h"
79 80 81 82 83 84 85 86 87 88 89
#include "Rts.h"
#include "SchedAPI.h"
#include "RtsUtils.h"
#include "RtsFlags.h"
#include "Storage.h"
#include "StgRun.h"
#include "StgStartup.h"
#include "Hooks.h"
#include "Schedule.h"
#include "StgMiscClosures.h"
#include "Storage.h"
90
#include "Interpreter.h"
91
#include "Exception.h"
92 93 94 95
#include "Printer.h"
#include "Main.h"
#include "Signals.h"
#include "Sanity.h"
96
#include "Stats.h"
andy's avatar
andy committed
97
#include "Itimer.h"
98
#include "Prelude.h"
99
#include "ThreadLabels.h"
100 101 102 103
#ifdef PROFILING
#include "Proftimer.h"
#include "ProfHeap.h"
#endif
104 105 106 107 108 109 110 111 112
#if defined(GRAN) || defined(PAR)
# include "GranSimRts.h"
# include "GranSim.h"
# include "ParallelRts.h"
# include "Parallel.h"
# include "ParallelDebug.h"
# include "FetchMe.h"
# include "HLC.h"
#endif
113
#include "Sparks.h"
sof's avatar
sof committed
114 115
#include "Capability.h"
#include "OSThreads.h"
sof's avatar
sof committed
116
#include  "Task.h"
117

118 119 120 121 122 123 124
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif

125
#include <stdarg.h>
126

127 128 129
//@node Variables and Data structures, Prototypes, Includes, Main scheduling code
//@subsection Variables and Data structures

130 131 132
/* Main thread queue.
 * Locks required: sched_mutex.
 */
133
StgMainThread *main_threads;
134 135 136 137

/* Thread queues.
 * Locks required: sched_mutex.
 */
138 139 140
#if defined(GRAN)

StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
141
/* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
142 143

/* 
sof's avatar
sof committed
144
   In GranSim we have a runnable and a blocked queue for each processor.
145 146 147 148 149 150 151 152
   In order to minimise code changes new arrays run_queue_hds/tls
   are created. run_queue_hd is then a short cut (macro) for
   run_queue_hds[CurrentProc] (see GranSim.h).
   -- HWL
*/
StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
StgTSO *ccalling_threadss[MAX_PROC];
153 154 155 156
/* We use the same global list of threads (all_threads) in GranSim as in
   the std RTS (i.e. we are cheating). However, we don't use this list in
   the GranSim specific code at the moment (so we are only potentially
   cheating).  */
157 158 159

#else /* !GRAN */

160 161
StgTSO *run_queue_hd, *run_queue_tl;
StgTSO *blocked_queue_hd, *blocked_queue_tl;
162
StgTSO *sleeping_queue;		/* perhaps replace with a hash table? */
163

164 165
#endif

166 167 168 169 170
/* Linked list of all threads.
 * Used for detecting garbage collected threads.
 */
StgTSO *all_threads;

sof's avatar
sof committed
171 172 173
/* When a thread performs a safe C call (_ccall_GC, using old
 * terminology), it gets put on the suspended_ccalling_threads
 * list. Used by the garbage collector.
174 175 176
 */
static StgTSO *suspended_ccalling_threads;

177 178
static StgTSO *threadStackOverflow(StgTSO *tso);

179 180 181 182 183
/* KH: The following two flags are shared memory locations.  There is no need
       to lock them, since they are only unset at the end of a scheduler
       operation.
*/

184
/* flag set by signal handler to precipitate a context switch */
185
//@cindex context_switch
186
nat context_switch;
187

188
/* if this flag is set as well, give up execution */
189
//@cindex interrupted
190
rtsBool interrupted;
191

192
/* Next thread ID to allocate.
sof's avatar
sof committed
193
 * Locks required: thread_id_mutex
194
 */
195
//@cindex next_thread_id
196 197 198 199 200 201 202
StgThreadID next_thread_id = 1;

/*
 * Pointers to the state of the current thread.
 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
 * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
 */
203
 
204 205 206 207 208 209 210 211 212 213 214 215
/* The smallest stack size that makes any sense is:
 *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
 *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
 *  + 1                       (the realworld token for an IO thread)
 *  + 1                       (the closure to enter)
 *
 * A thread with this stack will bomb immediately with a stack
 * overflow, which will increase its stack size.  
 */

#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)

sof's avatar
sof committed
216

217
#if defined(GRAN)
218
StgTSO *CurrentTSO;
219 220
#endif

221 222 223 224 225 226
/*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
 *  exists - earlier gccs apparently didn't.
 *  -= chak
 */
StgTSO dummy_tso;

227
rtsBool ready_to_gc;
sof's avatar
sof committed
228 229 230 231 232 233 234

/*
 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
 * in an MT setting, needed to signal that a worker thread shouldn't hang around
 * in the scheduler when it is out of work.
 */
static rtsBool shutting_down_scheduler = rtsFalse;
235 236 237 238

void            addToBlockedQueue ( StgTSO *tso );

static void     schedule          ( void );
239
       void     interruptStgRts   ( void );
240

241 242
static void     detectBlackHoles  ( void );

243 244 245
#ifdef DEBUG
static void sched_belch(char *s, ...);
#endif
246

sof's avatar
sof committed
247 248 249 250
#if defined(RTS_SUPPORTS_THREADS)
/* ToDo: carefully document the invariants that go together
 *       with these synchronisation objects.
 */
sof's avatar
sof committed
251 252
Mutex     sched_mutex       = INIT_MUTEX_VAR;
Mutex     term_mutex        = INIT_MUTEX_VAR;
sof's avatar
sof committed
253

sof's avatar
sof committed
254 255 256 257 258 259 260
/*
 * A heavyweight solution to the problem of protecting
 * the thread_id from concurrent update.
 */
Mutex     thread_id_mutex   = INIT_MUTEX_VAR;


sof's avatar
sof committed
261 262
# if defined(SMP)
static Condition gc_pending_cond = INIT_COND_VAR;
263
nat await_death;
sof's avatar
sof committed
264
# endif
265

sof's avatar
sof committed
266
#endif /* RTS_SUPPORTS_THREADS */
sof's avatar
sof committed
267

268 269 270
#if defined(PAR)
StgTSO *LastTSO;
rtsTime TimeOfLastYield;
271
rtsBool emitSchedule = rtsTrue;
272 273
#endif

274 275 276 277
#if DEBUG
char *whatNext_strs[] = {
  "ThreadEnterGHC",
  "ThreadRunGHC",
278
  "ThreadEnterInterp",
279 280 281 282 283 284 285 286 287 288 289 290 291
  "ThreadKilled",
  "ThreadComplete"
};

char *threadReturnCode_strs[] = {
  "HeapOverflow",			/* might also be StackOverflow */
  "StackOverflow",
  "ThreadYielding",
  "ThreadBlocked",
  "ThreadFinished"
};
#endif

sof's avatar
sof committed
292
#if defined(PAR)
293 294 295 296
StgTSO * createSparkThread(rtsSpark spark);
StgTSO * activateSpark (rtsSpark spark);  
#endif

297 298 299 300 301 302
/*
 * The thread state for the main thread.
// ToDo: check whether not needed any more
StgTSO   *MainTSO;
 */

sof's avatar
sof committed
303 304 305 306 307 308 309 310 311 312 313 314
#if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
static void taskStart(void);
static void
taskStart(void)
{
  schedule();
}
#endif




315 316 317 318
//@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
//@subsection Main scheduling loop

/* ---------------------------------------------------------------------------
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336
   Main scheduling loop.

   We use round-robin scheduling, each thread returning to the
   scheduler loop when one of these conditions is detected:

      * out of heap space
      * timer expires (thread yields)
      * thread blocks
      * thread ends
      * stack overflow

   Locking notes:  we acquire the scheduler lock once at the beginning
   of the scheduler loop, and release it when
    
      * running a thread, or
      * waiting for work, or
      * waiting for a GC to complete.

337 338 339 340 341 342 343 344 345 346 347 348 349 350 351
   GRAN version:
     In a GranSim setup this loop iterates over the global event queue.
     This revolves around the global event queue, which determines what 
     to do next. Therefore, it's more complicated than either the 
     concurrent or the parallel (GUM) setup.

   GUM version:
     GUM iterates over incoming messages.
     It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
     and sends out a fish whenever it has nothing to do; in-between
     doing the actual reductions (shared code below) it processes the
     incoming messages and deals with delayed operations 
     (see PendingFetches).
     This is not the ugliest code you could imagine, but it's bloody close.

352 353
   ------------------------------------------------------------------------ */
//@cindex schedule
354 355 356 357 358 359
static void
schedule( void )
{
  StgTSO *t;
  Capability *cap;
  StgThreadReturnCode ret;
360 361 362
#if defined(GRAN)
  rtsEvent *event;
#elif defined(PAR)
363
  StgSparkPool *pool;
364 365 366
  rtsSpark spark;
  StgTSO *tso;
  GlobalTaskId pe;
367 368 369 370
  rtsBool receivedFinish = rtsFalse;
# if defined(DEBUG)
  nat tp_size, sp_size; // stats only
# endif
371
#endif
372
  rtsBool was_interrupted = rtsFalse;
373 374
  
  ACQUIRE_LOCK(&sched_mutex);
sof's avatar
sof committed
375 376
 
#if defined(RTS_SUPPORTS_THREADS)
sof's avatar
sof committed
377
  waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
sof's avatar
sof committed
378 379 380
#else
  /* simply initialise it in the non-threaded case */
  grabCapability(&cap);
sof's avatar
sof committed
381
#endif
382

383
#if defined(GRAN)
384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405
  /* set up first event to get things going */
  /* ToDo: assign costs for system setup and init MainTSO ! */
  new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
	    ContinueThread, 
	    CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);

  IF_DEBUG(gran,
	   fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
	   G_TSO(CurrentTSO, 5));

  if (RtsFlags.GranFlags.Light) {
    /* Save current time; GranSim Light only */
    CurrentTSO->gran.clock = CurrentTime[CurrentProc];
  }      

  event = get_next_event();

  while (event!=(rtsEvent*)NULL) {
    /* Choose the processor with the next event */
    CurrentProc = event->proc;
    CurrentTSO = event->tso;

406
#elif defined(PAR)
407

408 409
  while (!receivedFinish) {    /* set by processMessages */
                               /* when receiving PP_FINISH message         */ 
410
#else
411

412
  while (1) {
413

414
#endif
415

416 417
    IF_DEBUG(scheduler, printAllThreads());

sof's avatar
sof committed
418 419 420 421
#if defined(RTS_SUPPORTS_THREADS)
    /* Check to see whether there are any worker threads
       waiting to deposit external call results. If so,
       yield our capability */
sof's avatar
sof committed
422
    yieldToReturningWorker(&sched_mutex, &cap);
sof's avatar
sof committed
423 424
#endif

425 426 427 428 429
    /* If we're interrupted (the user pressed ^C, or some other
     * termination condition occurred), kill all the currently running
     * threads.
     */
    if (interrupted) {
430
      IF_DEBUG(scheduler, sched_belch("interrupted"));
431
      deleteAllThreads();
432 433
      interrupted = rtsFalse;
      was_interrupted = rtsTrue;
434 435 436 437 438 439 440
    }

    /* Go through the list of main threads and wake up any
     * clients whose computations have finished.  ToDo: this
     * should be done more efficiently without a linear scan
     * of the main threads list, somehow...
     */
sof's avatar
sof committed
441
#if defined(RTS_SUPPORTS_THREADS)
442 443 444 445
    { 
      StgMainThread *m, **prev;
      prev = &main_threads;
      for (m = main_threads; m != NULL; m = m->link) {
446
	switch (m->tso->what_next) {
447
	case ThreadComplete:
448 449 450 451 452
	  if (m->ret) {
	    *(m->ret) = (StgClosure *)m->tso->sp[0];
	  }
	  *prev = m->link;
	  m->stat = Success;
sof's avatar
sof committed
453
	  broadcastCondition(&m->wakeup);
454
#ifdef DEBUG
455
	  removeThreadLabel(m->tso);
456
#endif
457 458
	  break;
	case ThreadKilled:
459
	  if (m->ret) *(m->ret) = NULL;
460
	  *prev = m->link;
461
	  if (was_interrupted) {
462 463 464 465
	    m->stat = Interrupted;
	  } else {
	    m->stat = Killed;
	  }
sof's avatar
sof committed
466
	  broadcastCondition(&m->wakeup);
467
#ifdef DEBUG
468
	  removeThreadLabel(m->tso);
469
#endif
470 471 472
	  break;
	default:
	  break;
473 474 475
	}
      }
    }
476

sof's avatar
sof committed
477
#else /* not threaded */
478

479 480 481 482
# if defined(PAR)
    /* in GUM do this only on the Main PE */
    if (IAmMainThread)
# endif
483 484 485 486
    /* If our main thread has finished or been killed, return.
     */
    {
      StgMainThread *m = main_threads;
487 488
      if (m->tso->what_next == ThreadComplete
	  || m->tso->what_next == ThreadKilled) {
489
#ifdef DEBUG
490
	removeThreadLabel((StgWord)m->tso);
491
#endif
492
	main_threads = main_threads->link;
493
	if (m->tso->what_next == ThreadComplete) {
494 495 496 497 498
	  /* we finished successfully, fill in the return value */
	  if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
	  m->stat = Success;
	  return;
	} else {
499
	  if (m->ret) { *(m->ret) = NULL; };
500
	  if (was_interrupted) {
501 502 503 504
	    m->stat = Interrupted;
	  } else {
	    m->stat = Killed;
	  }
505 506 507 508 509 510
	  return;
	}
      }
    }
#endif

511 512 513
    /* Top up the run queue from our spark pool.  We try to make the
     * number of threads in the run queue equal to the number of
     * free capabilities.
sof's avatar
sof committed
514 515 516
     *
     * Disable spark support in SMP for now, non-essential & requires
     * a little bit of work to make it compile cleanly. -- sof 1/02.
517
     */
sof's avatar
sof committed
518
#if 0 /* defined(SMP) */
519
    {
sof's avatar
sof committed
520
      nat n = getFreeCapabilities();
521 522 523 524 525 526 527 528 529 530
      StgTSO *tso = run_queue_hd;

      /* Count the run queue */
      while (n > 0 && tso != END_TSO_QUEUE) {
	tso = tso->link;
	n--;
      }

      for (; n > 0; n--) {
	StgClosure *spark;
531
	spark = findSpark(rtsFalse);
532 533 534
	if (spark == NULL) {
	  break; /* no more sparks in the pool */
	} else {
535 536 537
	  /* I'd prefer this to be done in activateSpark -- HWL */
	  /* tricky - it needs to hold the scheduler lock and
	   * not try to re-acquire it -- SDM */
538
	  createSparkThread(spark);	  
539
	  IF_DEBUG(scheduler,
540
		   sched_belch("==^^ turning spark of closure %p into a thread",
541 542 543 544 545 546
			       (StgClosure *)spark));
	}
      }
      /* We need to wake up the other tasks if we just created some
       * work for them.
       */
sof's avatar
sof committed
547
      if (getFreeCapabilities() - n > 1) {
sof's avatar
sof committed
548
   	  signalCondition( &thread_ready_cond );
549 550
      }
    }
551
#endif // SMP
552

553 554 555
    /* check for signals each time around the scheduler */
#ifndef mingw32_TARGET_OS
    if (signals_pending()) {
sof's avatar
sof committed
556
      RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
557
      startSignalHandlers();
sof's avatar
sof committed
558
      ACQUIRE_LOCK(&sched_mutex);
559 560 561
    }
#endif

562 563 564 565 566
    /* Check whether any waiting threads need to be woken up.  If the
     * run queue is empty, and there are no other tasks running, we
     * can wait indefinitely for something to happen.
     * ToDo: what if another client comes along & requests another
     * main thread?
567
     */
sof's avatar
sof committed
568 569
    if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
      awaitEvent( EMPTY_RUN_QUEUE()
sof's avatar
sof committed
570 571
#if defined(SMP)
	&& allFreeCapabilities()
572 573
#endif
	);
574
    }
575 576 577
    /* we can be interrupted while waiting for I/O... */
    if (interrupted) continue;

578 579 580 581 582 583 584 585 586 587
    /* 
     * Detect deadlock: when we have no threads to run, there are no
     * threads waiting on I/O or sleeping, and all the other tasks are
     * waiting for work, we must have a deadlock of some description.
     *
     * We first try to find threads blocked on themselves (ie. black
     * holes), and generate NonTermination exceptions where necessary.
     *
     * If no threads are black holed, we have a deadlock situation, so
     * inform all the main threads.
588
     */
589
#ifndef PAR
590
    if (   EMPTY_THREAD_QUEUES()
sof's avatar
sof committed
591
#if defined(RTS_SUPPORTS_THREADS)
sof's avatar
sof committed
592
	&& EMPTY_QUEUE(suspended_ccalling_threads)
sof's avatar
sof committed
593 594 595
#endif
#ifdef SMP
	&& allFreeCapabilities()
596 597
#endif
	)
598
    {
599
	IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
sof's avatar
sof committed
600 601 602 603
#if defined(THREADED_RTS)
	/* and SMP mode ..? */
	releaseCapability(cap);
#endif
604 605 606 607
	// Garbage collection can release some new threads due to
	// either (a) finalizers or (b) threads resurrected because
	// they are about to be send BlockedOnDeadMVar.  Any threads
	// thus released will be immediately runnable.
608
	GarbageCollect(GetRoots,rtsTrue);
609 610 611 612 613 614 615 616 617 618 619 620 621 622

	if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }

	IF_DEBUG(scheduler, 
		 sched_belch("still deadlocked, checking for black holes..."));
	detectBlackHoles();

	if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }

#ifndef mingw32_TARGET_OS
	/* If we have user-installed signal handlers, then wait
	 * for signals to arrive rather then bombing out with a
	 * deadlock.
	 */
sof's avatar
sof committed
623 624 625 626 627 628 629 630 631 632 633
#if defined(RTS_SUPPORTS_THREADS)
	if ( 0 ) { /* hmm..what to do? Simply stop waiting for
		      a signal with no runnable threads (or I/O
		      suspended ones) leads nowhere quick.
		      For now, simply shut down when we reach this
		      condition.
		      
		      ToDo: define precisely under what conditions
		      the Scheduler should shut down in an MT setting.
		   */
#else
634
	if ( anyUserHandlers() ) {
sof's avatar
sof committed
635
#endif
636 637 638 639 640 641 642 643 644
	    IF_DEBUG(scheduler, 
		     sched_belch("still deadlocked, waiting for signals..."));

	    awaitUserSignals();

	    // we might be interrupted...
	    if (interrupted) { continue; }

	    if (signals_pending()) {
sof's avatar
sof committed
645
		RELEASE_LOCK(&sched_mutex);
646
		startSignalHandlers();
sof's avatar
sof committed
647
		ACQUIRE_LOCK(&sched_mutex);
648 649 650 651 652 653 654 655 656 657 658 659 660
	    }
	    ASSERT(!EMPTY_RUN_QUEUE());
	    goto not_deadlocked;
	}
#endif

	/* Probably a real deadlock.  Send the current main thread the
	 * Deadlock exception (or in the SMP build, send *all* main
	 * threads the deadlock exception, since none of them can make
	 * progress).
	 */
	{
	    StgMainThread *m;
sof's avatar
sof committed
661
#if defined(RTS_SUPPORTS_THREADS)
662
	    for (m = main_threads; m != NULL; m = m->link) {
663 664
		switch (m->tso->why_blocked) {
		case BlockedOnBlackHole:
sof's avatar
sof committed
665
		    raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
666 667 668
		    break;
		case BlockedOnException:
		case BlockedOnMVar:
sof's avatar
sof committed
669
		    raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
670 671 672 673
		    break;
		default:
		    barf("deadlock: main thread blocked in a strange way");
		}
674
	    }
675 676 677 678
#else
	    m = main_threads;
	    switch (m->tso->why_blocked) {
	    case BlockedOnBlackHole:
sof's avatar
sof committed
679
		raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
680 681 682
		break;
	    case BlockedOnException:
	    case BlockedOnMVar:
sof's avatar
sof committed
683
		raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
684 685 686
		break;
	    default:
		barf("deadlock: main thread blocked in a strange way");
sof's avatar
sof committed
687
	    }
sof's avatar
sof committed
688
#endif
689
	}
690 691 692 693 694

#if defined(RTS_SUPPORTS_THREADS)
	/* ToDo: revisit conditions (and mechanism) for shutting
	   down a multi-threaded world  */
	IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
sof's avatar
sof committed
695 696 697
	RELEASE_LOCK(&sched_mutex);
	shutdownHaskell();
	return;
698
#endif
699
    }
700 701
  not_deadlocked:

702 703
#elif defined(PAR)
    /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
704 705
#endif

sof's avatar
sof committed
706
#if defined(SMP)
707 708 709 710
    /* If there's a GC pending, don't do anything until it has
     * completed.
     */
    if (ready_to_gc) {
711
      IF_DEBUG(scheduler,sched_belch("waiting for GC"));
sof's avatar
sof committed
712
      waitCondition( &gc_pending_cond, &sched_mutex );
713
    }
sof's avatar
sof committed
714 715
#endif    

sof's avatar
sof committed
716
#if defined(RTS_SUPPORTS_THREADS)
717 718
    /* block until we've got a thread on the run queue and a free
     * capability.
sof's avatar
sof committed
719
     *
720
     */
sof's avatar
sof committed
721 722 723
    if ( EMPTY_RUN_QUEUE() ) {
      /* Give up our capability */
      releaseCapability(cap);
sof's avatar
sof committed
724 725 726 727 728 729 730 731

      /* If we're in the process of shutting down (& running the
       * a batch of finalisers), don't wait around.
       */
      if ( shutting_down_scheduler ) {
	RELEASE_LOCK(&sched_mutex);
	return;
      }
sof's avatar
sof committed
732 733 734
      IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
      waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
      IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
735 736
    }
#endif
737 738

#if defined(GRAN)
739 740 741 742 743 744 745 746 747 748 749 750
    if (RtsFlags.GranFlags.Light)
      GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc

    /* adjust time based on time-stamp */
    if (event->time > CurrentTime[CurrentProc] &&
        event->evttype != ContinueThread)
      CurrentTime[CurrentProc] = event->time;
    
    /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
    if (!RtsFlags.GranFlags.Light)
      handleIdlePEs();

751
    IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864

    /* main event dispatcher in GranSim */
    switch (event->evttype) {
      /* Should just be continuing execution */
    case ContinueThread:
      IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
      /* ToDo: check assertion
      ASSERT(run_queue_hd != (StgTSO*)NULL &&
	     run_queue_hd != END_TSO_QUEUE);
      */
      /* Ignore ContinueThreads for fetching threads (if synchr comm) */
      if (!RtsFlags.GranFlags.DoAsyncFetch &&
	  procStatus[CurrentProc]==Fetching) {
	belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }	
      /* Ignore ContinueThreads for completed threads */
      if (CurrentTSO->what_next == ThreadComplete) {
	belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }	
      /* Ignore ContinueThreads for threads that are being migrated */
      if (PROCS(CurrentTSO)==Nowhere) { 
	belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }
      /* The thread should be at the beginning of the run queue */
      if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
	belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	break; // run the thread anyway
      }
      /*
      new_event(proc, proc, CurrentTime[proc],
		FindWork,
		(StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
      goto next_thread; 
      */ /* Catches superfluous CONTINUEs -- should be unnecessary */
      break; // now actually run the thread; DaH Qu'vam yImuHbej 

    case FetchNode:
      do_the_fetchnode(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case GlobalBlock:
      do_the_globalblock(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case FetchReply:
      do_the_fetchreply(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case UnblockThread:   /* Move from the blocked queue to the tail of */
      do_the_unblock(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case ResumeThread:  /* Move from the blocked queue to the tail of */
      /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
      event->tso->gran.blocktime += 
	CurrentTime[CurrentProc] - event->tso->gran.blockedat;
      do_the_startthread(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case StartThread:
      do_the_startthread(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case MoveThread:
      do_the_movethread(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case MoveSpark:
      do_the_movespark(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case FindWork:
      do_the_findwork(event);
      goto next_thread;             /* handle next event in event queue  */
      
    default:
      barf("Illegal event type %u\n", event->evttype);
    }  /* switch */
    
    /* This point was scheduler_loop in the old RTS */

    IF_DEBUG(gran, belch("GRAN: after main switch"));

    TimeOfLastEvent = CurrentTime[CurrentProc];
    TimeOfNextEvent = get_time_of_next_event();
    IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
    // CurrentTSO = ThreadQueueHd;

    IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
			 TimeOfNextEvent));

    if (RtsFlags.GranFlags.Light) 
      GranSimLight_leave_system(event, &ActiveTSO); 

    EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;

    IF_DEBUG(gran, 
	     belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));

    /* in a GranSim setup the TSO stays on the run queue */
    t = CurrentTSO;
    /* Take a thread from the run queue. */
    t = POP_RUN_QUEUE(); // take_off_run_queue(t);

    IF_DEBUG(gran, 
	     fprintf(stderr, "GRAN: About to run current thread, which is\n");
865
	     G_TSO(t,5));
866 867 868 869 870 871 872 873

    context_switch = 0; // turned on via GranYield, checking events and time slice

    IF_DEBUG(gran, 
	     DumpGranEvent(GR_SCHEDULE, t));

    procStatus[CurrentProc] = Busy;

874
#elif defined(PAR)
875 876 877 878
    if (PendingFetches != END_BF_QUEUE) {
        processFetches();
    }

879
    /* ToDo: phps merge with spark activation above */
880
    /* check whether we have local work and send requests if we have none */
881
    if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
882
      /* :-[  no local threads => look out for local sparks */
883 884
      /* the spark pool for the current PE */
      pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
885
      if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
886
	  pool->hd < pool->tl) {
887 888 889 890 891 892 893 894
	/* 
	 * ToDo: add GC code check that we really have enough heap afterwards!!
	 * Old comment:
	 * If we're here (no runnable threads) and we have pending
	 * sparks, we must have a space problem.  Get enough space
	 * to turn one of those pending sparks into a
	 * thread... 
	 */
895 896

	spark = findSpark(rtsFalse);                /* get a spark */
897 898
	if (spark != (rtsSpark) NULL) {
	  tso = activateSpark(spark);       /* turn the spark into a thread */
899 900 901
	  IF_PAR_DEBUG(schedule,
		       belch("==== schedule: Created TSO %d (%p); %d threads active",
			     tso->id, tso, advisory_thread_count));
902

903
	  if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
904
	    belch("==^^ failed to activate spark");
905
	    goto next_thread;
906
	  }               /* otherwise fall through & pick-up new tso */
907 908
	} else {
	  IF_PAR_DEBUG(verbose,
909 910
		       belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
			     spark_queue_len(pool)));
911 912
	  goto next_thread;
	}
913 914 915 916 917 918
      }

      /* If we still have no work we need to send a FISH to get a spark
	 from another PE 
      */
      if (EMPTY_RUN_QUEUE()) {
919 920 921 922 923 924 925 926 927
      /* =8-[  no local sparks => look for work on other PEs */
	/*
	 * We really have absolutely no work.  Send out a fish
	 * (there may be some out there already), and wait for
	 * something to arrive.  We clearly can't run any threads
	 * until a SCHEDULE or RESUME arrives, and so that's what
	 * we're hoping to see.  (Of course, we still have to
	 * respond to other types of messages.)
	 */
928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944
	TIME now = msTime() /*CURRENT_TIME*/;
	IF_PAR_DEBUG(verbose, 
		     belch("--  now=%ld", now));
	IF_PAR_DEBUG(verbose,
		     if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
			 (last_fish_arrived_at!=0 &&
			  last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
		       belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
			     last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
			     last_fish_arrived_at,
			     RtsFlags.ParFlags.fishDelay, now);
		     });
	
	if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
	    (last_fish_arrived_at==0 ||
	     (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
	  /* outstandingFishes is set in sendFish, processFish;
945 946 947 948
	     avoid flooding system with fishes via delay */
	  pe = choosePE();
	  sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
		   NEW_FISH_HUNGER);
949 950 951 952 953 954

	  // Global statistics: count no. of fishes
	  if (RtsFlags.ParFlags.ParStats.Global &&
	      RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
	    globalParStats.tot_fish_mess++;
	  }
955
	}
956 957
      
	receivedFinish = processMessages();
958 959 960
	goto next_thread;
      }
    } else if (PacketsWaiting()) {  /* Look for incoming messages */
961
      receivedFinish = processMessages();
962 963 964 965
    }

    /* Now we are sure that we have some work available */
    ASSERT(run_queue_hd != END_TSO_QUEUE);
966

967
    /* Take a thread from the run queue, if we have work */
968
    t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
969
    IF_DEBUG(sanity,checkTSO(t));
970 971 972 973 974 975

    /* ToDo: write something to the log-file
    if (RTSflags.ParFlags.granSimStats && !sameThread)
        DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);

    CurrentTSO = t;
976 977 978
    */
    /* the spark pool for the current PE */
    pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
979

980 981 982 983
    IF_DEBUG(scheduler, 
	     belch("--=^ %d threads, %d sparks on [%#x]", 
		   run_queue_len(), spark_queue_len(pool), CURRENT_PROC));

sof's avatar
sof committed
984
# if 1
985 986 987 988 989 990 991 992
    if (0 && RtsFlags.ParFlags.ParStats.Full && 
	t && LastTSO && t->id != LastTSO->id && 
	LastTSO->why_blocked == NotBlocked && 
	LastTSO->what_next != ThreadComplete) {
      // if previously scheduled TSO not blocked we have to record the context switch
      DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
			   GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
    }
993

994 995 996
    if (RtsFlags.ParFlags.ParStats.Full && 
	(emitSchedule /* forced emit */ ||
        (t && LastTSO && t->id != LastTSO->id))) {
997 998 999 1000 1001 1002 1003 1004 1005
      /* 
	 we are running a different TSO, so write a schedule event to log file
	 NB: If we use fair scheduling we also have to write  a deschedule 
	     event for LastTSO; with unfair scheduling we know that the
	     previous tso has blocked whenever we switch to another tso, so
	     we don't need it in GUM for now
      */
      DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
		       GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1006
      emitSchedule = rtsFalse;
1007
    }
1008
     
sof's avatar
sof committed
1009
# endif
1010
#else /* !GRAN && !PAR */
1011
  
sof's avatar
sof committed
1012
    /* grab a thread from the run queue */
1013
    ASSERT(run_queue_hd != END_TSO_QUEUE);
1014
    t = POP_RUN_QUEUE();
1015 1016
    // Sanity check the thread we're about to run.  This can be
    // expensive if there is lots of thread switching going on...
1017
    IF_DEBUG(sanity,checkTSO(t));
1018
#endif
1019
    
1020
    cap->r.rCurrentTSO = t;
1021
    
1022 1023 1024 1025
    /* context switches are now initiated by the timer signal, unless
     * the user specified "context switch as often as possible", with
     * +RTS -C0
     */
1026 1027 1028 1029 1030 1031 1032 1033
    if (
#ifdef PROFILING
	RtsFlags.ProfFlags.profileInterval == 0 ||
#endif
	(RtsFlags.ConcFlags.ctxtSwitchTicks == 0
	 && (run_queue_hd != END_TSO_QUEUE
	     || blocked_queue_hd != END_TSO_QUEUE
	     || sleeping_queue != END_TSO_QUEUE)))
1034 1035 1036
	context_switch = 1;
    else
	context_switch = 0;
1037

1038
    RELEASE_LOCK(&sched_mutex);
1039

1040
    IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
1041
			      t->id, t, whatNext_strs[t->what_next]));
1042

1043 1044 1045 1046
#ifdef PROFILING
    startHeapProfTimer();
#endif

1047
    /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1048 1049
    /* Run the current thread 
     */
1050
    switch (cap->r.rCurrentTSO->what_next) {
1051 1052
    case ThreadKilled:
    case ThreadComplete:
1053 1054 1055
	/* Thread already finished, return to scheduler. */
	ret = ThreadFinished;
	break;
1056
    case ThreadEnterGHC:
1057
	ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1058
	break;
1059
    case ThreadRunGHC:
1060
	ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1061
	break;
1062
    case ThreadEnterInterp:
1063 1064
	ret = interpretBCO(cap);
	break;
1065
    default:
1066
      barf("schedule: invalid what_next field");
1067
    }
1068
    /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1069 1070 1071
    
    /* Costs for the scheduler are assigned to CCS_SYSTEM */
#ifdef PROFILING
1072
    stopHeapProfTimer();
1073 1074 1075 1076 1077 1078
    CCCS = CCS_SYSTEM;
#endif
    
    ACQUIRE_LOCK(&sched_mutex);

#ifdef SMP
sof's avatar
sof committed
1079
    IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1080
#elif !defined(GRAN) && !defined(PAR)
1081
    IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1082
#endif
1083
    t = cap->r.rCurrentTSO;
1084
    
1085 1086 1087 1088
#if defined(PAR)
    /* HACK 675: if the last thread didn't yield, make sure to print a 
       SCHEDULE event t