Schedule.c 82.5 KB
Newer Older
1
/* ---------------------------------------------------------------------------
2
 *
3
 * (c) The GHC Team, 1998-2006
4
 *
5
 * The scheduler and thread-related functionality
sof's avatar
sof committed
6
 *
7 8
 * --------------------------------------------------------------------------*/

9
#include "PosixSource.h"
10
#define KEEP_LOCKCLOSURE
11
#include "Rts.h"
Simon Marlow's avatar
Simon Marlow committed
12 13

#include "sm/Storage.h"
14 15 16
#include "RtsUtils.h"
#include "StgRun.h"
#include "Schedule.h"
17
#include "Interpreter.h"
18
#include "Printer.h"
19
#include "RtsSignals.h"
20
#include "Sanity.h"
21
#include "Stats.h"
22
#include "STM.h"
23
#include "Prelude.h"
24
#include "ThreadLabels.h"
25
#include "Updates.h"
26 27
#include "Proftimer.h"
#include "ProfHeap.h"
28
#include "Weak.h"
Simon Marlow's avatar
Simon Marlow committed
29
#include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30
#include "Sparks.h"
sof's avatar
sof committed
31
#include "Capability.h"
32 33
#include "Task.h"
#include "AwaitEvent.h"
34 35 36
#if defined(mingw32_HOST_OS)
#include "win32/IOManager.h"
#endif
Simon Marlow's avatar
Simon Marlow committed
37
#include "Trace.h"
38 39
#include "RaiseAsync.h"
#include "Threads.h"
Simon Marlow's avatar
Simon Marlow committed
40 41
#include "Timer.h"
#include "ThreadPaused.h"
42

43 44 45 46 47 48 49
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif

50 51
#include <string.h>
#include <stdlib.h>
52
#include <stdarg.h>
53

54 55 56 57
#ifdef HAVE_ERRNO_H
#include <errno.h>
#endif

58 59 60
/* -----------------------------------------------------------------------------
 * Global variables
 * -------------------------------------------------------------------------- */
61

62 63
#if !defined(THREADED_RTS)
// Blocked/sleeping thrads
64 65
StgTSO *blocked_queue_hd = NULL;
StgTSO *blocked_queue_tl = NULL;
66 67
StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
#endif
68

69 70 71 72
/* Threads blocked on blackholes.
 * LOCK: sched_mutex+capability, or all capabilities
 */
StgTSO *blackhole_queue = NULL;
73

74 75
/* The blackhole_queue should be checked for threads to wake up.  See
 * Schedule.h for more thorough comment.
76
 * LOCK: none (doesn't matter if we miss an update)
77 78 79
 */
rtsBool blackholes_need_checking = rtsFalse;

80 81 82 83 84 85
/* Set to true when the latest garbage collection failed to reclaim
 * enough space, and the runtime should proceed to shut itself down in
 * an orderly fashion (emitting profiling info etc.)
 */
rtsBool heap_overflow = rtsFalse;

86 87 88
/* flag that tracks whether we have done any execution in this time slice.
 * LOCK: currently none, perhaps we should lock (but needs to be
 * updated in the fast path of the scheduler).
89 90
 *
 * NB. must be StgWord, we do xchg() on it.
91
 */
92
volatile StgWord recent_activity = ACTIVITY_YES;
93

94
/* if this flag is set as well, give up execution
95
 * LOCK: none (changes monotonically)
96
 */
97
volatile StgWord sched_state = SCHED_RUNNING;
98

99 100 101 102 103 104
/*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
 *  exists - earlier gccs apparently didn't.
 *  -= chak
 */
StgTSO dummy_tso;

sof's avatar
sof committed
105 106 107 108 109
/*
 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
 * in an MT setting, needed to signal that a worker thread shouldn't hang around
 * in the scheduler when it is out of work.
 */
110
rtsBool shutting_down_scheduler = rtsFalse;
111

112 113
/*
 * This mutex protects most of the global scheduler data in
114
 * the THREADED_RTS runtime.
sof's avatar
sof committed
115
 */
116
#if defined(THREADED_RTS)
117
Mutex sched_mutex;
118
#endif
sof's avatar
sof committed
119

120 121 122 123
#if !defined(mingw32_HOST_OS)
#define FORKPROCESS_PRIMOP_SUPPORTED
#endif

