Schedule.c 87.8 KB
Newer Older
1
/* ---------------------------------------------------------------------------
2
 *
3
 * (c) The GHC Team, 1998-2006
4
 *
5
 * The scheduler and thread-related functionality
sof's avatar
sof committed
6
 *
7 8
 * --------------------------------------------------------------------------*/

9
#include "PosixSource.h"
10
#define KEEP_LOCKCLOSURE
11
#include "Rts.h"
Simon Marlow's avatar
Simon Marlow committed
12 13

#include "sm/Storage.h"
14 15 16
#include "RtsUtils.h"
#include "StgRun.h"
#include "Schedule.h"
17
#include "Interpreter.h"
18
#include "Printer.h"
19
#include "RtsSignals.h"
Simon Marlow's avatar
Simon Marlow committed
20
#include "sm/Sanity.h"
21
#include "Stats.h"
22
#include "STM.h"
23
#include "Prelude.h"
24
#include "ThreadLabels.h"
25
#include "Updates.h"
26 27
#include "Proftimer.h"
#include "ProfHeap.h"
28
#include "Weak.h"
Simon Marlow's avatar
Simon Marlow committed
29
#include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30
#include "sm/GCThread.h"
31
#include "Sparks.h"
sof's avatar
sof committed
32
#include "Capability.h"
33 34
#include "Task.h"
#include "AwaitEvent.h"
35 36 37
#if defined(mingw32_HOST_OS)
#include "win32/IOManager.h"
#endif
Simon Marlow's avatar
Simon Marlow committed
38
#include "Trace.h"
39 40
#include "RaiseAsync.h"
#include "Threads.h"
Simon Marlow's avatar
Simon Marlow committed
41 42
#include "Timer.h"
#include "ThreadPaused.h"
43
#include "Messages.h"
44
#include "Stable.h"
45

46 47 48 49 50 51 52
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif

53 54
#include <string.h>
#include <stdlib.h>
55
#include <stdarg.h>
56

57 58 59 60
#ifdef HAVE_ERRNO_H
#include <errno.h>
#endif

61 62 63
#ifdef TRACING
#include "eventlog/EventLog.h"
#endif
64 65 66
/* -----------------------------------------------------------------------------
 * Global variables
 * -------------------------------------------------------------------------- */
67

68 69
#if !defined(THREADED_RTS)
// Blocked/sleeping thrads
70 71
StgTSO *blocked_queue_hd = NULL;
StgTSO *blocked_queue_tl = NULL;
72 73
StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
#endif
74

75 76 77 78 79 80
/* Set to true when the latest garbage collection failed to reclaim
 * enough space, and the runtime should proceed to shut itself down in
 * an orderly fashion (emitting profiling info etc.)
 */
rtsBool heap_overflow = rtsFalse;

81 82 83
/* flag that tracks whether we have done any execution in this time slice.
 * LOCK: currently none, perhaps we should lock (but needs to be
 * updated in the fast path of the scheduler).
84 85
 *
 * NB. must be StgWord, we do xchg() on it.
86
 */
87
volatile StgWord recent_activity = ACTIVITY_YES;
88

89
/* if this flag is set as well, give up execution
90
 * LOCK: none (changes monotonically)
91
 */
92
volatile StgWord sched_state = SCHED_RUNNING;
93

94 95 96 97 98 99
/*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
 *  exists - earlier gccs apparently didn't.
 *  -= chak
 */
StgTSO dummy_tso;

sof's avatar
sof committed
100 101 102 103 104
/*
 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
 * in an MT setting, needed to signal that a worker thread shouldn't hang around
 * in the scheduler when it is out of work.
 */
105
rtsBool shutting_down_scheduler = rtsFalse;
106

107 108
/*
 * This mutex protects most of the global scheduler data in
109
 * the THREADED_RTS runtime.
sof's avatar
sof committed
110
 */
111
#if defined(THREADED_RTS)
112
Mutex sched_mutex;
113
#endif
sof's avatar
sof committed
114

115 116 117 118
#if !defined(mingw32_HOST_OS)
#define FORKPROCESS_PRIMOP_SUPPORTED
#endif

119 120 121 122 123
// Local stats
#ifdef THREADED_RTS
static nat n_failed_trygrab_idles = 0, n_idle_caps = 0;
#endif

