Schedule.c 103 KB
Newer Older
1
/* ---------------------------------------------------------------------------
2
 *
3
 * (c) The GHC Team, 1998-2006
4
 *
5
 * The scheduler and thread-related functionality
sof's avatar
sof committed
6
 *
7 8
 * --------------------------------------------------------------------------*/

9
#include "PosixSource.h"
10
#define KEEP_LOCKCLOSURE
11
#include "Rts.h"
Simon Marlow's avatar
Simon Marlow committed
12 13

#include "sm/Storage.h"
14 15 16
#include "RtsUtils.h"
#include "StgRun.h"
#include "Schedule.h"
17
#include "Interpreter.h"
18
#include "Printer.h"
19
#include "RtsSignals.h"
Simon Marlow's avatar
Simon Marlow committed
20
#include "sm/Sanity.h"
21
#include "Stats.h"
22
#include "STM.h"
23
#include "LongPause.h"
24
#include "Prelude.h"
25
#include "ThreadLabels.h"
26
#include "Updates.h"
27 28
#include "Proftimer.h"
#include "ProfHeap.h"
29
#include "Weak.h"
Simon Marlow's avatar
Simon Marlow committed
30
#include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
31
#include "sm/GCThread.h"
32
#include "Sparks.h"
sof's avatar
sof committed
33
#include "Capability.h"
34 35
#include "Task.h"
#include "AwaitEvent.h"
36 37 38
#if defined(mingw32_HOST_OS)
#include "win32/IOManager.h"
#endif
Simon Marlow's avatar
Simon Marlow committed
39
#include "Trace.h"
40 41
#include "RaiseAsync.h"
#include "Threads.h"
Simon Marlow's avatar
Simon Marlow committed
42 43
#include "Timer.h"
#include "ThreadPaused.h"
44
#include "Messages.h"
David Feuer's avatar
David Feuer committed
45 46
#include "StablePtr.h"
#include "StableName.h"
47
#include "TopHandler.h"
48 49
#include "sm/NonMoving.h"
#include "sm/NonMovingMark.h"
50

Ben Gamari's avatar
Ben Gamari committed
51
#if defined(HAVE_SYS_TYPES_H)
52 53
#include <sys/types.h>
#endif
Ben Gamari's avatar
Ben Gamari committed
54
#if defined(HAVE_UNISTD_H)
55 56 57
#include <unistd.h>
#endif

58 59
#include <string.h>
#include <stdlib.h>
60
#include <stdarg.h>
61

Ben Gamari's avatar
Ben Gamari committed
62
#if defined(HAVE_ERRNO_H)
63 64 65
#include <errno.h>
#endif

Ben Gamari's avatar
Ben Gamari committed
66
#if defined(TRACING)
67 68
#include "eventlog/EventLog.h"
#endif
69 70 71
/* -----------------------------------------------------------------------------
 * Global variables
 * -------------------------------------------------------------------------- */
72

73
#if !defined(THREADED_RTS)
74
// Blocked/sleeping threads
75 76
StgTSO *blocked_queue_hd = NULL;
StgTSO *blocked_queue_tl = NULL;
77 78
StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
#endif
79

80 81 82 83 84 85 86 87
// Bytes allocated since the last time a HeapOverflow exception was thrown by
// the RTS
uint64_t allocated_bytes_at_heapoverflow = 0;

/* Set to true when the latest garbage collection failed to reclaim enough
 * space, and the runtime should proceed to shut itself down in an orderly
 * fashion (emitting profiling info etc.), OR throw an exception to the main
 * thread, if it is still alive.
88
 */
Ben Gamari's avatar
Ben Gamari committed
89
bool heap_overflow = false;
90

91 92 93
/* flag that tracks whether we have done any execution in this time slice.
 * LOCK: currently none, perhaps we should lock (but needs to be
 * updated in the fast path of the scheduler).
94 95
 *
 * NB. must be StgWord, we do xchg() on it.
96
 */
97
volatile StgWord recent_activity = ACTIVITY_YES;
98

99
/* if this flag is set as well, give up execution
100
 * LOCK: none (changes monotonically)
101
 */
102
volatile StgWord sched_state = SCHED_RUNNING;
103

104 105
/*
 * This mutex protects most of the global scheduler data in
106
 * the THREADED_RTS runtime.
sof's avatar
sof committed
107
 */
108
#if defined(THREADED_RTS)
109
Mutex sched_mutex;
110
#endif
sof's avatar
sof committed
111

112 113 114 115
#if !defined(mingw32_HOST_OS)
#define FORKPROCESS_PRIMOP_SUPPORTED
#endif

116 117 118 119 120 121 122 123 124 125 126 127 128
/*
 * sync_finished_cond allows threads which do not own any capability (e.g. the
 * concurrent mark thread) to participate in the sync protocol. In particular,
 * if such a thread requests a sync while sync is already in progress it will
 * block on sync_finished_cond, which will be signalled when the sync is
 * finished (by releaseAllCapabilities).
 */
