Schedule.c 95.6 KB
Newer Older
1
/* ---------------------------------------------------------------------------
2
 *
3
 * (c) The GHC Team, 1998-2006
4
 *
5
 * The scheduler and thread-related functionality
sof's avatar
sof committed
6
 *
7 8
 * --------------------------------------------------------------------------*/

9
#include "PosixSource.h"
10 11 12 13
#include "Rts.h"
#include "SchedAPI.h"
#include "RtsUtils.h"
#include "RtsFlags.h"
14
#include "BlockAlloc.h"
15
#include "OSThreads.h"
16 17 18 19 20
#include "Storage.h"
#include "StgRun.h"
#include "Hooks.h"
#include "Schedule.h"
#include "StgMiscClosures.h"
21
#include "Interpreter.h"
22
#include "Printer.h"
23
#include "RtsSignals.h"
24
#include "Sanity.h"
25
#include "Stats.h"
26
#include "STM.h"
sof's avatar
sof committed
27
#include "Timer.h"
28
#include "Prelude.h"
29
#include "ThreadLabels.h"
30 31
#include "LdvProfile.h"
#include "Updates.h"
32 33 34 35
#ifdef PROFILING
#include "Proftimer.h"
#include "ProfHeap.h"
#endif
36
#if defined(GRAN) || defined(PARALLEL_HASKELL)
37 38 39 40 41 42 43 44
# include "GranSimRts.h"
# include "GranSim.h"
# include "ParallelRts.h"
# include "Parallel.h"
# include "ParallelDebug.h"
# include "FetchMe.h"
# include "HLC.h"
#endif
45
#include "Sparks.h"
sof's avatar
sof committed
46
#include "Capability.h"
47 48
#include "Task.h"
#include "AwaitEvent.h"
49 50 51
#if defined(mingw32_HOST_OS)
#include "win32/IOManager.h"
#endif
Simon Marlow's avatar
Simon Marlow committed
52
#include "Trace.h"
53 54
#include "RaiseAsync.h"
#include "Threads.h"
55

56 57 58 59 60 61 62
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif

63 64
#include <string.h>
#include <stdlib.h>
65
#include <stdarg.h>
66

67 68 69 70
#ifdef HAVE_ERRNO_H
#include <errno.h>
#endif

71 72 73 74 75 76
// Turn off inlining when debugging - it obfuscates things
#ifdef DEBUG
# undef  STATIC_INLINE
# define STATIC_INLINE static
#endif

77 78 79
/* -----------------------------------------------------------------------------
 * Global variables
 * -------------------------------------------------------------------------- */
80

81 82 83
#if defined(GRAN)

StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
84
/* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
85 86

/* 
sof's avatar
sof committed
87
   In GranSim we have a runnable and a blocked queue for each processor.
88 89 90 91 92 93 94 95
   In order to minimise code changes new arrays run_queue_hds/tls
   are created. run_queue_hd is then a short cut (macro) for
   run_queue_hds[CurrentProc] (see GranSim.h).
   -- HWL
*/
StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
StgTSO *ccalling_threadss[MAX_PROC];
96 97 98 99
/* We use the same global list of threads (all_threads) in GranSim as in
   the std RTS (i.e. we are cheating). However, we don't use this list in
   the GranSim specific code at the moment (so we are only potentially
   cheating).  */
100 101 102

#else /* !GRAN */

103 104
#if !defined(THREADED_RTS)
// Blocked/sleeping thrads
105 106
StgTSO *blocked_queue_hd = NULL;
StgTSO *blocked_queue_tl = NULL;
107 108
StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
#endif
109

110 111 112 113
/* Threads blocked on blackholes.
 * LOCK: sched_mutex+capability, or all capabilities
 */
StgTSO *blackhole_queue = NULL;
114 115
#endif

116 117
/* The blackhole_queue should be checked for threads to wake up.  See
 * Schedule.h for more thorough comment.
118
 * LOCK: none (doesn't matter if we miss an update)
119 120 121
 */
rtsBool blackholes_need_checking = rtsFalse;

122 123
/* Linked list of all threads.
 * Used for detecting garbage collected threads.
124
 * LOCK: sched_mutex+capability, or all capabilities
125
 */
126
StgTSO *all_threads = NULL;
127

