Schedule.c 98.4 KB
Newer Older
1
/* ---------------------------------------------------------------------------
2
 *
3
 * (c) The GHC Team, 1998-2006
4
 *
5
 * The scheduler and thread-related functionality
sof's avatar
sof committed
6
 *
7 8
 * --------------------------------------------------------------------------*/

9
#include "PosixSource.h"
10
#define KEEP_LOCKCLOSURE
11
#include "Rts.h"
Simon Marlow's avatar
Simon Marlow committed
12 13

#include "sm/Storage.h"
14 15 16
#include "RtsUtils.h"
#include "StgRun.h"
#include "Schedule.h"
17
#include "Interpreter.h"
18
#include "Printer.h"
19
#include "RtsSignals.h"
Simon Marlow's avatar
Simon Marlow committed
20
#include "sm/Sanity.h"
21
#include "Stats.h"
22
#include "STM.h"
23
#include "Prelude.h"
24
#include "ThreadLabels.h"
25
#include "Updates.h"
26 27
#include "Proftimer.h"
#include "ProfHeap.h"
28
#include "Weak.h"
Simon Marlow's avatar
Simon Marlow committed
29
#include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30
#include "sm/GCThread.h"
31
#include "Sparks.h"
sof's avatar
sof committed
32
#include "Capability.h"
33 34
#include "Task.h"
#include "AwaitEvent.h"
35 36 37
#if defined(mingw32_HOST_OS)
#include "win32/IOManager.h"
#endif
Simon Marlow's avatar
Simon Marlow committed
38
#include "Trace.h"
39 40
#include "RaiseAsync.h"
#include "Threads.h"
Simon Marlow's avatar
Simon Marlow committed
41 42
#include "Timer.h"
#include "ThreadPaused.h"
43
#include "Messages.h"
44
#include "Stable.h"
45

46 47 48 49 50 51 52
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif

53 54
#include <string.h>
#include <stdlib.h>
55
#include <stdarg.h>
56

57 58 59 60
#ifdef HAVE_ERRNO_H
#include <errno.h>
#endif

61 62 63
#ifdef TRACING
#include "eventlog/EventLog.h"
#endif
64 65 66
/* -----------------------------------------------------------------------------
 * Global variables
 * -------------------------------------------------------------------------- */
67

68 69
#if !defined(THREADED_RTS)
// Blocked/sleeping thrads
70 71
StgTSO *blocked_queue_hd = NULL;
StgTSO *blocked_queue_tl = NULL;
72 73
StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
#endif
74

75 76 77 78
/* Set to true when the latest garbage collection failed to reclaim
 * enough space, and the runtime should proceed to shut itself down in
 * an orderly fashion (emitting profiling info etc.)
 */
Ben Gamari's avatar
Ben Gamari committed
79
bool heap_overflow = false;
80

81 82 83
/* flag that tracks whether we have done any execution in this time slice.
 * LOCK: currently none, perhaps we should lock (but needs to be
 * updated in the fast path of the scheduler).
84 85
 *
 * NB. must be StgWord, we do xchg() on it.
86
 */
87
volatile StgWord recent_activity = ACTIVITY_YES;
88

89
/* if this flag is set as well, give up execution
90
 * LOCK: none (changes monotonically)
91
 */
92
volatile StgWord sched_state = SCHED_RUNNING;
93

94 95
/*
 * This mutex protects most of the global scheduler data in
96
 * the THREADED_RTS runtime.
sof's avatar
sof committed
97
 */
98
#if defined(THREADED_RTS)
99
Mutex sched_mutex;
100
#endif
sof's avatar
sof committed
101

102 103 104 105
#if !defined(mingw32_HOST_OS)
#define FORKPROCESS_PRIMOP_SUPPORTED
#endif

106 107 108 109
/* -----------------------------------------------------------------------------
 * static function prototypes
 * -------------------------------------------------------------------------- */

110
static Capability *schedule (Capability *initialCapability, Task *task);
111 112