124 125 126 127
/* -----------------------------------------------------------------------------
 * static function prototypes
 * -------------------------------------------------------------------------- */

128
static Capability *schedule (Capability *initialCapability, Task *task);
129 130 131 132 133 134

//
// These function all encapsulate parts of the scheduler loop, and are
// abstracted only to make the structure and control flow of the
// scheduler clearer.
//
135
static void schedulePreLoop (void);
136 137
static void scheduleFindWork (Capability *cap);
#if defined(THREADED_RTS)
138
static void scheduleYield (Capability **pcap, Task *task, rtsBool);
139
#endif
140
static void scheduleStartSignalHandlers (Capability *cap);
141
static void scheduleCheckBlockedThreads (Capability *cap);
142
static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
143 144
static void scheduleCheckBlackHoles (Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
145
static void schedulePushWork(Capability *cap, Task *task);
Simon Marlow's avatar
Simon Marlow committed
146
#if defined(THREADED_RTS)
147
static void scheduleActivateSpark(Capability *cap);
148
#endif
149
static void schedulePostRunThread(Capability *cap, StgTSO *t);
150
static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
151 152 153 154
static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
					 StgTSO *t);
static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
				    nat prev_what_next );
155
static void scheduleHandleThreadBlocked( StgTSO *t );
156 157
static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
					     StgTSO *t );
158
static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
159
static Capability *scheduleDoGC(Capability *cap, Task *task,
160
				rtsBool force_major);
161 162

static rtsBool checkBlackHoles(Capability *cap);
163

164
static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
165
static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
166

167
static void deleteThread (Capability *cap, StgTSO *tso);
168
static void deleteAllThreads (Capability *cap);
169

170 171
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
static void deleteThread_(Capability *cap, StgTSO *tso);
172
#endif
173 174 175 176 177 178

/* -----------------------------------------------------------------------------
 * Putting a thread on the run queue: different scheduling policies
 * -------------------------------------------------------------------------- */

STATIC_INLINE void
179
addToRunQueue( Capability *cap, StgTSO *t )
180 181
{
    // this does round-robin scheduling; good for concurrency
182
    appendToRunQueue(cap,t);
183
}
184

185
/* ---------------------------------------------------------------------------
186 187 188 189 190 191 192 193 194 195 196
   Main scheduling loop.

   We use round-robin scheduling, each thread returning to the
   scheduler loop when one of these conditions is detected:

      * out of heap space
      * timer expires (thread yields)
      * thread blocks
      * thread ends
      * stack overflow

197 198 199 200 201
   GRAN version:
     In a GranSim setup this loop iterates over the global event queue.
     This revolves around the global event queue, which determines what 
     to do next. Therefore, it's more complicated than either the 
     concurrent or the parallel (GUM) setup.
202
  This version has been entirely removed (JB 2008/08).
203 204 205 206 207 208 209 210 211 212

   GUM version:
     GUM iterates over incoming messages.
     It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
     and sends out a fish whenever it has nothing to do; in-between
     doing the actual reductions (shared code below) it processes the
     incoming messages and deals with delayed operations 
     (see PendingFetches).
     This is not the ugliest code you could imagine, but it's bloody close.

213 214 215 216 217 218
  (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
  now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
  as well as future GUM versions. This file has been refurbished to
  only contain valid code, which is however incomplete, refers to
  invalid includes etc.

219
   ------------------------------------------------------------------------ */
220

