Schedule.c 73.3 KB
Newer Older
1
/* ---------------------------------------------------------------------------
2
 *
3
 * (c) The GHC Team, 1998-2006
4
 *
5
 * The scheduler and thread-related functionality
sof's avatar
sof committed
6
 *
7 8
 * --------------------------------------------------------------------------*/

9
#include "PosixSource.h"
10
#define KEEP_LOCKCLOSURE
11
#include "Rts.h"
Simon Marlow's avatar
Simon Marlow committed
12 13

#include "sm/Storage.h"
14 15 16
#include "RtsUtils.h"
#include "StgRun.h"
#include "Schedule.h"
17
#include "Interpreter.h"
18
#include "Printer.h"
19
#include "RtsSignals.h"
Simon Marlow's avatar
Simon Marlow committed
20
#include "sm/Sanity.h"
21
#include "Stats.h"
22
#include "STM.h"
23
#include "Prelude.h"
24
#include "ThreadLabels.h"
25
#include "Updates.h"
26 27
#include "Proftimer.h"
#include "ProfHeap.h"
28
#include "Weak.h"
Simon Marlow's avatar
Simon Marlow committed
29
#include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30
#include "Sparks.h"
sof's avatar
sof committed
31
#include "Capability.h"
32 33
#include "Task.h"
#include "AwaitEvent.h"
34 35 36
#if defined(mingw32_HOST_OS)
#include "win32/IOManager.h"
#endif
Simon Marlow's avatar
Simon Marlow committed
37
#include "Trace.h"
38 39
#include "RaiseAsync.h"
#include "Threads.h"
Simon Marlow's avatar
Simon Marlow committed
40 41
#include "Timer.h"
#include "ThreadPaused.h"
42
#include "Messages.h"
43

44 45 46 47 48 49 50
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif

51 52
#include <string.h>
#include <stdlib.h>
53
#include <stdarg.h>
54

55 56 57 58
#ifdef HAVE_ERRNO_H
#include <errno.h>
#endif

59 60 61
#ifdef TRACING
#include "eventlog/EventLog.h"
#endif
62 63 64
/* -----------------------------------------------------------------------------
 * Global variables
 * -------------------------------------------------------------------------- */
65

66 67
#if !defined(THREADED_RTS)
// Blocked/sleeping thrads
68 69
StgTSO *blocked_queue_hd = NULL;
StgTSO *blocked_queue_tl = NULL;
70 71
StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
#endif
72

73 74 75 76 77 78
/* Set to true when the latest garbage collection failed to reclaim
 * enough space, and the runtime should proceed to shut itself down in
 * an orderly fashion (emitting profiling info etc.)
 */
rtsBool heap_overflow = rtsFalse;

79 80 81
/* flag that tracks whether we have done any execution in this time slice.
 * LOCK: currently none, perhaps we should lock (but needs to be
 * updated in the fast path of the scheduler).
82 83
 *
 * NB. must be StgWord, we do xchg() on it.
84
 */
85
volatile StgWord recent_activity = ACTIVITY_YES;
86

87
/* if this flag is set as well, give up execution
88
 * LOCK: none (changes monotonically)
89
 */
90
volatile StgWord sched_state = SCHED_RUNNING;
91

92 93 94 95 96 97
/*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
 *  exists - earlier gccs apparently didn't.
 *  -= chak
 */
StgTSO dummy_tso;

sof's avatar
sof committed
98 99 100 101 102
/*
 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
 * in an MT setting, needed to signal that a worker thread shouldn't hang around
 * in the scheduler when it is out of work.
 */
103
rtsBool shutting_down_scheduler = rtsFalse;
104

105 106
/*
 * This mutex protects most of the global scheduler data in
107
 * the THREADED_RTS runtime.
sof's avatar
sof committed
108
 */
109
#if defined(THREADED_RTS)
110
Mutex sched_mutex;
111
#endif
sof's avatar
sof committed
112

113 114 115 116
#if !defined(mingw32_HOST_OS)
#define FORKPROCESS_PRIMOP_SUPPORTED
#endif

117 118 119 120
/* -----------------------------------------------------------------------------
 * static function prototypes
 * -------------------------------------------------------------------------- */

121
static Capability *schedule (Capability *initialCapability, Task *task);
122 123 124 125 126 127