//
Gabor Greif's avatar
typo  
Gabor Greif committed
113
// These functions all encapsulate parts of the scheduler loop, and are
114 115 116
// abstracted only to make the structure and control flow of the
// scheduler clearer.
//
117
static void schedulePreLoop (void);
118
static void scheduleFindWork (Capability **pcap);
119
#if defined(THREADED_RTS)
120
static void scheduleYield (Capability **pcap, Task *task);
121
#endif
122
#if defined(THREADED_RTS)
Ben Gamari's avatar
Ben Gamari committed
123 124
static bool requestSync (Capability **pcap, Task *task,
                         PendingSync *sync_type, SyncType *prev_sync_type);
125
static void acquireAllCapabilities(Capability *cap, Task *task);
126 127
static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task);
static void startWorkerTasks (uint32_t from USED_IF_THREADS,
Ben Gamari's avatar
Ben Gamari committed
128
                              uint32_t to USED_IF_THREADS);
129
#endif
130
static void scheduleStartSignalHandlers (Capability *cap);
131
static void scheduleCheckBlockedThreads (Capability *cap);
132 133
static void scheduleProcessInbox(Capability **cap);
static void scheduleDetectDeadlock (Capability **pcap, Task *task);
134
static void schedulePushWork(Capability *cap, Task *task);
Simon Marlow's avatar
Simon Marlow committed
135
#if defined(THREADED_RTS)
136
static void scheduleActivateSpark(Capability *cap);
137
#endif
138
static void schedulePostRunThread(Capability *cap, StgTSO *t);
Ben Gamari's avatar
Ben Gamari committed
139 140 141
static bool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
static bool scheduleHandleYield( Capability *cap, StgTSO *t,
                                 uint32_t prev_what_next );
142
static void scheduleHandleThreadBlocked( StgTSO *t );
Ben Gamari's avatar
Ben Gamari committed
143 144 145 146
static bool scheduleHandleThreadFinished( Capability *cap, Task *task,
                                          StgTSO *t );
static bool scheduleNeedHeapProfile(bool ready_to_gc);
static void scheduleDoGC(Capability **pcap, Task *task, bool force_major);
147 148

static void deleteThread (Capability *cap, StgTSO *tso);
149
static void deleteAllThreads (Capability *cap);
150

151 152
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
static void deleteThread_(Capability *cap, StgTSO *tso);
153
#endif
154

155
/* ---------------------------------------------------------------------------
156 157 158 159 160 161 162 163 164 165 166
   Main scheduling loop.

   We use round-robin scheduling, each thread returning to the
   scheduler loop when one of these conditions is detected:

      * out of heap space
      * timer expires (thread yields)
      * thread blocks
      * thread ends
      * stack overflow

167
   ------------------------------------------------------------------------ */
168