128 129
/* flag set by signal handler to precipitate a context switch
 * LOCK: none (just an advisory flag)
130
 */
131
int context_switch = 0;
132

133 134 135 136
/* flag that tracks whether we have done any execution in this time slice.
 * LOCK: currently none, perhaps we should lock (but needs to be
 * updated in the fast path of the scheduler).
 */
137 138
nat recent_activity = ACTIVITY_YES;

139 140 141
/* if this flag is set as well, give up execution
 * LOCK: none (changes once, from false->true)
 */
142
rtsBool sched_state = SCHED_RUNNING;
143

144
#if defined(GRAN)
145
StgTSO *CurrentTSO;
146 147
#endif

148 149 150 151 152 153
/*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
 *  exists - earlier gccs apparently didn't.
 *  -= chak
 */
StgTSO dummy_tso;

sof's avatar
sof committed
154 155 156 157 158
/*
 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
 * in an MT setting, needed to signal that a worker thread shouldn't hang around
 * in the scheduler when it is out of work.
 */
159
rtsBool shutting_down_scheduler = rtsFalse;
160

161 162
/*
 * This mutex protects most of the global scheduler data in
163
 * the THREADED_RTS runtime.
sof's avatar
sof committed
164
 */
165
#if defined(THREADED_RTS)
166
Mutex sched_mutex;
167
#endif
sof's avatar
sof committed
168

169
#if defined(PARALLEL_HASKELL)
170 171
StgTSO *LastTSO;
rtsTime TimeOfLastYield;
172
rtsBool emitSchedule = rtsTrue;
173 174
#endif

175 176 177 178
#if !defined(mingw32_HOST_OS)
#define FORKPROCESS_PRIMOP_SUPPORTED
#endif

179 180 181 182
/* -----------------------------------------------------------------------------
 * static function prototypes
 * -------------------------------------------------------------------------- */

183
static Capability *schedule (Capability *initialCapability, Task *task);
184 185 186 187 188 189

//
// These function all encapsulate parts of the scheduler loop, and are
// abstracted only to make the structure and control flow of the
// scheduler clearer.
//
190
static void schedulePreLoop (void);
191
#if defined(THREADED_RTS)
192
static void schedulePushWork(Capability *cap, Task *task);
193
#endif
194
static void scheduleStartSignalHandlers (Capability *cap);
195
static void scheduleCheckBlockedThreads (Capability *cap);
196
static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
197 198
static void scheduleCheckBlackHoles (Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
199 200 201 202 203 204 205 206 207 208 209 210 211
#if defined(GRAN)
static StgTSO *scheduleProcessEvent(rtsEvent *event);
#endif
#if defined(PARALLEL_HASKELL)
static StgTSO *scheduleSendPendingMessages(void);
static void scheduleActivateSpark(void);
static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
#endif
#if defined(PAR) || defined(GRAN)
static void scheduleGranParReport(void);
#endif
static void schedulePostRunThread(void);
static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
212 213 214 215
static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
					 StgTSO *t);
static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
				    nat prev_what_next );
216
static void scheduleHandleThreadBlocked( StgTSO *t );
217 218
static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
					     StgTSO *t );
219
static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
220 221 222
static Capability *scheduleDoGC(Capability *cap, Task *task,
				rtsBool force_major, 
				void (*get_roots)(evac_fn));
223 224

static rtsBool checkBlackHoles(Capability *cap);
225 226
static void AllRoots(evac_fn evac);

227
static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
228

229
static void deleteThread (Capability *cap, StgTSO *tso);
230
static void deleteAllThreads (Capability *cap);
231

232 233
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
static void deleteThread_(Capability *cap, StgTSO *tso);
234
#endif
235 236

#if defined(PARALLEL_HASKELL)
237 238 239 240
StgTSO * createSparkThread(rtsSpark spark);
StgTSO * activateSpark (rtsSpark spark);  
#endif

241 242 243 244 245 246 247 248 249
#ifdef DEBUG
static char *whatNext_strs[] = {
  "(unknown)",
  "ThreadRunGHC",
  "ThreadInterpret",
  "ThreadKilled",
  "ThreadRelocated",
  "ThreadComplete"
};
250
#endif
sof's avatar
sof committed
251

252 253 254 255 256
/* -----------------------------------------------------------------------------
 * Putting a thread on the run queue: different scheduling policies
 * -------------------------------------------------------------------------- */

