Schedule.c 110 KB
Newer Older
1
/* ---------------------------------------------------------------------------
2
 *
3
 * (c) The GHC Team, 1998-2004
4 5 6
 *
 * Scheduler
 *
7 8 9 10 11
 * Different GHC ways use this scheduler quite differently (see comments below)
 * Here is the global picture:
 *
 * WAY  Name     CPP flag  What's it for
 * --------------------------------------
12
 * mp   GUM      PAR          Parallel execution on a distrib. memory machine
sof's avatar
sof committed
13 14 15 16
 * s    SMP      SMP          Parallel execution on a shared memory machine
 * mg   GranSim  GRAN         Simulation of parallel execution
 * md   GUM/GdH  DIST         Distributed execution (based on GUM)
 *
17 18
 * --------------------------------------------------------------------------*/

19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
/* 
 * Version with support for distributed memory parallelism aka GUM (WAY=mp):

   The main scheduling loop in GUM iterates until a finish message is received.
   In that case a global flag @receivedFinish@ is set and this instance of
   the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
   for the handling of incoming messages, such as PP_FINISH.
   Note that in the parallel case we have a system manager that coordinates
   different PEs, each of which are running one instance of the RTS.
   See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
   From this routine processes executing ghc/rts/Main.c are spawned. -- HWL

 * Version with support for simulating parallel execution aka GranSim (WAY=mg):

   The main scheduling code in GranSim is quite different from that in std
   (concurrent) Haskell: while concurrent Haskell just iterates over the
   threads in the runnable queue, GranSim is event driven, i.e. it iterates
   over the events in the global event queue.  -- HWL
37 38
*/

39
#include "PosixSource.h"
40 41 42 43
#include "Rts.h"
#include "SchedAPI.h"
#include "RtsUtils.h"
#include "RtsFlags.h"
44
#include "BlockAlloc.h"
45 46 47
#include "Storage.h"
#include "StgRun.h"
#include "Hooks.h"
sof's avatar
sof committed
48
#define COMPILING_SCHEDULER
49 50 51
#include "Schedule.h"
#include "StgMiscClosures.h"
#include "Storage.h"
52
#include "Interpreter.h"
53
#include "Exception.h"
54 55 56
#include "Printer.h"
#include "Signals.h"
#include "Sanity.h"
57
#include "Stats.h"
58
#include "STM.h"
sof's avatar
sof committed
59
#include "Timer.h"
60
#include "Prelude.h"
61
#include "ThreadLabels.h"
62 63
#include "LdvProfile.h"
#include "Updates.h"
64 65 66 67
#ifdef PROFILING
#include "Proftimer.h"
#include "ProfHeap.h"
#endif
68 69 70 71 72 73 74 75 76
#if defined(GRAN) || defined(PAR)
# include "GranSimRts.h"
# include "GranSim.h"
# include "ParallelRts.h"
# include "Parallel.h"
# include "ParallelDebug.h"
# include "FetchMe.h"
# include "HLC.h"
#endif
77
#include "Sparks.h"
sof's avatar
sof committed
78 79
#include "Capability.h"
#include "OSThreads.h"
sof's avatar
sof committed
80
#include  "Task.h"
81

82 83 84 85 86 87 88
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif

89 90
#include <string.h>
#include <stdlib.h>
91
#include <stdarg.h>
92

93 94 95 96
#ifdef HAVE_ERRNO_H
#include <errno.h>
#endif

97 98 99 100 101 102 103 104 105 106 107 108
#ifdef THREADED_RTS
#define USED_IN_THREADED_RTS
#else
#define USED_IN_THREADED_RTS STG_UNUSED
#endif

#ifdef RTS_SUPPORTS_THREADS
#define USED_WHEN_RTS_SUPPORTS_THREADS
#else
#define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
#endif

109 110 111
/* Main thread queue.
 * Locks required: sched_mutex.
 */
112
StgMainThread *main_threads = NULL;
113 114 115 116

/* Thread queues.
 * Locks required: sched_mutex.
 */
117 118 119
#if defined(GRAN)

StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
120
/* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
121 122

/* 
sof's avatar
sof committed
123
   In GranSim we have a runnable and a blocked queue for each processor.
124 125 126 127 128 129 130 131
   In order to minimise code changes new arrays run_queue_hds/tls
   are created. run_queue_hd is then a short cut (macro) for
   run_queue_hds[CurrentProc] (see GranSim.h).
   -- HWL
*/
StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
StgTSO *ccalling_threadss[MAX_PROC];
132 133 134 135
/* We use the same global list of threads (all_threads) in GranSim as in
   the std RTS (i.e. we are cheating). However, we don't use this list in
   the GranSim specific code at the moment (so we are only potentially
   cheating).  */
136 137 138

#else /* !GRAN */

139 140 141 142 143
StgTSO *run_queue_hd = NULL;
StgTSO *run_queue_tl = NULL;
StgTSO *blocked_queue_hd = NULL;
StgTSO *blocked_queue_tl = NULL;
StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
144

145 146
#endif

147 148 149
/* Linked list of all threads.
 * Used for detecting garbage collected threads.
 */
150
StgTSO *all_threads = NULL;
151

sof's avatar
sof committed
152 153 154
/* When a thread performs a safe C call (_ccall_GC, using old
 * terminology), it gets put on the suspended_ccalling_threads
 * list. Used by the garbage collector.
155 156 157
 */
static StgTSO *suspended_ccalling_threads;

158 159
static StgTSO *threadStackOverflow(StgTSO *tso);

160 161 162 163 164
/* KH: The following two flags are shared memory locations.  There is no need
       to lock them, since they are only unset at the end of a scheduler
       operation.
*/

165
/* flag set by signal handler to precipitate a context switch */
166
int context_switch = 0;
167

168
/* if this flag is set as well, give up execution */
169
rtsBool interrupted = rtsFalse;
170

171 172 173 174 175
/* If this flag is set, we are running Haskell code.  Used to detect
 * uses of 'foreign import unsafe' that should be 'safe'.
 */
rtsBool in_haskell = rtsFalse;

176
/* Next thread ID to allocate.
sof's avatar
sof committed
177
 * Locks required: thread_id_mutex
178
 */
179
static StgThreadID next_thread_id = 1;
180 181 182 183 184 185

/*
 * Pointers to the state of the current thread.
 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
 * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
 */
186
 
187 188 189 190
/* The smallest stack size that makes any sense is:
 *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
 *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
 *  + 1                       (the closure to enter)
191 192
 *  + 1			      (stg_ap_v_ret)
 *  + 1			      (spare slot req'd by stg_ap_v_ret)
193 194 195 196 197
 *
 * A thread with this stack will bomb immediately with a stack
 * overflow, which will increase its stack size.  
 */

198
#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
199

sof's avatar
sof committed
200

201
#if defined(GRAN)
202
StgTSO *CurrentTSO;
203 204
#endif

205 206 207 208 209 210
/*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
 *  exists - earlier gccs apparently didn't.
 *  -= chak
 */
StgTSO dummy_tso;

211
static rtsBool ready_to_gc;
sof's avatar
sof committed
212 213 214 215 216 217 218

/*
 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
 * in an MT setting, needed to signal that a worker thread shouldn't hang around
 * in the scheduler when it is out of work.
 */
static rtsBool shutting_down_scheduler = rtsFalse;
219 220 221

void            addToBlockedQueue ( StgTSO *tso );

222
static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
223
       void     interruptStgRts   ( void );
224

225
#if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
226
static void     detectBlackHoles  ( void );
227
#endif
228

229 230
static void     raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically);

sof's avatar
sof committed
231 232 233 234
#if defined(RTS_SUPPORTS_THREADS)
/* ToDo: carefully document the invariants that go together
 *       with these synchronisation objects.
 */
sof's avatar
sof committed
235 236
Mutex     sched_mutex       = INIT_MUTEX_VAR;
Mutex     term_mutex        = INIT_MUTEX_VAR;
sof's avatar
sof committed
237 238

#endif /* RTS_SUPPORTS_THREADS */
sof's avatar
sof committed
239