#if defined(THREADED_RTS)
static Condition sync_finished_cond;
static Mutex sync_finished_mutex;
#endif


129 130 131 132
/* -----------------------------------------------------------------------------
 * static function prototypes
 * -------------------------------------------------------------------------- */

133
static Capability *schedule (Capability *initialCapability, Task *task);
134 135

//
Gabor Greif's avatar
typo  
Gabor Greif committed
136
// These functions all encapsulate parts of the scheduler loop, and are
137 138 139
// abstracted only to make the structure and control flow of the
// scheduler clearer.
//
140
static void schedulePreLoop (void);
141
static void scheduleFindWork (Capability **pcap);
142
#if defined(THREADED_RTS)
143
static void scheduleYield (Capability **pcap, Task *task);
144
#endif
145
#if defined(THREADED_RTS)
Ben Gamari's avatar
Ben Gamari committed
146 147
static bool requestSync (Capability **pcap, Task *task,
                         PendingSync *sync_type, SyncType *prev_sync_type);
148
static void acquireAllCapabilities(Capability *cap, Task *task);
149
static void startWorkerTasks (uint32_t from USED_IF_THREADS,
Ben Gamari's avatar
Ben Gamari committed
150
                              uint32_t to USED_IF_THREADS);
151
#endif
152
static void scheduleStartSignalHandlers (Capability *cap);
153
static void scheduleCheckBlockedThreads (Capability *cap);
154 155
static void scheduleProcessInbox(Capability **cap);
static void scheduleDetectDeadlock (Capability **pcap, Task *task);
156
static void schedulePushWork(Capability *cap, Task *task);
Simon Marlow's avatar
Simon Marlow committed
157
#if defined(THREADED_RTS)
158
static void scheduleActivateSpark(Capability *cap);
159
#endif
160
static void schedulePostRunThread(Capability *cap, StgTSO *t);
Ben Gamari's avatar
Ben Gamari committed
161 162 163
static bool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
static bool scheduleHandleYield( Capability *cap, StgTSO *t,
                                 uint32_t prev_what_next );
164
static void scheduleHandleThreadBlocked( StgTSO *t );
Ben Gamari's avatar
Ben Gamari committed
165 166 167
static bool scheduleHandleThreadFinished( Capability *cap, Task *task,
                                          StgTSO *t );
static bool scheduleNeedHeapProfile(bool ready_to_gc);
168 169
static void scheduleDoGC( Capability **pcap, Task *task,
                          bool force_major, bool deadlock_detect );
170

171 172
static void deleteThread (StgTSO *tso);
static void deleteAllThreads (void);
173

Ben Gamari's avatar
Ben Gamari committed
174
#if defined(FORKPROCESS_PRIMOP_SUPPORTED)
175
static void deleteThread_(StgTSO *tso);
176
#endif
177

178
/* ---------------------------------------------------------------------------
179 180 181 182 183 184 185 186 187 188 189
   Main scheduling loop.

   We use round-robin scheduling, each thread returning to the
   scheduler loop when one of these conditions is detected:

      * out of heap space
      * timer expires (thread yields)
      * thread blocks
      * thread ends
      * stack overflow

190
   ------------------------------------------------------------------------ */
191