STATIC_INLINE void
257
addToRunQueue( Capability *cap, StgTSO *t )
258 259 260 261
{
#if defined(PARALLEL_HASKELL)
    if (RtsFlags.ParFlags.doFairScheduling) { 
	// this does round-robin scheduling; good for concurrency
262
	appendToRunQueue(cap,t);
263 264
    } else {
	// this does unfair scheduling; good for parallelism
265
	pushOnRunQueue(cap,t);
266 267 268
    }
#else
    // this does round-robin scheduling; good for concurrency
269
    appendToRunQueue(cap,t);
270 271
#endif
}
272

273
/* ---------------------------------------------------------------------------
274 275 276 277 278 279 280 281 282 283 284
   Main scheduling loop.

   We use round-robin scheduling, each thread returning to the
   scheduler loop when one of these conditions is detected:

      * out of heap space
      * timer expires (thread yields)
      * thread blocks
      * thread ends
      * stack overflow

285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
   GRAN version:
     In a GranSim setup this loop iterates over the global event queue.
     This revolves around the global event queue, which determines what 
     to do next. Therefore, it's more complicated than either the 
     concurrent or the parallel (GUM) setup.

   GUM version:
     GUM iterates over incoming messages.
     It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
     and sends out a fish whenever it has nothing to do; in-between
     doing the actual reductions (shared code below) it processes the
     incoming messages and deals with delayed operations 
     (see PendingFetches).
     This is not the ugliest code you could imagine, but it's bloody close.

300
   ------------------------------------------------------------------------ */
301