//
// These function all encapsulate parts of the scheduler loop, and are
// abstracted only to make the structure and control flow of the
// scheduler clearer.
//
128
static void schedulePreLoop (void);
129 130
static void scheduleFindWork (Capability *cap);
#if defined(THREADED_RTS)
131
static void scheduleYield (Capability **pcap, Task *task);
132
#endif
133
static void scheduleStartSignalHandlers (Capability *cap);
134
static void scheduleCheckBlockedThreads (Capability *cap);
135
static void scheduleProcessInbox(Capability *cap);
136
static void scheduleDetectDeadlock (Capability *cap, Task *task);
137
static void schedulePushWork(Capability *cap, Task *task);
Simon Marlow's avatar
Simon Marlow committed
138
#if defined(THREADED_RTS)
139
static void scheduleActivateSpark(Capability *cap);
140
#endif
141
static void schedulePostRunThread(Capability *cap, StgTSO *t);
142
static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
143
static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
144
				    nat prev_what_next );
145
static void scheduleHandleThreadBlocked( StgTSO *t );
146 147
static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
					     StgTSO *t );
148
static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
149
static Capability *scheduleDoGC(Capability *cap, Task *task,
150
				rtsBool force_major);
151 152

static void deleteThread (Capability *cap, StgTSO *tso);
153
static void deleteAllThreads (Capability *cap);
154

155 156
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
static void deleteThread_(Capability *cap, StgTSO *tso);
157
#endif
158

159
/* ---------------------------------------------------------------------------
160 161 162 163 164 165 166 167 168 169 170
   Main scheduling loop.

   We use round-robin scheduling, each thread returning to the
   scheduler loop when one of these conditions is detected:

      * out of heap space
      * timer expires (thread yields)
      * thread blocks
      * thread ends
      * stack overflow

171 172 173 174 175
   GRAN version:
     In a GranSim setup this loop iterates over the global event queue.
     This revolves around the global event queue, which determines what 
     to do next. Therefore, it's more complicated than either the 
     concurrent or the parallel (GUM) setup.
176
  This version has been entirely removed (JB 2008/08).
177 178 179 180 181 182 183 184 185 186

   GUM version:
     GUM iterates over incoming messages.
     It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
     and sends out a fish whenever it has nothing to do; in-between
     doing the actual reductions (shared code below) it processes the
     incoming messages and deals with delayed operations 
     (see PendingFetches).
     This is not the ugliest code you could imagine, but it's bloody close.

187 188 189 190 191 192
  (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
  now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
  as well as future GUM versions. This file has been refurbished to
  only contain valid code, which is however incomplete, refers to
  invalid includes etc.

193
   ------------------------------------------------------------------------ */
194

