Schedule.c 80 KB
Newer Older
1
/* ---------------------------------------------------------------------------
2
 *
3
 * (c) The GHC Team, 1998-2006
4
 *
5
 * The scheduler and thread-related functionality
sof's avatar
sof committed
6
 *
7 8
 * --------------------------------------------------------------------------*/

9
#include "PosixSource.h"
10
#define KEEP_LOCKCLOSURE
11 12 13 14
#include "Rts.h"
#include "SchedAPI.h"
#include "RtsUtils.h"
#include "RtsFlags.h"
15
#include "OSThreads.h"
16 17 18 19 20
#include "Storage.h"
#include "StgRun.h"
#include "Hooks.h"
#include "Schedule.h"
#include "StgMiscClosures.h"
21
#include "Interpreter.h"
22
#include "Printer.h"
23
#include "RtsSignals.h"
24
#include "Sanity.h"
25
#include "Stats.h"
26
#include "STM.h"
sof's avatar
sof committed
27
#include "Timer.h"
28
#include "Prelude.h"
29
#include "ThreadLabels.h"
30 31
#include "LdvProfile.h"
#include "Updates.h"
32 33
#include "Proftimer.h"
#include "ProfHeap.h"
34 35 36

/* PARALLEL_HASKELL includes go here */

37
#include "Sparks.h"
sof's avatar
sof committed
38
#include "Capability.h"
39 40
#include "Task.h"
#include "AwaitEvent.h"
41 42 43
#if defined(mingw32_HOST_OS)
#include "win32/IOManager.h"
#endif
Simon Marlow's avatar
Simon Marlow committed
44
#include "Trace.h"
45 46
#include "RaiseAsync.h"
#include "Threads.h"
47
#include "ThrIOManager.h"
48

49 50 51 52 53 54 55
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif

56 57
#include <string.h>
#include <stdlib.h>
58
#include <stdarg.h>
59

60 61 62 63
#ifdef HAVE_ERRNO_H
#include <errno.h>
#endif

64 65 66 67 68 69
// Turn off inlining when debugging - it obfuscates things
#ifdef DEBUG
# undef  STATIC_INLINE
# define STATIC_INLINE static
#endif

70 71 72
/* -----------------------------------------------------------------------------
 * Global variables
 * -------------------------------------------------------------------------- */
73

74 75
#if !defined(THREADED_RTS)
// Blocked/sleeping thrads
76 77
StgTSO *blocked_queue_hd = NULL;
StgTSO *blocked_queue_tl = NULL;
78 79
StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
#endif
80

81 82 83 84
/* Threads blocked on blackholes.
 * LOCK: sched_mutex+capability, or all capabilities
 */
StgTSO *blackhole_queue = NULL;
85

86 87
/* The blackhole_queue should be checked for threads to wake up.  See
 * Schedule.h for more thorough comment.
88
 * LOCK: none (doesn't matter if we miss an update)
89 90 91
 */
rtsBool blackholes_need_checking = rtsFalse;

92 93 94
/* flag that tracks whether we have done any execution in this time slice.
 * LOCK: currently none, perhaps we should lock (but needs to be
 * updated in the fast path of the scheduler).
95 96
 *
 * NB. must be StgWord, we do xchg() on it.
97
 */
98
volatile StgWord recent_activity = ACTIVITY_YES;
99

100
/* if this flag is set as well, give up execution
101
 * LOCK: none (changes monotonically)
102
 */
103
volatile StgWord sched_state = SCHED_RUNNING;
104

105 106 107 108 109 110
/*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
 *  exists - earlier gccs apparently didn't.
 *  -= chak
 */
StgTSO dummy_tso;

sof's avatar
sof committed
111 112 113 114 115
/*
 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
 * in an MT setting, needed to signal that a worker thread shouldn't hang around
 * in the scheduler when it is out of work.
 */
116
rtsBool shutting_down_scheduler = rtsFalse;
117

118 119
/*
 * This mutex protects most of the global scheduler data in
120
 * the THREADED_RTS runtime.
sof's avatar
sof committed
121
 */
122
#if defined(THREADED_RTS)
123
Mutex sched_mutex;
124
#endif
sof's avatar
sof committed
125

126 127 128 129
#if !defined(mingw32_HOST_OS)
#define FORKPROCESS_PRIMOP_SUPPORTED
#endif

130 131 132 133
/* -----------------------------------------------------------------------------
 * static function prototypes
 * -------------------------------------------------------------------------- */

