Schedule.c 88.4 KB
Newer Older
1
/* ---------------------------------------------------------------------------
2
 *
3
 * (c) The GHC Team, 1998-2006
4
 *
5
 * The scheduler and thread-related functionality
sof's avatar
sof committed
6
 *
7 8
 * --------------------------------------------------------------------------*/

9
#include "PosixSource.h"
10
#define KEEP_LOCKCLOSURE
11
#include "Rts.h"
Simon Marlow's avatar
Simon Marlow committed
12 13

#include "sm/Storage.h"
14 15 16
#include "RtsUtils.h"
#include "StgRun.h"
#include "Schedule.h"
17
#include "Interpreter.h"
18
#include "Printer.h"
19
#include "RtsSignals.h"
Simon Marlow's avatar
Simon Marlow committed
20
#include "sm/Sanity.h"
21
#include "Stats.h"
22
#include "STM.h"
23
#include "Prelude.h"
24
#include "ThreadLabels.h"
25
#include "Updates.h"
26 27
#include "Proftimer.h"
#include "ProfHeap.h"
28
#include "Weak.h"
Simon Marlow's avatar
Simon Marlow committed
29
#include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30
#include "sm/GCThread.h"
31
#include "Sparks.h"
sof's avatar
sof committed
32
#include "Capability.h"
33 34
#include "Task.h"
#include "AwaitEvent.h"
35 36 37
#if defined(mingw32_HOST_OS)
#include "win32/IOManager.h"
#endif
Simon Marlow's avatar
Simon Marlow committed
38
#include "Trace.h"
39 40
#include "RaiseAsync.h"
#include "Threads.h"
Simon Marlow's avatar
Simon Marlow committed
41 42
#include "Timer.h"
#include "ThreadPaused.h"
43
#include "Messages.h"
44
#include "Stable.h"
45

46 47 48 49 50 51 52
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif

53 54
#include <string.h>
#include <stdlib.h>
55
#include <stdarg.h>
56

57 58 59 60
#ifdef HAVE_ERRNO_H
#include <errno.h>
#endif

61 62 63
#ifdef TRACING
#include "eventlog/EventLog.h"
#endif
64 65 66
/* -----------------------------------------------------------------------------
 * Global variables
 * -------------------------------------------------------------------------- */
67

68 69
#if !defined(THREADED_RTS)
// Blocked/sleeping thrads
70 71
StgTSO *blocked_queue_hd = NULL;
StgTSO *blocked_queue_tl = NULL;
72 73
StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
#endif
74

75 76 77 78 79 80
/* Set to true when the latest garbage collection failed to reclaim
 * enough space, and the runtime should proceed to shut itself down in
 * an orderly fashion (emitting profiling info etc.)
 */
rtsBool heap_overflow = rtsFalse;

81 82 83
/* flag that tracks whether we have done any execution in this time slice.
 * LOCK: currently none, perhaps we should lock (but needs to be
 * updated in the fast path of the scheduler).
84 85
 *
 * NB. must be StgWord, we do xchg() on it.
86
 */
87
volatile StgWord recent_activity = ACTIVITY_YES;
88

89
/* if this flag is set as well, give up execution
90
 * LOCK: none (changes monotonically)
91
 */
92
volatile StgWord sched_state = SCHED_RUNNING;
93

94 95 96 97 98 99
/*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
 *  exists - earlier gccs apparently didn't.
 *  -= chak
 */
StgTSO dummy_tso;

100 101
/*
 * This mutex protects most of the global scheduler data in
102
 * the THREADED_RTS runtime.
sof's avatar
sof committed
103
 */
104
#if defined(THREADED_RTS)
105
Mutex sched_mutex;
106
#endif
sof's avatar
sof committed
107

108 109 110 111
#if !defined(mingw32_HOST_OS)
#define FORKPROCESS_PRIMOP_SUPPORTED
#endif

112 113 114 115 116
// Local stats
#ifdef THREADED_RTS
static nat n_failed_trygrab_idles = 0, n_idle_caps = 0;
#endif

117 118 119 120
/* -----------------------------------------------------------------------------
 * static function prototypes
 * -------------------------------------------------------------------------- */

121
static Capability *schedule (Capability *initialCapability, Task *task);
122 123