221 222
static Capability *
schedule (Capability *initialCapability, Task *task)
223 224
{
  StgTSO *t;
225
  Capability *cap;
226
  StgThreadReturnCode ret;
227
  nat prev_what_next;
228
  rtsBool ready_to_gc;
229
#if defined(THREADED_RTS)
230
  rtsBool first = rtsTrue;
231
  rtsBool force_yield = rtsFalse;
Simon Marlow's avatar
Simon Marlow committed
232
#endif
233
  
234 235
  cap = initialCapability;

236 237 238
  // Pre-condition: this task owns initialCapability.
  // The sched_mutex is *NOT* held
  // NB. on return, we still hold a capability.
239

240
  debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
241

242
  schedulePreLoop();
243

244 245
  // -----------------------------------------------------------
  // Scheduler loop starts here:
246

Simon Marlow's avatar
Simon Marlow committed
247
  while (1) {
248

249 250 251
    // Check whether we have re-entered the RTS from Haskell without
    // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
    // call).
252
    if (cap->in_haskell) {
253 254
    	  errorBelch("schedule: re-entered unsafely.\n"
    		     "   Perhaps a 'foreign import unsafe' should be 'safe'?");
255
    	  stg_exit(EXIT_FAILURE);
256 257
    }

258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
    // The interruption / shutdown sequence.
    // 
    // In order to cleanly shut down the runtime, we want to:
    //   * make sure that all main threads return to their callers
    //     with the state 'Interrupted'.
    //   * clean up all OS threads assocated with the runtime
    //   * free all memory etc.
    //
    // So the sequence for ^C goes like this:
    //
    //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
    //     arranges for some Capability to wake up
    //
    //   * all threads in the system are halted, and the zombies are
    //     placed on the run queue for cleaning up.  We acquire all
    //     the capabilities in order to delete the threads, this is
    //     done by scheduleDoGC() for convenience (because GC already
    //     needs to acquire all the capabilities).  We can't kill
    //     threads involved in foreign calls.
    // 
    //   * somebody calls shutdownHaskell(), which calls exitScheduler()
    //
    //   * sched_state := SCHED_SHUTTING_DOWN
281
    //
282 283 284
    //   * all workers exit when the run queue on their capability
    //     drains.  All main threads will also exit when their TSO
    //     reaches the head of the run queue and they can return.
285
    //
286 287 288 289 290 291 292 293 294 295
    //   * eventually all Capabilities will shut down, and the RTS can
    //     exit.
    //
    //   * We might be left with threads blocked in foreign calls, 
    //     we should really attempt to kill these somehow (TODO);
    
    switch (sched_state) {
    case SCHED_RUNNING:
	break;
    case SCHED_INTERRUPTING:
Simon Marlow's avatar
Simon Marlow committed
296
	debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
297
#if defined(THREADED_RTS)
298 299
	discardSparksCap(cap);
#endif
300
	/* scheduleDoGC() deletes all the threads */
301
	cap = scheduleDoGC(cap,task,rtsFalse);
302 303 304 305 306 307 308 309

        // after scheduleDoGC(), we must be shutting down.  Either some
        // other Capability did the final GC, or we did it above,
        // either way we can fall through to the SCHED_SHUTTING_DOWN
        // case now.
        ASSERT(sched_state == SCHED_SHUTTING_DOWN);
        // fall through

310
    case SCHED_SHUTTING_DOWN:
Simon Marlow's avatar
Simon Marlow committed
311
	debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
312 313 314 315 316
	// If we are a worker, just exit.  If we're a bound thread
	// then we will exit below when we've removed our TSO from
	// the run queue.
	if (task->tso == NULL && emptyRunQueue(cap)) {
	    return cap;
317
	}
318 319 320
	break;
    default:
	barf("sched_state: %d", sched_state);
321
    }
322

323
    scheduleFindWork(cap);
324

325 326 327
    /* work pushing, currently relevant only for THREADED_RTS:
       (pushes threads, wakes up idle capabilities for stealing) */
    schedulePushWork(cap,task);
328

329
    scheduleDetectDeadlock(cap,task);
330

331 332 333
#if defined(THREADED_RTS)
    cap = task->cap;    // reload cap, it might have changed
#endif
334 335 336 337 338 339

    // Normally, the only way we can get here with no threads to
    // run is if a keyboard interrupt received during 
    // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
    // Additionally, it is not fatal for the
    // threaded RTS to reach here with no threads to run.
340
    //
341 342
    // win32: might be here due to awaitEvent() being abandoned
    // as a result of a console event having been delivered.
343 344 345 346 347 348 349 350 351 352 353 354
    
#if defined(THREADED_RTS)
    if (first) 
    {
    // XXX: ToDo
    //     // don't yield the first time, we want a chance to run this
    //     // thread for a bit, even if there are others banging at the
    //     // door.
    //     first = rtsFalse;
    //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
    }

Simon Marlow's avatar
Simon Marlow committed
355
  yield:
356 357 358
    scheduleYield(&cap,task,force_yield);
    force_yield = rtsFalse;

359 360 361
    if (emptyRunQueue(cap)) continue; // look for work again
#endif

362
#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
363
    if ( emptyRunQueue(cap) ) {
364
	ASSERT(sched_state >= SCHED_INTERRUPTING);
365
    }
366
#endif
367

368 369 370
    // 
    // Get a thread to run
    //
371
    t = popRunQueue(cap);
372

373 374 375
    // Sanity check the thread we're about to run.  This can be
    // expensive if there is lots of thread switching going on...
    IF_DEBUG(sanity,checkTSO(t));
376

377
#if defined(THREADED_RTS)
378 379 380
    // Check whether we can run this thread in the current task.
    // If not, we have to pass our capability to the right task.
    {
381
	Task *bound = t->bound;
382
      
383 384 385 386
	if (bound) {
	    if (bound == task) {
		// yes, the Haskell thread is bound to the current native thread
	    } else {
Simon Marlow's avatar
Simon Marlow committed
387
		debugTrace(DEBUG_sched,
388 389
			   "thread %lu bound to another OS thread",
                           (unsigned long)t->id);
390 391 392 393 394 395 396
		// no, bound to a different Haskell thread: pass to that thread
		pushOnRunQueue(cap,t);
		continue;
	    }
	} else {
	    // The thread we want to run is unbound.
	    if (task->tso) { 
Simon Marlow's avatar
Simon Marlow committed
397
		debugTrace(DEBUG_sched,
398 399
			   "this OS thread cannot run thread %lu",
                           (unsigned long)t->id);
400 401 402 403 404
		// no, the current native thread is bound to a different
		// Haskell thread, so pass it to any worker thread
		pushOnRunQueue(cap,t);
		continue; 
	    }
405
	}
406 407 408
    }
#endif

Simon Marlow's avatar
Simon Marlow committed
409 410 411 412 413 414
    // If we're shutting down, and this thread has not yet been
    // killed, kill it now.  This sometimes happens when a finalizer
    // thread is created by the final GC, or a thread previously
    // in a foreign call returns.
    if (sched_state >= SCHED_INTERRUPTING &&
        !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
415
        deleteThread(cap,t);
Simon Marlow's avatar
Simon Marlow committed
416 417
    }

418
    /* context switches are initiated by the timer signal, unless
419 420
     * the user specified "context switch as often as possible", with
     * +RTS -C0
sof's avatar
sof committed
421
     */
422 423
    if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
	&& !emptyThreadQueues(cap)) {
424
	cap->context_switch = 1;
425 426
    }
	 
427
run_thread:
428

Simon Marlow's avatar
Simon Marlow committed
429 430 431 432 433
    // CurrentTSO is the thread to run.  t might be different if we
    // loop back to run_thread, so make sure to set CurrentTSO after
    // that.
    cap->r.rCurrentTSO = t;

434 435
    startHeapProfTimer();

436 437 438
    // Check for exceptions blocked on this thread
    maybePerformBlockedException (cap, t);

439 440 441
    // ----------------------------------------------------------------------
    // Run the current thread 

Simon Marlow's avatar
Simon Marlow committed
442
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
443
    ASSERT(t->cap == cap);
Simon Marlow's avatar
Simon Marlow committed
444
    ASSERT(t->bound ? t->bound->cap == cap : 1);
Simon Marlow's avatar
Simon Marlow committed
445

446 447 448
    prev_what_next = t->what_next;

    errno = t->saved_errno;
449 450 451 452
#if mingw32_HOST_OS
    SetLastError(t->saved_winerror);
#endif

453
    cap->in_haskell = rtsTrue;
454

455
    dirty_TSO(cap,t);
Simon Marlow's avatar
Simon Marlow committed
456

457 458 459 460 461
#if defined(THREADED_RTS)
    if (recent_activity == ACTIVITY_DONE_GC) {
        // ACTIVITY_DONE_GC means we turned off the timer signal to
        // conserve power (see #1623).  Re-enable it here.
        nat prev;
462
        prev = xchg((P_)&recent_activity, ACTIVITY_YES);
463 464 465 466 467 468 469
        if (prev == ACTIVITY_DONE_GC) {
            startTimer();
        }
    } else {
        recent_activity = ACTIVITY_YES;
    }
#endif
470

471
    traceSchedEvent(cap, EVENT_RUN_THREAD, t, 0);
Simon Marlow's avatar
Simon Marlow committed
472

473
    switch (prev_what_next) {
474
	
475 476 477 478 479
    case ThreadKilled:
    case ThreadComplete:
	/* Thread already finished, return to scheduler. */
	ret = ThreadFinished;
	break;
480
	
481
    case ThreadRunGHC:
482 483 484 485 486
    {
	StgRegTable *r;
	r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
	cap = regTableToCapability(r);
	ret = r->rRet;
487
	break;
488 489
    }
    
490
    case ThreadInterpret:
491 492
	cap = interpretBCO(cap);
	ret = cap->r.rRet;
493
	break;
494
	
495
    default:
496
	barf("schedule: invalid what_next field");
497 498
    }

499
    cap->in_haskell = rtsFalse;
500

501 502 503 504
    // The TSO might have moved, eg. if it re-entered the RTS and a GC
    // happened.  So find the new location:
    t = cap->r.rCurrentTSO;

505 506 507 508 509 510 511 512 513 514 515
    // We have run some Haskell code: there might be blackhole-blocked
    // threads to wake up now.
    // Lock-free test here should be ok, we're just setting a flag.
    if ( blackhole_queue != END_TSO_QUEUE ) {
	blackholes_need_checking = rtsTrue;
    }
    
    // And save the current errno in this thread.
    // XXX: possibly bogus for SMP because this thread might already
    // be running again, see code below.
    t->saved_errno = errno;
516 517
#if mingw32_HOST_OS
    // Similarly for Windows error code
518
    t->saved_winerror = GetLastError();
519
#endif
520

521
    traceSchedEvent (cap, EVENT_STOP_THREAD, t, ret);
Simon Marlow's avatar
Simon Marlow committed
522

523
#if defined(THREADED_RTS)
524 525 526 527 528 529
    // If ret is ThreadBlocked, and this Task is bound to the TSO that
    // blocked, we are in limbo - the TSO is now owned by whatever it
    // is blocked on, and may in fact already have been woken up,
    // perhaps even on a different Capability.  It may be the case
    // that task->cap != cap.  We better yield this Capability
    // immediately and return to normaility.
530
    if (ret == ThreadBlocked) {
531
        force_yield = rtsTrue;
Simon Marlow's avatar
Simon Marlow committed
532
        goto yield;
533
    }
534 535
#endif

536
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
537
    ASSERT(t->cap == cap);
538

539
    // ----------------------------------------------------------------------
540
    
541
    // Costs for the scheduler are assigned to CCS_SYSTEM
542
    stopHeapProfTimer();
543
#if defined(PROFILING)
544 545 546
    CCCS = CCS_SYSTEM;
#endif
    
547
    schedulePostRunThread(cap,t);
548

549 550 551
    if (ret != StackOverflow) {
        t = threadStackUnderflow(task,t);
    }
552

553 554
    ready_to_gc = rtsFalse;

555 556 557 558 559 560
    switch (ret) {
    case HeapOverflow:
	ready_to_gc = scheduleHandleHeapOverflow(cap,t);
	break;

    case StackOverflow:
561
	scheduleHandleStackOverflow(cap,task,t);
562 563 564
	break;

    case ThreadYielding:
565
	if (scheduleHandleYield(cap, t, prev_what_next)) {
566 567 568 569 570 571 572 573 574 575
            // shortcut for switching between compiler/interpreter:
	    goto run_thread; 
	}
	break;

    case ThreadBlocked:
	scheduleHandleThreadBlocked(t);
	break;

    case ThreadFinished:
576
	if (scheduleHandleThreadFinished(cap, task, t)) return cap;
577
	ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
578 579 580 581 582 583
	break;

    default:
      barf("schedule: invalid thread return code %d", (int)ret);
    }

584
    if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
585
      cap = scheduleDoGC(cap,task,rtsFalse);
586
    }
