Schedule.c 100 KB
Newer Older
1
/* ---------------------------------------------------------------------------
2
 *
3
 * (c) The GHC Team, 1998-2006
4
 *
5
 * The scheduler and thread-related functionality
sof's avatar
sof committed
6
 *
7 8
 * --------------------------------------------------------------------------*/

9
#include "PosixSource.h"
10
#define KEEP_LOCKCLOSURE
11
#include "Rts.h"
Simon Marlow's avatar
Simon Marlow committed
12 13

#include "sm/Storage.h"
14 15 16
#include "RtsUtils.h"
#include "StgRun.h"
#include "Schedule.h"
17
#include "Interpreter.h"
18
#include "Printer.h"
19
#include "RtsSignals.h"
Simon Marlow's avatar
Simon Marlow committed
20
#include "sm/Sanity.h"
21
#include "Stats.h"
22
#include "STM.h"
23
#include "Prelude.h"
24
#include "ThreadLabels.h"
25
#include "Updates.h"
26 27
#include "Proftimer.h"
#include "ProfHeap.h"
28
#include "Weak.h"
Simon Marlow's avatar
Simon Marlow committed
29
#include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30
#include "sm/GCThread.h"
31
#include "Sparks.h"
sof's avatar
sof committed
32
#include "Capability.h"
33 34
#include "Task.h"
#include "AwaitEvent.h"
35 36 37
#if defined(mingw32_HOST_OS)
#include "win32/IOManager.h"
#endif
Simon Marlow's avatar
Simon Marlow committed
38
#include "Trace.h"
39 40
#include "RaiseAsync.h"
#include "Threads.h"
Simon Marlow's avatar
Simon Marlow committed
41 42
#include "Timer.h"
#include "ThreadPaused.h"
43
#include "Messages.h"
David Feuer's avatar
David Feuer committed
44 45
#include "StablePtr.h"
#include "StableName.h"
46
#include "TopHandler.h"
47

Ben Gamari's avatar
Ben Gamari committed
48
#if defined(HAVE_SYS_TYPES_H)
49 50
#include <sys/types.h>
#endif
Ben Gamari's avatar
Ben Gamari committed
51
#if defined(HAVE_UNISTD_H)
52 53 54
#include <unistd.h>
#endif

55 56
#include <string.h>
#include <stdlib.h>
57
#include <stdarg.h>
58

Ben Gamari's avatar
Ben Gamari committed
59
#if defined(HAVE_ERRNO_H)
60 61 62
#include <errno.h>
#endif

Ben Gamari's avatar
Ben Gamari committed
63
#if defined(TRACING)
64 65
#include "eventlog/EventLog.h"
#endif
66 67 68
/* -----------------------------------------------------------------------------
 * Global variables
 * -------------------------------------------------------------------------- */
69

70
#if !defined(THREADED_RTS)
71
// Blocked/sleeping threads
72 73
StgTSO *blocked_queue_hd = NULL;
StgTSO *blocked_queue_tl = NULL;
74 75
StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
#endif
76

77 78 79 80 81 82 83 84
// Bytes allocated since the last time a HeapOverflow exception was thrown by
// the RTS
uint64_t allocated_bytes_at_heapoverflow = 0;

/* Set to true when the latest garbage collection failed to reclaim enough
 * space, and the runtime should proceed to shut itself down in an orderly
 * fashion (emitting profiling info etc.), OR throw an exception to the main
 * thread, if it is still alive.
85
 */
Ben Gamari's avatar
Ben Gamari committed
86
bool heap_overflow = false;
87

88 89 90
/* flag that tracks whether we have done any execution in this time slice.
 * LOCK: currently none, perhaps we should lock (but needs to be
 * updated in the fast path of the scheduler).
91 92
 *
 * NB. must be StgWord, we do xchg() on it.
93
 */
94
volatile StgWord recent_activity = ACTIVITY_YES;
95

96
/* if this flag is set as well, give up execution
97
 * LOCK: none (changes monotonically)
98
 */
99
volatile StgWord sched_state = SCHED_RUNNING;
100

101 102
/*
 * This mutex protects most of the global scheduler data in
103
 * the THREADED_RTS runtime.
sof's avatar
sof committed
104
 */
105
#if defined(THREADED_RTS)
106
Mutex sched_mutex;
107
#endif
sof's avatar
sof committed
108

109 110 111 112
#if !defined(mingw32_HOST_OS)
#define FORKPROCESS_PRIMOP_SUPPORTED
#endif

113 114 115 116
/* -----------------------------------------------------------------------------
 * static function prototypes
 * -------------------------------------------------------------------------- */

117
static Capability *schedule (Capability *initialCapability, Task *task);
118 119