169 170
static Capability *
schedule (Capability *initialCapability, Task *task)
171 172
{
  StgTSO *t;
173
  Capability *cap;
174
  StgThreadReturnCode ret;
175
  uint32_t prev_what_next;
Ben Gamari's avatar
Ben Gamari committed
176
  bool ready_to_gc;
177
#if defined(THREADED_RTS)
Ben Gamari's avatar
Ben Gamari committed
178
  bool first = true;
Simon Marlow's avatar
Simon Marlow committed
179
#endif
180

181 182
  cap = initialCapability;

183 184 185
  // Pre-condition: this task owns initialCapability.
  // The sched_mutex is *NOT* held
  // NB. on return, we still hold a capability.
186

187
  debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
188

189
  schedulePreLoop();
190

191 192
  // -----------------------------------------------------------
  // Scheduler loop starts here:
193

Simon Marlow's avatar
Simon Marlow committed
194
  while (1) {
195

196 197 198
    // Check whether we have re-entered the RTS from Haskell without
    // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
    // call).
199
    if (cap->in_haskell) {
200 201 202
          errorBelch("schedule: re-entered unsafely.\n"
                     "   Perhaps a 'foreign import unsafe' should be 'safe'?");
          stg_exit(EXIT_FAILURE);
203 204
    }

205
    // Note [shutdown]: The interruption / shutdown sequence.
206
    //
207 208 209 210 211 212
    // In order to cleanly shut down the runtime, we want to:
    //   * make sure that all main threads return to their callers
    //     with the state 'Interrupted'.
    //   * clean up all OS threads assocated with the runtime
    //   * free all memory etc.
    //
213
    // So the sequence goes like this:
214
    //
215 216
    //   * The shutdown sequence is initiated by calling hs_exit(),
    //     interruptStgRts(), or running out of memory in the GC.
217
    //
218
    //   * Set sched_state = SCHED_INTERRUPTING
219
    //
220 221 222 223 224
    //   * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
    //     scheduleDoGC(), which halts the whole runtime by acquiring all the
    //     capabilities, does a GC and then calls deleteAllThreads() to kill all
    //     the remaining threads.  The zombies are left on the run queue for
    //     cleaning up.  We can't kill threads involved in foreign calls.
225
    //
226 227 228 229 230 231
    //   * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
    //
    //   * After this point, there can be NO MORE HASKELL EXECUTION.  This is
    //     enforced by the scheduler, which won't run any Haskell code when
    //     sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
    //     other capabilities by doing the GC earlier.
232
    //
233 234 235
    //   * all workers exit when the run queue on their capability
    //     drains.  All main threads will also exit when their TSO
    //     reaches the head of the run queue and they can return.
236
    //
237 238 239
    //   * eventually all Capabilities will shut down, and the RTS can
    //     exit.
    //
240
    //   * We might be left with threads blocked in foreign calls,
241
    //     we should really attempt to kill these somehow (TODO).
242

243 244
    switch (sched_state) {
    case SCHED_RUNNING:
245
        break;
246
    case SCHED_INTERRUPTING:
247
        debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
248
        /* scheduleDoGC() deletes all the threads */
Ben Gamari's avatar
Ben Gamari committed
249
        scheduleDoGC(&cap,task,true);
250 251 252 253 254 255 256 257

        // after scheduleDoGC(), we must be shutting down.  Either some
        // other Capability did the final GC, or we did it above,
        // either way we can fall through to the SCHED_SHUTTING_DOWN
        // case now.
        ASSERT(sched_state == SCHED_SHUTTING_DOWN);
        // fall through

258
    case SCHED_SHUTTING_DOWN:
259 260 261 262 263 264 265 266
        debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
        // If we are a worker, just exit.  If we're a bound thread
        // then we will exit below when we've removed our TSO from
        // the run queue.
        if (!isBoundTask(task) && emptyRunQueue(cap)) {
            return cap;
        }
        break;
267
    default:
268
        barf("sched_state: %d", sched_state);
269
    }
270

271
    scheduleFindWork(&cap);
272

273 274 275
    /* work pushing, currently relevant only for THREADED_RTS:
       (pushes threads, wakes up idle capabilities for stealing) */
    schedulePushWork(cap,task);
276

277
    scheduleDetectDeadlock(&cap,task);
278 279

    // Normally, the only way we can get here with no threads to
280
    // run is if a keyboard interrupt received during
281 282 283
    // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
    // Additionally, it is not fatal for the
    // threaded RTS to reach here with no threads to run.
284
    //
285 286
    // win32: might be here due to awaitEvent() being abandoned
    // as a result of a console event having been delivered.
287

288
#if defined(THREADED_RTS)
289
    if (first)
290 291 292 293 294
    {
    // XXX: ToDo
    //     // don't yield the first time, we want a chance to run this
    //     // thread for a bit, even if there are others banging at the
    //     // door.
Ben Gamari's avatar
Ben Gamari committed
295
    //     first = false;
296 297 298
    //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
    }

299
    scheduleYield(&cap,task);
300

301 302 303
    if (emptyRunQueue(cap)) continue; // look for work again
#endif

304
#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
305
    if ( emptyRunQueue(cap) ) {
306
        ASSERT(sched_state >= SCHED_INTERRUPTING);
307
    }
308
#endif
309

310
    //
311 312
    // Get a thread to run
    //
313
    t = popRunQueue(cap);
314

315 316 317
    // Sanity check the thread we're about to run.  This can be
    // expensive if there is lots of thread switching going on...
    IF_DEBUG(sanity,checkTSO(t));
318

319
#if defined(THREADED_RTS)
320 321 322
    // Check whether we can run this thread in the current task.
    // If not, we have to pass our capability to the right task.
    {
323
        InCall *bound = t->bound;
324 325 326 327 328 329 330

        if (bound) {
            if (bound->task == task) {
                // yes, the Haskell thread is bound to the current native thread
            } else {
                debugTrace(DEBUG_sched,
                           "thread %lu bound to another OS thread",
331
                           (unsigned long)t->id);
332 333 334 335 336 337 338 339 340
                // no, bound to a different Haskell thread: pass to that thread
                pushOnRunQueue(cap,t);
                continue;
            }
        } else {
            // The thread we want to run is unbound.
            if (task->incall->tso) {
                debugTrace(DEBUG_sched,
                           "this OS thread cannot run thread %lu",
341
                           (unsigned long)t->id);
342 343 344 345 346 347
                // no, the current native thread is bound to a different
                // Haskell thread, so pass it to any worker thread
                pushOnRunQueue(cap,t);
                continue;
            }
        }
348 349 350
    }
#endif

Simon Marlow's avatar
Simon Marlow committed
351 352 353 354 355 356
    // If we're shutting down, and this thread has not yet been
    // killed, kill it now.  This sometimes happens when a finalizer
    // thread is created by the final GC, or a thread previously
    // in a foreign call returns.
    if (sched_state >= SCHED_INTERRUPTING &&
        !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
357
        deleteThread(cap,t);
Simon Marlow's avatar
Simon Marlow committed
358 359
    }

360 361 362 363 364 365 366 367 368 369 370 371 372 373
    // If this capability is disabled, migrate the thread away rather
    // than running it.  NB. but not if the thread is bound: it is
    // really hard for a bound thread to migrate itself.  Believe me,
    // I tried several ways and couldn't find a way to do it.
    // Instead, when everything is stopped for GC, we migrate all the
    // threads on the run queue then (see scheduleDoGC()).
    //
    // ToDo: what about TSO_LOCKED?  Currently we're migrating those
    // when the number of capabilities drops, but we never migrate
    // them back if it rises again.  Presumably we should, but after
    // the thread has been migrated we no longer know what capability
    // it was originally on.
#ifdef THREADED_RTS
    if (cap->disabled && !t->bound) {
374
        Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
375 376 377 378 379
        migrateThread(cap, t, dest_cap);
        continue;
    }
#endif

380
    /* context switches are initiated by the timer signal, unless
381 382
     * the user specified "context switch as often as possible", with
     * +RTS -C0
sof's avatar
sof committed
383
     */
384
    if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
385 386
        && !emptyThreadQueues(cap)) {
        cap->context_switch = 1;
387
    }
388

389
run_thread:
390

Ben Gamari's avatar
Ben Gamari committed
391
    // CurrentTSO is the thread to run. It might be different if we
Simon Marlow's avatar
Simon Marlow committed
392 393 394 395
    // loop back to run_thread, so make sure to set CurrentTSO after
    // that.
    cap->r.rCurrentTSO = t;

396 397
    startHeapProfTimer();

398
    // ----------------------------------------------------------------------
399
    // Run the current thread
400

Simon Marlow's avatar
Simon Marlow committed
401
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
402
    ASSERT(t->cap == cap);
403
    ASSERT(t->bound ? t->bound->task->cap == cap : 1);
Simon Marlow's avatar
Simon Marlow committed
404

405 406 407
    prev_what_next = t->what_next;

    errno = t->saved_errno;
408 409 410 411
#if mingw32_HOST_OS
    SetLastError(t->saved_winerror);
#endif

412 413 414
    // reset the interrupt flag before running Haskell code
    cap->interrupt = 0;

Ben Gamari's avatar
Ben Gamari committed
415
    cap->in_haskell = true;
416
    cap->idle = 0;
417

418
    dirty_TSO(cap,t);
419
    dirty_STACK(cap,t->stackobj);
Simon Marlow's avatar
Simon Marlow committed
420

421 422 423
    switch (recent_activity)
    {
    case ACTIVITY_DONE_GC: {
424 425
        // ACTIVITY_DONE_GC means we turned off the timer signal to
        // conserve power (see #1623).  Re-enable it here.
426
        uint32_t prev;
427
        prev = xchg((P_)&recent_activity, ACTIVITY_YES);
428
        if (prev == ACTIVITY_DONE_GC) {
Simon Marlow's avatar
Simon Marlow committed
429
#ifndef PROFILING
430
            startTimer();
431
#endif
Simon Marlow's avatar
Simon Marlow committed
432
        }
433 434 435
        break;
    }
    case ACTIVITY_INACTIVE:
436 437 438 439
        // If we reached ACTIVITY_INACTIVE, then don't reset it until
        // we've done the GC.  The thread running here might just be
        // the IO manager thread that handle_tick() woke up via
        // wakeUpRts().
440 441
        break;
    default:
442 443
        recent_activity = ACTIVITY_YES;
    }
444

445
    traceEventRunThread(cap, t);
Simon Marlow's avatar
Simon Marlow committed
446

447
    switch (prev_what_next) {
448

449 450
    case ThreadKilled:
    case ThreadComplete:
451 452 453 454
        /* Thread already finished, return to scheduler. */
        ret = ThreadFinished;
        break;

455
    case ThreadRunGHC:
456
    {
457 458 459 460 461
        StgRegTable *r;
        r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
        cap = regTableToCapability(r);
        ret = r->rRet;
        break;
462
    }
463

464
    case ThreadInterpret:
465 466 467 468
        cap = interpretBCO(cap);
        ret = cap->r.rRet;
        break;

469
    default:
470
        barf("schedule: invalid what_next field");
471 472
    }

Ben Gamari's avatar
Ben Gamari committed
473
    cap->in_haskell = false;
474

475 476 477 478
    // The TSO might have moved, eg. if it re-entered the RTS and a GC
    // happened.  So find the new location:
    t = cap->r.rCurrentTSO;

479 480 481 482
    // cap->r.rCurrentTSO is charged for calls to allocate(), so we
    // don't want it set when not running a Haskell thread.
    cap->r.rCurrentTSO = NULL;

483 484 485 486
    // And save the current errno in this thread.
    // XXX: possibly bogus for SMP because this thread might already
    // be running again, see code below.
    t->saved_errno = errno;
487 488
#if mingw32_HOST_OS
    // Similarly for Windows error code
489
    t->saved_winerror = GetLastError();
490
#endif
491

492 493 494 495 496 497 498 499 500 501 502
    if (ret == ThreadBlocked) {
        if (t->why_blocked == BlockedOnBlackHole) {
            StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
            traceEventStopThread(cap, t, t->why_blocked + 6,
                                 owner != NULL ? owner->id : 0);
        } else {
            traceEventStopThread(cap, t, t->why_blocked + 6, 0);
        }
    } else {
        traceEventStopThread(cap, t, ret, 0);
    }
Simon Marlow's avatar
Simon Marlow committed
503

504
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
505
    ASSERT(t->cap == cap);
506

507
    // ----------------------------------------------------------------------
508

509
    // Costs for the scheduler are assigned to CCS_SYSTEM
510
    stopHeapProfTimer();
511
#if defined(PROFILING)
512
    cap->r.rCCCS = CCS_SYSTEM;
513
#endif
514

515
    schedulePostRunThread(cap,t);
516

Ben Gamari's avatar
Ben Gamari committed
517
    ready_to_gc = false;
518

519 520
    switch (ret) {
    case HeapOverflow:
521 522
        ready_to_gc = scheduleHandleHeapOverflow(cap,t);
        break;
523 524

    case StackOverflow:
525 526 527 528 529
        // just adjust the stack for this thread, then pop it back
        // on the run queue.
        threadStackOverflow(cap, t);
        pushOnRunQueue(cap,t);
        break;
530 531

    case ThreadYielding:
532
        if (scheduleHandleYield(cap, t, prev_what_next)) {
533
            // shortcut for switching between compiler/interpreter:
534 535 536
            goto run_thread;
        }
        break;
537 538

    case ThreadBlocked:
539 540
        scheduleHandleThreadBlocked(t);
        break;
541 542

    case ThreadFinished:
543 544 545
        if (scheduleHandleThreadFinished(cap, task, t)) return cap;
        ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
        break;
546 547 548 549 550

    default:
      barf("schedule: invalid thread return code %d", (int)ret);
    }

551
    if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
Ben Gamari's avatar
Ben Gamari committed
552
      scheduleDoGC(&cap,task,false);
553
    }