240 241 242
#if defined(PAR)
StgTSO *LastTSO;
rtsTime TimeOfLastYield;
243
rtsBool emitSchedule = rtsTrue;
244 245
#endif

246
#if DEBUG
247
static char *whatNext_strs[] = {
248
  "(unknown)",
249
  "ThreadRunGHC",
250
  "ThreadInterpret",
251
  "ThreadKilled",
252
  "ThreadRelocated",
253 254 255 256
  "ThreadComplete"
};
#endif

sof's avatar
sof committed
257
#if defined(PAR)
258 259 260 261
StgTSO * createSparkThread(rtsSpark spark);
StgTSO * activateSpark (rtsSpark spark);  
#endif

262 263 264
/* ----------------------------------------------------------------------------
 * Starting Tasks
 * ------------------------------------------------------------------------- */
265

266 267 268
#if defined(RTS_SUPPORTS_THREADS)
static rtsBool startingWorkerThread = rtsFalse;

sof's avatar
sof committed
269 270 271 272
static void taskStart(void);
static void
taskStart(void)
{
273
  ACQUIRE_LOCK(&sched_mutex);
274
  startingWorkerThread = rtsFalse;
275
  schedule(NULL,NULL);
276
  RELEASE_LOCK(&sched_mutex);
sof's avatar
sof committed
277 278
}

279
void
280
startSchedulerTaskIfNecessary(void)
281
{
282 283 284 285 286 287 288 289 290
  if(run_queue_hd != END_TSO_QUEUE
    || blocked_queue_hd != END_TSO_QUEUE
    || sleeping_queue != END_TSO_QUEUE)
  {
    if(!startingWorkerThread)
    { // we don't want to start another worker thread
      // just because the last one hasn't yet reached the
      // "waiting for capability" state
      startingWorkerThread = rtsTrue;
291 292 293 294
      if(!startTask(taskStart))
      {
        startingWorkerThread = rtsFalse;
      }
295 296
    }
  }
297 298
}
#endif
sof's avatar
sof committed
299

300
/* ---------------------------------------------------------------------------
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318
   Main scheduling loop.

   We use round-robin scheduling, each thread returning to the
   scheduler loop when one of these conditions is detected:

      * out of heap space
      * timer expires (thread yields)
      * thread blocks
      * thread ends
      * stack overflow

   Locking notes:  we acquire the scheduler lock once at the beginning
   of the scheduler loop, and release it when
    
      * running a thread, or
      * waiting for work, or
      * waiting for a GC to complete.

319 320 321 322 323 324 325 326 327 328 329 330 331 332 333
   GRAN version:
     In a GranSim setup this loop iterates over the global event queue.
     This revolves around the global event queue, which determines what 
     to do next. Therefore, it's more complicated than either the 
     concurrent or the parallel (GUM) setup.

   GUM version:
     GUM iterates over incoming messages.
     It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
     and sends out a fish whenever it has nothing to do; in-between
     doing the actual reductions (shared code below) it processes the
     incoming messages and deals with delayed operations 
     (see PendingFetches).
     This is not the ugliest code you could imagine, but it's bloody close.

334
   ------------------------------------------------------------------------ */
335
static void
336 337
schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
          Capability *initialCapability )
338 339
{
  StgTSO *t;
340
  Capability *cap;
341
  StgThreadReturnCode ret;
342 343 344
#if defined(GRAN)
  rtsEvent *event;
#elif defined(PAR)
345
  StgSparkPool *pool;
346 347 348
  rtsSpark spark;
  StgTSO *tso;
  GlobalTaskId pe;
349 350 351 352
  rtsBool receivedFinish = rtsFalse;
# if defined(DEBUG)
  nat tp_size, sp_size; // stats only
# endif
353
#endif
354
  rtsBool was_interrupted = rtsFalse;
355
  nat prev_what_next;
356
  
357
  // Pre-condition: sched_mutex is held.
358 359 360
  // We might have a capability, passed in as initialCapability.
  cap = initialCapability;

sof's avatar
sof committed
361
#if defined(RTS_SUPPORTS_THREADS)
362 363 364 365 366
  //
  // in the threaded case, the capability is either passed in via the
  // initialCapability parameter, or initialized inside the scheduler
  // loop 
  //
367
  IF_DEBUG(scheduler,
368 369 370
	   sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
		       mainThread, initialCapability);
      );