192 193
static Capability *
schedule (Capability *initialCapability, Task *task)
194 195
{
  StgTSO *t;
196
  Capability *cap;
197
  StgThreadReturnCode ret;
198
  uint32_t prev_what_next;
Ben Gamari's avatar
Ben Gamari committed
199
  bool ready_to_gc;
200

201 202
  cap = initialCapability;

203 204 205
  // Pre-condition: this task owns initialCapability.
  // The sched_mutex is *NOT* held
  // NB. on return, we still hold a capability.
206

207
  debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
208

209
  schedulePreLoop();
210

211 212
  // -----------------------------------------------------------
  // Scheduler loop starts here:
213

Simon Marlow's avatar
Simon Marlow committed
214
  while (1) {
215

216 217 218
    // Check whether we have re-entered the RTS from Haskell without
    // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
    // call).
219
    if (cap->in_haskell) {
220 221 222
          errorBelch("schedule: re-entered unsafely.\n"
                     "   Perhaps a 'foreign import unsafe' should be 'safe'?");
          stg_exit(EXIT_FAILURE);
223 224
    }

225
    // Note [shutdown]: The interruption / shutdown sequence.
226
    //
227 228 229 230 231 232
    // In order to cleanly shut down the runtime, we want to:
    //   * make sure that all main threads return to their callers
    //     with the state 'Interrupted'.
    //   * clean up all OS threads assocated with the runtime
    //   * free all memory etc.
    //
233
    // So the sequence goes like this:
234
    //
235 236
    //   * The shutdown sequence is initiated by calling hs_exit(),
    //     interruptStgRts(), or running out of memory in the GC.
237
    //
238
    //   * Set sched_state = SCHED_INTERRUPTING
239
    //
240 241 242 243 244
    //   * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
    //     scheduleDoGC(), which halts the whole runtime by acquiring all the
    //     capabilities, does a GC and then calls deleteAllThreads() to kill all
    //     the remaining threads.  The zombies are left on the run queue for
    //     cleaning up.  We can't kill threads involved in foreign calls.
245
    //
246 247 248 249 250 251
    //   * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
    //
    //   * After this point, there can be NO MORE HASKELL EXECUTION.  This is
    //     enforced by the scheduler, which won't run any Haskell code when
    //     sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
    //     other capabilities by doing the GC earlier.
252
    //
253 254 255
    //   * all workers exit when the run queue on their capability
    //     drains.  All main threads will also exit when their TSO
    //     reaches the head of the run queue and they can return.
256
    //
257 258 259
    //   * eventually all Capabilities will shut down, and the RTS can
    //     exit.
    //
260
    //   * We might be left with threads blocked in foreign calls,
261
    //     we should really attempt to kill these somehow (TODO).
262

263 264
    switch (sched_state) {
    case SCHED_RUNNING:
265
        break;
266
    case SCHED_INTERRUPTING:
267
        debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
268
        /* scheduleDoGC() deletes all the threads */
269
        scheduleDoGC(&cap,task,true,false);
270 271 272 273 274 275 276 277

        // after scheduleDoGC(), we must be shutting down.  Either some
        // other Capability did the final GC, or we did it above,
        // either way we can fall through to the SCHED_SHUTTING_DOWN
        // case now.
        ASSERT(sched_state == SCHED_SHUTTING_DOWN);
        // fall through

278
    case SCHED_SHUTTING_DOWN:
279 280 281 282 283 284 285 286
        debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
        // If we are a worker, just exit.  If we're a bound thread
        // then we will exit below when we've removed our TSO from
        // the run queue.
        if (!isBoundTask(task) && emptyRunQueue(cap)) {
            return cap;
        }
        break;
287
    default:
duog's avatar
duog committed
288
        barf("sched_state: %" FMT_Word, sched_state);
289
    }
290

291
    scheduleFindWork(&cap);
292

293 294 295
    /* work pushing, currently relevant only for THREADED_RTS:
       (pushes threads, wakes up idle capabilities for stealing) */
    schedulePushWork(cap,task);
296

297
    scheduleDetectDeadlock(&cap,task);
298 299

    // Normally, the only way we can get here with no threads to
300
    // run is if a keyboard interrupt received during
301 302 303
    // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
    // Additionally, it is not fatal for the
    // threaded RTS to reach here with no threads to run.
304
    //
305 306
    // win32: might be here due to awaitEvent() being abandoned
    // as a result of a console event having been delivered.
307

308
#if defined(THREADED_RTS)
309
    scheduleYield(&cap,task);
310

311 312 313
    if (emptyRunQueue(cap)) continue; // look for work again
#endif

314
#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
315
    if ( emptyRunQueue(cap) ) {
316
        ASSERT(sched_state >= SCHED_INTERRUPTING);
317
    }
318
#endif
319

320
    //
321 322
    // Get a thread to run
    //
323
    t = popRunQueue(cap);
324

325 326 327
    // Sanity check the thread we're about to run.  This can be
    // expensive if there is lots of thread switching going on...
    IF_DEBUG(sanity,checkTSO(t));
328

329
#if defined(THREADED_RTS)
330 331 332
    // Check whether we can run this thread in the current task.
    // If not, we have to pass our capability to the right task.
    {
333
        InCall *bound = t->bound;
334 335 336 337 338 339 340

        if (bound) {
            if (bound->task == task) {
                // yes, the Haskell thread is bound to the current native thread
            } else {
                debugTrace(DEBUG_sched,
                           "thread %lu bound to another OS thread",
341
                           (unsigned long)t->id);
342 343 344 345 346 347 348 349 350
                // no, bound to a different Haskell thread: pass to that thread
                pushOnRunQueue(cap,t);
                continue;
            }
        } else {
            // The thread we want to run is unbound.
            if (task->incall->tso) {
                debugTrace(DEBUG_sched,
                           "this OS thread cannot run thread %lu",
351
                           (unsigned long)t->id);
352 353 354 355 356 357
                // no, the current native thread is bound to a different
                // Haskell thread, so pass it to any worker thread
                pushOnRunQueue(cap,t);
                continue;
            }
        }
358 359 360
    }
#endif

Simon Marlow's avatar
Simon Marlow committed
361 362 363 364 365 366
    // If we're shutting down, and this thread has not yet been
    // killed, kill it now.  This sometimes happens when a finalizer
    // thread is created by the final GC, or a thread previously
    // in a foreign call returns.
    if (sched_state >= SCHED_INTERRUPTING &&
        !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
367
        deleteThread(t);
Simon Marlow's avatar
Simon Marlow committed
368 369
    }

370 371 372 373 374 375 376 377 378 379 380 381
    // If this capability is disabled, migrate the thread away rather
    // than running it.  NB. but not if the thread is bound: it is
    // really hard for a bound thread to migrate itself.  Believe me,
    // I tried several ways and couldn't find a way to do it.
    // Instead, when everything is stopped for GC, we migrate all the
    // threads on the run queue then (see scheduleDoGC()).
    //
    // ToDo: what about TSO_LOCKED?  Currently we're migrating those
    // when the number of capabilities drops, but we never migrate
    // them back if it rises again.  Presumably we should, but after
    // the thread has been migrated we no longer know what capability
    // it was originally on.
Ben Gamari's avatar
Ben Gamari committed
382
#if defined(THREADED_RTS)
383
    if (cap->disabled && !t->bound) {
384
        Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
385 386 387 388 389
        migrateThread(cap, t, dest_cap);
        continue;
    }
#endif

390
    /* context switches are initiated by the timer signal, unless
391 392
     * the user specified "context switch as often as possible", with
     * +RTS -C0
sof's avatar
sof committed
393
     */
394
    if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
395 396
        && !emptyThreadQueues(cap)) {
        cap->context_switch = 1;
397
    }
398

399
run_thread:
400

Ben Gamari's avatar
Ben Gamari committed
401
    // CurrentTSO is the thread to run. It might be different if we
Simon Marlow's avatar
Simon Marlow committed
402 403 404 405
    // loop back to run_thread, so make sure to set CurrentTSO after
    // that.
    cap->r.rCurrentTSO = t;

406 407
    startHeapProfTimer();

408
    // ----------------------------------------------------------------------
409
    // Run the current thread
410

Simon Marlow's avatar
Simon Marlow committed
411
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
412
    ASSERT(t->cap == cap);
413
    ASSERT(t->bound ? t->bound->task->cap == cap : 1);
Simon Marlow's avatar
Simon Marlow committed
414

415 416 417
    prev_what_next = t->what_next;

    errno = t->saved_errno;
Ben Gamari's avatar
Ben Gamari committed
418
#if defined(mingw32_HOST_OS)
419 420 421
    SetLastError(t->saved_winerror);
#endif

422 423 424
    // reset the interrupt flag before running Haskell code
    cap->interrupt = 0;

Ben Gamari's avatar
Ben Gamari committed
425
    cap->in_haskell = true;
426
    cap->idle = 0;
427

428
    dirty_TSO(cap,t);
429
    dirty_STACK(cap,t->stackobj);
Simon Marlow's avatar
Simon Marlow committed
430

431 432 433
    switch (recent_activity)
    {
    case ACTIVITY_DONE_GC: {
434 435
        // ACTIVITY_DONE_GC means we turned off the timer signal to
        // conserve power (see #1623).  Re-enable it here.
436
        uint32_t prev;
437
        prev = xchg((P_)&recent_activity, ACTIVITY_YES);
438
        if (prev == ACTIVITY_DONE_GC) {
Ben Gamari's avatar
Ben Gamari committed
439
#if !defined(PROFILING)
440
            startTimer();
441
#endif
Simon Marlow's avatar
Simon Marlow committed
442
        }
443 444 445
        break;
    }
    case ACTIVITY_INACTIVE:
446 447 448 449
        // If we reached ACTIVITY_INACTIVE, then don't reset it until
        // we've done the GC.  The thread running here might just be
        // the IO manager thread that handle_tick() woke up via
        // wakeUpRts().
450 451
        break;
    default:
452 453
        recent_activity = ACTIVITY_YES;
    }
454

455
    traceEventRunThread(cap, t);
Simon Marlow's avatar
Simon Marlow committed
456

457
    switch (prev_what_next) {
458

459 460
    case ThreadKilled:
    case ThreadComplete:
461 462 463 464
        /* Thread already finished, return to scheduler. */
        ret = ThreadFinished;
        break;

465
    case ThreadRunGHC:
466
    {
467 468 469 470 471
        StgRegTable *r;
        r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
        cap = regTableToCapability(r);
        ret = r->rRet;
        break;
472
    }
473

474
    case ThreadInterpret:
475 476 477 478
        cap = interpretBCO(cap);
        ret = cap->r.rRet;
        break;

479
    default:
480
        barf("schedule: invalid prev_what_next=%u field", prev_what_next);
481 482
    }

Ben Gamari's avatar
Ben Gamari committed
483
    cap->in_haskell = false;
484

485 486 487 488
    // The TSO might have moved, eg. if it re-entered the RTS and a GC
    // happened.  So find the new location:
    t = cap->r.rCurrentTSO;

489 490 491 492
    // cap->r.rCurrentTSO is charged for calls to allocate(), so we
    // don't want it set when not running a Haskell thread.
    cap->r.rCurrentTSO = NULL;

493 494 495 496
    // And save the current errno in this thread.
    // XXX: possibly bogus for SMP because this thread might already
    // be running again, see code below.
    t->saved_errno = errno;
Ben Gamari's avatar
Ben Gamari committed
497
#if defined(mingw32_HOST_OS)
498
    // Similarly for Windows error code
499
    t->saved_winerror = GetLastError();
500
#endif
501

502 503 504 505 506 507 508 509 510
    if (ret == ThreadBlocked) {
        if (t->why_blocked == BlockedOnBlackHole) {
            StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
            traceEventStopThread(cap, t, t->why_blocked + 6,
                                 owner != NULL ? owner->id : 0);
        } else {
            traceEventStopThread(cap, t, t->why_blocked + 6, 0);
        }
    } else {
511 512 513 514 515
        if (ret == StackOverflow) {
          traceEventStopThread(cap, t, ret, t->tot_stack_size);
        } else {
          traceEventStopThread(cap, t, ret, 0);
        }
516
    }
Simon Marlow's avatar
Simon Marlow committed
517

518
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
519
    ASSERT(t->cap == cap);
520

521
    // ----------------------------------------------------------------------
522

523
    // Costs for the scheduler are assigned to CCS_SYSTEM
524
    stopHeapProfTimer();
525
#if defined(PROFILING)
526
    cap->r.rCCCS = CCS_SYSTEM;
527
#endif
528

529
    schedulePostRunThread(cap,t);
530

Ben Gamari's avatar
Ben Gamari committed
531
    ready_to_gc = false;
532

533 534
    switch (ret) {
    case HeapOverflow:
535 536
        ready_to_gc = scheduleHandleHeapOverflow(cap,t);
        break;
537 538

    case StackOverflow:
539 540 541 542 543
        // just adjust the stack for this thread, then pop it back
        // on the run queue.
        threadStackOverflow(cap, t);
        pushOnRunQueue(cap,t);
        break;
544 545

    case ThreadYielding:
546
        if (scheduleHandleYield(cap, t, prev_what_next)) {
547
            // shortcut for switching between compiler/interpreter:
548 549 550
            goto run_thread;
        }
        break;
551 552

    case ThreadBlocked:
553 554
        scheduleHandleThreadBlocked(t);
        break;
555 556

    case ThreadFinished:
557 558 559
        if (scheduleHandleThreadFinished(cap, task, t)) return cap;
        ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
        break;
560 561 562 563 564

    default:
      barf("schedule: invalid thread return code %d", (int)ret);
    }

565
    if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
566
      scheduleDoGC(&cap,task,false,false);
567
    }