124 125 126 127
/* -----------------------------------------------------------------------------
 * static function prototypes
 * -------------------------------------------------------------------------- */

128
static Capability *schedule (Capability *initialCapability, Task *task);
129 130

//
Gabor Greif's avatar
typo  
Gabor Greif committed
131
// These functions all encapsulate parts of the scheduler loop, and are
132 133 134
// abstracted only to make the structure and control flow of the
// scheduler clearer.
//
135
static void schedulePreLoop (void);
136
static void scheduleFindWork (Capability **pcap);
137
#if defined(THREADED_RTS)
138
static void scheduleYield (Capability **pcap, Task *task);
139
#endif
140 141 142
#if defined(THREADED_RTS)
static nat requestSync (Capability **pcap, Task *task, nat sync_type);
static void acquireAllCapabilities(Capability *cap, Task *task);
143 144
static void releaseAllCapabilities(Capability *cap, Task *task);
static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
145
#endif
146
static void scheduleStartSignalHandlers (Capability *cap);
147
static void scheduleCheckBlockedThreads (Capability *cap);
148 149
static void scheduleProcessInbox(Capability **cap);
static void scheduleDetectDeadlock (Capability **pcap, Task *task);
150
static void schedulePushWork(Capability *cap, Task *task);
Simon Marlow's avatar
Simon Marlow committed
151
#if defined(THREADED_RTS)
152
static void scheduleActivateSpark(Capability *cap);
153
#endif
154
static void schedulePostRunThread(Capability *cap, StgTSO *t);
155
static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
156
static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
157
				    nat prev_what_next );
158
static void scheduleHandleThreadBlocked( StgTSO *t );
159 160
static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
					     StgTSO *t );
161
static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
162
static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
163 164

static void deleteThread (Capability *cap, StgTSO *tso);
165
static void deleteAllThreads (Capability *cap);
166

167 168
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
static void deleteThread_(Capability *cap, StgTSO *tso);
169
#endif
170

171
/* ---------------------------------------------------------------------------
172 173 174 175 176 177 178 179 180 181 182
   Main scheduling loop.

   We use round-robin scheduling, each thread returning to the
   scheduler loop when one of these conditions is detected:

      * out of heap space
      * timer expires (thread yields)
      * thread blocks
      * thread ends
      * stack overflow

183 184 185 186 187
   GRAN version:
     In a GranSim setup this loop iterates over the global event queue.
     This revolves around the global event queue, which determines what 
     to do next. Therefore, it's more complicated than either the 
     concurrent or the parallel (GUM) setup.
188
  This version has been entirely removed (JB 2008/08).
189 190 191 192 193 194 195 196 197 198

   GUM version:
     GUM iterates over incoming messages.
     It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
     and sends out a fish whenever it has nothing to do; in-between
     doing the actual reductions (shared code below) it processes the
     incoming messages and deals with delayed operations 
     (see PendingFetches).
     This is not the ugliest code you could imagine, but it's bloody close.

199 200 201 202 203 204
  (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
  now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
  as well as future GUM versions. This file has been refurbished to
  only contain valid code, which is however incomplete, refers to
  invalid includes etc.

205
   ------------------------------------------------------------------------ */
206