587 588 589 590 591 592 593 594 595 596
  } /* end of while() */
}

/* ----------------------------------------------------------------------------
 * Setting up the scheduler loop
 * ------------------------------------------------------------------------- */

static void
schedulePreLoop(void)
{
597
  // initialisation for scheduler - what cannot go into initScheduler()  
598 599
}

600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620
/* -----------------------------------------------------------------------------
 * scheduleFindWork()
 *
 * Search for work to do, and handle messages from elsewhere.
 * -------------------------------------------------------------------------- */

static void
scheduleFindWork (Capability *cap)
{
    scheduleStartSignalHandlers(cap);

    // Only check the black holes here if we've nothing else to do.
    // During normal execution, the black hole list only gets checked
    // at GC time, to avoid repeatedly traversing this possibly long
    // list each time around the scheduler.
    if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }

    scheduleCheckWakeupThreads(cap);

    scheduleCheckBlockedThreads(cap);

Simon Marlow's avatar
Simon Marlow committed
621
#if defined(THREADED_RTS)
622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648
    if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
#endif
}

#if defined(THREADED_RTS)
STATIC_INLINE rtsBool
shouldYieldCapability (Capability *cap, Task *task)
{
    // we need to yield this capability to someone else if..
    //   - another thread is initiating a GC
    //   - another Task is returning from a foreign call
    //   - the thread at the head of the run queue cannot be run
    //     by this Task (it is bound to another Task, or it is unbound
    //     and this task it bound).
    return (waiting_for_gc || 
            cap->returning_tasks_hd != NULL ||
            (!emptyRunQueue(cap) && (task->tso == NULL
                                     ? cap->run_queue_hd->bound != NULL
                                     : cap->run_queue_hd->bound != task)));
}