134
static Capability *schedule (Capability *initialCapability, Task *task);
135 136 137 138 139 140

//
// These function all encapsulate parts of the scheduler loop, and are
// abstracted only to make the structure and control flow of the
// scheduler clearer.
//
141
static void schedulePreLoop (void);
142 143 144 145
static void scheduleFindWork (Capability *cap);
#if defined(THREADED_RTS)
static void scheduleYield (Capability **pcap, Task *task);
#endif
146
static void scheduleStartSignalHandlers (Capability *cap);
147
static void scheduleCheckBlockedThreads (Capability *cap);
148
static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
149 150
static void scheduleCheckBlackHoles (Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
151 152
static void schedulePushWork(Capability *cap, Task *task);
#if defined(PARALLEL_HASKELL)
153
static rtsBool scheduleGetRemoteWork(Capability *cap);
154
static void scheduleSendPendingMessages(void);
155
#endif
156
#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
157
static void scheduleActivateSpark(Capability *cap);
158
#endif
159
static void schedulePostRunThread(Capability *cap, StgTSO *t);
160
static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
161 162 163 164
static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
					 StgTSO *t);
static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
				    nat prev_what_next );
165
static void scheduleHandleThreadBlocked( StgTSO *t );
166 167
static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
					     StgTSO *t );
168
static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
169
static Capability *scheduleDoGC(Capability *cap, Task *task,
170
				rtsBool force_major);
171 172

static rtsBool checkBlackHoles(Capability *cap);
173

174
static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
175
static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
176

177
static void deleteThread (Capability *cap, StgTSO *tso);
178
static void deleteAllThreads (Capability *cap);
179

180 181
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
static void deleteThread_(Capability *cap, StgTSO *tso);
182
#endif
183

184 185 186 187 188 189 190 191 192
#ifdef DEBUG
static char *whatNext_strs[] = {
  "(unknown)",
  "ThreadRunGHC",
  "ThreadInterpret",
  "ThreadKilled",
  "ThreadRelocated",
  "ThreadComplete"
};
193
#endif
sof's avatar
sof committed
194

195 196 197 198 199
/* -----------------------------------------------------------------------------
 * Putting a thread on the run queue: different scheduling policies
 * -------------------------------------------------------------------------- */

STATIC_INLINE void
200
addToRunQueue( Capability *cap, StgTSO *t )
201 202 203 204
{
#if defined(PARALLEL_HASKELL)
    if (RtsFlags.ParFlags.doFairScheduling) { 
	// this does round-robin scheduling; good for concurrency
205
	appendToRunQueue(cap,t);
206 207
    } else {
	// this does unfair scheduling; good for parallelism
208
	pushOnRunQueue(cap,t);
209 210 211
    }
#else
    // this does round-robin scheduling; good for concurrency
212
    appendToRunQueue(cap,t);
213 214
#endif
}
215

216
/* ---------------------------------------------------------------------------
217 218 219 220 221 222 223 224 225 226 227
   Main scheduling loop.

   We use round-robin scheduling, each thread returning to the
   scheduler loop when one of these conditions is detected:

      * out of heap space
      * timer expires (thread yields)
      * thread blocks
      * thread ends
      * stack overflow

228 229 230 231 232
   GRAN version:
     In a GranSim setup this loop iterates over the global event queue.
     This revolves around the global event queue, which determines what 
     to do next. Therefore, it's more complicated than either the 
     concurrent or the parallel (GUM) setup.
233
  This version has been entirely removed (JB 2008/08).
234 235 236 237 238 239 240 241 242 243

   GUM version:
     GUM iterates over incoming messages.
     It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
     and sends out a fish whenever it has nothing to do; in-between
     doing the actual reductions (shared code below) it processes the
     incoming messages and deals with delayed operations 
     (see PendingFetches).
     This is not the ugliest code you could imagine, but it's bloody close.

244 245 246 247 248 249
  (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
  now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
  as well as future GUM versions. This file has been refurbished to
  only contain valid code, which is however incomplete, refers to
  invalid includes etc.

250
   ------------------------------------------------------------------------ */
251