554 555 556
  } /* end of while() */
}

557 558 559 560
/* -----------------------------------------------------------------------------
 * Run queue operations
 * -------------------------------------------------------------------------- */

561
static void
562 563 564 565 566 567 568 569 570 571 572 573 574 575 576
removeFromRunQueue (Capability *cap, StgTSO *tso)
{
    if (tso->block_info.prev == END_TSO_QUEUE) {
        ASSERT(cap->run_queue_hd == tso);
        cap->run_queue_hd = tso->_link;
    } else {
        setTSOLink(cap, tso->block_info.prev, tso->_link);
    }
    if (tso->_link == END_TSO_QUEUE) {
        ASSERT(cap->run_queue_tl == tso);
        cap->run_queue_tl = tso->block_info.prev;
    } else {
        setTSOPrev(cap, tso->_link, tso->block_info.prev);
    }
    tso->_link = tso->block_info.prev = END_TSO_QUEUE;
577
    cap->n_run_queue--;
578 579 580 581

    IF_DEBUG(sanity, checkRunQueue(cap));
}

582 583 584 585 586 587 588
void
promoteInRunQueue (Capability *cap, StgTSO *tso)
{
    removeFromRunQueue(cap, tso);
    pushOnRunQueue(cap, tso);
}

589 590 591 592 593 594 595
/* ----------------------------------------------------------------------------
 * Setting up the scheduler loop
 * ------------------------------------------------------------------------- */