195 196
static Capability *
schedule (Capability *initialCapability, Task *task)
197 198
{
  StgTSO *t;
199
  Capability *cap;
200
  StgThreadReturnCode ret;
201
  nat prev_what_next;
202
  rtsBool ready_to_gc;
203
#if defined(THREADED_RTS)
204
  rtsBool first = rtsTrue;
Simon Marlow's avatar
Simon Marlow committed
205
#endif
206
  
207 208
  cap = initialCapability;

209 210 211
  // Pre-condition: this task owns initialCapability.
  // The sched_mutex is *NOT* held
  // NB. on return, we still hold a capability.
212

213
  debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
214

215
  schedulePreLoop();
216

217 218
  // -----------------------------------------------------------
  // Scheduler loop starts here:
219

Simon Marlow's avatar
Simon Marlow committed
220
  while (1) {
221

222 223 224
    // Check whether we have re-entered the RTS from Haskell without
    // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
    // call).
225
    if (cap->in_haskell) {
226 227
    	  errorBelch("schedule: re-entered unsafely.\n"
    		     "   Perhaps a 'foreign import unsafe' should be 'safe'?");
228
    	  stg_exit(EXIT_FAILURE);
229 230
    }

231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
    // The interruption / shutdown sequence.
    // 
    // In order to cleanly shut down the runtime, we want to:
    //   * make sure that all main threads return to their callers
    //     with the state 'Interrupted'.
    //   * clean up all OS threads assocated with the runtime
    //   * free all memory etc.
    //
    // So the sequence for ^C goes like this:
    //
    //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
    //     arranges for some Capability to wake up
    //
    //   * all threads in the system are halted, and the zombies are
    //     placed on the run queue for cleaning up.  We acquire all
    //     the capabilities in order to delete the threads, this is
    //     done by scheduleDoGC() for convenience (because GC already
    //     needs to acquire all the capabilities).  We can't kill
    //     threads involved in foreign calls.
    // 
    //   * somebody calls shutdownHaskell(), which calls exitScheduler()
    //
    //   * sched_state := SCHED_SHUTTING_DOWN
254
    //
255 256 257
    //   * all workers exit when the run queue on their capability
    //     drains.  All main threads will also exit when their TSO
    //     reaches the head of the run queue and they can return.
258
    //
259 260 261 262 263 264 265 266 267 268
    //   * eventually all Capabilities will shut down, and the RTS can
    //     exit.
    //
    //   * We might be left with threads blocked in foreign calls, 
    //     we should really attempt to kill these somehow (TODO);
    
    switch (sched_state) {
    case SCHED_RUNNING:
	break;
    case SCHED_INTERRUPTING:
Simon Marlow's avatar
Simon Marlow committed
269
	debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
270
#if defined(THREADED_RTS)
271 272
	discardSparksCap(cap);
#endif
273
	/* scheduleDoGC() deletes all the threads */
274
	cap = scheduleDoGC(cap,task,rtsFalse);
275 276 277 278 279 280 281 282

        // after scheduleDoGC(), we must be shutting down.  Either some
        // other Capability did the final GC, or we did it above,
        // either way we can fall through to the SCHED_SHUTTING_DOWN
        // case now.
        ASSERT(sched_state == SCHED_SHUTTING_DOWN);
        // fall through

283
    case SCHED_SHUTTING_DOWN:
Simon Marlow's avatar
Simon Marlow committed
284
	debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
285 286 287
	// If we are a worker, just exit.  If we're a bound thread
	// then we will exit below when we've removed our TSO from
	// the run queue.
288
	if (!isBoundTask(task) && emptyRunQueue(cap)) {
289
	    return cap;
290
	}
291 292 293
	break;
    default:
	barf("sched_state: %d", sched_state);
294
    }
295

296
    scheduleFindWork(cap);
297

298 299 300
    /* work pushing, currently relevant only for THREADED_RTS:
       (pushes threads, wakes up idle capabilities for stealing) */
    schedulePushWork(cap,task);
301

302
    scheduleDetectDeadlock(cap,task);
303

304 305 306
#if defined(THREADED_RTS)
    cap = task->cap;    // reload cap, it might have changed
#endif
307 308 309 310 311 312

    // Normally, the only way we can get here with no threads to
    // run is if a keyboard interrupt received during 
    // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
    // Additionally, it is not fatal for the
    // threaded RTS to reach here with no threads to run.
313
    //
314 315
    // win32: might be here due to awaitEvent() being abandoned
    // as a result of a console event having been delivered.
316 317 318 319 320 321 322 323 324 325 326 327
    
#if defined(THREADED_RTS)
    if (first) 
    {
    // XXX: ToDo
    //     // don't yield the first time, we want a chance to run this
    //     // thread for a bit, even if there are others banging at the
    //     // door.
    //     first = rtsFalse;
    //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
    }

328
    scheduleYield(&cap,task);
329

330 331 332
    if (emptyRunQueue(cap)) continue; // look for work again
#endif

333
#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
334
    if ( emptyRunQueue(cap) ) {
335
	ASSERT(sched_state >= SCHED_INTERRUPTING);
336
    }
337
#endif
338

339 340 341
    // 
    // Get a thread to run
    //
342
    t = popRunQueue(cap);
343

344 345 346
    // Sanity check the thread we're about to run.  This can be
    // expensive if there is lots of thread switching going on...
    IF_DEBUG(sanity,checkTSO(t));
347

348
#if defined(THREADED_RTS)
349 350 351
    // Check whether we can run this thread in the current task.
    // If not, we have to pass our capability to the right task.
    {
352
        InCall *bound = t->bound;
353
      
354
	if (bound) {
355
	    if (bound->task == task) {
356 357
		// yes, the Haskell thread is bound to the current native thread
	    } else {
Simon Marlow's avatar
Simon Marlow committed
358
		debugTrace(DEBUG_sched,
359 360
			   "thread %lu bound to another OS thread",
                           (unsigned long)t->id);
361 362 363 364 365 366
		// no, bound to a different Haskell thread: pass to that thread
		pushOnRunQueue(cap,t);
		continue;
	    }
	} else {
	    // The thread we want to run is unbound.
367
	    if (task->incall->tso) { 
Simon Marlow's avatar
Simon Marlow committed
368
		debugTrace(DEBUG_sched,
369 370
			   "this OS thread cannot run thread %lu",
                           (unsigned long)t->id);
371 372 373 374 375
		// no, the current native thread is bound to a different
		// Haskell thread, so pass it to any worker thread
		pushOnRunQueue(cap,t);
		continue; 
	    }
376
	}
377 378 379
    }
#endif

Simon Marlow's avatar
Simon Marlow committed
380 381 382 383 384 385
    // If we're shutting down, and this thread has not yet been
    // killed, kill it now.  This sometimes happens when a finalizer
    // thread is created by the final GC, or a thread previously
    // in a foreign call returns.
    if (sched_state >= SCHED_INTERRUPTING &&
        !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
386
        deleteThread(cap,t);
Simon Marlow's avatar
Simon Marlow committed
387 388
    }

389
    /* context switches are initiated by the timer signal, unless
390 391
     * the user specified "context switch as often as possible", with
     * +RTS -C0
sof's avatar
sof committed
392
     */
393 394
    if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
	&& !emptyThreadQueues(cap)) {
395
	cap->context_switch = 1;
396 397
    }
	 
398
run_thread:
399

Simon Marlow's avatar
Simon Marlow committed
400 401 402 403 404
    // CurrentTSO is the thread to run.  t might be different if we
    // loop back to run_thread, so make sure to set CurrentTSO after
    // that.
    cap->r.rCurrentTSO = t;

405 406
    startHeapProfTimer();

407 408 409
    // ----------------------------------------------------------------------
    // Run the current thread 

Simon Marlow's avatar
Simon Marlow committed
410
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
411
    ASSERT(t->cap == cap);
412
    ASSERT(t->bound ? t->bound->task->cap == cap : 1);
Simon Marlow's avatar
Simon Marlow committed
413

414 415 416
    prev_what_next = t->what_next;

    errno = t->saved_errno;
417 418 419 420
#if mingw32_HOST_OS
    SetLastError(t->saved_winerror);
#endif

421
    cap->in_haskell = rtsTrue;
422

423
    dirty_TSO(cap,t);
424
    dirty_STACK(cap,t->stackobj);
Simon Marlow's avatar
Simon Marlow committed
425

426 427 428 429 430
#if defined(THREADED_RTS)
    if (recent_activity == ACTIVITY_DONE_GC) {
        // ACTIVITY_DONE_GC means we turned off the timer signal to
        // conserve power (see #1623).  Re-enable it here.
        nat prev;
431
        prev = xchg((P_)&recent_activity, ACTIVITY_YES);
432 433 434
        if (prev == ACTIVITY_DONE_GC) {
            startTimer();
        }
435 436 437 438 439
    } else if (recent_activity != ACTIVITY_INACTIVE) {
        // If we reached ACTIVITY_INACTIVE, then don't reset it until
        // we've done the GC.  The thread running here might just be
        // the IO manager thread that handle_tick() woke up via
        // wakeUpRts().
440 441 442
        recent_activity = ACTIVITY_YES;
    }
#endif
443

444
    traceEventRunThread(cap, t);
Simon Marlow's avatar
Simon Marlow committed
445

446
    switch (prev_what_next) {
447
	
448 449 450 451 452
    case ThreadKilled:
    case ThreadComplete:
	/* Thread already finished, return to scheduler. */
	ret = ThreadFinished;
	break;
453
	
454
    case ThreadRunGHC:
455 456 457 458 459
    {
	StgRegTable *r;
	r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
	cap = regTableToCapability(r);
	ret = r->rRet;
460
	break;
461 462
    }
    
463
    case ThreadInterpret:
464 465
	cap = interpretBCO(cap);
	ret = cap->r.rRet;
466
	break;
467
	
468
    default:
469
	barf("schedule: invalid what_next field");
470 471
    }

472
    cap->in_haskell = rtsFalse;
473

474 475 476 477
    // The TSO might have moved, eg. if it re-entered the RTS and a GC
    // happened.  So find the new location:
    t = cap->r.rCurrentTSO;

478 479 480 481
    // And save the current errno in this thread.
    // XXX: possibly bogus for SMP because this thread might already
    // be running again, see code below.
    t->saved_errno = errno;
482 483
#if mingw32_HOST_OS
    // Similarly for Windows error code
484
    t->saved_winerror = GetLastError();
485
#endif
486

487 488 489 490 491 492 493 494 495 496 497
    if (ret == ThreadBlocked) {
        if (t->why_blocked == BlockedOnBlackHole) {
            StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
            traceEventStopThread(cap, t, t->why_blocked + 6,
                                 owner != NULL ? owner->id : 0);
        } else {
            traceEventStopThread(cap, t, t->why_blocked + 6, 0);
        }
    } else {
        traceEventStopThread(cap, t, ret, 0);
    }
Simon Marlow's avatar
Simon Marlow committed
498

499
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
500
    ASSERT(t->cap == cap);
501

502
    // ----------------------------------------------------------------------
503
    
504
    // Costs for the scheduler are assigned to CCS_SYSTEM
505
    stopHeapProfTimer();
506
#if defined(PROFILING)
507 508 509
    CCCS = CCS_SYSTEM;
#endif
    
510
    schedulePostRunThread(cap,t);
511

512 513
    ready_to_gc = rtsFalse;

514 515 516 517 518 519
    switch (ret) {
    case HeapOverflow:
	ready_to_gc = scheduleHandleHeapOverflow(cap,t);
	break;

    case StackOverflow:
520 521 522 523 524
        // just adjust the stack for this thread, then pop it back
        // on the run queue.
        threadStackOverflow(cap, t);
        pushOnRunQueue(cap,t);
        break;
525 526

    case ThreadYielding:
527
	if (scheduleHandleYield(cap, t, prev_what_next)) {
528 529 530 531 532 533 534 535 536 537
            // shortcut for switching between compiler/interpreter:
	    goto run_thread; 
	}
	break;

    case ThreadBlocked:
	scheduleHandleThreadBlocked(t);
	break;

    case ThreadFinished:
538
	if (scheduleHandleThreadFinished(cap, task, t)) return cap;
539
	ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
540 541 542 543 544 545
	break;

    default:
      barf("schedule: invalid thread return code %d", (int)ret);
    }

546
    if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
547
      cap = scheduleDoGC(cap,task,rtsFalse);
548
    }