302 303
static Capability *
schedule (Capability *initialCapability, Task *task)
304 305
{
  StgTSO *t;
306
  Capability *cap;
307
  StgThreadReturnCode ret;
308 309
#if defined(GRAN)
  rtsEvent *event;
310
#elif defined(PARALLEL_HASKELL)
311 312
  StgTSO *tso;
  GlobalTaskId pe;
313 314 315 316
  rtsBool receivedFinish = rtsFalse;
# if defined(DEBUG)
  nat tp_size, sp_size; // stats only
# endif
317
#endif
318
  nat prev_what_next;
319
  rtsBool ready_to_gc;
320
#if defined(THREADED_RTS)
321
  rtsBool first = rtsTrue;
322
#endif
323
  
324 325
  cap = initialCapability;

326 327 328
  // Pre-condition: this task owns initialCapability.
  // The sched_mutex is *NOT* held
  // NB. on return, we still hold a capability.
329

Simon Marlow's avatar
Simon Marlow committed
330 331 332
  debugTrace (DEBUG_sched, 
	      "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
	      task, initialCapability);
333

334
  schedulePreLoop();
335

336 337
  // -----------------------------------------------------------
  // Scheduler loop starts here:
338

339 340 341 342 343 344 345
#if defined(PARALLEL_HASKELL)
#define TERMINATION_CONDITION        (!receivedFinish)
#elif defined(GRAN)
#define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
#else
#define TERMINATION_CONDITION        rtsTrue
#endif
346

347
  while (TERMINATION_CONDITION) {
348

349 350 351 352 353
#if defined(GRAN)
      /* Choose the processor with the next event */
      CurrentProc = event->proc;
      CurrentTSO = event->tso;
#endif
354

355 356 357 358 359 360
#if defined(THREADED_RTS)
      if (first) {
	  // don't yield the first time, we want a chance to run this
	  // thread for a bit, even if there are others banging at the
	  // door.
	  first = rtsFalse;
361
	  ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
362 363 364
      } else {
	  // Yield the capability to higher-priority tasks if necessary.
	  yieldCapability(&cap, task);
365 366
      }
#endif
367
      
368
#if defined(THREADED_RTS)
369
      schedulePushWork(cap,task);
370
#endif
371

372 373 374
    // Check whether we have re-entered the RTS from Haskell without
    // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
    // call).
375
    if (cap->in_haskell) {
376 377
    	  errorBelch("schedule: re-entered unsafely.\n"
    		     "   Perhaps a 'foreign import unsafe' should be 'safe'?");
378
    	  stg_exit(EXIT_FAILURE);
379 380
    }

381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403
    // The interruption / shutdown sequence.
    // 
    // In order to cleanly shut down the runtime, we want to:
    //   * make sure that all main threads return to their callers
    //     with the state 'Interrupted'.
    //   * clean up all OS threads assocated with the runtime
    //   * free all memory etc.
    //
    // So the sequence for ^C goes like this:
    //
    //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
    //     arranges for some Capability to wake up
    //
    //   * all threads in the system are halted, and the zombies are
    //     placed on the run queue for cleaning up.  We acquire all
    //     the capabilities in order to delete the threads, this is
    //     done by scheduleDoGC() for convenience (because GC already
    //     needs to acquire all the capabilities).  We can't kill
    //     threads involved in foreign calls.
    // 
    //   * somebody calls shutdownHaskell(), which calls exitScheduler()
    //
    //   * sched_state := SCHED_SHUTTING_DOWN
404
    //
405 406 407
    //   * all workers exit when the run queue on their capability
    //     drains.  All main threads will also exit when their TSO
    //     reaches the head of the run queue and they can return.
408
    //
409 410 411 412 413 414 415 416 417 418
    //   * eventually all Capabilities will shut down, and the RTS can
    //     exit.
    //
    //   * We might be left with threads blocked in foreign calls, 
    //     we should really attempt to kill these somehow (TODO);
    
    switch (sched_state) {
    case SCHED_RUNNING:
	break;
    case SCHED_INTERRUPTING:
Simon Marlow's avatar
Simon Marlow committed
419
	debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
420
#if defined(THREADED_RTS)
421 422
	discardSparksCap(cap);
#endif
423 424 425 426
	/* scheduleDoGC() deletes all the threads */
	cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
	break;
    case SCHED_SHUTTING_DOWN:
Simon Marlow's avatar
Simon Marlow committed
427
	debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
428 429 430 431 432
	// If we are a worker, just exit.  If we're a bound thread
	// then we will exit below when we've removed our TSO from
	// the run queue.
	if (task->tso == NULL && emptyRunQueue(cap)) {
	    return cap;
433
	}
434 435 436
	break;
    default:
	barf("sched_state: %d", sched_state);
437
    }
438

439
#if defined(THREADED_RTS)
440
    // If the run queue is empty, take a spark and turn it into a thread.
441
    {
442 443 444 445
	if (emptyRunQueue(cap)) {
	    StgClosure *spark;
	    spark = findSpark(cap);
	    if (spark != NULL) {
Simon Marlow's avatar
Simon Marlow committed
446 447 448
		debugTrace(DEBUG_sched,
			   "turning spark of closure %p into a thread",
			   (StgClosure *)spark);
449
		createSparkThread(cap,spark);	  
450 451
	    }
	}
452
    }
453
#endif // THREADED_RTS
454

455
    scheduleStartSignalHandlers(cap);
456

457 458 459 460
    // Only check the black holes here if we've nothing else to do.
    // During normal execution, the black hole list only gets checked
    // at GC time, to avoid repeatedly traversing this possibly long
    // list each time around the scheduler.
461
    if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
462

463
    scheduleCheckWakeupThreads(cap);
464

465
    scheduleCheckBlockedThreads(cap);
466

467
    scheduleDetectDeadlock(cap,task);
468 469 470
#if defined(THREADED_RTS)
    cap = task->cap;    // reload cap, it might have changed
#endif
471 472 473 474 475 476

    // Normally, the only way we can get here with no threads to
    // run is if a keyboard interrupt received during 
    // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
    // Additionally, it is not fatal for the
    // threaded RTS to reach here with no threads to run.
477
    //
478 479
    // win32: might be here due to awaitEvent() being abandoned
    // as a result of a console event having been delivered.
480 481
    if ( emptyRunQueue(cap) ) {
#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
482
	ASSERT(sched_state >= SCHED_INTERRUPTING);
483
#endif
484
	continue; // nothing to do
485
    }
486

487 488
#if defined(PARALLEL_HASKELL)
    scheduleSendPendingMessages();
489
    if (emptyRunQueue(cap) && scheduleActivateSpark()) 
490
	continue;
491

492 493 494
#if defined(SPARKS)
    ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
#endif
495

496 497
    /* If we still have no work we need to send a FISH to get a spark
       from another PE */
498
    if (emptyRunQueue(cap)) {
499 500 501 502 503 504 505 506 507 508 509 510
	if (!scheduleGetRemoteWork(&receivedFinish)) continue;
	ASSERT(rtsFalse); // should not happen at the moment
    }
    // from here: non-empty run queue.
    //  TODO: merge above case with this, only one call processMessages() !
    if (PacketsWaiting()) {  /* process incoming messages, if
				any pending...  only in else
				because getRemoteWork waits for
				messages as well */
	receivedFinish = processMessages();
    }
#endif
511

512 513 514
#if defined(GRAN)
    scheduleProcessEvent(event);
#endif
515

516 517 518
    // 
    // Get a thread to run
    //
519
    t = popRunQueue(cap);
520

521 522 523 524 525 526
#if defined(GRAN) || defined(PAR)
    scheduleGranParReport(); // some kind of debuging output
#else
    // Sanity check the thread we're about to run.  This can be
    // expensive if there is lots of thread switching going on...
    IF_DEBUG(sanity,checkTSO(t));
527 528
#endif

529
#if defined(THREADED_RTS)
530 531 532
    // Check whether we can run this thread in the current task.
    // If not, we have to pass our capability to the right task.
    {
533
	Task *bound = t->bound;
534
      
535 536
	if (bound) {
	    if (bound == task) {
Simon Marlow's avatar
Simon Marlow committed
537
		debugTrace(DEBUG_sched,
538
			   "### Running thread %lu in bound thread", (unsigned long)t->id);
539 540
		// yes, the Haskell thread is bound to the current native thread
	    } else {
Simon Marlow's avatar
Simon Marlow committed
541
		debugTrace(DEBUG_sched,
542
			   "### thread %lu bound to another OS thread", (unsigned long)t->id);
543 544 545 546 547 548 549
		// no, bound to a different Haskell thread: pass to that thread
		pushOnRunQueue(cap,t);
		continue;
	    }
	} else {
	    // The thread we want to run is unbound.
	    if (task->tso) { 
Simon Marlow's avatar
Simon Marlow committed
550
		debugTrace(DEBUG_sched,
551
			   "### this OS thread cannot run thread %lu", (unsigned long)t->id);
552 553 554 555 556
		// no, the current native thread is bound to a different
		// Haskell thread, so pass it to any worker thread
		pushOnRunQueue(cap,t);
		continue; 
	    }
557
	}
558 559 560
    }
#endif

561 562
    cap->r.rCurrentTSO = t;
    
563
    /* context switches are initiated by the timer signal, unless
564 565
     * the user specified "context switch as often as possible", with
     * +RTS -C0
sof's avatar
sof committed
566
     */
567 568
    if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
	&& !emptyThreadQueues(cap)) {
569
	context_switch = 1;
570 571
    }
	 
572
run_thread:
573

Simon Marlow's avatar
Simon Marlow committed
574 575
    debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
			      (long)t->id, whatNext_strs[t->what_next]);