// This is the single place where a Task goes to sleep.  There are
// two reasons it might need to sleep:
//    - there are no threads to run
//    - we need to yield this Capability to someone else 
//      (see shouldYieldCapability())
//
649 650 651
// Careful: the scheduler loop is quite delicate.  Make sure you run
// the tests in testsuite/concurrent (all ways) after modifying this,
// and also check the benchmarks in nofib/parallel for regressions.
652 653

static void
654
scheduleYield (Capability **pcap, Task *task, rtsBool force_yield)
655 656 657 658
{
    Capability *cap = *pcap;

    // if we have work, and we don't need to give up the Capability, continue.
659 660 661 662 663 664 665 666 667 668 669 670
    //
    // The force_yield flag is used when a bound thread blocks.  This
    // is a particularly tricky situation: the current Task does not
    // own the TSO any more, since it is on some queue somewhere, and
    // might be woken up or manipulated by another thread at any time.
    // The TSO and Task might be migrated to another Capability.
    // Certain invariants might be in doubt, such as task->bound->cap
    // == cap.  We have to yield the current Capability immediately,
    // no messing around.
    //
    if (!force_yield &&
        !shouldYieldCapability(cap,task) && 
671
        (!emptyRunQueue(cap) ||
672
         !emptyWakeupQueue(cap) ||
673 674
         blackholes_need_checking ||
         sched_state >= SCHED_INTERRUPTING))
675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690
        return;

    // otherwise yield (sleep), and keep yielding if necessary.
    do {
        yieldCapability(&cap,task);
    } 
    while (shouldYieldCapability(cap,task));

    // note there may still be no threads on the run queue at this
    // point, the caller has to check.

    *pcap = cap;
    return;
}
#endif
    