207 208
static Capability *
schedule (Capability *initialCapability, Task *task)
209 210
{
  StgTSO *t;
211
  Capability *cap;
212
  StgThreadReturnCode ret;
213
  nat prev_what_next;
214
  rtsBool ready_to_gc;
215
#if defined(THREADED_RTS)
216
  rtsBool first = rtsTrue;
Simon Marlow's avatar
Simon Marlow committed
217
#endif
218
  
219 220
  cap = initialCapability;

221 222 223
  // Pre-condition: this task owns initialCapability.
  // The sched_mutex is *NOT* held
  // NB. on return, we still hold a capability.
224

225
  debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
226

227
  schedulePreLoop();
228

229 230
  // -----------------------------------------------------------
  // Scheduler loop starts here:
231

Simon Marlow's avatar
Simon Marlow committed
232
  while (1) {
233

234 235 236
    // Check whether we have re-entered the RTS from Haskell without
    // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
    // call).
237
    if (cap->in_haskell) {
238 239
    	  errorBelch("schedule: re-entered unsafely.\n"
    		     "   Perhaps a 'foreign import unsafe' should be 'safe'?");
240
    	  stg_exit(EXIT_FAILURE);
241 242
    }

243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
    // The interruption / shutdown sequence.
    // 
    // In order to cleanly shut down the runtime, we want to:
    //   * make sure that all main threads return to their callers
    //     with the state 'Interrupted'.
    //   * clean up all OS threads assocated with the runtime
    //   * free all memory etc.
    //
    // So the sequence for ^C goes like this:
    //
    //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
    //     arranges for some Capability to wake up
    //
    //   * all threads in the system are halted, and the zombies are
    //     placed on the run queue for cleaning up.  We acquire all
    //     the capabilities in order to delete the threads, this is
    //     done by scheduleDoGC() for convenience (because GC already
    //     needs to acquire all the capabilities).  We can't kill
    //     threads involved in foreign calls.
    // 
    //   * somebody calls shutdownHaskell(), which calls exitScheduler()
    //
    //   * sched_state := SCHED_SHUTTING_DOWN
266
    //
267 268 269
    //   * all workers exit when the run queue on their capability
    //     drains.  All main threads will also exit when their TSO
    //     reaches the head of the run queue and they can return.
270
    //
271 272 273 274 275 276 277 278 279 280
    //   * eventually all Capabilities will shut down, and the RTS can
    //     exit.
    //
    //   * We might be left with threads blocked in foreign calls, 
    //     we should really attempt to kill these somehow (TODO);
    
    switch (sched_state) {
    case SCHED_RUNNING:
	break;
    case SCHED_INTERRUPTING:
Simon Marlow's avatar
Simon Marlow committed
281
	debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
282
        /* scheduleDoGC() deletes all the threads */
283
        scheduleDoGC(&cap,task,rtsFalse);
284 285 286 287 288 289 290 291

        // after scheduleDoGC(), we must be shutting down.  Either some
        // other Capability did the final GC, or we did it above,
        // either way we can fall through to the SCHED_SHUTTING_DOWN
        // case now.
        ASSERT(sched_state == SCHED_SHUTTING_DOWN);
        // fall through

292
    case SCHED_SHUTTING_DOWN:
Simon Marlow's avatar
Simon Marlow committed
293
	debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
294 295 296
	// If we are a worker, just exit.  If we're a bound thread
	// then we will exit below when we've removed our TSO from
	// the run queue.
297
	if (!isBoundTask(task) && emptyRunQueue(cap)) {
298
	    return cap;
299
	}
300 301 302
	break;
    default:
	barf("sched_state: %d", sched_state);
303
    }
304

305
    scheduleFindWork(&cap);
306

307 308 309
    /* work pushing, currently relevant only for THREADED_RTS:
       (pushes threads, wakes up idle capabilities for stealing) */
    schedulePushWork(cap,task);
310

311
    scheduleDetectDeadlock(&cap,task);
312 313 314 315 316 317

    // Normally, the only way we can get here with no threads to
    // run is if a keyboard interrupt received during 
    // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
    // Additionally, it is not fatal for the
    // threaded RTS to reach here with no threads to run.
318
    //
319 320
    // win32: might be here due to awaitEvent() being abandoned
    // as a result of a console event having been delivered.
321 322 323 324 325 326 327 328 329 330 331 332
    
#if defined(THREADED_RTS)
    if (first) 
    {
    // XXX: ToDo
    //     // don't yield the first time, we want a chance to run this
    //     // thread for a bit, even if there are others banging at the
    //     // door.
    //     first = rtsFalse;
    //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
    }

333
    scheduleYield(&cap,task);
334

335 336 337
    if (emptyRunQueue(cap)) continue; // look for work again
#endif

338
#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
339
    if ( emptyRunQueue(cap) ) {
340
	ASSERT(sched_state >= SCHED_INTERRUPTING);
341
    }
342
#endif
343

344 345 346
    // 
    // Get a thread to run
    //
347
    t = popRunQueue(cap);
348

349 350 351
    // Sanity check the thread we're about to run.  This can be
    // expensive if there is lots of thread switching going on...
    IF_DEBUG(sanity,checkTSO(t));
352

353
#if defined(THREADED_RTS)
354 355 356
    // Check whether we can run this thread in the current task.
    // If not, we have to pass our capability to the right task.
    {
357
        InCall *bound = t->bound;
358
      
359
	if (bound) {
360
	    if (bound->task == task) {
361 362
		// yes, the Haskell thread is bound to the current native thread
	    } else {
Simon Marlow's avatar
Simon Marlow committed
363
		debugTrace(DEBUG_sched,
364 365
			   "thread %lu bound to another OS thread",
                           (unsigned long)t->id);
366 367 368 369 370 371
		// no, bound to a different Haskell thread: pass to that thread
		pushOnRunQueue(cap,t);
		continue;
	    }
	} else {
	    // The thread we want to run is unbound.
372
	    if (task->incall->tso) { 
Simon Marlow's avatar
Simon Marlow committed
373
		debugTrace(DEBUG_sched,
374 375
			   "this OS thread cannot run thread %lu",
                           (unsigned long)t->id);
376 377 378 379 380
		// no, the current native thread is bound to a different
		// Haskell thread, so pass it to any worker thread
		pushOnRunQueue(cap,t);
		continue; 
	    }
381
	}
382 383 384
    }
#endif

Simon Marlow's avatar
Simon Marlow committed
385 386 387 388 389 390
    // If we're shutting down, and this thread has not yet been
    // killed, kill it now.  This sometimes happens when a finalizer
    // thread is created by the final GC, or a thread previously
    // in a foreign call returns.
    if (sched_state >= SCHED_INTERRUPTING &&
        !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
391
        deleteThread(cap,t);
Simon Marlow's avatar
Simon Marlow committed
392 393
    }

394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413
    // If this capability is disabled, migrate the thread away rather
    // than running it.  NB. but not if the thread is bound: it is
    // really hard for a bound thread to migrate itself.  Believe me,
    // I tried several ways and couldn't find a way to do it.
    // Instead, when everything is stopped for GC, we migrate all the
    // threads on the run queue then (see scheduleDoGC()).
    //
    // ToDo: what about TSO_LOCKED?  Currently we're migrating those
    // when the number of capabilities drops, but we never migrate
    // them back if it rises again.  Presumably we should, but after
    // the thread has been migrated we no longer know what capability
    // it was originally on.
#ifdef THREADED_RTS
    if (cap->disabled && !t->bound) {
        Capability *dest_cap = &capabilities[cap->no % enabled_capabilities];
        migrateThread(cap, t, dest_cap);
        continue;
    }
#endif

414
    /* context switches are initiated by the timer signal, unless
415 416
     * the user specified "context switch as often as possible", with
     * +RTS -C0
sof's avatar
sof committed
417
     */
418 419
    if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
	&& !emptyThreadQueues(cap)) {
420
	cap->context_switch = 1;
421 422
    }
	 
423
run_thread:
424

Simon Marlow's avatar
Simon Marlow committed
425 426 427 428 429
    // CurrentTSO is the thread to run.  t might be different if we
    // loop back to run_thread, so make sure to set CurrentTSO after
    // that.
    cap->r.rCurrentTSO = t;

430 431
    startHeapProfTimer();

432 433 434
    // ----------------------------------------------------------------------
    // Run the current thread 

Simon Marlow's avatar
Simon Marlow committed
435
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
436
    ASSERT(t->cap == cap);
437
    ASSERT(t->bound ? t->bound->task->cap == cap : 1);
Simon Marlow's avatar
Simon Marlow committed
438

439 440 441
    prev_what_next = t->what_next;

    errno = t->saved_errno;
442 443 444 445
#if mingw32_HOST_OS
    SetLastError(t->saved_winerror);
#endif

446 447 448
    // reset the interrupt flag before running Haskell code
    cap->interrupt = 0;

449
    cap->in_haskell = rtsTrue;
450
    cap->idle = 0;
451

452
    dirty_TSO(cap,t);
453
    dirty_STACK(cap,t->stackobj);
Simon Marlow's avatar
Simon Marlow committed
454

455 456 457 458 459
#if defined(THREADED_RTS)
    if (recent_activity == ACTIVITY_DONE_GC) {
        // ACTIVITY_DONE_GC means we turned off the timer signal to
        // conserve power (see #1623).  Re-enable it here.
        nat prev;
460
        prev = xchg((P_)&recent_activity, ACTIVITY_YES);
461 462 463
        if (prev == ACTIVITY_DONE_GC) {
            startTimer();
        }
464 465 466 467 468
    } else if (recent_activity != ACTIVITY_INACTIVE) {
        // If we reached ACTIVITY_INACTIVE, then don't reset it until
        // we've done the GC.  The thread running here might just be
        // the IO manager thread that handle_tick() woke up via
        // wakeUpRts().
469 470 471
        recent_activity = ACTIVITY_YES;
    }
#endif
472

473
    traceEventRunThread(cap, t);
Simon Marlow's avatar
Simon Marlow committed
474

475
    switch (prev_what_next) {
476
	
477 478 479 480 481
    case ThreadKilled:
    case ThreadComplete:
	/* Thread already finished, return to scheduler. */
	ret = ThreadFinished;
	break;
482
	
483
    case ThreadRunGHC:
484 485 486 487 488
    {
	StgRegTable *r;
	r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
	cap = regTableToCapability(r);
	ret = r->rRet;
489
	break;
490 491
    }
    
492
    case ThreadInterpret:
493 494
	cap = interpretBCO(cap);
	ret = cap->r.rRet;
495
	break;
496
	
497
    default:
498
	barf("schedule: invalid what_next field");
499 500
    }

501
    cap->in_haskell = rtsFalse;
502

503 504 505 506
    // The TSO might have moved, eg. if it re-entered the RTS and a GC
    // happened.  So find the new location:
    t = cap->r.rCurrentTSO;

507 508 509 510
    // And save the current errno in this thread.
    // XXX: possibly bogus for SMP because this thread might already
    // be running again, see code below.
    t->saved_errno = errno;
511 512
#if mingw32_HOST_OS
    // Similarly for Windows error code
513
    t->saved_winerror = GetLastError();
514
#endif
515

516 517 518 519 520 521 522 523 524 525 526
    if (ret == ThreadBlocked) {
        if (t->why_blocked == BlockedOnBlackHole) {
            StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
            traceEventStopThread(cap, t, t->why_blocked + 6,
                                 owner != NULL ? owner->id : 0);
        } else {
            traceEventStopThread(cap, t, t->why_blocked + 6, 0);
        }
    } else {
        traceEventStopThread(cap, t, ret, 0);
    }
Simon Marlow's avatar
Simon Marlow committed
527

528
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
529
    ASSERT(t->cap == cap);
530

531
    // ----------------------------------------------------------------------
532
    
533
    // Costs for the scheduler are assigned to CCS_SYSTEM
534
    stopHeapProfTimer();
535
#if defined(PROFILING)
536
    cap->r.rCCCS = CCS_SYSTEM;
537 538
#endif
    
539
    schedulePostRunThread(cap,t);
540

541 542
    ready_to_gc = rtsFalse;

543 544 545 546 547 548
    switch (ret) {
    case HeapOverflow:
	ready_to_gc = scheduleHandleHeapOverflow(cap,t);
	break;

    case StackOverflow:
549 550 551 552 553
        // just adjust the stack for this thread, then pop it back
        // on the run queue.
        threadStackOverflow(cap, t);
        pushOnRunQueue(cap,t);
        break;
554 555

    case ThreadYielding:
556
        if (scheduleHandleYield(cap, t, prev_what_next)) {
557 558 559 560 561 562 563 564 565 566
            // shortcut for switching between compiler/interpreter:
	    goto run_thread; 
	}
	break;

    case ThreadBlocked:
	scheduleHandleThreadBlocked(t);
	break;

    case ThreadFinished:
567
	if (scheduleHandleThreadFinished(cap, task, t)) return cap;
568
	ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
569 570 571 572 573 574
	break;

    default:
      barf("schedule: invalid thread return code %d", (int)ret);
    }

575
    if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
576
      scheduleDoGC(&cap,task,rtsFalse);
577
    }