549 550 551
  } /* end of while() */
}

552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575
/* -----------------------------------------------------------------------------
 * Run queue operations
 * -------------------------------------------------------------------------- */

void
removeFromRunQueue (Capability *cap, StgTSO *tso)
{
    if (tso->block_info.prev == END_TSO_QUEUE) {
        ASSERT(cap->run_queue_hd == tso);
        cap->run_queue_hd = tso->_link;
    } else {
        setTSOLink(cap, tso->block_info.prev, tso->_link);
    }
    if (tso->_link == END_TSO_QUEUE) {
        ASSERT(cap->run_queue_tl == tso);
        cap->run_queue_tl = tso->block_info.prev;
    } else {
        setTSOPrev(cap, tso->_link, tso->block_info.prev);
    }
    tso->_link = tso->block_info.prev = END_TSO_QUEUE;

    IF_DEBUG(sanity, checkRunQueue(cap));
}

576 577 578 579 580 581 582
/* ----------------------------------------------------------------------------
 * Setting up the scheduler loop
 * ------------------------------------------------------------------------- */

static void
schedulePreLoop(void)
{
583
  // initialisation for scheduler - what cannot go into initScheduler()  
584 585
}

586 587 588 589 590 591 592 593 594 595 596
/* -----------------------------------------------------------------------------
 * scheduleFindWork()
 *
 * Search for work to do, and handle messages from elsewhere.
 * -------------------------------------------------------------------------- */