691 692 693 694 695 696 697
/* -----------------------------------------------------------------------------
 * schedulePushWork()
 *
 * Push work to other Capabilities if we have some.
 * -------------------------------------------------------------------------- */

static void
698 699
schedulePushWork(Capability *cap USED_IF_THREADS, 
		 Task *task      USED_IF_THREADS)
700
{
701 702 703 704
  /* following code not for PARALLEL_HASKELL. I kept the call general,
     future GUM versions might use pushing in a distributed setup */
#if defined(THREADED_RTS)

705 706 707
    Capability *free_caps[n_capabilities], *cap0;
    nat i, n_free_caps;

708 709 710
    // migration can be turned off with +RTS -qg
    if (!RtsFlags.ParFlags.migrate) return;

711 712
    // Check whether we have more threads on our run queue, or sparks
    // in our pool, that we could hand to another Capability.
713 714 715 716 717
    if (cap->run_queue_hd == END_TSO_QUEUE) {
        if (sparkPoolSizeCap(cap) < 2) return;
    } else {
        if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
            sparkPoolSizeCap(cap) < 1) return;
718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746
    }

    // First grab as many free Capabilities as we can.
    for (i=0, n_free_caps=0; i < n_capabilities; i++) {
	cap0 = &capabilities[i];
	if (cap != cap0 && tryGrabCapability(cap0,task)) {
	    if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
		// it already has some work, we just grabbed it at 
		// the wrong moment.  Or maybe it's deadlocked!
		releaseCapability(cap0);
	    } else {
		free_caps[n_free_caps++] = cap0;
	    }
	}
    }

    // we now have n_free_caps free capabilities stashed in
    // free_caps[].  Share our run queue equally with them.  This is
    // probably the simplest thing we could do; improvements we might
    // want to do include:
    //
    //   - giving high priority to moving relatively new threads, on 
    //     the gournds that they haven't had time to build up a
    //     working set in the cache on this CPU/Capability.
    //
    //   - giving low priority to moving long-lived threads

    if (n_free_caps > 0) {
	StgTSO *prev, *t, *next;
747 748
	rtsBool pushed_to_all;

749 750 751 752 753 754
	debugTrace(DEBUG_sched, 
		   "cap %d: %s and %d free capabilities, sharing...", 
		   cap->no, 
		   (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
		   "excess threads on run queue":"sparks to share (>=2)",
		   n_free_caps);
755 756

	i = 0;
757 758 759 760
	pushed_to_all = rtsFalse;

	if (cap->run_queue_hd != END_TSO_QUEUE) {
	    prev = cap->run_queue_hd;
761 762
	    t = prev->_link;
	    prev->_link = END_TSO_QUEUE;
763
	    for (; t != END_TSO_QUEUE; t = next) {
764 765
		next = t->_link;
		t->_link = END_TSO_QUEUE;
766
		if (t->what_next == ThreadRelocated
767 768
		    || t->bound == task // don't move my bound thread
		    || tsoLocked(t)) {  // don't move a locked thread
769
		    setTSOLink(cap, prev, t);
770 771 772 773 774
		    prev = t;
		} else if (i == n_free_caps) {
		    pushed_to_all = rtsTrue;
		    i = 0;
		    // keep one for us
775
		    setTSOLink(cap, prev, t);
776 777
		    prev = t;
		} else {
778
		    debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
779
		    appendToRunQueue(free_caps[i],t);
Simon Marlow's avatar
Simon Marlow committed
780

781
                    traceSchedEvent (cap, EVENT_MIGRATE_THREAD, t, free_caps[i]->no);
Simon Marlow's avatar
Simon Marlow committed
782

783
		    if (t->bound) { t->bound->cap = free_caps[i]; }
784
		    t->cap = free_caps[i];
785 786 787 788 789 790
		    i++;
		}
	    }
	    cap->run_queue_tl = prev;
	}

791 792 793
#ifdef SPARK_PUSHING
	/* JB I left this code in place, it would work but is not necessary */

794 795 796 797 798 799 800
	// If there are some free capabilities that we didn't push any
	// threads to, then try to push a spark to each one.
	if (!pushed_to_all) {
	    StgClosure *spark;
	    // i is the next free capability to push to
	    for (; i < n_free_caps; i++) {
		if (emptySparkPoolCap(free_caps[i])) {
801
		    spark = tryStealSpark(cap->sparks);
802
		    if (spark != NULL) {
Simon Marlow's avatar
Simon Marlow committed
803
			debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
804

805
      traceSchedEvent(free_caps[i], EVENT_STEAL_SPARK, t, cap->no);
806

807 808 809
			newSpark(&(free_caps[i]->r), spark);
		    }
		}
810 811
	    }
	}
812
#endif /* SPARK_PUSHING */
813 814 815 816

	// release the capabilities
	for (i = 0; i < n_free_caps; i++) {
	    task->cap = free_caps[i];
817
	    releaseAndWakeupCapability(free_caps[i]);
818 819 820
	}
    }
    task->cap = cap; // reset to point to our Capability.
821 822 823

#endif /* THREADED_RTS */

824 825
}