sof's avatar
sof committed
371
#else
372
  // simply initialise it in the non-threaded case
sof's avatar
sof committed
373
  grabCapability(&cap);
sof's avatar
sof committed
374
#endif
375

376
#if defined(GRAN)
377 378 379 380 381 382 383
  /* set up first event to get things going */
  /* ToDo: assign costs for system setup and init MainTSO ! */
  new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
	    ContinueThread, 
	    CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);

  IF_DEBUG(gran,
384
	   debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
385 386 387 388 389 390 391 392 393 394 395 396 397 398
	   G_TSO(CurrentTSO, 5));

  if (RtsFlags.GranFlags.Light) {
    /* Save current time; GranSim Light only */
    CurrentTSO->gran.clock = CurrentTime[CurrentProc];
  }      

  event = get_next_event();

  while (event!=(rtsEvent*)NULL) {
    /* Choose the processor with the next event */
    CurrentProc = event->proc;
    CurrentTSO = event->tso;

399
#elif defined(PAR)
400

401 402
  while (!receivedFinish) {    /* set by processMessages */
                               /* when receiving PP_FINISH message         */ 
403 404

#else // everything except GRAN and PAR
405

406
  while (1) {
407

408
#endif
409

410
     IF_DEBUG(scheduler, printAllThreads());
411

sof's avatar
sof committed
412
#if defined(RTS_SUPPORTS_THREADS)
413 414 415 416 417
      // Yield the capability to higher-priority tasks if necessary.
      //
      if (cap != NULL) {
	  yieldCapability(&cap);
      }
418

419 420 421 422 423 424 425 426
      // If we do not currently hold a capability, we wait for one
      //
      if (cap == NULL) {
	  waitForCapability(&sched_mutex, &cap,
			    mainThread ? &mainThread->bound_thread_cond : NULL);
      }

      // We now have a capability...
sof's avatar
sof committed
427 428
#endif

429 430 431 432 433 434 435 436 437
    // Check whether we have re-entered the RTS from Haskell without
    // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
    // call).
    if (in_haskell) {
    	  errorBelch("schedule: re-entered unsafely.\n"
    		     "   Perhaps a 'foreign import unsafe' should be 'safe'?");
    	  stg_exit(1);
    }

438 439 440 441 442
    //
    // If we're interrupted (the user pressed ^C, or some other
    // termination condition occurred), kill all the currently running
    // threads.
    //
443
    if (interrupted) {
444 445 446
	IF_DEBUG(scheduler, sched_belch("interrupted"));
	interrupted = rtsFalse;
	was_interrupted = rtsTrue;
447
#if defined(RTS_SUPPORTS_THREADS)
448 449
	// In the threaded RTS, deadlock detection doesn't work,
	// so just exit right away.
450
	errorBelch("interrupted");
451 452 453
	releaseCapability(cap);
	RELEASE_LOCK(&sched_mutex);
	shutdownHaskellAndExit(EXIT_SUCCESS);
454
#else
455
	deleteAllThreads();
456
#endif
457 458
    }

sof's avatar
sof committed
459
#if defined(RTS_USER_SIGNALS)
460
    // check for signals each time around the scheduler
461
    if (signals_pending()) {
sof's avatar
sof committed
462
      RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
463
      startSignalHandlers();
sof's avatar
sof committed
464
      ACQUIRE_LOCK(&sched_mutex);
465 466 467
    }
#endif

468 469 470 471 472
    //
    // Check whether any waiting threads need to be woken up.  If the
    // run queue is empty, and there are no other tasks running, we
    // can wait indefinitely for something to happen.
    //
473 474
    if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
    {
475
#if defined(RTS_SUPPORTS_THREADS)
476 477
	// We shouldn't be here...
	barf("schedule: awaitEvent() in threaded RTS");
478
#endif
479
	awaitEvent( EMPTY_RUN_QUEUE() );
480
    }
481
    // we can be interrupted while waiting for I/O...
482 483
    if (interrupted) continue;

484 485 486 487 488 489 490 491 492 493
    /* 
     * Detect deadlock: when we have no threads to run, there are no
     * threads waiting on I/O or sleeping, and all the other tasks are
     * waiting for work, we must have a deadlock of some description.
     *
     * We first try to find threads blocked on themselves (ie. black
     * holes), and generate NonTermination exceptions where necessary.
     *
     * If no threads are black holed, we have a deadlock situation, so
     * inform all the main threads.
494
     */
495
#if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
496
    if (   EMPTY_THREAD_QUEUES() )
497
    {
498
	IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
499

500 501
	// Garbage collection can release some new threads due to
	// either (a) finalizers or (b) threads resurrected because
502 503 504
	// they are unreachable and will therefore be sent an
	// exception.  Any threads thus released will be immediately
	// runnable.
505
	GarbageCollect(GetRoots,rtsTrue);
506 507
	if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }

sof's avatar
sof committed
508
#if defined(RTS_USER_SIGNALS)
509 510 511 512 513 514 515 516 517 518 519 520 521 522
	/* If we have user-installed signal handlers, then wait
	 * for signals to arrive rather then bombing out with a
	 * deadlock.
	 */
	if ( anyUserHandlers() ) {
	    IF_DEBUG(scheduler, 
		     sched_belch("still deadlocked, waiting for signals..."));

	    awaitUserSignals();

	    // we might be interrupted...
	    if (interrupted) { continue; }

	    if (signals_pending()) {
sof's avatar
sof committed
523
		RELEASE_LOCK(&sched_mutex);
524
		startSignalHandlers();
sof's avatar
sof committed
525
		ACQUIRE_LOCK(&sched_mutex);
526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543
	    }
	    ASSERT(!EMPTY_RUN_QUEUE());
	    goto not_deadlocked;
	}
#endif

	/* Probably a real deadlock.  Send the current main thread the
	 * Deadlock exception (or in the SMP build, send *all* main
	 * threads the deadlock exception, since none of them can make
	 * progress).
	 */
	{
	    StgMainThread *m;
	    m = main_threads;
	    switch (m->tso->why_blocked) {
	    case BlockedOnBlackHole:
	    case BlockedOnException:
	    case BlockedOnMVar:
544
		raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
545 546 547
		break;
	    default:
		barf("deadlock: main thread blocked in a strange way");
sof's avatar
sof committed
548
	    }
549
	}
550
    }
551 552
  not_deadlocked:

553
#elif defined(RTS_SUPPORTS_THREADS)
554
    // ToDo: add deadlock detection in threaded RTS
555
#elif defined(PAR)
556
    // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
557 558
#endif

559
#if defined(RTS_SUPPORTS_THREADS) || defined(mingw32_HOST_OS)
sof's avatar
sof committed
560 561 562
    /* win32: might be back here due to awaitEvent() being abandoned
     * as a result of a console event having been delivered.
     */
563
    if ( EMPTY_RUN_QUEUE() ) {
564
	continue; // nothing to do
565 566
    }
#endif
567 568

#if defined(GRAN)
569 570 571 572 573 574 575 576 577 578 579 580
    if (RtsFlags.GranFlags.Light)
      GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc

    /* adjust time based on time-stamp */
    if (event->time > CurrentTime[CurrentProc] &&
        event->evttype != ContinueThread)
      CurrentTime[CurrentProc] = event->time;
    
    /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
    if (!RtsFlags.GranFlags.Light)
      handleIdlePEs();

581
    IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
582 583 584 585 586

    /* main event dispatcher in GranSim */
    switch (event->evttype) {
      /* Should just be continuing execution */
    case ContinueThread:
587
      IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
588 589 590 591 592 593 594
      /* ToDo: check assertion
      ASSERT(run_queue_hd != (StgTSO*)NULL &&
	     run_queue_hd != END_TSO_QUEUE);
      */
      /* Ignore ContinueThreads for fetching threads (if synchr comm) */
      if (!RtsFlags.GranFlags.DoAsyncFetch &&
	  procStatus[CurrentProc]==Fetching) {
595
	debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
596 597 598 599 600
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }	
      /* Ignore ContinueThreads for completed threads */
      if (CurrentTSO->what_next == ThreadComplete) {
601
	debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
602 603 604 605 606
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }	
      /* Ignore ContinueThreads for threads that are being migrated */
      if (PROCS(CurrentTSO)==Nowhere) { 
607
	debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
608 609 610 611 612
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }
      /* The thread should be at the beginning of the run queue */
      if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
613
	debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	break; // run the thread anyway
      }
      /*
      new_event(proc, proc, CurrentTime[proc],
		FindWork,
		(StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
      goto next_thread; 
      */ /* Catches superfluous CONTINUEs -- should be unnecessary */
      break; // now actually run the thread; DaH Qu'vam yImuHbej 

    case FetchNode:
      do_the_fetchnode(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case GlobalBlock:
      do_the_globalblock(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case FetchReply:
      do_the_fetchreply(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case UnblockThread:   /* Move from the blocked queue to the tail of */
      do_the_unblock(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case ResumeThread:  /* Move from the blocked queue to the tail of */
      /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
      event->tso->gran.blocktime += 
	CurrentTime[CurrentProc] - event->tso->gran.blockedat;
      do_the_startthread(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case StartThread:
      do_the_startthread(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case MoveThread:
      do_the_movethread(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case MoveSpark:
      do_the_movespark(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case FindWork:
      do_the_findwork(event);
      goto next_thread;             /* handle next event in event queue  */
      
    default:
      barf("Illegal event type %u\n", event->evttype);
    }  /* switch */
    
    /* This point was scheduler_loop in the old RTS */

670
    IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
671 672 673 674 675 676

    TimeOfLastEvent = CurrentTime[CurrentProc];
    TimeOfNextEvent = get_time_of_next_event();
    IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
    // CurrentTSO = ThreadQueueHd;

677
    IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
678 679 680 681 682 683 684 685
			 TimeOfNextEvent));

    if (RtsFlags.GranFlags.Light) 
      GranSimLight_leave_system(event, &ActiveTSO); 

    EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;

    IF_DEBUG(gran, 
686
	     debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
687 688 689 690

    /* in a GranSim setup the TSO stays on the run queue */
    t = CurrentTSO;
    /* Take a thread from the run queue. */
sof's avatar
sof committed
691
    POP_RUN_QUEUE(t); // take_off_run_queue(t);
692 693

    IF_DEBUG(gran, 
694
	     debugBelch("GRAN: About to run current thread, which is\n");
695
	     G_TSO(t,5));
696 697 698 699 700 701 702 703

    context_switch = 0; // turned on via GranYield, checking events and time slice

    IF_DEBUG(gran, 
	     DumpGranEvent(GR_SCHEDULE, t));

    procStatus[CurrentProc] = Busy;

704
#elif defined(PAR)
705 706 707 708
    if (PendingFetches != END_BF_QUEUE) {
        processFetches();
    }

709
    /* ToDo: phps merge with spark activation above */
710
    /* check whether we have local work and send requests if we have none */
711
    if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
712
      /* :-[  no local threads => look out for local sparks */
713 714
      /* the spark pool for the current PE */
      pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
715
      if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
716
	  pool->hd < pool->tl) {
717 718 719 720 721 722 723 724
	/* 
	 * ToDo: add GC code check that we really have enough heap afterwards!!
	 * Old comment:
	 * If we're here (no runnable threads) and we have pending
	 * sparks, we must have a space problem.  Get enough space
	 * to turn one of those pending sparks into a
	 * thread... 
	 */
725 726

	spark = findSpark(rtsFalse);                /* get a spark */
727 728
	if (spark != (rtsSpark) NULL) {
	  tso = activateSpark(spark);       /* turn the spark into a thread */
729
	  IF_PAR_DEBUG(schedule,
730
		       debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
731
			     tso->id, tso, advisory_thread_count));
732

733
	  if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
734
	    debugBelch("==^^ failed to activate spark\n");
735
	    goto next_thread;
736
	  }               /* otherwise fall through & pick-up new tso */
737 738
	} else {
	  IF_PAR_DEBUG(verbose,
739
		       debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
740
			     spark_queue_len(pool)));
741 742
	  goto next_thread;
	}
743 744 745 746 747 748
      }

      /* If we still have no work we need to send a FISH to get a spark
	 from another PE 
      */
      if (EMPTY_RUN_QUEUE()) {
749 750 751 752 753 754 755 756 757
      /* =8-[  no local sparks => look for work on other PEs */
	/*
	 * We really have absolutely no work.  Send out a fish
	 * (there may be some out there already), and wait for
	 * something to arrive.  We clearly can't run any threads
	 * until a SCHEDULE or RESUME arrives, and so that's what
	 * we're hoping to see.  (Of course, we still have to
	 * respond to other types of messages.)
	 */
758 759
	TIME now = msTime() /*CURRENT_TIME*/;
	IF_PAR_DEBUG(verbose, 
760
		     debugBelch("--  now=%ld\n", now));
761 762 763 764
	IF_PAR_DEBUG(verbose,
		     if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
			 (last_fish_arrived_at!=0 &&
			  last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
765
		       debugBelch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)\n",
766 767 768 769 770 771 772 773 774
			     last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
			     last_fish_arrived_at,
			     RtsFlags.ParFlags.fishDelay, now);
		     });
	
	if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
	    (last_fish_arrived_at==0 ||
	     (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
	  /* outstandingFishes is set in sendFish, processFish;
775 776 777 778
	     avoid flooding system with fishes via delay */
	  pe = choosePE();
	  sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
		   NEW_FISH_HUNGER);
779 780 781 782 783 784

	  // Global statistics: count no. of fishes
	  if (RtsFlags.ParFlags.ParStats.Global &&
	      RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
	    globalParStats.tot_fish_mess++;
	  }
785
	}
786 787
      
	receivedFinish = processMessages();
788 789 790
	goto next_thread;
      }
    } else if (PacketsWaiting()) {  /* Look for incoming messages */
791
      receivedFinish = processMessages();
792 793 794 795
    }

    /* Now we are sure that we have some work available */
    ASSERT(run_queue_hd != END_TSO_QUEUE);
796

797
    /* Take a thread from the run queue, if we have work */
sof's avatar
sof committed
798
    POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
799
    IF_DEBUG(sanity,checkTSO(t));
800 801 802 803 804 805

    /* ToDo: write something to the log-file
    if (RTSflags.ParFlags.granSimStats && !sameThread)
        DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);

    CurrentTSO = t;
806 807 808
    */
    /* the spark pool for the current PE */
    pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
809

810
    IF_DEBUG(scheduler, 
811
	     debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
812 813
		   run_queue_len(), spark_queue_len(pool), CURRENT_PROC));