//
Gabor Greif's avatar
typo  
Gabor Greif committed
120
// These functions all encapsulate parts of the scheduler loop, and are
121 122 123
// abstracted only to make the structure and control flow of the
// scheduler clearer.
//
124
static void schedulePreLoop (void);
125
static void scheduleFindWork (Capability **pcap);
126
#if defined(THREADED_RTS)
127
static void scheduleYield (Capability **pcap, Task *task);
128
#endif
129
#if defined(THREADED_RTS)
Ben Gamari's avatar
Ben Gamari committed
130 131
static bool requestSync (Capability **pcap, Task *task,
                         PendingSync *sync_type, SyncType *prev_sync_type);
132
static void acquireAllCapabilities(Capability *cap, Task *task);
133 134
static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task);
static void startWorkerTasks (uint32_t from USED_IF_THREADS,
Ben Gamari's avatar
Ben Gamari committed
135
                              uint32_t to USED_IF_THREADS);
136
#endif
137
static void scheduleStartSignalHandlers (Capability *cap);
138
static void scheduleCheckBlockedThreads (Capability *cap);
139 140
static void scheduleProcessInbox(Capability **cap);
static void scheduleDetectDeadlock (Capability **pcap, Task *task);
141
static void schedulePushWork(Capability *cap, Task *task);
Simon Marlow's avatar
Simon Marlow committed
142
#if defined(THREADED_RTS)
143
static void scheduleActivateSpark(Capability *cap);
144
#endif
145
static void schedulePostRunThread(Capability *cap, StgTSO *t);
Ben Gamari's avatar
Ben Gamari committed
146 147 148
static bool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
static bool scheduleHandleYield( Capability *cap, StgTSO *t,
                                 uint32_t prev_what_next );
149
static void scheduleHandleThreadBlocked( StgTSO *t );
Ben Gamari's avatar
Ben Gamari committed
150 151 152 153
static bool scheduleHandleThreadFinished( Capability *cap, Task *task,
                                          StgTSO *t );
static bool scheduleNeedHeapProfile(bool ready_to_gc);
static void scheduleDoGC(Capability **pcap, Task *task, bool force_major);
154

155 156
static void deleteThread (StgTSO *tso);
static void deleteAllThreads (void);
157

Ben Gamari's avatar
Ben Gamari committed
158
#if defined(FORKPROCESS_PRIMOP_SUPPORTED)
159
static void deleteThread_(StgTSO *tso);
160
#endif
161

162
/* ---------------------------------------------------------------------------
163 164 165 166 167 168 169 170 171 172 173
   Main scheduling loop.

   We use round-robin scheduling, each thread returning to the
   scheduler loop when one of these conditions is detected:

      * out of heap space
      * timer expires (thread yields)
      * thread blocks
      * thread ends
      * stack overflow

174
   ------------------------------------------------------------------------ */
175