568 569 570
  } /* end of while() */
}

571 572 573 574
/* -----------------------------------------------------------------------------
 * Run queue operations
 * -------------------------------------------------------------------------- */

575
static void
576 577 578 579 580 581 582 583 584 585 586 587 588 589 590
removeFromRunQueue (Capability *cap, StgTSO *tso)
{
    if (tso->block_info.prev == END_TSO_QUEUE) {
        ASSERT(cap->run_queue_hd == tso);
        cap->run_queue_hd = tso->_link;
    } else {
        setTSOLink(cap, tso->block_info.prev, tso->_link);
    }
    if (tso->_link == END_TSO_QUEUE) {
        ASSERT(cap->run_queue_tl == tso);
        cap->run_queue_tl = tso->block_info.prev;
    } else {
        setTSOPrev(cap, tso->_link, tso->block_info.prev);
    }
    tso->_link = tso->block_info.prev = END_TSO_QUEUE;
591
    cap->n_run_queue--;
592 593 594 595

    IF_DEBUG(sanity, checkRunQueue(cap));
}

596 597 598 599 600 601 602
void
promoteInRunQueue (Capability *cap, StgTSO *tso)
{
    removeFromRunQueue(cap, tso);
    pushOnRunQueue(cap, tso);
}

603 604 605 606 607 608 609
/* ----------------------------------------------------------------------------
 * Setting up the scheduler loop
 * ------------------------------------------------------------------------- */