sof's avatar
sof committed
814
# if 1
815 816 817 818 819 820 821 822
    if (0 && RtsFlags.ParFlags.ParStats.Full && 
	t && LastTSO && t->id != LastTSO->id && 
	LastTSO->why_blocked == NotBlocked && 
	LastTSO->what_next != ThreadComplete) {
      // if previously scheduled TSO not blocked we have to record the context switch
      DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
			   GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
    }
823

824 825 826
    if (RtsFlags.ParFlags.ParStats.Full && 
	(emitSchedule /* forced emit */ ||
        (t && LastTSO && t->id != LastTSO->id))) {
827 828 829 830 831 832 833 834 835
      /* 
	 we are running a different TSO, so write a schedule event to log file
	 NB: If we use fair scheduling we also have to write  a deschedule 
	     event for LastTSO; with unfair scheduling we know that the
	     previous tso has blocked whenever we switch to another tso, so
	     we don't need it in GUM for now
      */
      DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
		       GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
836
      emitSchedule = rtsFalse;
837
    }
838
     
sof's avatar
sof committed
839
# endif
840
#else /* !GRAN && !PAR */
841
  
842
    // grab a thread from the run queue
843
    ASSERT(run_queue_hd != END_TSO_QUEUE);
sof's avatar
sof committed
844
    POP_RUN_QUEUE(t);