//
Gabor Greif's avatar
typo  
Gabor Greif committed
124
// These functions all encapsulate parts of the scheduler loop, and are
125 126 127
// abstracted only to make the structure and control flow of the
// scheduler clearer.
//
128
static void schedulePreLoop (void);
129
static void scheduleFindWork (Capability **pcap);
130
#if defined(THREADED_RTS)
131
static void scheduleYield (Capability **pcap, Task *task);
132
#endif
133 134 135
#if defined(THREADED_RTS)
static nat requestSync (Capability **pcap, Task *task, nat sync_type);
static void acquireAllCapabilities(Capability *cap, Task *task);
Simon Marlow's avatar
Simon Marlow committed
136
static void releaseAllCapabilities(nat n, Capability *cap, Task *task);
137
static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
138
#endif
139
static void scheduleStartSignalHandlers (Capability *cap);
140
static void scheduleCheckBlockedThreads (Capability *cap);
141 142
static void scheduleProcessInbox(Capability **cap);
static void scheduleDetectDeadlock (Capability **pcap, Task *task);
143
static void schedulePushWork(Capability *cap, Task *task);
Simon Marlow's avatar
Simon Marlow committed
144
#if defined(THREADED_RTS)
145
static void scheduleActivateSpark(Capability *cap);
146
#endif
147
static void schedulePostRunThread(Capability *cap, StgTSO *t);
148
static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
149
static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
150
				    nat prev_what_next );
151
static void scheduleHandleThreadBlocked( StgTSO *t );
152 153
static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
					     StgTSO *t );
154
static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
155
static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
156 157

static void deleteThread (Capability *cap, StgTSO *tso);
158
static void deleteAllThreads (Capability *cap);
159

160 161
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
static void deleteThread_(Capability *cap, StgTSO *tso);
162
#endif
163

164
/* ---------------------------------------------------------------------------
165 166 167 168 169 170 171 172 173 174 175
   Main scheduling loop.

   We use round-robin scheduling, each thread returning to the
   scheduler loop when one of these conditions is detected:

      * out of heap space
      * timer expires (thread yields)
      * thread blocks
      * thread ends
      * stack overflow

176
   ------------------------------------------------------------------------ */
177