static void
schedulePreLoop(void)
{
596
  // initialisation for scheduler - what cannot go into initScheduler()
597

598
#if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
599 600
    win32AllocStack();
#endif
601 602
}

603 604 605 606 607 608 609
/* -----------------------------------------------------------------------------
 * scheduleFindWork()
 *
 * Search for work to do, and handle messages from elsewhere.
 * -------------------------------------------------------------------------- */

static void
610
scheduleFindWork (Capability **pcap)
611
{
612
    scheduleStartSignalHandlers(*pcap);
613

614
    scheduleProcessInbox(pcap);
615

616
    scheduleCheckBlockedThreads(*pcap);
617

Simon Marlow's avatar
Simon Marlow committed
618
#if defined(THREADED_RTS)
619
    if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
620 621 622 623
#endif
}

#if defined(THREADED_RTS)
Ben Gamari's avatar
Ben Gamari committed
624 625
STATIC_INLINE bool
shouldYieldCapability (Capability *cap, Task *task, bool didGcLast)
626 627
{
    // we need to yield this capability to someone else if..
628 629
    //   - another thread is initiating a GC, and we didn't just do a GC
    //     (see Note [GC livelock])
630 631 632 633
    //   - another Task is returning from a foreign call
    //   - the thread at the head of the run queue cannot be run
    //     by this Task (it is bound to another Task, or it is unbound
    //     and this task it bound).
634 635 636 637 638 639 640 641 642
    //
    // Note [GC livelock]
    //
    // If we are interrupted to do a GC, then we do not immediately do
    // another one.  This avoids a starvation situation where one
    // Capability keeps forcing a GC and the other Capabilities make no
    // progress at all.

    return ((pending_sync && !didGcLast) ||
643
            cap->n_returning_tasks != 0 ||
644
            (!emptyRunQueue(cap) && (task->incall->tso == NULL
645 646
                                     ? peekRunQueue(cap)->bound != NULL
                                     : peekRunQueue(cap)->bound != task->incall)));
647 648 649 650 651
}