252 253
static Capability *
schedule (Capability *initialCapability, Task *task)
254 255
{
  StgTSO *t;
256
  Capability *cap;
257
  StgThreadReturnCode ret;
258
#if defined(PARALLEL_HASKELL)
259
  rtsBool receivedFinish = rtsFalse;
260
#endif
261
  nat prev_what_next;
262
  rtsBool ready_to_gc;
263
#if defined(THREADED_RTS)
264
  rtsBool first = rtsTrue;
265
#endif
266
  
267 268
  cap = initialCapability;

269 270 271
  // Pre-condition: this task owns initialCapability.
  // The sched_mutex is *NOT* held
  // NB. on return, we still hold a capability.
272

Simon Marlow's avatar
Simon Marlow committed
273 274 275
  debugTrace (DEBUG_sched, 
	      "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
	      task, initialCapability);
276

277
  schedulePreLoop();
278

279 280
  // -----------------------------------------------------------
  // Scheduler loop starts here:
281

282 283 284 285 286
#if defined(PARALLEL_HASKELL)
#define TERMINATION_CONDITION        (!receivedFinish)
#else
#define TERMINATION_CONDITION        rtsTrue
#endif
287

288
  while (TERMINATION_CONDITION) {
289

290 291 292
    // Check whether we have re-entered the RTS from Haskell without
    // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
    // call).
293
    if (cap->in_haskell) {
294 295
    	  errorBelch("schedule: re-entered unsafely.\n"
    		     "   Perhaps a 'foreign import unsafe' should be 'safe'?");
296
    	  stg_exit(EXIT_FAILURE);
297 298
    }

299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
    // The interruption / shutdown sequence.
    // 
    // In order to cleanly shut down the runtime, we want to:
    //   * make sure that all main threads return to their callers
    //     with the state 'Interrupted'.
    //   * clean up all OS threads assocated with the runtime
    //   * free all memory etc.
    //
    // So the sequence for ^C goes like this:
    //
    //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
    //     arranges for some Capability to wake up
    //
    //   * all threads in the system are halted, and the zombies are
    //     placed on the run queue for cleaning up.  We acquire all
    //     the capabilities in order to delete the threads, this is
    //     done by scheduleDoGC() for convenience (because GC already
    //     needs to acquire all the capabilities).  We can't kill
    //     threads involved in foreign calls.
    // 
    //   * somebody calls shutdownHaskell(), which calls exitScheduler()
    //
    //   * sched_state := SCHED_SHUTTING_DOWN
322
    //
323 324 325
    //   * all workers exit when the run queue on their capability
    //     drains.  All main threads will also exit when their TSO
    //     reaches the head of the run queue and they can return.
326
    //
327 328 329 330 331 332 333 334 335 336
    //   * eventually all Capabilities will shut down, and the RTS can
    //     exit.
    //
    //   * We might be left with threads blocked in foreign calls, 
    //     we should really attempt to kill these somehow (TODO);
    
    switch (sched_state) {
    case SCHED_RUNNING:
	break;
    case SCHED_INTERRUPTING:
Simon Marlow's avatar
Simon Marlow committed
337
	debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
338
#if defined(THREADED_RTS)
339 340
	discardSparksCap(cap);
#endif
341
	/* scheduleDoGC() deletes all the threads */
342
	cap = scheduleDoGC(cap,task,rtsFalse);
343 344 345 346 347 348 349 350

        // after scheduleDoGC(), we must be shutting down.  Either some
        // other Capability did the final GC, or we did it above,
        // either way we can fall through to the SCHED_SHUTTING_DOWN
        // case now.
        ASSERT(sched_state == SCHED_SHUTTING_DOWN);
        // fall through

351
    case SCHED_SHUTTING_DOWN:
Simon Marlow's avatar
Simon Marlow committed
352
	debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
353 354 355 356 357
	// If we are a worker, just exit.  If we're a bound thread
	// then we will exit below when we've removed our TSO from
	// the run queue.
	if (task->tso == NULL && emptyRunQueue(cap)) {
	    return cap;
358
	}
359 360 361
	break;
    default:
	barf("sched_state: %d", sched_state);
362
    }
363

364
    scheduleFindWork(cap);
365

366 367 368
    /* work pushing, currently relevant only for THREADED_RTS:
       (pushes threads, wakes up idle capabilities for stealing) */
    schedulePushWork(cap,task);
369

370
#if defined(PARALLEL_HASKELL)
371 372 373 374 375 376 377 378 379 380 381 382
    /* since we perform a blocking receive and continue otherwise,
       either we never reach here or we definitely have work! */
    // from here: non-empty run queue
    ASSERT(!emptyRunQueue(cap));

    if (PacketsWaiting()) {  /* now process incoming messages, if any
				pending...  

				CAUTION: scheduleGetRemoteWork called
				above, waits for messages as well! */
      processMessages(cap, &receivedFinish);
    }
383 384
#endif // PARALLEL_HASKELL: non-empty run queue!

385
    scheduleDetectDeadlock(cap,task);
386

387 388 389
#if defined(THREADED_RTS)
    cap = task->cap;    // reload cap, it might have changed
#endif
390 391 392 393 394 395

    // Normally, the only way we can get here with no threads to
    // run is if a keyboard interrupt received during 
    // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
    // Additionally, it is not fatal for the
    // threaded RTS to reach here with no threads to run.
396
    //
397 398
    // win32: might be here due to awaitEvent() being abandoned
    // as a result of a console event having been delivered.
399 400 401 402 403 404 405 406 407 408 409 410
    
#if defined(THREADED_RTS)
    if (first) 
    {
    // XXX: ToDo
    //     // don't yield the first time, we want a chance to run this
    //     // thread for a bit, even if there are others banging at the
    //     // door.
    //     first = rtsFalse;
    //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
    }

Simon Marlow's avatar
Simon Marlow committed
411
  yield:
412 413 414 415
    scheduleYield(&cap,task);
    if (emptyRunQueue(cap)) continue; // look for work again
#endif

416
#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
417
    if ( emptyRunQueue(cap) ) {
418
	ASSERT(sched_state >= SCHED_INTERRUPTING);
419
    }
420
#endif
421

422 423 424
    // 
    // Get a thread to run
    //
425
    t = popRunQueue(cap);
426

427 428 429
    // Sanity check the thread we're about to run.  This can be
    // expensive if there is lots of thread switching going on...
    IF_DEBUG(sanity,checkTSO(t));
430

431
#if defined(THREADED_RTS)
432 433 434
    // Check whether we can run this thread in the current task.
    // If not, we have to pass our capability to the right task.
    {
435
	Task *bound = t->bound;
436
      
437 438
	if (bound) {
	    if (bound == task) {
Simon Marlow's avatar
Simon Marlow committed
439
		debugTrace(DEBUG_sched,
440
			   "### Running thread %lu in bound thread", (unsigned long)t->id);
441 442
		// yes, the Haskell thread is bound to the current native thread
	    } else {
Simon Marlow's avatar
Simon Marlow committed
443
		debugTrace(DEBUG_sched,
444
			   "### thread %lu bound to another OS thread", (unsigned long)t->id);
445 446 447 448 449 450 451
		// no, bound to a different Haskell thread: pass to that thread
		pushOnRunQueue(cap,t);
		continue;
	    }
	} else {
	    // The thread we want to run is unbound.
	    if (task->tso) { 
Simon Marlow's avatar
Simon Marlow committed
452
		debugTrace(DEBUG_sched,
453
			   "### this OS thread cannot run thread %lu", (unsigned long)t->id);
454 455 456 457 458
		// no, the current native thread is bound to a different
		// Haskell thread, so pass it to any worker thread
		pushOnRunQueue(cap,t);
		continue; 
	    }
459
	}
460 461 462
    }
#endif

463
    /* context switches are initiated by the timer signal, unless
464 465
     * the user specified "context switch as often as possible", with
     * +RTS -C0
sof's avatar
sof committed
466
     */
467 468
    if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
	&& !emptyThreadQueues(cap)) {
469
	cap->context_switch = 1;
470 471
    }
	 
472
run_thread:
473

Simon Marlow's avatar
Simon Marlow committed
474 475 476 477 478
    // CurrentTSO is the thread to run.  t might be different if we
    // loop back to run_thread, so make sure to set CurrentTSO after
    // that.
    cap->r.rCurrentTSO = t;

Simon Marlow's avatar
Simon Marlow committed
479 480
    debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
			      (long)t->id, whatNext_strs[t->what_next]);