845

846 847
    // Sanity check the thread we're about to run.  This can be
    // expensive if there is lots of thread switching going on...
848
    IF_DEBUG(sanity,checkTSO(t));
849
#endif
850

851 852
#ifdef THREADED_RTS
    {
853
      StgMainThread *m = t->main;
854 855 856 857 858 859
      
      if(m)
      {
	if(m == mainThread)
	{
	  IF_DEBUG(scheduler,
860
	    sched_belch("### Running thread %d in bound thread", t->id));
861 862 863 864 865
	  // yes, the Haskell thread is bound to the current native thread
	}
	else
	{
	  IF_DEBUG(scheduler,
866
	    sched_belch("### thread %d bound to another OS thread", t->id));
867 868
	  // no, bound to a different Haskell thread: pass to that thread
	  PUSH_ON_RUN_QUEUE(t);
869
	  passCapability(&m->bound_thread_cond);
870 871 872 873 874
	  continue;
	}
      }
      else
      {
875 876
	if(mainThread != NULL)
        // The thread we want to run is bound.
877 878
	{
	  IF_DEBUG(scheduler,
879
	    sched_belch("### this OS thread cannot run thread %d", t->id));
880 881 882
	  // no, the current native thread is bound to a different
	  // Haskell thread, so pass it to any worker thread
	  PUSH_ON_RUN_QUEUE(t);
883
	  passCapabilityToWorker();
884 885 886 887 888 889
	  continue; 
	}
      }
    }