// This is the single place where a Task goes to sleep.  There are
// two reasons it might need to sleep:
//    - there are no threads to run
652
//    - we need to yield this Capability to someone else
653 654
//      (see shouldYieldCapability())
//
655 656 657
// Careful: the scheduler loop is quite delicate.  Make sure you run
// the tests in testsuite/concurrent (all ways) after modifying this,
// and also check the benchmarks in nofib/parallel for regressions.
658 659

static void
660
scheduleYield (Capability **pcap, Task *task)
661 662
{
    Capability *cap = *pcap;
Ben Gamari's avatar
Ben Gamari committed
663
    bool didGcLast = false;
664 665

    // if we have work, and we don't need to give up the Capability, continue.
666
    //
Ben Gamari's avatar
Ben Gamari committed
667
    if (!shouldYieldCapability(cap,task,false) &&
668
        (!emptyRunQueue(cap) ||
669
         !emptyInbox(cap) ||
670
         sched_state >= SCHED_INTERRUPTING)) {
671
        return;
672
    }
673 674 675

    // otherwise yield (sleep), and keep yielding if necessary.
    do {
676
        didGcLast = yieldCapability(&cap,task, !didGcLast);
677
    }
678
    while (shouldYieldCapability(cap,task,didGcLast));
679 680 681 682 683 684 685 686

    // note there may still be no threads on the run queue at this
    // point, the caller has to check.

    *pcap = cap;
    return;
}
#endif
687