481

482 483
    startHeapProfTimer();

484 485 486
    // Check for exceptions blocked on this thread
    maybePerformBlockedException (cap, t);

487 488 489
    // ----------------------------------------------------------------------
    // Run the current thread 

Simon Marlow's avatar
Simon Marlow committed
490
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
491
    ASSERT(t->cap == cap);
Simon Marlow's avatar
Simon Marlow committed
492
    ASSERT(t->bound ? t->bound->cap == cap : 1);
Simon Marlow's avatar
Simon Marlow committed
493

494 495 496
    prev_what_next = t->what_next;

    errno = t->saved_errno;
497 498 499 500
#if mingw32_HOST_OS
    SetLastError(t->saved_winerror);
#endif

501
    cap->in_haskell = rtsTrue;
502

503
    dirty_TSO(cap,t);
Simon Marlow's avatar
Simon Marlow committed
504

505 506 507 508 509
#if defined(THREADED_RTS)
    if (recent_activity == ACTIVITY_DONE_GC) {
        // ACTIVITY_DONE_GC means we turned off the timer signal to
        // conserve power (see #1623).  Re-enable it here.
        nat prev;
510
        prev = xchg((P_)&recent_activity, ACTIVITY_YES);
511 512 513 514 515 516 517
        if (prev == ACTIVITY_DONE_GC) {
            startTimer();
        }
    } else {
        recent_activity = ACTIVITY_YES;
    }