static void
schedulePreLoop(void)
{
610
  // initialisation for scheduler - what cannot go into initScheduler()
611

612
#if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
613 614
    win32AllocStack();
#endif
615 616
}

617 618 619 620 621 622 623
/* -----------------------------------------------------------------------------
 * scheduleFindWork()
 *
 * Search for work to do, and handle messages from elsewhere.
 * -------------------------------------------------------------------------- */

static void
624
scheduleFindWork (Capability **pcap)
625
{
626
    scheduleStartSignalHandlers(*pcap);
627

628
    scheduleProcessInbox(pcap);
629

630
    scheduleCheckBlockedThreads(*pcap);
631

Simon Marlow's avatar
Simon Marlow committed
632
#if defined(THREADED_RTS)
633
    if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
634 635 636 637
#endif
}

#if defined(THREADED_RTS)
Ben Gamari's avatar
Ben Gamari committed
638 639
STATIC_INLINE bool
shouldYieldCapability (Capability *cap, Task *task, bool didGcLast)
640 641
{
    // we need to yield this capability to someone else if..
642 643
    //   - another thread is initiating a GC, and we didn't just do a GC
    //     (see Note [GC livelock])
644 645 646 647
    //   - another Task is returning from a foreign call
    //   - the thread at the head of the run queue cannot be run
    //     by this Task (it is bound to another Task, or it is unbound
    //     and this task it bound).
648 649 650 651 652 653 654 655 656
    //
    // Note [GC livelock]
    //
    // If we are interrupted to do a GC, then we do not immediately do
    // another one.  This avoids a starvation situation where one
    // Capability keeps forcing a GC and the other Capabilities make no
    // progress at all.

    return ((pending_sync && !didGcLast) ||
657
            cap->n_returning_tasks != 0 ||
658
            (!emptyRunQueue(cap) && (task->incall->tso == NULL
659 660
                                     ? peekRunQueue(cap)->bound != NULL
                                     : peekRunQueue(cap)->bound != task->incall)));
661 662 663 664 665
}