176 177
static Capability *
schedule (Capability *initialCapability, Task *task)
178 179
{
  StgTSO *t;
180
  Capability *cap;
181
  StgThreadReturnCode ret;
182
  uint32_t prev_what_next;
Ben Gamari's avatar
Ben Gamari committed
183
  bool ready_to_gc;
184

185 186
  cap = initialCapability;

187 188 189
  // Pre-condition: this task owns initialCapability.
  // The sched_mutex is *NOT* held
  // NB. on return, we still hold a capability.
190

191
  debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
192

193
  schedulePreLoop();
194

195 196
  // -----------------------------------------------------------
  // Scheduler loop starts here:
197

Simon Marlow's avatar
Simon Marlow committed
198
  while (1) {
199

200 201 202
    // Check whether we have re-entered the RTS from Haskell without
    // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
    // call).
203
    if (cap->in_haskell) {
204 205 206
          errorBelch("schedule: re-entered unsafely.\n"
                     "   Perhaps a 'foreign import unsafe' should be 'safe'?");
          stg_exit(EXIT_FAILURE);
207 208
    }

209
    // Note [shutdown]: The interruption / shutdown sequence.
210
    //
211 212 213 214 215 216
    // In order to cleanly shut down the runtime, we want to:
    //   * make sure that all main threads return to their callers
    //     with the state 'Interrupted'.
    //   * clean up all OS threads assocated with the runtime
    //   * free all memory etc.
    //
217
    // So the sequence goes like this:
218
    //
219 220
    //   * The shutdown sequence is initiated by calling hs_exit(),
    //     interruptStgRts(), or running out of memory in the GC.
221
    //
222
    //   * Set sched_state = SCHED_INTERRUPTING
223
    //
224 225 226 227 228
    //   * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
    //     scheduleDoGC(), which halts the whole runtime by acquiring all the
    //     capabilities, does a GC and then calls deleteAllThreads() to kill all
    //     the remaining threads.  The zombies are left on the run queue for
    //     cleaning up.  We can't kill threads involved in foreign calls.
229
    //
230 231 232 233 234 235
    //   * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
    //
    //   * After this point, there can be NO MORE HASKELL EXECUTION.  This is
    //     enforced by the scheduler, which won't run any Haskell code when
    //     sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
    //     other capabilities by doing the GC earlier.
236
    //
237 238 239
    //   * all workers exit when the run queue on their capability
    //     drains.  All main threads will also exit when their TSO
    //     reaches the head of the run queue and they can return.
240
    //
241 242 243
    //   * eventually all Capabilities will shut down, and the RTS can
    //     exit.
    //
244
    //   * We might be left with threads blocked in foreign calls,
245
    //     we should really attempt to kill these somehow (TODO).
246

247 248
    switch (sched_state) {
    case SCHED_RUNNING:
249
        break;
250
    case SCHED_INTERRUPTING:
251
        debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
252
        /* scheduleDoGC() deletes all the threads */
Ben Gamari's avatar
Ben Gamari committed
253
        scheduleDoGC(&cap,task,true);
254 255 256 257 258 259 260 261

        // after scheduleDoGC(), we must be shutting down.  Either some
        // other Capability did the final GC, or we did it above,
        // either way we can fall through to the SCHED_SHUTTING_DOWN
        // case now.
        ASSERT(sched_state == SCHED_SHUTTING_DOWN);
        // fall through

262
    case SCHED_SHUTTING_DOWN:
263 264 265 266 267 268 269 270
        debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
        // If we are a worker, just exit.  If we're a bound thread
        // then we will exit below when we've removed our TSO from
        // the run queue.
        if (!isBoundTask(task) && emptyRunQueue(cap)) {
            return cap;
        }
        break;
271
    default:
duog's avatar
duog committed
272
        barf("sched_state: %" FMT_Word, sched_state);
273
    }
274

275
    scheduleFindWork(&cap);
276

277 278 279
    /* work pushing, currently relevant only for THREADED_RTS:
       (pushes threads, wakes up idle capabilities for stealing) */
    schedulePushWork(cap,task);
280

281
    scheduleDetectDeadlock(&cap,task);
282 283

    // Normally, the only way we can get here with no threads to
284
    // run is if a keyboard interrupt received during
285 286 287
    // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
    // Additionally, it is not fatal for the
    // threaded RTS to reach here with no threads to run.
288
    //
289 290
    // win32: might be here due to awaitEvent() being abandoned
    // as a result of a console event having been delivered.
291

292
#if defined(THREADED_RTS)
293
    scheduleYield(&cap,task);
294

295 296 297
    if (emptyRunQueue(cap)) continue; // look for work again
#endif

298
#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
299
    if ( emptyRunQueue(cap) ) {
300
        ASSERT(sched_state >= SCHED_INTERRUPTING);
301
    }
302
#endif
303

304
    //
305 306
    // Get a thread to run
    //
307
    t = popRunQueue(cap);
308

309 310 311
    // Sanity check the thread we're about to run.  This can be
    // expensive if there is lots of thread switching going on...
    IF_DEBUG(sanity,checkTSO(t));
312

313
#if defined(THREADED_RTS)
314 315 316
    // Check whether we can run this thread in the current task.
    // If not, we have to pass our capability to the right task.
    {
317
        InCall *bound = t->bound;
318 319 320 321 322 323 324

        if (bound) {
            if (bound->task == task) {
                // yes, the Haskell thread is bound to the current native thread
            } else {
                debugTrace(DEBUG_sched,
                           "thread %lu bound to another OS thread",
325
                           (unsigned long)t->id);
326 327 328 329 330 331 332 333 334
                // no, bound to a different Haskell thread: pass to that thread
                pushOnRunQueue(cap,t);
                continue;
            }
        } else {
            // The thread we want to run is unbound.
            if (task->incall->tso) {
                debugTrace(DEBUG_sched,
                           "this OS thread cannot run thread %lu",
335
                           (unsigned long)t->id);
336 337 338 339 340 341
                // no, the current native thread is bound to a different
                // Haskell thread, so pass it to any worker thread
                pushOnRunQueue(cap,t);
                continue;
            }
        }
342 343 344
    }
#endif

Simon Marlow's avatar
Simon Marlow committed
345 346 347 348 349 350
    // If we're shutting down, and this thread has not yet been
    // killed, kill it now.  This sometimes happens when a finalizer
    // thread is created by the final GC, or a thread previously
    // in a foreign call returns.
    if (sched_state >= SCHED_INTERRUPTING &&
        !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
351
        deleteThread(t);
Simon Marlow's avatar
Simon Marlow committed
352 353
    }

354 355 356 357 358 359 360 361 362 363 364 365
    // If this capability is disabled, migrate the thread away rather
    // than running it.  NB. but not if the thread is bound: it is
    // really hard for a bound thread to migrate itself.  Believe me,
    // I tried several ways and couldn't find a way to do it.
    // Instead, when everything is stopped for GC, we migrate all the
    // threads on the run queue then (see scheduleDoGC()).
    //
    // ToDo: what about TSO_LOCKED?  Currently we're migrating those
    // when the number of capabilities drops, but we never migrate
    // them back if it rises again.  Presumably we should, but after
    // the thread has been migrated we no longer know what capability
    // it was originally on.
Ben Gamari's avatar
Ben Gamari committed
366
#if defined(THREADED_RTS)
367
    if (cap->disabled && !t->bound) {
368
        Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
369 370 371 372 373
        migrateThread(cap, t, dest_cap);
        continue;
    }
#endif

374
    /* context switches are initiated by the timer signal, unless
375 376
     * the user specified "context switch as often as possible", with
     * +RTS -C0
sof's avatar
sof committed
377
     */
378
    if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
379 380
        && !emptyThreadQueues(cap)) {
        cap->context_switch = 1;
381
    }
382

383
run_thread:
384

Ben Gamari's avatar
Ben Gamari committed
385
    // CurrentTSO is the thread to run. It might be different if we
Simon Marlow's avatar
Simon Marlow committed
386 387 388 389
    // loop back to run_thread, so make sure to set CurrentTSO after
    // that.
    cap->r.rCurrentTSO = t;

390 391
    startHeapProfTimer();

392
    // ----------------------------------------------------------------------
393
    // Run the current thread
394

Simon Marlow's avatar
Simon Marlow committed
395
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
396
    ASSERT(t->cap == cap);
397
    ASSERT(t->bound ? t->bound->task->cap == cap : 1);
Simon Marlow's avatar
Simon Marlow committed
398

399 400 401
    prev_what_next = t->what_next;

    errno = t->saved_errno;
Ben Gamari's avatar
Ben Gamari committed
402
#if defined(mingw32_HOST_OS)
403 404 405
    SetLastError(t->saved_winerror);
#endif

406 407 408
    // reset the interrupt flag before running Haskell code
    cap->interrupt = 0;

Ben Gamari's avatar
Ben Gamari committed
409
    cap->in_haskell = true;
410
    cap->idle = 0;
411

412
    dirty_TSO(cap,t);
413
    dirty_STACK(cap,t->stackobj);
Simon Marlow's avatar
Simon Marlow committed
414

415 416 417
    switch (recent_activity)
    {
    case ACTIVITY_DONE_GC: {
418 419
        // ACTIVITY_DONE_GC means we turned off the timer signal to
        // conserve power (see #1623).  Re-enable it here.
420
        uint32_t prev;
421
        prev = xchg((P_)&recent_activity, ACTIVITY_YES);
422
        if (prev == ACTIVITY_DONE_GC) {
Ben Gamari's avatar
Ben Gamari committed
423
#if !defined(PROFILING)
424
            startTimer();
425
#endif
Simon Marlow's avatar
Simon Marlow committed
426
        }
427 428 429
        break;
    }
    case ACTIVITY_INACTIVE:
430 431 432 433
        // If we reached ACTIVITY_INACTIVE, then don't reset it until
        // we've done the GC.  The thread running here might just be
        // the IO manager thread that handle_tick() woke up via
        // wakeUpRts().
434 435
        break;
    default:
436 437
        recent_activity = ACTIVITY_YES;
    }
438

439
    traceEventRunThread(cap, t);
Simon Marlow's avatar
Simon Marlow committed
440

441
    switch (prev_what_next) {
442

443 444
    case ThreadKilled:
    case ThreadComplete:
445 446 447 448
        /* Thread already finished, return to scheduler. */
        ret = ThreadFinished;
        break;

449
    case ThreadRunGHC:
450
    {
451 452 453 454 455
        StgRegTable *r;
        r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
        cap = regTableToCapability(r);
        ret = r->rRet;
        break;
456
    }
457

458
    case ThreadInterpret:
459 460 461 462
        cap = interpretBCO(cap);
        ret = cap->r.rRet;
        break;

463
    default:
464
        barf("schedule: invalid prev_what_next=%u field", prev_what_next);
465 466
    }

Ben Gamari's avatar
Ben Gamari committed
467
    cap->in_haskell = false;
468

469 470 471 472
    // The TSO might have moved, eg. if it re-entered the RTS and a GC
    // happened.  So find the new location:
    t = cap->r.rCurrentTSO;

473 474 475 476
    // cap->r.rCurrentTSO is charged for calls to allocate(), so we
    // don't want it set when not running a Haskell thread.
    cap->r.rCurrentTSO = NULL;

477 478 479 480
    // And save the current errno in this thread.
    // XXX: possibly bogus for SMP because this thread might already
    // be running again, see code below.
    t->saved_errno = errno;
Ben Gamari's avatar
Ben Gamari committed
481
#if defined(mingw32_HOST_OS)
482
    // Similarly for Windows error code
483
    t->saved_winerror = GetLastError();
484
#endif
485

486 487 488 489 490 491 492 493 494
    if (ret == ThreadBlocked) {
        if (t->why_blocked == BlockedOnBlackHole) {
            StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
            traceEventStopThread(cap, t, t->why_blocked + 6,
                                 owner != NULL ? owner->id : 0);
        } else {
            traceEventStopThread(cap, t, t->why_blocked + 6, 0);
        }
    } else {
495 496 497 498 499
        if (ret == StackOverflow) {
          traceEventStopThread(cap, t, ret, t->tot_stack_size);
        } else {
          traceEventStopThread(cap, t, ret, 0);
        }
500
    }
Simon Marlow's avatar
Simon Marlow committed
501

502
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
503
    ASSERT(t->cap == cap);
504

505
    // ----------------------------------------------------------------------
506

507
    // Costs for the scheduler are assigned to CCS_SYSTEM
508
    stopHeapProfTimer();
509
#if defined(PROFILING)
510
    cap->r.rCCCS = CCS_SYSTEM;
511
#endif
512

513
    schedulePostRunThread(cap,t);
514

Ben Gamari's avatar
Ben Gamari committed
515
    ready_to_gc = false;
516

517 518
    switch (ret) {
    case HeapOverflow:
519 520
        ready_to_gc = scheduleHandleHeapOverflow(cap,t);
        break;
521 522

    case StackOverflow:
523 524 525 526 527
        // just adjust the stack for this thread, then pop it back
        // on the run queue.
        threadStackOverflow(cap, t);
        pushOnRunQueue(cap,t);
        break;
528 529

    case ThreadYielding:
530
        if (scheduleHandleYield(cap, t, prev_what_next)) {
531
            // shortcut for switching between compiler/interpreter:
532 533 534
            goto run_thread;
        }
        break;
535 536

    case ThreadBlocked:
537 538
        scheduleHandleThreadBlocked(t);
        break;
539 540

    case ThreadFinished:
541 542 543
        if (scheduleHandleThreadFinished(cap, task, t)) return cap;
        ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
        break;
544 545 546 547 548

    default:
      barf("schedule: invalid thread return code %d", (int)ret);
    }

549
    if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
Ben Gamari's avatar
Ben Gamari committed
550
      scheduleDoGC(&cap,task,false);
551
    }