static void
scheduleFindWork (Capability *cap)
{
    scheduleStartSignalHandlers(cap);

597
    scheduleProcessInbox(cap);
598 599 600

    scheduleCheckBlockedThreads(cap);

Simon Marlow's avatar
Simon Marlow committed
601
#if defined(THREADED_RTS)
602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617
    if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
#endif
}

#if defined(THREADED_RTS)
STATIC_INLINE rtsBool
shouldYieldCapability (Capability *cap, Task *task)
{
    // we need to yield this capability to someone else if..
    //   - another thread is initiating a GC
    //   - another Task is returning from a foreign call
    //   - the thread at the head of the run queue cannot be run
    //     by this Task (it is bound to another Task, or it is unbound
    //     and this task it bound).
    return (waiting_for_gc || 
            cap->returning_tasks_hd != NULL ||
618
            (!emptyRunQueue(cap) && (task->incall->tso == NULL
619
                                     ? cap->run_queue_hd->bound != NULL
620
                                     : cap->run_queue_hd->bound != task->incall)));
621 622 623 624 625 626 627 628
}

// This is the single place where a Task goes to sleep.  There are
// two reasons it might need to sleep:
//    - there are no threads to run
//    - we need to yield this Capability to someone else 
//      (see shouldYieldCapability())
//
629 630 631
// Careful: the scheduler loop is quite delicate.  Make sure you run
// the tests in testsuite/concurrent (all ways) after modifying this,
// and also check the benchmarks in nofib/parallel for regressions.
632 633