688 689 690 691 692 693 694
/* -----------------------------------------------------------------------------
 * schedulePushWork()
 *
 * Push work to other Capabilities if we have some.
 * -------------------------------------------------------------------------- */

static void
695 696
schedulePushWork(Capability *cap USED_IF_THREADS,
                 Task *task      USED_IF_THREADS)
697
{
698 699 700 701
  /* following code not for PARALLEL_HASKELL. I kept the call general,
     future GUM versions might use pushing in a distributed setup */
#if defined(THREADED_RTS)

702
    Capability *free_caps[n_capabilities], *cap0;
703
    uint32_t i, n_wanted_caps, n_free_caps;
704

Simon Marlow's avatar
Simon Marlow committed
705 706
    uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;

Simon Marlow's avatar
Simon Marlow committed
707
    // migration can be turned off with +RTS -qm
Simon Marlow's avatar
Simon Marlow committed
708 709 710
    if (!RtsFlags.ParFlags.migrate) {
        spare_threads = 0;
    }
711

712 713
    // Figure out how many capabilities we want to wake up.  We need at least
    // sparkPoolSize(cap) plus the number of spare threads we have.
Simon Marlow's avatar
Simon Marlow committed
714
    n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
715
    if (n_wanted_caps == 0) return;
716

Simon Marlow's avatar
Simon Marlow committed
717 718
    // First grab as many free Capabilities as we can.  ToDo: we should use
    // capabilities on the same NUMA node preferably, but not exclusively.
719 720 721
    for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
         n_free_caps < n_wanted_caps && i != cap->no;
         i = (i + 1) % n_capabilities) {
722
        cap0 = capabilities[i];
723
        if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
724
            if (!emptyRunQueue(cap0)
725
                || cap0->n_returning_tasks != 0
Simon Marlow's avatar
Simon Marlow committed
726
                || !emptyInbox(cap0)) {
727 728 729 730 731 732 733
                // it already has some work, we just grabbed it at
                // the wrong moment.  Or maybe it's deadlocked!
                releaseCapability(cap0);
            } else {
                free_caps[n_free_caps++] = cap0;
            }
        }