// This is the single place where a Task goes to sleep.  There are
// two reasons it might need to sleep:
//    - there are no threads to run
666
//    - we need to yield this Capability to someone else
667 668
//      (see shouldYieldCapability())
//
669 670 671
// Careful: the scheduler loop is quite delicate.  Make sure you run
// the tests in testsuite/concurrent (all ways) after modifying this,
// and also check the benchmarks in nofib/parallel for regressions.
672 673

static void
674
scheduleYield (Capability **pcap, Task *task)
675 676
{
    Capability *cap = *pcap;
Ben Gamari's avatar
Ben Gamari committed
677
    bool didGcLast = false;
678 679

    // if we have work, and we don't need to give up the Capability, continue.
680
    //
Ben Gamari's avatar
Ben Gamari committed
681
    if (!shouldYieldCapability(cap,task,false) &&
682
        (!emptyRunQueue(cap) ||
683
         !emptyInbox(cap) ||
684
         sched_state >= SCHED_INTERRUPTING)) {
685
        return;
686
    }
687 688 689

    // otherwise yield (sleep), and keep yielding if necessary.
    do {
690
        if (doIdleGCWork(cap, false)) {
691
            // there's more idle GC work to do
692 693
            didGcLast = false;
        } else {
694
            // no more idle GC work to do
695 696
            didGcLast = yieldCapability(&cap,task, !didGcLast);
        }
697
    }
698
    while (shouldYieldCapability(cap,task,didGcLast));
699 700 701 702 703 704 705 706

    // note there may still be no threads on the run queue at this
    // point, the caller has to check.

    *pcap = cap;
    return;
}
#endif
707

708 709 710 711 712 713 714
/* -----------------------------------------------------------------------------
 * schedulePushWork()
 *
 * Push work to other Capabilities if we have some.
 * -------------------------------------------------------------------------- */

static void
715 716
schedulePushWork(Capability *cap USED_IF_THREADS,
                 Task *task      USED_IF_THREADS)
717
{
718 719
#if defined(THREADED_RTS)

720
    Capability *free_caps[n_capabilities], *cap0;
721
    uint32_t i, n_wanted_caps, n_free_caps;
722

Simon Marlow's avatar
Simon Marlow committed
723 724
    uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;

Simon Marlow's avatar
Simon Marlow committed
725
    // migration can be turned off with +RTS -qm
Simon Marlow's avatar
Simon Marlow committed
726 727 728
    if (!RtsFlags.ParFlags.migrate) {
        spare_threads = 0;
    }
729

730 731
    // Figure out how many capabilities we want to wake up.  We need at least
    // sparkPoolSize(cap) plus the number of spare threads we have.
Simon Marlow's avatar
Simon Marlow committed
732
    n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
733
    if (n_wanted_caps == 0) return;
734

Simon Marlow's avatar
Simon Marlow committed
735 736
    // First grab as many free Capabilities as we can.  ToDo: we should use
    // capabilities on the same NUMA node preferably, but not exclusively.
737 738 739
    for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
         n_free_caps < n_wanted_caps && i != cap->no;
         i = (i + 1) % n_capabilities) {
740
        cap0 = capabilities[i];
741
        if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
742
            if (!emptyRunQueue(cap0)
743
                || cap0->n_returning_tasks != 0
Simon Marlow's avatar
Simon Marlow committed
744
                || !emptyInbox(cap0)) {
745 746 747 748 749 750 751
                // it already has some work, we just grabbed it at
                // the wrong moment.  Or maybe it's deadlocked!
                releaseCapability(cap0);
            } else {
                free_caps[n_free_caps++] = cap0;
            }
        }