static void
634
scheduleYield (Capability **pcap, Task *task)
635 636 637 638
{
    Capability *cap = *pcap;

    // if we have work, and we don't need to give up the Capability, continue.
639
    //
640
    if (!shouldYieldCapability(cap,task) && 
641
        (!emptyRunQueue(cap) ||
642
         !emptyInbox(cap) ||
643
         sched_state >= SCHED_INTERRUPTING))
644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659
        return;

    // otherwise yield (sleep), and keep yielding if necessary.
    do {
        yieldCapability(&cap,task);
    } 
    while (shouldYieldCapability(cap,task));

    // note there may still be no threads on the run queue at this
    // point, the caller has to check.

    *pcap = cap;
    return;
}
#endif
    
660 661 662 663 664 665 666
/* -----------------------------------------------------------------------------
 * schedulePushWork()
 *
 * Push work to other Capabilities if we have some.
 * -------------------------------------------------------------------------- */

static void
667 668
schedulePushWork(Capability *cap USED_IF_THREADS, 
		 Task *task      USED_IF_THREADS)
669
{
670 671 672 673
  /* following code not for PARALLEL_HASKELL. I kept the call general,
     future GUM versions might use pushing in a distributed setup */
#if defined(THREADED_RTS)

674 675 676
    Capability *free_caps[n_capabilities], *cap0;
    nat i, n_free_caps;

Simon Marlow's avatar
Simon Marlow committed
677
    // migration can be turned off with +RTS -qm
678 679
    if (!RtsFlags.ParFlags.migrate) return;

680 681
    // Check whether we have more threads on our run queue, or sparks
    // in our pool, that we could hand to another Capability.
682 683 684 685 686
    if (cap->run_queue_hd == END_TSO_QUEUE) {
        if (sparkPoolSizeCap(cap) < 2) return;
    } else {
        if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
            sparkPoolSizeCap(cap) < 1) return;
687 688 689 690 691 692
    }

    // First grab as many free Capabilities as we can.
    for (i=0, n_free_caps=0; i < n_capabilities; i++) {
	cap0 = &capabilities[i];
	if (cap != cap0 && tryGrabCapability(cap0,task)) {
693 694 695
	    if (!emptyRunQueue(cap0)
                || cap->returning_tasks_hd != NULL
                || cap->inbox != (Message*)END_TSO_QUEUE) {
696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717
		// it already has some work, we just grabbed it at 
		// the wrong moment.  Or maybe it's deadlocked!
		releaseCapability(cap0);
	    } else {
		free_caps[n_free_caps++] = cap0;
	    }
	}
    }

    // we now have n_free_caps free capabilities stashed in
    // free_caps[].  Share our run queue equally with them.  This is
    // probably the simplest thing we could do; improvements we might
    // want to do include:
    //
    //   - giving high priority to moving relatively new threads, on 
    //     the gournds that they haven't had time to build up a
    //     working set in the cache on this CPU/Capability.
    //
    //   - giving low priority to moving long-lived threads

    if (n_free_caps > 0) {
	StgTSO *prev, *t, *next;
Ian Lynagh's avatar
Ian Lynagh committed
718
#ifdef SPARK_PUSHING
719
	rtsBool pushed_to_all;
Ian Lynagh's avatar
Ian Lynagh committed
720
#endif
721

722 723 724 725 726 727
	debugTrace(DEBUG_sched, 
		   "cap %d: %s and %d free capabilities, sharing...", 
		   cap->no, 
		   (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
		   "excess threads on run queue":"sparks to share (>=2)",
		   n_free_caps);
728 729

	i = 0;
Ian Lynagh's avatar
Ian Lynagh committed
730
#ifdef SPARK_PUSHING
731
	pushed_to_all = rtsFalse;
Ian Lynagh's avatar
Ian Lynagh committed
732
#endif
733 734 735

	if (cap->run_queue_hd != END_TSO_QUEUE) {
	    prev = cap->run_queue_hd;
736 737
	    t = prev->_link;
	    prev->_link = END_TSO_QUEUE;
738
	    for (; t != END_TSO_QUEUE; t = next) {
739 740
		next = t->_link;
		t->_link = END_TSO_QUEUE;
741
                if (t->bound == task->incall // don't move my bound thread
742
		    || tsoLocked(t)) {  // don't move a locked thread
743
		    setTSOLink(cap, prev, t);
744
                    setTSOPrev(cap, t, prev);
745 746
		    prev = t;
		} else if (i == n_free_caps) {
Ian Lynagh's avatar
Ian Lynagh committed
747
#ifdef SPARK_PUSHING
748
		    pushed_to_all = rtsTrue;
Ian Lynagh's avatar
Ian Lynagh committed
749
#endif
750 751
		    i = 0;
		    // keep one for us
752
		    setTSOLink(cap, prev, t);
753
                    setTSOPrev(cap, t, prev);
754 755 756
		    prev = t;
		} else {
		    appendToRunQueue(free_caps[i],t);
Simon Marlow's avatar
Simon Marlow committed
757

758
                    traceEventMigrateThread (cap, t, free_caps[i]->no);
Simon Marlow's avatar
Simon Marlow committed
759

760
		    if (t->bound) { t->bound->task->cap = free_caps[i]; }
761
		    t->cap = free_caps[i];
762 763 764 765
		    i++;
		}
	    }
	    cap->run_queue_tl = prev;
766 767

            IF_DEBUG(sanity, checkRunQueue(cap));
768 769
	}

770 771 772
#ifdef SPARK_PUSHING
	/* JB I left this code in place, it would work but is not necessary */

773 774 775 776 777 778 779
	// If there are some free capabilities that we didn't push any
	// threads to, then try to push a spark to each one.
	if (!pushed_to_all) {
	    StgClosure *spark;
	    // i is the next free capability to push to
	    for (; i < n_free_caps; i++) {
		if (emptySparkPoolCap(free_caps[i])) {
780
		    spark = tryStealSpark(cap->sparks);
781
		    if (spark != NULL) {
Simon Marlow's avatar
Simon Marlow committed
782
			debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
783

784
            traceEventStealSpark(free_caps[i], t, cap->no);
785

786 787 788
			newSpark(&(free_caps[i]->r), spark);
		    }
		}
789 790
	    }
	}
791
#endif /* SPARK_PUSHING */
792 793 794 795

	// release the capabilities
	for (i = 0; i < n_free_caps; i++) {
	    task->cap = free_caps[i];
796
	    releaseAndWakeupCapability(free_caps[i]);
797 798 799
	}
    }
    task->cap = cap; // reset to point to our Capability.
800 801 802

#endif /* THREADED_RTS */

803 804
}