178 179
static Capability *
schedule (Capability *initialCapability, Task *task)
180 181
{
  StgTSO *t;
182
  Capability *cap;
183
  StgThreadReturnCode ret;
184
  nat prev_what_next;
185
  rtsBool ready_to_gc;
186
#if defined(THREADED_RTS)
187
  rtsBool first = rtsTrue;
Simon Marlow's avatar
Simon Marlow committed
188
#endif
189
  
190 191
  cap = initialCapability;

192 193 194
  // Pre-condition: this task owns initialCapability.
  // The sched_mutex is *NOT* held
  // NB. on return, we still hold a capability.
195

196
  debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
197

198
  schedulePreLoop();
199

200 201
  // -----------------------------------------------------------
  // Scheduler loop starts here:
202

Simon Marlow's avatar
Simon Marlow committed
203
  while (1) {
204

205 206 207
    // Check whether we have re-entered the RTS from Haskell without
    // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
    // call).
208
    if (cap->in_haskell) {
209 210
    	  errorBelch("schedule: re-entered unsafely.\n"
    		     "   Perhaps a 'foreign import unsafe' should be 'safe'?");
211
    	  stg_exit(EXIT_FAILURE);
212 213
    }

214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
    // The interruption / shutdown sequence.
    // 
    // In order to cleanly shut down the runtime, we want to:
    //   * make sure that all main threads return to their callers
    //     with the state 'Interrupted'.
    //   * clean up all OS threads assocated with the runtime
    //   * free all memory etc.
    //
    // So the sequence for ^C goes like this:
    //
    //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
    //     arranges for some Capability to wake up
    //
    //   * all threads in the system are halted, and the zombies are
    //     placed on the run queue for cleaning up.  We acquire all
    //     the capabilities in order to delete the threads, this is
    //     done by scheduleDoGC() for convenience (because GC already
    //     needs to acquire all the capabilities).  We can't kill
    //     threads involved in foreign calls.
    // 
    //   * somebody calls shutdownHaskell(), which calls exitScheduler()
    //
    //   * sched_state := SCHED_SHUTTING_DOWN
237
    //
238 239 240
    //   * all workers exit when the run queue on their capability
    //     drains.  All main threads will also exit when their TSO
    //     reaches the head of the run queue and they can return.
241
    //
242 243 244 245 246 247 248 249 250 251
    //   * eventually all Capabilities will shut down, and the RTS can
    //     exit.
    //
    //   * We might be left with threads blocked in foreign calls, 
    //     we should really attempt to kill these somehow (TODO);
    
    switch (sched_state) {
    case SCHED_RUNNING:
	break;
    case SCHED_INTERRUPTING:
Simon Marlow's avatar
Simon Marlow committed
252
	debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
253
        /* scheduleDoGC() deletes all the threads */
254
        scheduleDoGC(&cap,task,rtsFalse);
255 256 257 258 259 260 261 262

        // after scheduleDoGC(), we must be shutting down.  Either some
        // other Capability did the final GC, or we did it above,
        // either way we can fall through to the SCHED_SHUTTING_DOWN
        // case now.
        ASSERT(sched_state == SCHED_SHUTTING_DOWN);
        // fall through

263
    case SCHED_SHUTTING_DOWN:
Simon Marlow's avatar
Simon Marlow committed
264
	debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
265 266 267
	// If we are a worker, just exit.  If we're a bound thread
	// then we will exit below when we've removed our TSO from
	// the run queue.
268
	if (!isBoundTask(task) && emptyRunQueue(cap)) {
269
	    return cap;
270
	}
271 272 273
	break;
    default:
	barf("sched_state: %d", sched_state);
274
    }
275

276
    scheduleFindWork(&cap);
277

278 279 280
    /* work pushing, currently relevant only for THREADED_RTS:
       (pushes threads, wakes up idle capabilities for stealing) */
    schedulePushWork(cap,task);
281

282
    scheduleDetectDeadlock(&cap,task);
283 284 285 286 287 288

    // Normally, the only way we can get here with no threads to
    // run is if a keyboard interrupt received during 
    // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
    // Additionally, it is not fatal for the
    // threaded RTS to reach here with no threads to run.
289
    //
290 291
    // win32: might be here due to awaitEvent() being abandoned
    // as a result of a console event having been delivered.
292 293 294 295 296 297 298 299 300 301 302 303
    
#if defined(THREADED_RTS)
    if (first) 
    {
    // XXX: ToDo
    //     // don't yield the first time, we want a chance to run this
    //     // thread for a bit, even if there are others banging at the
    //     // door.
    //     first = rtsFalse;
    //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
    }

304
    scheduleYield(&cap,task);
305

306 307 308
    if (emptyRunQueue(cap)) continue; // look for work again
#endif

309
#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
310
    if ( emptyRunQueue(cap) ) {
311
	ASSERT(sched_state >= SCHED_INTERRUPTING);
312
    }
313
#endif
314

315 316 317
    // 
    // Get a thread to run
    //
318
    t = popRunQueue(cap);
319

320 321 322
    // Sanity check the thread we're about to run.  This can be
    // expensive if there is lots of thread switching going on...
    IF_DEBUG(sanity,checkTSO(t));
323

324
#if defined(THREADED_RTS)
325 326 327
    // Check whether we can run this thread in the current task.
    // If not, we have to pass our capability to the right task.
    {
328
        InCall *bound = t->bound;
329
      
330
	if (bound) {
331
	    if (bound->task == task) {
332 333
		// yes, the Haskell thread is bound to the current native thread
	    } else {
Simon Marlow's avatar
Simon Marlow committed
334
		debugTrace(DEBUG_sched,
335 336
			   "thread %lu bound to another OS thread",
                           (unsigned long)t->id);
337 338 339 340 341 342
		// no, bound to a different Haskell thread: pass to that thread
		pushOnRunQueue(cap,t);
		continue;
	    }
	} else {
	    // The thread we want to run is unbound.
343
	    if (task->incall->tso) { 
Simon Marlow's avatar
Simon Marlow committed
344
		debugTrace(DEBUG_sched,
345 346
			   "this OS thread cannot run thread %lu",
                           (unsigned long)t->id);
347 348 349 350 351
		// no, the current native thread is bound to a different
		// Haskell thread, so pass it to any worker thread
		pushOnRunQueue(cap,t);
		continue; 
	    }
352
	}
353 354 355
    }
#endif

Simon Marlow's avatar
Simon Marlow committed
356 357 358 359 360 361
    // If we're shutting down, and this thread has not yet been
    // killed, kill it now.  This sometimes happens when a finalizer
    // thread is created by the final GC, or a thread previously
    // in a foreign call returns.
    if (sched_state >= SCHED_INTERRUPTING &&
        !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
362
        deleteThread(cap,t);
Simon Marlow's avatar
Simon Marlow committed
363 364
    }

365 366 367 368 369 370 371 372 373 374 375 376 377 378
    // If this capability is disabled, migrate the thread away rather
    // than running it.  NB. but not if the thread is bound: it is
    // really hard for a bound thread to migrate itself.  Believe me,
    // I tried several ways and couldn't find a way to do it.
    // Instead, when everything is stopped for GC, we migrate all the
    // threads on the run queue then (see scheduleDoGC()).
    //
    // ToDo: what about TSO_LOCKED?  Currently we're migrating those
    // when the number of capabilities drops, but we never migrate
    // them back if it rises again.  Presumably we should, but after
    // the thread has been migrated we no longer know what capability
    // it was originally on.
#ifdef THREADED_RTS
    if (cap->disabled && !t->bound) {
379
        Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
380 381 382 383 384
        migrateThread(cap, t, dest_cap);
        continue;
    }
#endif

385
    /* context switches are initiated by the timer signal, unless
386 387
     * the user specified "context switch as often as possible", with
     * +RTS -C0
sof's avatar
sof committed
388
     */
389 390
    if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
	&& !emptyThreadQueues(cap)) {
391
	cap->context_switch = 1;
392 393
    }
	 
394
run_thread:
395

396
    // CurrentTSO is the thread to run.  t might be different if we
Simon Marlow's avatar
Simon Marlow committed
397 398 399 400
    // loop back to run_thread, so make sure to set CurrentTSO after
    // that.
    cap->r.rCurrentTSO = t;

401 402
    startHeapProfTimer();

403 404 405
    // ----------------------------------------------------------------------
    // Run the current thread 

Simon Marlow's avatar
Simon Marlow committed
406
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
407
    ASSERT(t->cap == cap);
408
    ASSERT(t->bound ? t->bound->task->cap == cap : 1);
Simon Marlow's avatar
Simon Marlow committed
409

410 411 412
    prev_what_next = t->what_next;

    errno = t->saved_errno;
413 414 415 416
#if mingw32_HOST_OS
    SetLastError(t->saved_winerror);
#endif

417 418 419
    // reset the interrupt flag before running Haskell code
    cap->interrupt = 0;

420
    cap->in_haskell = rtsTrue;
421
    cap->idle = 0;
422

423
    dirty_TSO(cap,t);
424
    dirty_STACK(cap,t->stackobj);
Simon Marlow's avatar
Simon Marlow committed
425

426 427 428
    switch (recent_activity)
    {
    case ACTIVITY_DONE_GC: {
429 430 431
        // ACTIVITY_DONE_GC means we turned off the timer signal to
        // conserve power (see #1623).  Re-enable it here.
        nat prev;
432
        prev = xchg((P_)&recent_activity, ACTIVITY_YES);
433
        if (prev == ACTIVITY_DONE_GC) {
Simon Marlow's avatar
Simon Marlow committed
434
#ifndef PROFILING
435
            startTimer();
436
#endif
Simon Marlow's avatar
Simon Marlow committed
437
        }
438 439 440
        break;
    }
    case ACTIVITY_INACTIVE:
441 442 443 444
        // If we reached ACTIVITY_INACTIVE, then don't reset it until
        // we've done the GC.  The thread running here might just be
        // the IO manager thread that handle_tick() woke up via
        // wakeUpRts().
445 446
        break;
    default:
447 448
        recent_activity = ACTIVITY_YES;
    }
449

450
    traceEventRunThread(cap, t);
Simon Marlow's avatar
Simon Marlow committed
451

452
    switch (prev_what_next) {
453
	
454 455 456 457 458
    case ThreadKilled:
    case ThreadComplete:
	/* Thread already finished, return to scheduler. */
	ret = ThreadFinished;
	break;
459
	
460
    case ThreadRunGHC:
461 462 463 464 465
    {
	StgRegTable *r;
	r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
	cap = regTableToCapability(r);
	ret = r->rRet;
466
	break;
467 468
    }
    
469
    case ThreadInterpret:
470 471
	cap = interpretBCO(cap);
	ret = cap->r.rRet;
472
	break;
473
	
474
    default:
475
	barf("schedule: invalid what_next field");
476 477
    }

478
    cap->in_haskell = rtsFalse;
479

480 481 482 483
    // The TSO might have moved, eg. if it re-entered the RTS and a GC
    // happened.  So find the new location:
    t = cap->r.rCurrentTSO;

484 485 486 487
    // And save the current errno in this thread.
    // XXX: possibly bogus for SMP because this thread might already
    // be running again, see code below.
    t->saved_errno = errno;
488 489
#if mingw32_HOST_OS
    // Similarly for Windows error code
490
    t->saved_winerror = GetLastError();
491
#endif
492

493 494 495 496 497 498 499 500 501 502 503
    if (ret == ThreadBlocked) {
        if (t->why_blocked == BlockedOnBlackHole) {
            StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
            traceEventStopThread(cap, t, t->why_blocked + 6,
                                 owner != NULL ? owner->id : 0);
        } else {
            traceEventStopThread(cap, t, t->why_blocked + 6, 0);
        }
    } else {
        traceEventStopThread(cap, t, ret, 0);
    }
Simon Marlow's avatar
Simon Marlow committed
504

505
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
506
    ASSERT(t->cap == cap);
507

508
    // ----------------------------------------------------------------------
509
    
510
    // Costs for the scheduler are assigned to CCS_SYSTEM
511
    stopHeapProfTimer();
512
#if defined(PROFILING)
513
    cap->r.rCCCS = CCS_SYSTEM;
514 515
#endif
    
516
    schedulePostRunThread(cap,t);
517

518 519
    ready_to_gc = rtsFalse;

520 521 522 523 524 525
    switch (ret) {
    case HeapOverflow:
	ready_to_gc = scheduleHandleHeapOverflow(cap,t);
	break;

    case StackOverflow:
526 527 528 529 530
        // just adjust the stack for this thread, then pop it back
        // on the run queue.
        threadStackOverflow(cap, t);
        pushOnRunQueue(cap,t);
        break;
531 532

    case ThreadYielding:
533
        if (scheduleHandleYield(cap, t, prev_what_next)) {
534 535 536 537 538 539 540 541 542 543
            // shortcut for switching between compiler/interpreter:
	    goto run_thread; 
	}
	break;

    case ThreadBlocked:
	scheduleHandleThreadBlocked(t);
	break;

    case ThreadFinished:
544
	if (scheduleHandleThreadFinished(cap, task, t)) return cap;
545
	ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
546 547 548 549 550 551
	break;

    default:
      barf("schedule: invalid thread return code %d", (int)ret);
    }

552
    if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
553
      scheduleDoGC(&cap,task,rtsFalse);
554
    }