826 827 828 829
/* ----------------------------------------------------------------------------
 * Start any pending signal handlers
 * ------------------------------------------------------------------------- */

830
#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
831
static void
832
scheduleStartSignalHandlers(Capability *cap)
833
{
834 835
    if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
        // safe outside the lock
836
	startSignalHandlers(cap);
837 838
    }
}
839 840 841 842 843 844
#else
static void
scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
{
}
#endif
845 846 847 848 849 850

/* ----------------------------------------------------------------------------
 * Check for blocked threads that can be woken up.
 * ------------------------------------------------------------------------- */

static void
851
scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
852
{
853
#if !defined(THREADED_RTS)
854 855 856 857 858
    //
    // Check whether any waiting threads need to be woken up.  If the
    // run queue is empty, and there are no other tasks running, we
    // can wait indefinitely for something to happen.
    //
859
    if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
860
    {
861
	awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
862
    }
863
#endif
864 865 866
}


867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882
/* ----------------------------------------------------------------------------
 * Check for threads woken up by other Capabilities
 * ------------------------------------------------------------------------- */

static void
scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
{
#if defined(THREADED_RTS)
    // Any threads that were woken up by other Capabilities get
    // appended to our run queue.
    if (!emptyWakeupQueue(cap)) {
	ACQUIRE_LOCK(&cap->lock);
	if (emptyRunQueue(cap)) {
	    cap->run_queue_hd = cap->wakeup_queue_hd;
	    cap->run_queue_tl = cap->wakeup_queue_tl;
	} else {
883
	    setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
884 885 886 887 888 889 890 891
	    cap->run_queue_tl = cap->wakeup_queue_tl;
	}
	cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
	RELEASE_LOCK(&cap->lock);
    }
#endif
}