805 806 807 808
/* ----------------------------------------------------------------------------
 * Start any pending signal handlers
 * ------------------------------------------------------------------------- */

809
#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
810
static void
811
scheduleStartSignalHandlers(Capability *cap)
812
{
813 814
    if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
        // safe outside the lock
815
	startSignalHandlers(cap);
816 817
    }
}
818 819 820 821 822 823
#else
static void
scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
{
}
#endif
824 825 826 827 828 829

/* ----------------------------------------------------------------------------
 * Check for blocked threads that can be woken up.
 * ------------------------------------------------------------------------- */

static void
830
scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
831
{
832
#if !defined(THREADED_RTS)
833 834 835 836 837
    //
    // Check whether any waiting threads need to be woken up.  If the
    // run queue is empty, and there are no other tasks running, we
    // can wait indefinitely for something to happen.
    //
838
    if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
839
    {
840
	awaitEvent (emptyRunQueue(cap));
841 842 843 844
    }
#endif
}

845 846 847 848 849
/* ----------------------------------------------------------------------------
 * Detect deadlock conditions and attempt to resolve them.
 * ------------------------------------------------------------------------- */

static void
850
scheduleDetectDeadlock (Capability *cap, Task *task)
851 852 853
{
    /* 
     * Detect deadlock: when we have no threads to run, there are no
854 855 856
     * threads blocked, waiting for I/O, or sleeping, and all the
     * other tasks are waiting for work, we must have a deadlock of
     * some description.
857
     */
858
    if ( emptyThreadQueues(cap) )
859
    {
860
#if defined(THREADED_RTS)
861 862 863 864 865 866 867 868 869
	/* 
	 * In the threaded RTS, we only check for deadlock if there
	 * has been no activity in a complete timeslice.  This means
	 * we won't eagerly start a full GC just because we don't have
	 * any threads to run currently.
	 */
	if (recent_activity != ACTIVITY_INACTIVE) return;
#endif

Simon Marlow's avatar
Simon Marlow committed
870
	debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
871 872 873 874 875 876

	// Garbage collection can release some new threads due to
	// either (a) finalizers or (b) threads resurrected because
	// they are unreachable and will therefore be sent an
	// exception.  Any threads thus released will be immediately
	// runnable.
877 878 879 880
	cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
        // when force_major == rtsTrue. scheduleDoGC sets
        // recent_activity to ACTIVITY_DONE_GC and turns off the timer
        // signal.
881

882
	if ( !emptyRunQueue(cap) ) return;
883

884
#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
885 886 887 888
	/* If we have user-installed signal handlers, then wait
	 * for signals to arrive rather then bombing out with a
	 * deadlock.
	 */
889
	if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
Simon Marlow's avatar
Simon Marlow committed
890 891
	    debugTrace(DEBUG_sched,
		       "still deadlocked, waiting for signals...");
892 893 894 895

	    awaitUserSignals();

	    if (signals_pending()) {
896
		startSignalHandlers(cap);
897 898 899
	    }

	    // either we have threads to run, or we were interrupted:
900
	    ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
901 902

            return;
903 904 905
	}
#endif

906
#if !defined(THREADED_RTS)
907
	/* Probably a real deadlock.  Send the current main thread the
908
	 * Deadlock exception.
909
	 */
910 911
	if (task->incall->tso) {
	    switch (task->incall->tso->why_blocked) {
912
	    case BlockedOnSTM:
913
	    case BlockedOnBlackHole:
914
	    case BlockedOnMsgThrowTo:
915
	    case BlockedOnMVar:
916
		throwToSingleThreaded(cap, task->incall->tso, 
917
				      (StgClosure *)nonTermination_closure);
918 919 920 921 922
		return;
	    default:
		barf("deadlock: main thread blocked in a strange way");
	    }
	}
923
	return;
924
#endif
925
    }