752 753
    }

Simon Marlow's avatar
Simon Marlow committed
754 755 756 757 758 759 760 761 762 763
    // We now have n_free_caps free capabilities stashed in
    // free_caps[].  Attempt to share our run queue equally with them.
    // This is complicated slightly by the fact that we can't move
    // some threads:
    //
    //  - threads that have TSO_LOCKED cannot migrate
    //  - a thread that is bound to the current Task cannot be migrated
    //
    // This is about the simplest thing we could do; improvements we
    // might want to do include:
764
    //
765
    //   - giving high priority to moving relatively new threads, on
766 767 768 769 770 771
    //     the gournds that they haven't had time to build up a
    //     working set in the cache on this CPU/Capability.
    //
    //   - giving low priority to moving long-lived threads

    if (n_free_caps > 0) {
772
        StgTSO *prev, *t, *next;
773

774
        debugTrace(DEBUG_sched,
Simon Marlow's avatar
Simon Marlow committed
775 776
                   "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
                   cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
777
                   n_free_caps);
778

779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811
        // There are n_free_caps+1 caps in total.  We will share the threads
        // evently between them, *except* that if the run queue does not divide
        // evenly by n_free_caps+1 then we bias towards the current capability.
        // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
        uint32_t keep_threads =
            (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);

        // This also ensures that we don't give away all our threads, since
        // (x + y) / (y + 1) >= 1 when x >= 1.

        // The number of threads we have left.
        uint32_t n = cap->n_run_queue;

        // prev = the previous thread on this cap's run queue
        prev = END_TSO_QUEUE;

        // We're going to walk through the run queue, migrating threads to other
        // capabilities until we have only keep_threads left.  We might
        // encounter a thread that cannot be migrated, in which case we add it
        // to the current run queue and decrement keep_threads.
        for (t = cap->run_queue_hd, i = 0;
             t != END_TSO_QUEUE && n > keep_threads;
             t = next)
        {
            next = t->_link;
            t->_link = END_TSO_QUEUE;

            // Should we keep this thread?
            if (t->bound == task->incall // don't move my bound thread
                || tsoLocked(t) // don't move a locked thread
                ) {
                if (prev == END_TSO_QUEUE) {
                    cap->run_queue_hd = t;
812
                } else {
813
                    setTSOLink(cap, prev, t);
814
                }
815 816 817
                setTSOPrev(cap, t, prev);
                prev = t;
                if (keep_threads > 0) keep_threads--;
818
            }
819

820 821 822 823
            // Or migrate it?
            else {
                appendToRunQueue(free_caps[i],t);
                traceEventMigrateThread (cap, t, free_caps[i]->no);
824

825 826 827 828 829
                if (t->bound) { t->bound->task->cap = free_caps[i]; }
                t->cap = free_caps[i];
                n--; // we have one fewer threads now
                i++; // move on to the next free_cap
                if (i == n_free_caps) i = 0;
830 831
            }
        }
832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847

        // Join up the beginning of the queue (prev)
        // with the rest of the queue (t)
        if (t == END_TSO_QUEUE) {
            cap->run_queue_tl = prev;
        } else {
            setTSOPrev(cap, t, prev);
        }
        if (prev == END_TSO_QUEUE) {
            cap->run_queue_hd = t;
        } else {
            setTSOLink(cap, prev, t);
        }
        cap->n_run_queue = n;

        IF_DEBUG(sanity, checkRunQueue(cap));
848

849 850 851
        // release the capabilities
        for (i = 0; i < n_free_caps; i++) {
            task->cap = free_caps[i];
852 853 854 855 856 857 858
            if (sparkPoolSizeCap(cap) > 0) {
                // If we have sparks to steal, wake up a worker on the
                // capability, even if it has no threads to run.
                releaseAndWakeupCapability(free_caps[i]);
            } else {
                releaseCapability(free_caps[i]);
            }
859
        }
860 861
    }
    task->cap = cap; // reset to point to our Capability.
862 863 864

#endif /* THREADED_RTS */

865 866
}

867 868 869 870
/* ----------------------------------------------------------------------------
 * Start any pending signal handlers
 * ------------------------------------------------------------------------- */

871
#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
872
static void
873
scheduleStartSignalHandlers(Capability *cap)
874
{
875 876
    if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
        // safe outside the lock
877
        startSignalHandlers(cap);
878 879
    }
}
880 881 882 883 884 885
#else
static void
scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
{
}
#endif
886 887 888 889 890 891

/* ----------------------------------------------------------------------------
 * Check for blocked threads that can be woken up.
 * ------------------------------------------------------------------------- */

static void
892
scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
893
{
894
#if !defined(THREADED_RTS)
895 896 897 898 899
    //
    // Check whether any waiting threads need to be woken up.  If the
    // run queue is empty, and there are no other tasks running, we
    // can wait indefinitely for something to happen.
    //
900
    if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
901
    {
902
        awaitEvent (emptyRunQueue(cap));
903 904 905 906
    }
#endif
}