552 553 554
  } /* end of while() */
}

555 556 557 558
/* -----------------------------------------------------------------------------
 * Run queue operations
 * -------------------------------------------------------------------------- */

559
static void
560 561 562 563 564 565 566 567 568 569 570 571 572 573 574
removeFromRunQueue (Capability *cap, StgTSO *tso)
{
    if (tso->block_info.prev == END_TSO_QUEUE) {
        ASSERT(cap->run_queue_hd == tso);
        cap->run_queue_hd = tso->_link;
    } else {
        setTSOLink(cap, tso->block_info.prev, tso->_link);
    }
    if (tso->_link == END_TSO_QUEUE) {
        ASSERT(cap->run_queue_tl == tso);
        cap->run_queue_tl = tso->block_info.prev;
    } else {
        setTSOPrev(cap, tso->_link, tso->block_info.prev);
    }
    tso->_link = tso->block_info.prev = END_TSO_QUEUE;
575
    cap->n_run_queue--;
576 577 578 579

    IF_DEBUG(sanity, checkRunQueue(cap));
}

580 581 582 583 584 585 586
void
promoteInRunQueue (Capability *cap, StgTSO *tso)
{
    removeFromRunQueue(cap, tso);
    pushOnRunQueue(cap, tso);
}

587 588 589 590 591 592 593
/* ----------------------------------------------------------------------------
 * Setting up the scheduler loop
 * ------------------------------------------------------------------------- */