#endif
518

519
    switch (prev_what_next) {
520
	
521 522 523 524 525
    case ThreadKilled:
    case ThreadComplete:
	/* Thread already finished, return to scheduler. */
	ret = ThreadFinished;
	break;
526
	
527
    case ThreadRunGHC:
528 529 530 531 532
    {
	StgRegTable *r;
	r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
	cap = regTableToCapability(r);
	ret = r->rRet;
533
	break;
534 535
    }
    
536
    case ThreadInterpret:
537 538
	cap = interpretBCO(cap);
	ret = cap->r.rRet;
539
	break;
540
	
541
    default:
542
	barf("schedule: invalid what_next field");
543 544
    }

545
    cap->in_haskell = rtsFalse;
546

547 548 549 550
    // The TSO might have moved, eg. if it re-entered the RTS and a GC
    // happened.  So find the new location:
    t = cap->r.rCurrentTSO;

551 552 553 554 555 556 557 558 559 560 561
    // We have run some Haskell code: there might be blackhole-blocked
    // threads to wake up now.
    // Lock-free test here should be ok, we're just setting a flag.
    if ( blackhole_queue != END_TSO_QUEUE ) {
	blackholes_need_checking = rtsTrue;
    }
    
    // And save the current errno in this thread.
    // XXX: possibly bogus for SMP because this thread might already
    // be running again, see code below.
    t->saved_errno = errno;
562 563
#if mingw32_HOST_OS
    // Similarly for Windows error code
564
    t->saved_winerror = GetLastError();
565
#endif
566

567
#if defined(THREADED_RTS)
568 569 570 571 572 573
    // If ret is ThreadBlocked, and this Task is bound to the TSO that
    // blocked, we are in limbo - the TSO is now owned by whatever it
    // is blocked on, and may in fact already have been woken up,
    // perhaps even on a different Capability.  It may be the case
    // that task->cap != cap.  We better yield this Capability
    // immediately and return to normaility.
574
    if (ret == ThreadBlocked) {
Simon Marlow's avatar
Simon Marlow committed
575
	debugTrace(DEBUG_sched,
576 577
		   "--<< thread %lu (%s) stopped: blocked",
		   (unsigned long)t->id, whatNext_strs[t->what_next]);
Simon Marlow's avatar
Simon Marlow committed
578
        goto yield;
579
    }
580 581
#endif

582
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
583
    ASSERT(t->cap == cap);
584

585
    // ----------------------------------------------------------------------
586
    
587
    // Costs for the scheduler are assigned to CCS_SYSTEM
588
    stopHeapProfTimer();
589
#if defined(PROFILING)
590 591 592
    CCCS = CCS_SYSTEM;
#endif
    
593
    schedulePostRunThread(cap,t);
594

595 596
    t = threadStackUnderflow(task,t);

597 598
    ready_to_gc = rtsFalse;

599 600 601 602 603 604
    switch (ret) {
    case HeapOverflow:
	ready_to_gc = scheduleHandleHeapOverflow(cap,t);
	break;

    case StackOverflow:
605
	scheduleHandleStackOverflow(cap,task,t);
606 607 608
	break;

    case ThreadYielding:
609
	if (scheduleHandleYield(cap, t, prev_what_next)) {
610 611 612 613 614 615 616 617 618 619
            // shortcut for switching between compiler/interpreter:
	    goto run_thread; 
	}
	break;

    case ThreadBlocked:
	scheduleHandleThreadBlocked(t);
	break;

    case ThreadFinished:
620
	if (scheduleHandleThreadFinished(cap, task, t)) return cap;
621
	ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
622 623 624 625 626 627
	break;

    default:
      barf("schedule: invalid thread return code %d", (int)ret);
    }

628
    if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
629
      cap = scheduleDoGC(cap,task,rtsFalse);
630
    }
631 632 633 634 635 636 637 638 639 640
  } /* end of while() */
}