578 579 580
  } /* end of while() */
}

581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604
/* -----------------------------------------------------------------------------
 * Run queue operations
 * -------------------------------------------------------------------------- */

void
removeFromRunQueue (Capability *cap, StgTSO *tso)
{
    if (tso->block_info.prev == END_TSO_QUEUE) {
        ASSERT(cap->run_queue_hd == tso);
        cap->run_queue_hd = tso->_link;
    } else {
        setTSOLink(cap, tso->block_info.prev, tso->_link);
    }
    if (tso->_link == END_TSO_QUEUE) {
        ASSERT(cap->run_queue_tl == tso);
        cap->run_queue_tl = tso->block_info.prev;
    } else {
        setTSOPrev(cap, tso->_link, tso->block_info.prev);
    }
    tso->_link = tso->block_info.prev = END_TSO_QUEUE;

    IF_DEBUG(sanity, checkRunQueue(cap));
}

605 606 607 608 609 610 611
/* ----------------------------------------------------------------------------
 * Setting up the scheduler loop
 * ------------------------------------------------------------------------- */

static void
schedulePreLoop(void)
{
612
  // initialisation for scheduler - what cannot go into initScheduler()  
613

Ian Lynagh's avatar
Ian Lynagh committed
614
#if defined(mingw32_HOST_OS) && !defined(GhcUnregisterised)
615 616
    win32AllocStack();
#endif
617 618
}