555 556 557
  } /* end of while() */
}

558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581
/* -----------------------------------------------------------------------------
 * Run queue operations
 * -------------------------------------------------------------------------- */

void
removeFromRunQueue (Capability *cap, StgTSO *tso)
{
    if (tso->block_info.prev == END_TSO_QUEUE) {
        ASSERT(cap->run_queue_hd == tso);
        cap->run_queue_hd = tso->_link;
    } else {
        setTSOLink(cap, tso->block_info.prev, tso->_link);
    }
    if (tso->_link == END_TSO_QUEUE) {
        ASSERT(cap->run_queue_tl == tso);
        cap->run_queue_tl = tso->block_info.prev;
    } else {
        setTSOPrev(cap, tso->_link, tso->block_info.prev);
    }
    tso->_link = tso->block_info.prev = END_TSO_QUEUE;

    IF_DEBUG(sanity, checkRunQueue(cap));
}

582 583 584 585 586 587 588
void
promoteInRunQueue (Capability *cap, StgTSO *tso)
{
    removeFromRunQueue(cap, tso);
    pushOnRunQueue(cap, tso);
}

589 590 591 592 593 594 595
/* ----------------------------------------------------------------------------
 * Setting up the scheduler loop
 * ------------------------------------------------------------------------- */

static void
schedulePreLoop(void)
{
596
  // initialisation for scheduler - what cannot go into initScheduler()  
597

598
#if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
599 600
    win32AllocStack();
#endif
601 602
}