static void
schedulePreLoop(void)
{
594
  // initialisation for scheduler - what cannot go into initScheduler()
595

596
#if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
597 598
    win32AllocStack();
#endif
599 600
}

601 602 603 604 605 606 607
/* -----------------------------------------------------------------------------
 * scheduleFindWork()
 *
 * Search for work to do, and handle messages from elsewhere.
 * -------------------------------------------------------------------------- */

static void
608
scheduleFindWork (Capability **pcap)
609
{
610
    scheduleStartSignalHandlers(*pcap);
611

612
    scheduleProcessInbox(pcap);
613

614
    scheduleCheckBlockedThreads(*pcap);
615

Simon Marlow's avatar
Simon Marlow committed
616
#if defined(THREADED_RTS)
617
    if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
618 619 620 621
#endif
}

#if defined(THREADED_RTS)
Ben Gamari's avatar
Ben Gamari committed
622 623
STATIC_INLINE bool
shouldYieldCapability (Capability *cap, Task *task, bool didGcLast)
624 625
{
    // we need to yield this capability to someone else if..
626 627
    //   - another thread is initiating a GC, and we didn't just do a GC
    //     (see Note [GC livelock])
628 629 630 631
    //   - another Task is returning from a foreign call
    //   - the thread at the head of the run queue cannot be run
    //     by this Task (it is bound to another Task, or it is unbound
    //     and this task it bound).
632 633 634 635 636 637 638 639 640
    //
    // Note [GC livelock]
    //
    // If we are interrupted to do a GC, then we do not immediately do
    // another one.  This avoids a starvation situation where one
    // Capability keeps forcing a GC and the other Capabilities make no
    // progress at all.

    return ((pending_sync && !didGcLast) ||
641
            cap->n_returning_tasks != 0 ||
642
            (!emptyRunQueue(cap) && (task->incall->tso == NULL
643 644
                                     ? peekRunQueue(cap)->bound != NULL
                                     : peekRunQueue(cap)->bound != task->incall)));
645 646 647 648 649
}