619 620 621 622 623 624 625
/* -----------------------------------------------------------------------------
 * scheduleFindWork()
 *
 * Search for work to do, and handle messages from elsewhere.
 * -------------------------------------------------------------------------- */

static void
626
scheduleFindWork (Capability **pcap)
627
{
628
    scheduleStartSignalHandlers(*pcap);
629

630
    scheduleProcessInbox(pcap);
631

632
    scheduleCheckBlockedThreads(*pcap);
633

Simon Marlow's avatar
Simon Marlow committed
634
#if defined(THREADED_RTS)
635
    if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
636 637 638 639 640 641 642 643 644 645 646 647 648
#endif
}

#if defined(THREADED_RTS)
STATIC_INLINE rtsBool
shouldYieldCapability (Capability *cap, Task *task)
{
    // we need to yield this capability to someone else if..
    //   - another thread is initiating a GC
    //   - another Task is returning from a foreign call
    //   - the thread at the head of the run queue cannot be run
    //     by this Task (it is bound to another Task, or it is unbound
    //     and this task it bound).
649
    return (pending_sync ||
650
            cap->returning_tasks_hd != NULL ||
651
            (!emptyRunQueue(cap) && (task->incall->tso == NULL
652
                                     ? cap->run_queue_hd->bound != NULL
653
                                     : cap->run_queue_hd->bound != task->incall)));
654 655 656 657 658 659 660 661
}