603 604 605 606 607 608 609
/* -----------------------------------------------------------------------------
 * scheduleFindWork()
 *
 * Search for work to do, and handle messages from elsewhere.
 * -------------------------------------------------------------------------- */

static void
610
scheduleFindWork (Capability **pcap)
611
{
612
    scheduleStartSignalHandlers(*pcap);
613

614
    scheduleProcessInbox(pcap);
615

616
    scheduleCheckBlockedThreads(*pcap);
617

Simon Marlow's avatar
Simon Marlow committed
618
#if defined(THREADED_RTS)
619
    if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
620 621 622 623 624
#endif
}

#if defined(THREADED_RTS)
STATIC_INLINE rtsBool
625
shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
626 627
{
    // we need to yield this capability to someone else if..
628 629
    //   - another thread is initiating a GC, and we didn't just do a GC
    //     (see Note [GC livelock])
630 631 632 633
    //   - another Task is returning from a foreign call
    //   - the thread at the head of the run queue cannot be run
    //     by this Task (it is bound to another Task, or it is unbound
    //     and this task it bound).
634 635 636 637 638 639 640 641 642
    //
    // Note [GC livelock]
    //
    // If we are interrupted to do a GC, then we do not immediately do
    // another one.  This avoids a starvation situation where one
    // Capability keeps forcing a GC and the other Capabilities make no
    // progress at all.

    return ((pending_sync && !didGcLast) ||
643
            cap->returning_tasks_hd != NULL ||
644
            (!emptyRunQueue(cap) && (task->incall->tso == NULL
645 646
                                     ? peekRunQueue(cap)->bound != NULL
                                     : peekRunQueue(cap)->bound != task->incall)));
647 648 649 650 651 652 653 654
}