// This is the single place where a Task goes to sleep.  There are
// two reasons it might need to sleep:
//    - there are no threads to run
650
//    - we need to yield this Capability to someone else
651 652
//      (see shouldYieldCapability())
//
653 654 655
// Careful: the scheduler loop is quite delicate.  Make sure you run
// the tests in testsuite/concurrent (all ways) after modifying this,
// and also check the benchmarks in nofib/parallel for regressions.
656 657

static void
658
scheduleYield (Capability **pcap, Task *task)
659 660
{
    Capability *cap = *pcap;
Ben Gamari's avatar
Ben Gamari committed
661
    bool didGcLast = false;
662 663

    // if we have work, and we don't need to give up the Capability, continue.
664
    //
Ben Gamari's avatar
Ben Gamari committed
665
    if (!shouldYieldCapability(cap,task,false) &&
666
        (!emptyRunQueue(cap) ||
667
         !emptyInbox(cap) ||
668
         sched_state >= SCHED_INTERRUPTING)) {
669
        return;
670
    }
671 672 673

    // otherwise yield (sleep), and keep yielding if necessary.
    do {
674
        if (doIdleGCWork(cap, false)) {
675
            // there's more idle GC work to do
676 677
            didGcLast = false;
        } else {
678
            // no more idle GC work to do
679 680
            didGcLast = yieldCapability(&cap,task, !didGcLast);
        }
681
    }
682
    while (shouldYieldCapability(cap,task,didGcLast));
683 684 685 686 687 688 689 690

    // note there may still be no threads on the run queue at this
    // point, the caller has to check.

    *pcap = cap;
    return;
}
#endif
691

692 693 694 695 696 697 698
/* -----------------------------------------------------------------------------
 * schedulePushWork()
 *
 * Push work to other Capabilities if we have some.
 * -------------------------------------------------------------------------- */

static void
699 700
schedulePushWork(Capability *cap USED_IF_THREADS,
                 Task *task      USED_IF_THREADS)
701
{
702 703
#if defined(THREADED_RTS)

704
    Capability *free_caps[n_capabilities], *cap0;
705
    uint32_t i, n_wanted_caps, n_free_caps;
706

Simon Marlow's avatar
Simon Marlow committed
707 708
    uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;

Simon Marlow's avatar
Simon Marlow committed
709
    // migration can be turned off with +RTS -qm
Simon Marlow's avatar
Simon Marlow committed
710 711 712
    if (!RtsFlags.ParFlags.migrate) {
        spare_threads = 0;
    }
713

714 715
    // Figure out how many capabilities we want to wake up.  We need at least
    // sparkPoolSize(cap) plus the number of spare threads we have.
Simon Marlow's avatar
Simon Marlow committed
716
    n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
717
    if (n_wanted_caps == 0) return;
718

Simon Marlow's avatar
Simon Marlow committed
719 720
    // First grab as many free Capabilities as we can.  ToDo: we should use
    // capabilities on the same NUMA node preferably, but not exclusively.
721 722 723
    for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
         n_free_caps < n_wanted_caps && i != cap->no;
         i = (i + 1) % n_capabilities) {
724
        cap0 = capabilities[i];
725
        if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
726
            if (!emptyRunQueue(cap0)
727
                || cap0->n_returning_tasks != 0
Simon Marlow's avatar
Simon Marlow committed
728
                || !emptyInbox(cap0)) {
729 730 731 732 733 734 735
                // it already has some work, we just grabbed it at
                // the wrong moment.  Or maybe it's deadlocked!
                releaseCapability(cap0);
            } else {
                free_caps[n_free_caps++] = cap0;
            }
        }
736 737
    }

Simon Marlow's avatar
Simon Marlow committed
738 739 740 741 742 743 744 745 746 747
    // We now have n_free_caps free capabilities stashed in
    // free_caps[].  Attempt to share our run queue equally with them.
    // This is complicated slightly by the fact that we can't move
    // some threads:
    //
    //  - threads that have TSO_LOCKED cannot migrate
    //  - a thread that is bound to the current Task cannot be migrated
    //
    // This is about the simplest thing we could do; improvements we
    // might want to do include:
748
    //
749
    //   - giving high priority to moving relatively new threads, on
750 751 752 753 754 755
    //     the gournds that they haven't had time to build up a
    //     working set in the cache on this CPU/Capability.
    //
    //   - giving low priority to moving long-lived threads

    if (n_free_caps > 0) {
756
        StgTSO *prev, *t, *next;
757

758
        debugTrace(DEBUG_sched,
Simon Marlow's avatar
Simon Marlow committed
759 760
                   "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
                   cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
761
                   n_free_caps);
762

763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795
        // There are n_free_caps+1 caps in total.  We will share the threads
        // evently between them, *except* that if the run queue does not divide
        // evenly by n_free_caps+1 then we bias towards the current capability.
        // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
        uint32_t keep_threads =
            (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);

        // This also ensures that we don't give away all our threads, since
        // (x + y) / (y + 1) >= 1 when x >= 1.

        // The number of threads we have left.
        uint32_t n = cap->n_run_queue;

        // prev = the previous thread on this cap's run queue
        prev = END_TSO_QUEUE;

        // We're going to walk through the run queue, migrating threads to other
        // capabilities until we have only keep_threads left.  We might
        // encounter a thread that cannot be migrated, in which case we add it
        // to the current run queue and decrement keep_threads.
        for (t = cap->run_queue_hd, i = 0;
             t != END_TSO_QUEUE && n > keep_threads;
             t = next)
        {
            next = t->_link;
            t->_link = END_TSO_QUEUE;

            // Should we keep this thread?
            if (t->bound == task->incall // don't move my bound thread
                || tsoLocked(t) // don't move a locked thread
                ) {
                if (prev == END_TSO_QUEUE) {
                    cap->run_queue_hd = t;
796
                } else {
797
                    setTSOLink(cap, prev, t);
798
                }
799 800 801
                setTSOPrev(cap, t, prev);
                prev = t;
                if (keep_threads > 0) keep_threads--;
802
            }
803

804 805 806 807
            // Or migrate it?
            else {
                appendToRunQueue(free_caps[i],t);
                traceEventMigrateThread (cap, t, free_caps[i]->no);
808

809 810 811 812 813
                if (t->bound) { t->bound->task->cap = free_caps[i]; }
                t->cap = free_caps[i];
                n--; // we have one fewer threads now
                i++; // move on to the next free_cap
                if (i == n_free_caps) i = 0;
814 815
            }
        }
816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831

        // Join up the beginning of the queue (prev)
        // with the rest of the queue (t)
        if (t == END_TSO_QUEUE) {
            cap->run_queue_tl = prev;
        } else {
            setTSOPrev(cap, t, prev);
        }
        if (prev == END_TSO_QUEUE) {
            cap->run_queue_hd = t;
        } else {
            setTSOLink(cap, prev, t);
        }
        cap->n_run_queue = n;

        IF_DEBUG(sanity, checkRunQueue(cap));
832

833 834 835
        // release the capabilities
        for (i = 0; i < n_free_caps; i++) {
            task->cap = free_caps[i];
836 837 838 839 840 841 842
            if (sparkPoolSizeCap(cap) > 0) {
                // If we have sparks to steal, wake up a worker on the
                // capability, even if it has no threads to run.
                releaseAndWakeupCapability(free_caps[i]);
            } else {
                releaseCapability(free_caps[i]);
            }
843
        }
844 845
    }
    task->cap = cap; // reset to point to our Capability.
846 847 848

#endif /* THREADED_RTS */

849 850
}

851 852 853 854
/* ----------------------------------------------------------------------------
 * Start any pending signal handlers
 * ------------------------------------------------------------------------- */

855
#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
856
static void
857
scheduleStartSignalHandlers(Capability *cap)
858
{
859 860
    if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
        // safe outside the lock
861
        startSignalHandlers(cap);
862 863
    }
}
864 865 866 867 868 869
#else
static void
scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
{
}
#endif
870 871 872 873 874 875

/* ----------------------------------------------------------------------------
 * Check for blocked threads that can be woken up.
 * ------------------------------------------------------------------------- */

static void
876
scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
877
{
878
#if !defined(THREADED_RTS)
879 880 881 882 883
    //
    // Check whether any waiting threads need to be woken up.  If the
    // run queue is empty, and there are no other tasks running, we
    // can wait indefinitely for something to happen.
    //
884
    if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
885
    {
886
        awaitEvent (emptyRunQueue(cap));
887 888 889 890
    }
#endif
}

891 892 893 894 895
/* ----------------------------------------------------------------------------
 * Detect deadlock conditions and attempt to resolve them.
 * ------------------------------------------------------------------------- */