// This is the single place where a Task goes to sleep.  There are
// two reasons it might need to sleep:
//    - there are no threads to run
//    - we need to yield this Capability to someone else 
//      (see shouldYieldCapability())
//
662 663 664
// Careful: the scheduler loop is quite delicate.  Make sure you run
// the tests in testsuite/concurrent (all ways) after modifying this,
// and also check the benchmarks in nofib/parallel for regressions.
665 666

static void
667
scheduleYield (Capability **pcap, Task *task)
668 669 670 671
{
    Capability *cap = *pcap;

    // if we have work, and we don't need to give up the Capability, continue.
672
    //
673
    if (!shouldYieldCapability(cap,task) && 
674
        (!emptyRunQueue(cap) ||
675
         !emptyInbox(cap) ||
676
         sched_state >= SCHED_INTERRUPTING))
677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692
        return;

    // otherwise yield (sleep), and keep yielding if necessary.
    do {
        yieldCapability(&cap,task);
    } 
    while (shouldYieldCapability(cap,task));

    // note there may still be no threads on the run queue at this
    // point, the caller has to check.

    *pcap = cap;
    return;
}
#endif
    
693 694 695 696 697 698 699
/* -----------------------------------------------------------------------------
 * schedulePushWork()
 *
 * Push work to other Capabilities if we have some.
 * -------------------------------------------------------------------------- */

static void
700 701
schedulePushWork(Capability *cap USED_IF_THREADS, 
		 Task *task      USED_IF_THREADS)