#endif

890
    cap->r.rCurrentTSO = t;
891
    
892 893 894 895
    /* context switches are now initiated by the timer signal, unless
     * the user specified "context switch as often as possible", with
     * +RTS -C0
     */
896
    if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
897 898 899
	 && (run_queue_hd != END_TSO_QUEUE
	     || blocked_queue_hd != END_TSO_QUEUE
	     || sleeping_queue != END_TSO_QUEUE)))
900
	context_switch = 1;
901

902 903
run_thread:

904
    RELEASE_LOCK(&sched_mutex);
905

906
    IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
907
			      (long)t->id, whatNext_strs[t->what_next]));
908

909 910 911 912
#ifdef PROFILING
    startHeapProfTimer();
#endif

913
    /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
914 915
    /* Run the current thread 
     */
916
    prev_what_next = t->what_next;
917 918

    errno = t->saved_errno;
919
    in_haskell = rtsTrue;
920

921
    switch (prev_what_next) {
922

923 924
    case ThreadKilled:
    case ThreadComplete:
925 926 927
	/* Thread already finished, return to scheduler. */
	ret = ThreadFinished;
	break;
928

929
    case ThreadRunGHC:
930
	ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
931
	break;
932

933
    case ThreadInterpret:
934 935
	ret = interpretBCO(cap);
	break;
936

937
    default:
938
      barf("schedule: invalid what_next field");
939
    }
940

941 942
    in_haskell = rtsFalse;

943 944 945 946 947 948
    // The TSO might have moved, so find the new location:
    t = cap->r.rCurrentTSO;

    // And save the current errno in this thread.
    t->saved_errno = errno;

949
    /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
950 951 952
    
    /* Costs for the scheduler are assigned to CCS_SYSTEM */
#ifdef PROFILING
953
    stopHeapProfTimer();
954 955 956 957
    CCCS = CCS_SYSTEM;
#endif
    
    ACQUIRE_LOCK(&sched_mutex);
958 959
    
#ifdef RTS_SUPPORTS_THREADS
960
    IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
961
#elif !defined(GRAN) && !defined(PAR)
962
    IF_DEBUG(scheduler,debugBelch("sched: "););
963 964
#endif
    
965 966 967 968
#if defined(PAR)
    /* HACK 675: if the last thread didn't yield, make sure to print a 
       SCHEDULE event to the log file when StgRunning the next thread, even
       if it is the same one as before */
969
    LastTSO = t; 
970 971 972
    TimeOfLastYield = CURRENT_TIME;
#endif

973 974
    switch (ret) {
    case HeapOverflow:
975
#if defined(GRAN)