/* ----------------------------------------------------------------------------
 * Setting up the scheduler loop
 * ------------------------------------------------------------------------- */

static void
schedulePreLoop(void)
{
641
  // initialisation for scheduler - what cannot go into initScheduler()  
642 643
}

644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711
/* -----------------------------------------------------------------------------
 * scheduleFindWork()
 *
 * Search for work to do, and handle messages from elsewhere.
 * -------------------------------------------------------------------------- */

static void
scheduleFindWork (Capability *cap)
{
    scheduleStartSignalHandlers(cap);

    // Only check the black holes here if we've nothing else to do.
    // During normal execution, the black hole list only gets checked
    // at GC time, to avoid repeatedly traversing this possibly long
    // list each time around the scheduler.
    if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }

    scheduleCheckWakeupThreads(cap);

    scheduleCheckBlockedThreads(cap);

#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
    if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
#endif

#if defined(PARALLEL_HASKELL)
    // if messages have been buffered...
    scheduleSendPendingMessages();
#endif

#if defined(PARALLEL_HASKELL)
    if (emptyRunQueue(cap)) {
	receivedFinish = scheduleGetRemoteWork(cap);
	continue; //  a new round, (hopefully) with new work
	/* 
	   in GUM, this a) sends out a FISH and returns IF no fish is
	                   out already
			b) (blocking) awaits and receives messages
	   
	   in Eden, this is only the blocking receive, as b) in GUM.
	*/
    }
#endif
}

#if defined(THREADED_RTS)
STATIC_INLINE rtsBool
shouldYieldCapability (Capability *cap, Task *task)
{
    // we need to yield this capability to someone else if..
    //   - another thread is initiating a GC
    //   - another Task is returning from a foreign call
    //   - the thread at the head of the run queue cannot be run
    //     by this Task (it is bound to another Task, or it is unbound
    //     and this task it bound).
    return (waiting_for_gc || 
            cap->returning_tasks_hd != NULL ||
            (!emptyRunQueue(cap) && (task->tso == NULL
                                     ? cap->run_queue_hd->bound != NULL
                                     : cap->run_queue_hd->bound != task)));
}

// This is the single place where a Task goes to sleep.  There are
// two reasons it might need to sleep:
//    - there are no threads to run
//    - we need to yield this Capability to someone else 
//      (see shouldYieldCapability())
//
712 713 714
// Careful: the scheduler loop is quite delicate.  Make sure you run
// the tests in testsuite/concurrent (all ways) after modifying this,
// and also check the benchmarks in nofib/parallel for regressions.
715 716 717 718 719 720 721

static void
scheduleYield (Capability **pcap, Task *task)
{
    Capability *cap = *pcap;

    // if we have work, and we don't need to give up the Capability, continue.
722
    if (!shouldYieldCapability(cap,task) && 
723 724 725
        (!emptyRunQueue(cap) ||
         blackholes_need_checking ||
         sched_state >= SCHED_INTERRUPTING))
726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741
        return;

    // otherwise yield (sleep), and keep yielding if necessary.
    do {
        yieldCapability(&cap,task);
    } 
    while (shouldYieldCapability(cap,task));

    // note there may still be no threads on the run queue at this
    // point, the caller has to check.

    *pcap = cap;
    return;
}
#endif
    
742 743 744 745 746 747 748
/* -----------------------------------------------------------------------------
 * schedulePushWork()
 *
 * Push work to other Capabilities if we have some.
 * -------------------------------------------------------------------------- */

static void
749 750
schedulePushWork(Capability *cap USED_IF_THREADS, 
		 Task *task      USED_IF_THREADS)