// This is the single place where a Task goes to sleep.  There are
// two reasons it might need to sleep:
//    - there are no threads to run
//    - we need to yield this Capability to someone else 
//      (see shouldYieldCapability())
//
655 656 657
// Careful: the scheduler loop is quite delicate.  Make sure you run
// the tests in testsuite/concurrent (all ways) after modifying this,
// and also check the benchmarks in nofib/parallel for regressions.
658 659

static void
660
scheduleYield (Capability **pcap, Task *task)
661 662
{
    Capability *cap = *pcap;
663
    int didGcLast = rtsFalse;
664 665

    // if we have work, and we don't need to give up the Capability, continue.
666
    //
667
    if (!shouldYieldCapability(cap,task,rtsFalse) && 
668
        (!emptyRunQueue(cap) ||
669
         !emptyInbox(cap) ||
670
         sched_state >= SCHED_INTERRUPTING)) {
671
        return;
672
    }
673 674 675

    // otherwise yield (sleep), and keep yielding if necessary.
    do {
676
        didGcLast = yieldCapability(&cap,task, !didGcLast);
677
    } 
678
    while (shouldYieldCapability(cap,task,didGcLast));
679 680 681 682 683 684 685 686 687

    // note there may still be no threads on the run queue at this
    // point, the caller has to check.

    *pcap = cap;
    return;
}
#endif
    
688 689 690 691 692 693 694
/* -----------------------------------------------------------------------------
 * schedulePushWork()
 *
 * Push work to other Capabilities if we have some.
 * -------------------------------------------------------------------------- */

static void
695 696
schedulePushWork(Capability *cap USED_IF_THREADS, 
		 Task *task      USED_IF_THREADS)