702
{
703 704 705 706
  /* following code not for PARALLEL_HASKELL. I kept the call general,
     future GUM versions might use pushing in a distributed setup */
#if defined(THREADED_RTS)

707 708 709
    Capability *free_caps[n_capabilities], *cap0;
    nat i, n_free_caps;

Simon Marlow's avatar
Simon Marlow committed
710
    // migration can be turned off with +RTS -qm
711 712
    if (!RtsFlags.ParFlags.migrate) return;

713 714
    // Check whether we have more threads on our run queue, or sparks
    // in our pool, that we could hand to another Capability.
715 716 717 718 719
    if (cap->run_queue_hd == END_TSO_QUEUE) {
        if (sparkPoolSizeCap(cap) < 2) return;
    } else {
        if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
            sparkPoolSizeCap(cap) < 1) return;
720 721 722 723 724
    }

    // First grab as many free Capabilities as we can.
    for (i=0, n_free_caps=0; i < n_capabilities; i++) {
	cap0 = &capabilities[i];
725
        if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
726
	    if (!emptyRunQueue(cap0)
727 728
                || cap0->returning_tasks_hd != NULL
                || cap0->inbox != (Message*)END_TSO_QUEUE) {
729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750
		// it already has some work, we just grabbed it at 
		// the wrong moment.  Or maybe it's deadlocked!
		releaseCapability(cap0);
	    } else {
		free_caps[n_free_caps++] = cap0;
	    }
	}
    }

    // we now have n_free_caps free capabilities stashed in
    // free_caps[].  Share our run queue equally with them.  This is
    // probably the simplest thing we could do; improvements we might
    // want to do include:
    //
    //   - giving high priority to moving relatively new threads, on 
    //     the gournds that they haven't had time to build up a
    //     working set in the cache on this CPU/Capability.
    //
    //   - giving low priority to moving long-lived threads

    if (n_free_caps > 0) {
	StgTSO *prev, *t, *next;
Ian Lynagh's avatar
Ian Lynagh committed
751
#ifdef SPARK_PUSHING
752
	rtsBool pushed_to_all;
Ian Lynagh's avatar
Ian Lynagh committed
753
#endif
754

755 756 757 758 759 760
	debugTrace(DEBUG_sched, 
		   "cap %d: %s and %d free capabilities, sharing...", 
		   cap->no, 
		   (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
		   "excess threads on run queue":"sparks to share (>=2)",
		   n_free_caps);
761 762

	i = 0;
Ian Lynagh's avatar
Ian Lynagh committed
763
#ifdef SPARK_PUSHING
764
	pushed_to_all = rtsFalse;
Ian Lynagh's avatar
Ian Lynagh committed
765
#endif
766 767 768

	if (cap->run_queue_hd != END_TSO_QUEUE) {
	    prev = cap->run_queue_hd;
769 770
	    t = prev->_link;
	    prev->_link = END_TSO_QUEUE;
771
	    for (; t != END_TSO_QUEUE; t = next) {
772 773
		next = t->_link;
		t->_link = END_TSO_QUEUE;
774
                if (t->bound == task->incall // don't move my bound thread
775
		    || tsoLocked(t)) {  // don't move a locked thread
776
		    setTSOLink(cap, prev, t);
777
                    setTSOPrev(cap, t, prev);
778 779
		    prev = t;
		} else if (i == n_free_caps) {
Ian Lynagh's avatar
Ian Lynagh committed
780
#ifdef SPARK_PUSHING
781
		    pushed_to_all = rtsTrue;
Ian Lynagh's avatar
Ian Lynagh committed
782
#endif
783 784
		    i = 0;
		    // keep one for us
785
		    setTSOLink(cap, prev, t);
786
                    setTSOPrev(cap, t, prev);
787 788 789
		    prev = t;
		} else {
		    appendToRunQueue(free_caps[i],t);
Simon Marlow's avatar
Simon Marlow committed
790

791
                    traceEventMigrateThread (cap, t, free_caps[i]->no);
Simon Marlow's avatar
Simon Marlow committed
792

793
		    if (t->bound) { t->bound->task->cap = free_caps[i]; }
794
		    t->cap = free_caps[i];
795 796 797 798
		    i++;
		}
	    }
	    cap->run_queue_tl = prev;
799 800

            IF_DEBUG(sanity, checkRunQueue(cap));
801 802
	}

803 804 805
#ifdef SPARK_PUSHING
	/* JB I left this code in place, it would work but is not necessary */

806 807 808 809 810 811 812
	// If there are some free capabilities that we didn't push any
	// threads to, then try to push a spark to each one.
	if (!pushed_to_all) {
	    StgClosure *spark;
	    // i is the next free capability to push to
	    for (; i < n_free_caps; i++) {
		if (emptySparkPoolCap(free_caps[i])) {
813
		    spark = tryStealSpark(cap->sparks);
814
		    if (spark != NULL) {
815 816 817 818
                        /* TODO: if anyone wants to re-enable this code then
                         * they must consider the fizzledSpark(spark) case
                         * and update the per-cap spark statistics.
                         */
Simon Marlow's avatar
Simon Marlow committed
819
			debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
820

821
            traceEventStealSpark(free_caps[i], t, cap->no);
822

823 824 825
			newSpark(&(free_caps[i]->r), spark);
		    }
		}
826 827
	    }
	}
828
#endif /* SPARK_PUSHING */
829 830 831 832

	// release the capabilities
	for (i = 0; i < n_free_caps; i++) {
	    task->cap = free_caps[i];
833
	    releaseAndWakeupCapability(free_caps[i]);
834 835 836
	}
    }
    task->cap = cap; // reset to point to our Capability.
837 838 839

#endif /* THREADED_RTS */

840 841
}