751
{
752 753 754 755
  /* following code not for PARALLEL_HASKELL. I kept the call general,
     future GUM versions might use pushing in a distributed setup */
#if defined(THREADED_RTS)

756 757 758
    Capability *free_caps[n_capabilities], *cap0;
    nat i, n_free_caps;

759 760 761
    // migration can be turned off with +RTS -qg
    if (!RtsFlags.ParFlags.migrate) return;

762 763
    // Check whether we have more threads on our run queue, or sparks
    // in our pool, that we could hand to another Capability.
764
    if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
765
	&& sparkPoolSizeCap(cap) < 2) {
766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795
	return;
    }

    // First grab as many free Capabilities as we can.
    for (i=0, n_free_caps=0; i < n_capabilities; i++) {
	cap0 = &capabilities[i];
	if (cap != cap0 && tryGrabCapability(cap0,task)) {
	    if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
		// it already has some work, we just grabbed it at 
		// the wrong moment.  Or maybe it's deadlocked!
		releaseCapability(cap0);
	    } else {
		free_caps[n_free_caps++] = cap0;
	    }
	}
    }

    // we now have n_free_caps free capabilities stashed in
    // free_caps[].  Share our run queue equally with them.  This is
    // probably the simplest thing we could do; improvements we might
    // want to do include:
    //
    //   - giving high priority to moving relatively new threads, on 
    //     the gournds that they haven't had time to build up a
    //     working set in the cache on this CPU/Capability.
    //
    //   - giving low priority to moving long-lived threads

    if (n_free_caps > 0) {
	StgTSO *prev, *t, *next;
796 797
	rtsBool pushed_to_all;

798 799 800 801 802 803
	debugTrace(DEBUG_sched, 
		   "cap %d: %s and %d free capabilities, sharing...", 
		   cap->no, 
		   (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
		   "excess threads on run queue":"sparks to share (>=2)",
		   n_free_caps);
804 805

	i = 0;
806 807 808 809
	pushed_to_all = rtsFalse;

	if (cap->run_queue_hd != END_TSO_QUEUE) {
	    prev = cap->run_queue_hd;
810 811
	    t = prev->_link;
	    prev->_link = END_TSO_QUEUE;
812
	    for (; t != END_TSO_QUEUE; t = next) {
813 814
		next = t->_link;
		t->_link = END_TSO_QUEUE;
815
		if (t->what_next == ThreadRelocated
816 817
		    || t->bound == task // don't move my bound thread
		    || tsoLocked(t)) {  // don't move a locked thread
818
		    setTSOLink(cap, prev, t);
819 820 821 822 823
		    prev = t;
		} else if (i == n_free_caps) {
		    pushed_to_all = rtsTrue;
		    i = 0;
		    // keep one for us
824
		    setTSOLink(cap, prev, t);
825 826
		    prev = t;
		} else {
827
		    debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
828 829
		    appendToRunQueue(free_caps[i],t);
		    if (t->bound) { t->bound->cap = free_caps[i]; }
830
		    t->cap = free_caps[i];
831 832 833 834 835 836
		    i++;
		}
	    }
	    cap->run_queue_tl = prev;
	}

837 838 839
#ifdef SPARK_PUSHING
	/* JB I left this code in place, it would work but is not necessary */

840 841 842 843 844 845 846
	// If there are some free capabilities that we didn't push any
	// threads to, then try to push a spark to each one.
	if (!pushed_to_all) {
	    StgClosure *spark;
	    // i is the next free capability to push to
	    for (; i < n_free_caps; i++) {
		if (emptySparkPoolCap(free_caps[i])) {
847
		    spark = tryStealSpark(cap->sparks);
848
		    if (spark != NULL) {
Simon Marlow's avatar
Simon Marlow committed
849
			debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
850 851 852
			newSpark(&(free_caps[i]->r), spark);
		    }
		}
853 854
	    }
	}
855
#endif /* SPARK_PUSHING */
856 857 858 859

	// release the capabilities
	for (i = 0; i < n_free_caps; i++) {
	    task->cap = free_caps[i];
860
	    releaseAndWakeupCapability(free_caps[i]);
861 862 863
	}
    }
    task->cap = cap; // reset to point to our Capability.
864 865 866

#endif /* THREADED_RTS */

867 868
}

869 870 871 872
/* ----------------------------------------------------------------------------
 * Start any pending signal handlers
 * ------------------------------------------------------------------------- */

873
#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
874
static void
875
scheduleStartSignalHandlers(Capability *cap)
876
{
877 878
    if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
        // safe outside the lock
879
	startSignalHandlers(cap);
880 881
    }
}
882 883 884 885 886 887
#else
static void
scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
{
}
#endif
888 889 890 891 892 893

/* ----------------------------------------------------------------------------
 * Check for blocked threads that can be woken up.
 * ------------------------------------------------------------------------- */

static void
894
scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
895
{
896
#if !defined(THREADED_RTS)
897 898 899 900 901
    //
    // Check whether any waiting threads need to be woken up.  If the
    // run queue is empty, and there are no other tasks running, we
    // can wait indefinitely for something to happen.
    //
902
    if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
903
    {
904
	awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
905
    }
906
#endif
907 908 909
}


910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925
/* ----------------------------------------------------------------------------
 * Check for threads woken up by other Capabilities
 * ------------------------------------------------------------------------- */