907 908 909 910 911
/* ----------------------------------------------------------------------------
 * Detect deadlock conditions and attempt to resolve them.
 * ------------------------------------------------------------------------- */

static void
912
scheduleDetectDeadlock (Capability **pcap, Task *task)
913
{
914 915
    Capability *cap = *pcap;
    /*
916
     * Detect deadlock: when we have no threads to run, there are no
917 918 919
     * threads blocked, waiting for I/O, or sleeping, and all the
     * other tasks are waiting for work, we must have a deadlock of
     * some description.
920
     */
921
    if ( emptyThreadQueues(cap) )
922
    {
923
#if defined(THREADED_RTS)
924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939
        /*
         * In the threaded RTS, we only check for deadlock if there
         * has been no activity in a complete timeslice.  This means
         * we won't eagerly start a full GC just because we don't have
         * any threads to run currently.
         */
        if (recent_activity != ACTIVITY_INACTIVE) return;
#endif

        debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");

        // Garbage collection can release some new threads due to
        // either (a) finalizers or (b) threads resurrected because
        // they are unreachable and will therefore be sent an
        // exception.  Any threads thus released will be immediately
        // runnable.
940
        scheduleDoGC (pcap, task, true/*force major GC*/, true/*deadlock detection*/);
941
        cap = *pcap;
Ben Gamari's avatar
Ben Gamari committed
942
        // when force_major == true. scheduleDoGC sets
943 944
        // recent_activity to ACTIVITY_DONE_GC and turns off the timer
        // signal.
945

946
        if ( !emptyRunQueue(cap) ) return;
947

948
#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
949 950 951 952 953 954 955 956 957 958 959 960 961
        /* If we have user-installed signal handlers, then wait
         * for signals to arrive rather then bombing out with a
         * deadlock.
         */
        if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
            debugTrace(DEBUG_sched,
                       "still deadlocked, waiting for signals...");

            awaitUserSignals();

            if (signals_pending()) {
                startSignalHandlers(cap);
            }
962

963 964
            // either we have threads to run, or we were interrupted:
            ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
965 966

            return;
967
        }
968 969
#endif

970
#if !defined(THREADED_RTS)
971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988
        /* Probably a real deadlock.  Send the current main thread the
         * Deadlock exception.
         */
        if (task->incall->tso) {
            switch (task->incall->tso->why_blocked) {
            case BlockedOnSTM:
            case BlockedOnBlackHole:
            case BlockedOnMsgThrowTo:
            case BlockedOnMVar:
            case BlockedOnMVarRead:
                throwToSingleThreaded(cap, task->incall->tso,
                                      (StgClosure *)nonTermination_closure);
                return;
            default:
                barf("deadlock: main thread blocked in a strange way");
            }
        }
        return;
989
#endif
990
    }
991 992
}

993

994 995 996 997 998
/* ----------------------------------------------------------------------------
 * Process message in the current Capability's inbox
 * ------------------------------------------------------------------------- */

static void
999
scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
1000 1001
{
#if defined(THREADED_RTS)
1002
    Message *m, *next;
Simon Marlow's avatar
Simon Marlow committed
1003
    PutMVar *p, *pnext;
1004
    int r;
1005
    Capability *cap = *pcap;
1006 1007

    while (!emptyInbox(cap)) {
1008 1009
        // Executing messages might use heap, so we should check for GC.
        if (doYouWantToGC(cap)) {
1010
            scheduleDoGC(pcap, cap->running_task, false, false);
1011
            cap = *pcap;
1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026
        }

        // don't use a blocking acquire; if the lock is held by
        // another thread then just carry on.  This seems to avoid
        // getting stuck in a message ping-pong situation with other
        // processors.  We'll check the inbox again later anyway.
        //
        // We should really use a more efficient queue data structure
        // here.  The trickiness is that we must ensure a Capability
        // never goes idle if the inbox is non-empty, which is why we
        // use cap->lock (cap->lock is released as the last thing
        // before going idle; see Capability.c:releaseCapability()).
        r = TRY_ACQUIRE_LOCK(&cap->lock);
        if (r != 0) return;

1027
        m = cap->inbox;
Simon Marlow's avatar
Simon Marlow committed
1028
        p = cap->putMVars;
1029
        cap->inbox = (Message*)END_TSO_QUEUE;
Simon Marlow's avatar
Simon Marlow committed
1030
        cap->putMVars = NULL;
1031

1032
        RELEASE_LOCK(&cap->lock);
1033 1034 1035 1036 1037 1038

        while (m != (Message*)END_TSO_QUEUE) {
            next = m->link;
            executeMessage(cap, m);
            m = next;
        }
Simon Marlow's avatar
Simon Marlow committed
1039