576

577 578 579 580
#if defined(PROFILING)
    startHeapProfTimer();
#endif

581 582 583
    // Check for exceptions blocked on this thread
    maybePerformBlockedException (cap, t);

584 585 586
    // ----------------------------------------------------------------------
    // Run the current thread 

Simon Marlow's avatar
Simon Marlow committed
587
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
588
    ASSERT(t->cap == cap);
Simon Marlow's avatar
Simon Marlow committed
589

590 591 592
    prev_what_next = t->what_next;

    errno = t->saved_errno;
593
    cap->in_haskell = rtsTrue;
594

Simon Marlow's avatar
Simon Marlow committed
595 596
    dirtyTSO(t);

597 598
    recent_activity = ACTIVITY_YES;

599
    switch (prev_what_next) {
600
	
601 602 603 604 605
    case ThreadKilled:
    case ThreadComplete:
	/* Thread already finished, return to scheduler. */
	ret = ThreadFinished;
	break;
606
	
607
    case ThreadRunGHC:
608 609 610 611 612
    {
	StgRegTable *r;
	r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
	cap = regTableToCapability(r);
	ret = r->rRet;
613
	break;
614 615
    }
    
616
    case ThreadInterpret:
617 618
	cap = interpretBCO(cap);
	ret = cap->r.rRet;
619
	break;
620
	
621
    default:
622
	barf("schedule: invalid what_next field");
623 624
    }

625
    cap->in_haskell = rtsFalse;