697
{
698 699 700 701
  /* following code not for PARALLEL_HASKELL. I kept the call general,
     future GUM versions might use pushing in a distributed setup */
#if defined(THREADED_RTS)

702 703 704
    Capability *free_caps[n_capabilities], *cap0;
    nat i, n_free_caps;

Simon Marlow's avatar
Simon Marlow committed
705
    // migration can be turned off with +RTS -qm
706 707
    if (!RtsFlags.ParFlags.migrate) return;

708 709
    // Check whether we have more threads on our run queue, or sparks
    // in our pool, that we could hand to another Capability.
710
    if (emptyRunQueue(cap)) {
711 712
        if (sparkPoolSizeCap(cap) < 2) return;
    } else {
713
        if (singletonRunQueue(cap) &&
714
            sparkPoolSizeCap(cap) < 1) return;
715 716 717 718
    }

    // First grab as many free Capabilities as we can.
    for (i=0, n_free_caps=0; i < n_capabilities; i++) {
719
        cap0 = capabilities[i];
720
        if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
721
	    if (!emptyRunQueue(cap0)
722 723
                || cap0->returning_tasks_hd != NULL
                || cap0->inbox != (Message*)END_TSO_QUEUE) {
724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745
		// it already has some work, we just grabbed it at 
		// the wrong moment.  Or maybe it's deadlocked!
		releaseCapability(cap0);
	    } else {
		free_caps[n_free_caps++] = cap0;
	    }
	}
    }

    // we now have n_free_caps free capabilities stashed in
    // free_caps[].  Share our run queue equally with them.  This is
    // probably the simplest thing we could do; improvements we might
    // want to do include:
    //
    //   - giving high priority to moving relatively new threads, on 
    //     the gournds that they haven't had time to build up a
    //     working set in the cache on this CPU/Capability.
    //
    //   - giving low priority to moving long-lived threads

    if (n_free_caps > 0) {
	StgTSO *prev, *t, *next;
Ian Lynagh's avatar
Ian Lynagh committed
746
#ifdef SPARK_PUSHING
747
	rtsBool pushed_to_all;
Ian Lynagh's avatar
Ian Lynagh committed
748
#endif
749

750 751 752
	debugTrace(DEBUG_sched, 
		   "cap %d: %s and %d free capabilities, sharing...", 
		   cap->no, 
753
		   (!emptyRunQueue(cap) && !singletonRunQueue(cap))?
754 755
		   "excess threads on run queue":"sparks to share (>=2)",
		   n_free_caps);
756 757

	i = 0;
Ian Lynagh's avatar
Ian Lynagh committed
758
#ifdef SPARK_PUSHING
759
	pushed_to_all = rtsFalse;
Ian Lynagh's avatar
Ian Lynagh committed
760
#endif
761 762 763

	if (cap->run_queue_hd != END_TSO_QUEUE) {
	    prev = cap->run_queue_hd;
764 765
	    t = prev->_link;
	    prev->_link = END_TSO_QUEUE;
766
	    for (; t != END_TSO_QUEUE; t = next) {
767 768
		next = t->_link;
		t->_link = END_TSO_QUEUE;
769
                if (t->bound == task->incall // don't move my bound thread
770
		    || tsoLocked(t)) {  // don't move a locked thread
771
		    setTSOLink(cap, prev, t);
772
                    setTSOPrev(cap, t, prev);
773 774
		    prev = t;
		} else if (i == n_free_caps) {
Ian Lynagh's avatar
Ian Lynagh committed
775
#ifdef SPARK_PUSHING
776
		    pushed_to_all = rtsTrue;
Ian Lynagh's avatar
Ian Lynagh committed
777
#endif
778 779
		    i = 0;
		    // keep one for us
780
		    setTSOLink(cap, prev, t);
781
                    setTSOPrev(cap, t, prev);
782 783 784
		    prev = t;
		} else {
		    appendToRunQueue(free_caps[i],t);
Simon Marlow's avatar
Simon Marlow committed
785

786
                    traceEventMigrateThread (cap, t, free_caps[i]->no);
Simon Marlow's avatar
Simon Marlow committed
787

788
		    if (t->bound) { t->bound->task->cap = free_caps[i]; }
789
		    t->cap = free_caps[i];
790 791 792 793
		    i++;
		}
	    }
	    cap->run_queue_tl = prev;
794 795

            IF_DEBUG(sanity, checkRunQueue(cap));
796 797
	}

798 799 800
#ifdef SPARK_PUSHING
	/* JB I left this code in place, it would work but is not necessary */

801 802 803 804 805 806 807
	// If there are some free capabilities that we didn't push any
	// threads to, then try to push a spark to each one.
	if (!pushed_to_all) {
	    StgClosure *spark;
	    // i is the next free capability to push to
	    for (; i < n_free_caps; i++) {
		if (emptySparkPoolCap(free_caps[i])) {
808
		    spark = tryStealSpark(cap->sparks);
809
		    if (spark != NULL) {
810 811 812 813
                        /* TODO: if anyone wants to re-enable this code then
                         * they must consider the fizzledSpark(spark) case
                         * and update the per-cap spark statistics.
                         */
Simon Marlow's avatar
Simon Marlow committed
814
			debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
815

816
            traceEventStealSpark(free_caps[i], t, cap->no);
817

818 819 820
			newSpark(&(free_caps[i]->r), spark);
		    }
		}
821 822
	    }
	}
823
#endif /* SPARK_PUSHING */
824 825 826 827

	// release the capabilities
	for (i = 0; i < n_free_caps; i++) {
	    task->cap = free_caps[i];
828
	    releaseAndWakeupCapability(free_caps[i]);
829 830 831
	}
    }
    task->cap = cap; // reset to point to our Capability.