842 843 844 845
/* ----------------------------------------------------------------------------
 * Start any pending signal handlers
 * ------------------------------------------------------------------------- */

846
#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
847
static void
848
scheduleStartSignalHandlers(Capability *cap)
849
{
850 851
    if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
        // safe outside the lock
852
	startSignalHandlers(cap);
853 854
    }
}
855 856 857 858 859 860
#else
static void
scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
{
}
#endif
861 862 863 864 865 866

/* ----------------------------------------------------------------------------
 * Check for blocked threads that can be woken up.
 * ------------------------------------------------------------------------- */

static void
867
scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
868
{
869
#if !defined(THREADED_RTS)
870 871 872 873 874
    //
    // Check whether any waiting threads need to be woken up.  If the
    // run queue is empty, and there are no other tasks running, we
    // can wait indefinitely for something to happen.
    //
875
    if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
876
    {
877
	awaitEvent (emptyRunQueue(cap));
878 879 880 881
    }
#endif
}

882 883 884 885 886
/* ----------------------------------------------------------------------------
 * Detect deadlock conditions and attempt to resolve them.
 * ------------------------------------------------------------------------- */

static void
887
scheduleDetectDeadlock (Capability **pcap, Task *task)
888
{
889 890
    Capability *cap = *pcap;
    /*
891
     * Detect deadlock: when we have no threads to run, there are no
892 893 894
     * threads blocked, waiting for I/O, or sleeping, and all the
     * other tasks are waiting for work, we must have a deadlock of
     * some description.
895
     */
896
    if ( emptyThreadQueues(cap) )
897
    {
898
#if defined(THREADED_RTS)
899 900 901 902 903 904 905 906 907
	/* 
	 * In the threaded RTS, we only check for deadlock if there
	 * has been no activity in a complete timeslice.  This means
	 * we won't eagerly start a full GC just because we don't have
	 * any threads to run currently.
	 */
	if (recent_activity != ACTIVITY_INACTIVE) return;
#endif

Simon Marlow's avatar
Simon Marlow committed
908
	debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
909 910 911 912 913 914

	// Garbage collection can release some new threads due to
	// either (a) finalizers or (b) threads resurrected because
	// they are unreachable and will therefore be sent an
	// exception.  Any threads thus released will be immediately
	// runnable.
915 916
        scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
        cap = *pcap;
917 918 919
        // when force_major == rtsTrue. scheduleDoGC sets
        // recent_activity to ACTIVITY_DONE_GC and turns off the timer
        // signal.
920

921
	if ( !emptyRunQueue(cap) ) return;
922

923
#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
924 925 926 927
	/* If we have user-installed signal handlers, then wait
	 * for signals to arrive rather then bombing out with a
	 * deadlock.
	 */
928
	if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
Simon Marlow's avatar
Simon Marlow committed
929 930
	    debugTrace(DEBUG_sched,
		       "still deadlocked, waiting for signals...");
931 932 933 934

	    awaitUserSignals();

	    if (signals_pending()) {
935
		startSignalHandlers(cap);
936 937