626

627 628 629 630
    // The TSO might have moved, eg. if it re-entered the RTS and a GC
    // happened.  So find the new location:
    t = cap->r.rCurrentTSO;

631 632 633 634 635 636 637 638 639 640 641 642
    // We have run some Haskell code: there might be blackhole-blocked
    // threads to wake up now.
    // Lock-free test here should be ok, we're just setting a flag.
    if ( blackhole_queue != END_TSO_QUEUE ) {
	blackholes_need_checking = rtsTrue;
    }
    
    // And save the current errno in this thread.
    // XXX: possibly bogus for SMP because this thread might already
    // be running again, see code below.
    t->saved_errno = errno;

643
#if defined(THREADED_RTS)
644 645 646 647 648 649
    // If ret is ThreadBlocked, and this Task is bound to the TSO that
    // blocked, we are in limbo - the TSO is now owned by whatever it
    // is blocked on, and may in fact already have been woken up,
    // perhaps even on a different Capability.  It may be the case
    // that task->cap != cap.  We better yield this Capability
    // immediately and return to normaility.
650
    if (ret == ThreadBlocked) {
Simon Marlow's avatar
Simon Marlow committed
651
	debugTrace(DEBUG_sched,
652 653
		   "--<< thread %lu (%s) stopped: blocked",
		   (unsigned long)t->id, whatNext_strs[t->what_next]);
654 655
	continue;
    }
656 657
#endif

658
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
659
    ASSERT(t->cap == cap);
660

661
    // ----------------------------------------------------------------------
662
    
663
    // Costs for the scheduler are assigned to CCS_SYSTEM
664 665 666 667 668 669 670
#if defined(PROFILING)
    stopHeapProfTimer();
    CCCS = CCS_SYSTEM;
#endif
    
    schedulePostRunThread();

671 672
    ready_to_gc = rtsFalse;

673 674 675 676 677 678
    switch (ret) {
    case HeapOverflow:
	ready_to_gc = scheduleHandleHeapOverflow(cap,t);
	break;

    case StackOverflow:
679
	scheduleHandleStackOverflow(cap,task,t);
680 681 682
	break;

    case ThreadYielding:
683
	if (scheduleHandleYield(cap, t, prev_what_next)) {
684 685 686 687 688 689 690 691 692 693
            // shortcut for switching between compiler/interpreter:
	    goto run_thread; 
	}
	break;

    case ThreadBlocked:
	scheduleHandleThreadBlocked(t);
	break;

    case ThreadFinished:
694
	if (scheduleHandleThreadFinished(cap, task, t)) return cap;
695
	ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
696 697 698 699 700 701
	break;

    default:
      barf("schedule: invalid thread return code %d", (int)ret);
    }

702
    if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
703
    if (ready_to_gc) {
704
      cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
705
    }
706 707
  } /* end of while() */

Simon Marlow's avatar
Simon Marlow committed
708 709
  debugTrace(PAR_DEBUG_verbose,
	     "== Leaving schedule() after having received Finish");
710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725
}

/* ----------------------------------------------------------------------------
 * Setting up the scheduler loop
 * ------------------------------------------------------------------------- */

static void
schedulePreLoop(void)
{
#if defined(GRAN) 
    /* set up first event to get things going */
    /* ToDo: assign costs for system setup and init MainTSO ! */
    new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
	      ContinueThread, 
	      CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
    
Simon Marlow's avatar
Simon Marlow committed
726 727 728 729
    debugTrace (DEBUG_gran,
		"GRAN: Init CurrentTSO (in schedule) = %p", 
		CurrentTSO);
    IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
730 731 732 733 734 735 736 737
    
    if (RtsFlags.GranFlags.Light) {
	/* Save current time; GranSim Light only */
	CurrentTSO->gran.clock = CurrentTime[CurrentProc];
    }      
#endif
}

738 739 740 741 742 743
/* -----------------------------------------------------------------------------
 * schedulePushWork()
 *
 * Push work to other Capabilities if we have some.
 * -------------------------------------------------------------------------- */

744
#if defined(THREADED_RTS)
745
static void
746 747
schedulePushWork(Capability *cap USED_IF_THREADS, 
		 Task *task      USED_IF_THREADS)