832 833 834

#endif /* THREADED_RTS */

835 836
}

837 838 839 840
/* ----------------------------------------------------------------------------
 * Start any pending signal handlers
 * ------------------------------------------------------------------------- */

841
#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
842
static void
843
scheduleStartSignalHandlers(Capability *cap)
844
{
845 846
    if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
        // safe outside the lock
847
	startSignalHandlers(cap);
848 849
    }
}
850 851 852 853 854 855
#else
static void
scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
{
}
#endif
856 857 858 859 860 861

/* ----------------------------------------------------------------------------
 * Check for blocked threads that can be woken up.
 * ------------------------------------------------------------------------- */

static void
862
scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
863
{
864
#if !defined(THREADED_RTS)
865 866 867 868 869
    //
    // Check whether any waiting threads need to be woken up.  If the
    // run queue is empty, and there are no other tasks running, we
    // can wait indefinitely for something to happen.
    //
870
    if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
871
    {
872
	awaitEvent (emptyRunQueue(cap));
873 874 875 876
    }
#endif
}

877 878 879 880 881
/* ----------------------------------------------------------------------------
 * Detect deadlock conditions and attempt to resolve them.
 * ------------------------------------------------------------------------- */

static void
882
scheduleDetectDeadlock (Capability **pcap, Task *task)
883
{
884 885
    Capability *cap = *pcap;
    /*
886
     * Detect deadlock: when we have no threads to run, there are no
887 888 889
     * threads blocked, waiting for I/O, or sleeping, and all the
     * other tasks are waiting for work, we must have a deadlock of
     * some description.
890
     */
891
    if ( emptyThreadQueues(cap) )
892
    {
893
#if defined(THREADED_RTS)
894 895 896 897 898 899 900 901 902
	/* 
	 * In the threaded RTS, we only check for deadlock if there
	 * has been no activity in a complete timeslice.  This means
	 * we won't eagerly start a full GC just because we don't have
	 * any threads to run currently.
	 */
	if (recent_activity != ACTIVITY_INACTIVE) return;
#endif

Simon Marlow's avatar
Simon Marlow committed
903
	debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
904 905 906 907 908 909

	// Garbage collection can release some new threads due to
	// either (a) finalizers or (b) threads resurrected because
	// they are unreachable and will therefore be sent an
	// exception.  Any threads thus released will be immediately
	// runnable.
910 911
        scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
        cap = *pcap;
912 913 914
        // when force_major == rtsTrue. scheduleDoGC sets
        // recent_activity to ACTIVITY_DONE_GC and turns off the timer
        // signal.
915

916
	if ( !emptyRunQueue(cap) ) return;
917

918
#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
919 920 921 922
	/* If we have user-installed signal handlers, then wait
	 * for signals to arrive rather then bombing out with a
	 * deadlock.
	 */
923
	if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
Simon Marlow's avatar
Simon Marlow committed
924 925
	    debugTrace(DEBUG_sched,
		       "still deadlocked, waiting for signals...");
926 927 928 929

	    awaitUserSignals();

	    if (signals_pending()) {
930
		startSignalHandlers(cap);
931 932 933
	    }

	    // either we have threads to run, or we were interrupted:
934
	    ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
935 936

            return;