734 735
    }

Simon Marlow's avatar
Simon Marlow committed
736 737 738 739 740 741 742 743 744 745
    // We now have n_free_caps free capabilities stashed in
    // free_caps[].  Attempt to share our run queue equally with them.
    // This is complicated slightly by the fact that we can't move
    // some threads:
    //
    //  - threads that have TSO_LOCKED cannot migrate
    //  - a thread that is bound to the current Task cannot be migrated
    //
    // This is about the simplest thing we could do; improvements we
    // might want to do include:
746
    //
747
    //   - giving high priority to moving relatively new threads, on
748 749 750 751 752 753
    //     the gournds that they haven't had time to build up a
    //     working set in the cache on this CPU/Capability.
    //
    //   - giving low priority to moving long-lived threads

    if (n_free_caps > 0) {
754
        StgTSO *prev, *t, *next;
755

756
        debugTrace(DEBUG_sched,
Simon Marlow's avatar
Simon Marlow committed
757 758
                   "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
                   cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
759
                   n_free_caps);
760

761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793
        // There are n_free_caps+1 caps in total.  We will share the threads
        // evently between them, *except* that if the run queue does not divide
        // evenly by n_free_caps+1 then we bias towards the current capability.
        // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
        uint32_t keep_threads =
            (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);

        // This also ensures that we don't give away all our threads, since
        // (x + y) / (y + 1) >= 1 when x >= 1.

        // The number of threads we have left.
        uint32_t n = cap->n_run_queue;

        // prev = the previous thread on this cap's run queue
        prev = END_TSO_QUEUE;

        // We're going to walk through the run queue, migrating threads to other
        // capabilities until we have only keep_threads left.  We might
        // encounter a thread that cannot be migrated, in which case we add it
        // to the current run queue and decrement keep_threads.
        for (t = cap->run_queue_hd, i = 0;
             t != END_TSO_QUEUE && n > keep_threads;
             t = next)
        {
            next = t->_link;
            t->_link = END_TSO_QUEUE;

            // Should we keep this thread?
            if (t->bound == task->incall // don't move my bound thread
                || tsoLocked(t) // don't move a locked thread
                ) {
                if (prev == END_TSO_QUEUE) {
                    cap->run_queue_hd = t;
794
                } else {
795
                    setTSOLink(cap, prev, t);
796
                }
797 798 799
                setTSOPrev(cap, t, prev);
                prev = t;
                if (keep_threads > 0) keep_threads--;
800
            }
801

802 803 804 805
            // Or migrate it?
            else {
                appendToRunQueue(free_caps[i],t);
                traceEventMigrateThread (cap, t, free_caps[i]->no);
806

807 808 809 810 811
                if (t->bound) { t->bound->task->cap = free_caps[i]; }
                t->cap = free_caps[i];
                n--; // we have one fewer threads now
                i++; // move on to the next free_cap
                if (i == n_free_caps) i = 0;
812 813
            }
        }
814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829

        // Join up the beginning of the queue (prev)
        // with the rest of the queue (t)
        if (t == END_TSO_QUEUE) {
            cap->run_queue_tl = prev;
        } else {
            setTSOPrev(cap, t, prev);
        }
        if (prev == END_TSO_QUEUE) {
            cap->run_queue_hd = t;
        } else {
            setTSOLink(cap, prev, t);
        }
        cap->n_run_queue = n;

        IF_DEBUG(sanity, checkRunQueue(cap));
830

831 832 833
        // release the capabilities
        for (i = 0; i < n_free_caps; i++) {
            task->cap = free_caps[i];
834 835 836 837 838 839 840
            if (sparkPoolSizeCap(cap) > 0) {
                // If we have sparks to steal, wake up a worker on the
                // capability, even if it has no threads to run.
                releaseAndWakeupCapability(free_caps[i]);
            } else {
                releaseCapability(free_caps[i]);
            }
841
        }
842 843
    }
    task->cap = cap; // reset to point to our Capability.
844 845