748 749 750 751
{
    Capability *free_caps[n_capabilities], *cap0;
    nat i, n_free_caps;

752 753 754
    // migration can be turned off with +RTS -qg
    if (!RtsFlags.ParFlags.migrate) return;

755 756 757 758
    // Check whether we have more threads on our run queue, or sparks
    // in our pool, that we could hand to another Capability.
    if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
	&& sparkPoolSizeCap(cap) < 2) {
759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788
	return;
    }

    // First grab as many free Capabilities as we can.
    for (i=0, n_free_caps=0; i < n_capabilities; i++) {
	cap0 = &capabilities[i];
	if (cap != cap0 && tryGrabCapability(cap0,task)) {
	    if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
		// it already has some work, we just grabbed it at 
		// the wrong moment.  Or maybe it's deadlocked!
		releaseCapability(cap0);
	    } else {
		free_caps[n_free_caps++] = cap0;
	    }
	}
    }

    // we now have n_free_caps free capabilities stashed in
    // free_caps[].  Share our run queue equally with them.  This is
    // probably the simplest thing we could do; improvements we might
    // want to do include:
    //
    //   - giving high priority to moving relatively new threads, on 
    //     the gournds that they haven't had time to build up a
    //     working set in the cache on this CPU/Capability.
    //
    //   - giving low priority to moving long-lived threads

    if (n_free_caps > 0) {
	StgTSO *prev, *t, *next;
789 790
	rtsBool pushed_to_all;

Simon Marlow's avatar
Simon Marlow committed
791
	debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
792 793

	i = 0;
794 795 796 797 798 799 800 801 802 803
	pushed_to_all = rtsFalse;

	if (cap->run_queue_hd != END_TSO_QUEUE) {
	    prev = cap->run_queue_hd;
	    t = prev->link;
	    prev->link = END_TSO_QUEUE;
	    for (; t != END_TSO_QUEUE; t = next) {
		next = t->link;
		t->link = END_TSO_QUEUE;
		if (t->what_next == ThreadRelocated
804 805
		    || t->bound == task // don't move my bound thread
		    || tsoLocked(t)) {  // don't move a locked thread
806 807 808 809 810 811 812 813 814
		    prev->link = t;
		    prev = t;
		} else if (i == n_free_caps) {
		    pushed_to_all = rtsTrue;
		    i = 0;
		    // keep one for us
		    prev->link = t;
		    prev = t;
		} else {
815
		    debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
816 817
		    appendToRunQueue(free_caps[i],t);
		    if (t->bound) { t->bound->cap = free_caps[i]; }
818
		    t->cap = free_caps[i];
819 820 821 822 823 824 825 826 827 828 829 830 831 832 833
		    i++;
		}
	    }
	    cap->run_queue_tl = prev;
	}

	// If there are some free capabilities that we didn't push any
	// threads to, then try to push a spark to each one.
	if (!pushed_to_all) {
	    StgClosure *spark;
	    // i is the next free capability to push to
	    for (; i < n_free_caps; i++) {
		if (emptySparkPoolCap(free_caps[i])) {
		    spark = findSpark(cap);
		    if (spark != NULL) {
Simon Marlow's avatar
Simon Marlow committed
834
			debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
835 836 837
			newSpark(&(free_caps[i]->r), spark);
		    }
		}
838 839 840 841 842 843 844 845 846 847 848
	    }
	}

	// release the capabilities
	for (i = 0; i < n_free_caps; i++) {
	    task->cap = free_caps[i];
	    releaseCapability(free_caps[i]);
	}
    }
    task->cap = cap; // reset to point to our Capability.
}
849
#endif
850

851 852 853 854
/* ----------------------------------------------------------------------------
 * Start any pending signal handlers
 * ------------------------------------------------------------------------- */

855
#if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
856
static void
857
scheduleStartSignalHandlers(Capability *cap)
858
{
859
    if (signals_pending()) { // safe outside the lock
860
	startSignalHandlers(cap);
861 862
    }
}
863 864 865 866 867 868
#else
static void
scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
{
}
#endif
869 870 871 872 873 874

/* ----------------------------------------------------------------------------
 * Check for blocked threads that can be woken up.
 * ------------------------------------------------------------------------- */