926 927
}

928

929 930 931 932
/* ----------------------------------------------------------------------------
 * Send pending messages (PARALLEL_HASKELL only)
 * ------------------------------------------------------------------------- */

Simon Marlow's avatar
Simon Marlow committed
933
#if defined(PARALLEL_HASKELL)
934
static void
935 936 937 938
scheduleSendPendingMessages(void)
{

# if defined(PAR) // global Mem.Mgmt., omit for now
939 940 941
    if (PendingFetches != END_BF_QUEUE) {
        processFetches();
    }
942 943 944 945 946 947 948
# endif
    
    if (RtsFlags.ParFlags.BufferTime) {
	// if we use message buffering, we must send away all message
	// packets which have become too old...
	sendOldBuffers(); 
    }
949
}
Simon Marlow's avatar
Simon Marlow committed
950
#endif
951

952 953 954 955 956 957 958 959
/* ----------------------------------------------------------------------------
 * Process message in the current Capability's inbox
 * ------------------------------------------------------------------------- */

static void
scheduleProcessInbox (Capability *cap USED_IF_THREADS)
{
#if defined(THREADED_RTS)
960 961
    Message *m, *next;
    int r;
962 963

    while (!emptyInbox(cap)) {
964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981
        if (cap->r.rCurrentNursery->link == NULL ||
            g0->n_new_large_words >= large_alloc_lim) {
            scheduleDoGC(cap, cap->running_task, rtsFalse);
        }

        // don't use a blocking acquire; if the lock is held by
        // another thread then just carry on.  This seems to avoid
        // getting stuck in a message ping-pong situation with other
        // processors.  We'll check the inbox again later anyway.
        //
        // We should really use a more efficient queue data structure
        // here.  The trickiness is that we must ensure a Capability
        // never goes idle if the inbox is non-empty, which is why we
        // use cap->lock (cap->lock is released as the last thing
        // before going idle; see Capability.c:releaseCapability()).
        r = TRY_ACQUIRE_LOCK(&cap->lock);
        if (r != 0) return;

982
        m = cap->inbox;
983 984
        cap->inbox = (Message*)END_TSO_QUEUE;