892 893 894 895
/* ----------------------------------------------------------------------------
 * Check for threads blocked on BLACKHOLEs that can be woken up
 * ------------------------------------------------------------------------- */
static void
896
scheduleCheckBlackHoles (Capability *cap)
897
{
898
    if ( blackholes_need_checking ) // check without the lock first
899
    {
900 901 902
	ACQUIRE_LOCK(&sched_mutex);
	if ( blackholes_need_checking ) {
	    blackholes_need_checking = rtsFalse;
903 904 905 906 907 908
            // important that we reset the flag *before* checking the
            // blackhole queue, otherwise we could get deadlock.  This
            // happens as follows: we wake up a thread that
            // immediately runs on another Capability, blocks on a
            // blackhole, and then we reset the blackholes_need_checking flag.
	    checkBlackHoles(cap);
909 910
	}
	RELEASE_LOCK(&sched_mutex);
911 912 913 914 915 916 917 918
    }
}

/* ----------------------------------------------------------------------------
 * Detect deadlock conditions and attempt to resolve them.
 * ------------------------------------------------------------------------- */

static void
919
scheduleDetectDeadlock (Capability *cap, Task *task)
920 921 922
{
    /* 
     * Detect deadlock: when we have no threads to run, there are no
923 924 925
     * threads blocked, waiting for I/O, or sleeping, and all the
     * other tasks are waiting for work, we must have a deadlock of
     * some description.
926
     */
927
    if ( emptyThreadQueues(cap) )
928
    {
929
#if defined(THREADED_RTS)
930 931 932 933 934 935 936 937 938
	/* 
	 * In the threaded RTS, we only check for deadlock if there
	 * has been no activity in a complete timeslice.  This means
	 * we won't eagerly start a full GC just because we don't have
	 * any threads to run currently.
	 */
	if (recent_activity != ACTIVITY_INACTIVE) return;
#endif

Simon Marlow's avatar
Simon Marlow committed
939
	debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
940 941 942 943 944 945

	// Garbage collection can release some new threads due to
	// either (a) finalizers or (b) threads resurrected because
	// they are unreachable and will therefore be sent an
	// exception.  Any threads thus released will be immediately
	// runnable.
946 947 948 949
	cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
        // when force_major == rtsTrue. scheduleDoGC sets
        // recent_activity to ACTIVITY_DONE_GC and turns off the timer
        // signal.
950

951
	if ( !emptyRunQueue(cap) ) return;
952

953
#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
954 955 956 957
	/* If we have user-installed signal handlers, then wait
	 * for signals to arrive rather then bombing out with a
	 * deadlock.
	 */
958
	if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
Simon Marlow's avatar
Simon Marlow committed
959 960
	    debugTrace(DEBUG_sched,
		       "still deadlocked, waiting for signals...");
961 962 963 964

	    awaitUserSignals();

	    if (signals_pending()) {
965
		startSignalHandlers(cap);
966 967 968
	    }

	    // either we have threads to run, or we were interrupted:
969
	    ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
970 971

            return;
972 973 974
	}
#endif

975
#if !defined(THREADED_RTS)
976
	/* Probably a real deadlock.  Send the current main thread the
977
	 * Deadlock exception.
978
	 */
979 980
	if (task->tso) {
	    switch (task->tso->why_blocked) {
981
	    case BlockedOnSTM:
982 983 984
	    case BlockedOnBlackHole:
	    case BlockedOnException:
	    case BlockedOnMVar:
985
		throwToSingleThreaded(cap, task->tso,