static void
875
scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
876
{
877
#if !defined(THREADED_RTS)
878 879 880 881 882
    //
    // Check whether any waiting threads need to be woken up.  If the
    // run queue is empty, and there are no other tasks running, we
    // can wait indefinitely for something to happen.
    //
883
    if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
884
    {
885
	awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
886
    }
887
#endif
888 889 890
}


891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915
/* ----------------------------------------------------------------------------
 * Check for threads woken up by other Capabilities
 * ------------------------------------------------------------------------- */

static void
scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
{
#if defined(THREADED_RTS)
    // Any threads that were woken up by other Capabilities get
    // appended to our run queue.
    if (!emptyWakeupQueue(cap)) {
	ACQUIRE_LOCK(&cap->lock);
	if (emptyRunQueue(cap)) {
	    cap->run_queue_hd = cap->wakeup_queue_hd;
	    cap->run_queue_tl = cap->wakeup_queue_tl;
	} else {
	    cap->run_queue_tl->link = cap->wakeup_queue_hd;
	    cap->run_queue_tl = cap->wakeup_queue_tl;
	}
	cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
	RELEASE_LOCK(&cap->lock);
    }
#endif
}

916 917 918 919
/* ----------------------------------------------------------------------------
 * Check for threads blocked on BLACKHOLEs that can be woken up
 * ------------------------------------------------------------------------- */
static void
920
scheduleCheckBlackHoles (Capability *cap)
921
{
922
    if ( blackholes_need_checking ) // check without the lock first
923
    {
924 925 926 927 928 929
	ACQUIRE_LOCK(&sched_mutex);
	if ( blackholes_need_checking ) {
	    checkBlackHoles(cap);
	    blackholes_need_checking = rtsFalse;
	}
	RELEASE_LOCK(&sched_mutex);
930 931 932 933 934 935 936 937
    }
}

/* ----------------------------------------------------------------------------
 * Detect deadlock conditions and attempt to resolve them.
 * ------------------------------------------------------------------------- */

static void
938
scheduleDetectDeadlock (Capability *cap, Task *task)
939
{
940 941

#if defined(PARALLEL_HASKELL)
942
    // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
943 944 945
    return;
#endif

946 947
    /* 
     * Detect deadlock: when we have no threads to run, there are no
948 949 950
     * threads blocked, waiting for I/O, or sleeping, and all the
     * other tasks are waiting for work, we must have a deadlock of
     * some description.
951
     */
952
    if ( emptyThreadQueues(cap) )
953
    {
954
#if defined(THREADED_RTS)
955 956 957 958 959 960 961 962 963
	/* 
	 * In the threaded RTS, we only check for deadlock if there
	 * has been no activity in a complete timeslice.  This means
	 * we won't eagerly start a full GC just because we don't have
	 * any threads to run currently.
	 */
	if (recent_activity != ACTIVITY_INACTIVE) return;
#endif

Simon Marlow's avatar
Simon Marlow committed
964
	debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
965 966 967 968 969 970

	// Garbage collection can release some new threads due to
	// either (a) finalizers or (b) threads resurrected because
	// they are unreachable and will therefore be sent an
	// exception.  Any threads thus released will be immediately
	// runnable.
971
	cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/, GetRoots);
972

973
	recent_activity = ACTIVITY_DONE_GC;
974 975
	
	if ( !emptyRunQueue(cap) ) return;
976

977
#if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
978 979 980 981 982
	/* If we have user-installed signal handlers, then wait
	 * for signals to arrive rather then bombing out with a
	 * deadlock.
	 */
	if ( anyUserHandlers() ) {
Simon Marlow's avatar
Simon Marlow committed
983 984
	    debugTrace(DEBUG_sched,
		       "still deadlocked, waiting for signals...");
985 986 987 988

	    awaitUserSignals();

	    if (signals_pending()) {
989
		startSignalHandlers(cap);
990 991 992
	    }

	    // either we have threads to run, or we were interrupted:
993
	    ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
994 995 996
	}
#endif

997
#if !defined(THREADED_RTS)
998
	/* Probably a real deadlock.  Send the current main thread the
999
	 * Deadlock exception.
1000
	 */
1001 1002
	if (task->tso) {
	    switch (task->tso->why_blocked) {
1003
	    case BlockedOnSTM:
1004 1005 1006
	    case BlockedOnBlackHole:
	    case BlockedOnException:
	    case BlockedOnMVar:
1007 1008