static void
scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
{
#if defined(THREADED_RTS)
    // Any threads that were woken up by other Capabilities get
    // appended to our run queue.
    if (!emptyWakeupQueue(cap)) {
	ACQUIRE_LOCK(&cap->lock);
	if (emptyRunQueue(cap)) {
	    cap->run_queue_hd = cap->wakeup_queue_hd;
	    cap->run_queue_tl = cap->wakeup_queue_tl;
	} else {
926
	    setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
927 928 929 930 931 932 933 934
	    cap->run_queue_tl = cap->wakeup_queue_tl;
	}
	cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
	RELEASE_LOCK(&cap->lock);
    }
#endif
}

935 936 937 938
/* ----------------------------------------------------------------------------
 * Check for threads blocked on BLACKHOLEs that can be woken up
 * ------------------------------------------------------------------------- */
static void
939
scheduleCheckBlackHoles (Capability *cap)
940
{
941
    if ( blackholes_need_checking ) // check without the lock first
942
    {
943 944 945
	ACQUIRE_LOCK(&sched_mutex);
	if ( blackholes_need_checking ) {
	    blackholes_need_checking = rtsFalse;
946 947 948 949 950 951
            // important that we reset the flag *before* checking the
            // blackhole queue, otherwise we could get deadlock.  This
            // happens as follows: we wake up a thread that
            // immediately runs on another Capability, blocks on a
            // blackhole, and then we reset the blackholes_need_checking flag.
	    checkBlackHoles(cap);
952 953
	}
	RELEASE_LOCK(&sched_mutex);
954 955 956 957 958 959 960 961
    }
}

/* ----------------------------------------------------------------------------
 * Detect deadlock conditions and attempt to resolve them.
 * ------------------------------------------------------------------------- */

static void
962
scheduleDetectDeadlock (Capability *cap, Task *task)
963
{
964 965

#if defined(PARALLEL_HASKELL)
966
    // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
967 968 969
    return;
#endif

970 971
    /* 
     * Detect deadlock: when we have no threads to run, there are no
972 973 974
     * threads blocked, waiting for I/O, or sleeping, and all the
     * other tasks are waiting for work, we must have a deadlock of
     * some description.
975
     */
976
    if ( emptyThreadQueues(cap) )
977
    {
978
#if defined(THREADED_RTS)
979 980 981 982 983 984 985 986 987
	/* 
	 * In the threaded RTS, we only check for deadlock if there
	 * has been no activity in a complete timeslice.  This means
	 * we won't eagerly start a full GC just because we don't have
	 * any threads to run currently.
	 */
	if (recent_activity != ACTIVITY_INACTIVE) return;
#endif

Simon Marlow's avatar
Simon Marlow committed
988
	debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
989 990 991 992 993 994

	// Garbage collection can release some new threads due to
	// either (a) finalizers or (b) threads resurrected because
	// they are unreachable and will therefore be sent an
	// exception.  Any threads thus released will be immediately
	// runnable.
995
	cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
996

997
	recent_activity = ACTIVITY_DONE_GC;
998 999
        // disable timer signals (see #1623)
        stopTimer();
1000 1001
	
	if ( !emptyRunQueue(cap) ) return;
1002

1003
#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
1004 1005 1006 1007
	/* If we have user-installed signal handlers, then wait
	 * for signals to arrive rather then bombing out with a
	 * deadlock.
	 */
1008
	if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
Simon Marlow's avatar
Simon Marlow committed
1009 1010
	    debugTrace(DEBUG_sched,
		       "still deadlocked, waiting for signals...");
1011 1012 1013 1014

	    awaitUserSignals();

	    if (signals_pending()) {
1015
		startSignalHandlers(cap);
1016 1017 1018
	    }

	    // either we have threads to run, or we were interrupted:
1019
	    ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1020 1021

            return;
1022 1023 1024
	}
#endif

1025
#if !defined(THREADED_RTS)
1026
	/* Probably a real deadlock.  Send the current main thread the
1027
	 * Deadlock exception.
1028
	 */
1029 1030
	if (task->tso) {
	    switch (task->tso->why_blocked) {
1031
	    case BlockedOnSTM:
1032 1033 1034
	    case BlockedOnBlackHole:
	    case BlockedOnException:
	    case BlockedOnMVar:
1035
		throwToSingleThreaded(cap, task->tso, 
1036
				      (StgClosure *)nonTermination_closure);
1037 1038 1039 1040 1041
		return;
	    default:
		barf("deadlock: main thread blocked in a strange way");
	    }
	}
1042
	return;
1043
#endif
1044
    }