static void
896
scheduleDetectDeadlock (Capability **pcap, Task *task)
897
{
898 899
    Capability *cap = *pcap;
    /*
900
     * Detect deadlock: when we have no threads to run, there are no
901 902 903
     * threads blocked, waiting for I/O, or sleeping, and all the
     * other tasks are waiting for work, we must have a deadlock of
     * some description.
904
     */
905
    if ( emptyThreadQueues(cap) )
906
    {
907
#if defined(THREADED_RTS)
908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923
        /*
         * In the threaded RTS, we only check for deadlock if there
         * has been no activity in a complete timeslice.  This means
         * we won't eagerly start a full GC just because we don't have
         * any threads to run currently.
         */
        if (recent_activity != ACTIVITY_INACTIVE) return;
#endif

        debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");

        // Garbage collection can release some new threads due to
        // either (a) finalizers or (b) threads resurrected because
        // they are unreachable and will therefore be sent an
        // exception.  Any threads thus released will be immediately
        // runnable.
Ben Gamari's avatar
Ben Gamari committed
924
        scheduleDoGC (pcap, task, true/*force major GC*/);
925
        cap = *pcap;
Ben Gamari's avatar
Ben Gamari committed
926
        // when force_major == true. scheduleDoGC sets
927 928
        // recent_activity to ACTIVITY_DONE_GC and turns off the timer
        // signal.
929

930
        if ( !emptyRunQueue(cap) ) return;
931

932
#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
933 934 935 936 937 938 939 940 941 942 943 944 945
        /* If we have user-installed signal handlers, then wait
         * for signals to arrive rather then bombing out with a
         * deadlock.
         */
        if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
            debugTrace(DEBUG_sched,
                       "still deadlocked, waiting for signals...");

            awaitUserSignals();

            if (signals_pending()) {
                startSignalHandlers(cap);
            }
946

947 948
            // either we have threads to run, or we were interrupted:
            ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
949 950

            return;
951
        }
952 953
#endif

954
#if !defined(THREADED_RTS)
955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972
        /* Probably a real deadlock.  Send the current main thread the
         * Deadlock exception.
         */
        if (task->incall->tso) {
            switch (task->incall->tso->why_blocked) {
            case BlockedOnSTM:
            case BlockedOnBlackHole:
            case BlockedOnMsgThrowTo:
            case BlockedOnMVar:
            case BlockedOnMVarRead:
                throwToSingleThreaded(cap, task->incall->tso,
                                      (StgClosure *)nonTermination_closure);
                return;
            default:
                barf("deadlock: main thread blocked in a strange way");
            }
        }
        return;
973
#endif
974
    }
975 976
}

977

978 979 980 981 982
/* ----------------------------------------------------------------------------
 * Process message in the current Capability's inbox
 * ------------------------------------------------------------------------- */

static void
983
scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
984 985
{
#if defined(THREADED_RTS)
986
    Message *m, *next;
Simon Marlow's avatar
Simon Marlow committed
987
    PutMVar *p, *pnext;
988
    int r;
989
    Capability *cap = *pcap;
990 991

    while (!emptyInbox(cap)) {
992 993
        // Executing messages might use heap, so we should check for GC.
        if (doYouWantToGC(cap)) {
Ben Gamari's avatar
Ben Gamari committed
994
            scheduleDoGC(pcap, cap->running_task, false);
995
            cap = *pcap;
996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010
        }

        // don't use a blocking acquire; if the lock is held by
        // another thread then just carry on.  This seems to avoid
        // getting stuck in a message ping-pong situation with other
        // processors.  We'll check the inbox again later anyway.
        //
        // We should really use a more efficient queue data structure
        // here.  The trickiness is that we must ensure a Capability
        // never goes idle if the inbox is non-empty, which is why we
        // use cap->lock (cap->lock is released as the last thing
        // before going idle; see Capability.c:releaseCapability()).
        r = TRY_ACQUIRE_LOCK(&cap->lock);
        if (r != 0) return;

1011
        m = cap->inbox;
Simon Marlow's avatar
Simon Marlow committed
1012
        p = cap->putMVars;
1013
        cap->inbox = (Message*)END_TSO_QUEUE;
Simon Marlow's avatar
Simon Marlow committed
1014
        cap->putMVars = NULL;
1015

1016
        RELEASE_LOCK(&cap->lock);
1017 1018 1019 1020 1021 1022

        while (m != (Message*)END_TSO_QUEUE) {
            next = m->link;
            executeMessage(cap, m);
            m = next;
        }
Simon Marlow's avatar
Simon Marlow committed
1023 1024 1025 1026 1027 1028 1029 1030 1031

        while (p != NULL) {
            pnext = p->link;
            performTryPutMVar(cap, (StgMVar*)deRefStablePtr(p->mvar),
                              Unit_closure);
            freeStablePtr(p->mvar);
            stgFree(p);
            p = pnext;
        }
1032 1033 1034 1035
    }
#endif
}

Simon Marlow's avatar
Simon Marlow committed