Schedule.c 104 KB
Newer Older
1
/* ---------------------------------------------------------------------------
2
 *
3
 * (c) The GHC Team, 1998-2006
4
 *
5
 * The scheduler and thread-related functionality
sof's avatar
sof committed
6
 *
7 8
 * --------------------------------------------------------------------------*/

9
#include "PosixSource.h"
10
#define KEEP_LOCKCLOSURE
11
#include "Rts.h"
Simon Marlow's avatar
Simon Marlow committed
12 13

#include "sm/Storage.h"
14 15 16
#include "RtsUtils.h"
#include "StgRun.h"
#include "Schedule.h"
17
#include "Interpreter.h"
18
#include "Printer.h"
19
#include "RtsSignals.h"
Simon Marlow's avatar
Simon Marlow committed
20
#include "sm/Sanity.h"
21
#include "Stats.h"
22
#include "STM.h"
23
#include "Prelude.h"
24
#include "ThreadLabels.h"
25
#include "Updates.h"
26 27
#include "Proftimer.h"
#include "ProfHeap.h"
28
#include "Weak.h"
Simon Marlow's avatar
Simon Marlow committed
29
#include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30
#include "sm/GCThread.h"
31
#include "Sparks.h"
sof's avatar
sof committed
32
#include "Capability.h"
33 34
#include "Task.h"
#include "AwaitEvent.h"
35 36 37
#if defined(mingw32_HOST_OS)
#include "win32/IOManager.h"
#endif
Simon Marlow's avatar
Simon Marlow committed
38
#include "Trace.h"
39 40
#include "RaiseAsync.h"
#include "Threads.h"
Simon Marlow's avatar
Simon Marlow committed
41 42
#include "Timer.h"
#include "ThreadPaused.h"
43
#include "Messages.h"
David Feuer's avatar
David Feuer committed
44 45
#include "StablePtr.h"
#include "StableName.h"
46
#include "TopHandler.h"
47 48
#include "sm/NonMoving.h"
#include "sm/NonMovingMark.h"
49

Ben Gamari's avatar
Ben Gamari committed
50
#if defined(HAVE_SYS_TYPES_H)
51 52
#include <sys/types.h>
#endif
Ben Gamari's avatar
Ben Gamari committed
53
#if defined(HAVE_UNISTD_H)
54 55 56
#include <unistd.h>
#endif

57 58
#include <string.h>
#include <stdlib.h>
59
#include <stdarg.h>
60

Ben Gamari's avatar
Ben Gamari committed
61
#if defined(HAVE_ERRNO_H)
62 63 64
#include <errno.h>
#endif

Ben Gamari's avatar
Ben Gamari committed
65
#if defined(TRACING)
66 67
#include "eventlog/EventLog.h"
#endif
68 69 70
/* -----------------------------------------------------------------------------
 * Global variables
 * -------------------------------------------------------------------------- */
71

72
#if !defined(THREADED_RTS)
73
// Blocked/sleeping threads
74 75
StgTSO *blocked_queue_hd = NULL;
StgTSO *blocked_queue_tl = NULL;
76 77
StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
#endif
78

79 80 81 82 83 84 85 86
// Bytes allocated since the last time a HeapOverflow exception was thrown by
// the RTS
uint64_t allocated_bytes_at_heapoverflow = 0;

/* Set to true when the latest garbage collection failed to reclaim enough
 * space, and the runtime should proceed to shut itself down in an orderly
 * fashion (emitting profiling info etc.), OR throw an exception to the main
 * thread, if it is still alive.
87
 */
Ben Gamari's avatar
Ben Gamari committed
88
bool heap_overflow = false;
89

90 91 92
/* flag that tracks whether we have done any execution in this time slice.
 * LOCK: currently none, perhaps we should lock (but needs to be
 * updated in the fast path of the scheduler).
93 94
 *
 * NB. must be StgWord, we do xchg() on it.
95
 */
96
volatile StgWord recent_activity = ACTIVITY_YES;
97

98
/* if this flag is set as well, give up execution
99
 * LOCK: none (changes monotonically)
100
 */
101
volatile StgWord sched_state = SCHED_RUNNING;
102

103 104
/*
 * This mutex protects most of the global scheduler data in
105
 * the THREADED_RTS runtime.
sof's avatar
sof committed
106
 */
107
#if defined(THREADED_RTS)
108
Mutex sched_mutex;
109
#endif
sof's avatar
sof committed
110

111 112 113 114
#if !defined(mingw32_HOST_OS)
#define FORKPROCESS_PRIMOP_SUPPORTED
#endif

115 116 117 118 119 120 121 122 123 124 125 126 127
/*
 * sync_finished_cond allows threads which do not own any capability (e.g. the
 * concurrent mark thread) to participate in the sync protocol. In particular,
 * if such a thread requests a sync while sync is already in progress it will
 * block on sync_finished_cond, which will be signalled when the sync is
 * finished (by releaseAllCapabilities).
 */
#if defined(THREADED_RTS)
static Condition sync_finished_cond;
static Mutex sync_finished_mutex;
#endif


128 129 130 131
/* -----------------------------------------------------------------------------
 * static function prototypes
 * -------------------------------------------------------------------------- */

132
static Capability *schedule (Capability *initialCapability, Task *task);
133 134

//
Gabor Greif's avatar
typo  
Gabor Greif committed
135
// These functions all encapsulate parts of the scheduler loop, and are
136 137 138
// abstracted only to make the structure and control flow of the
// scheduler clearer.
//
139
static void schedulePreLoop (void);
140
static void scheduleFindWork (Capability **pcap);
141
#if defined(THREADED_RTS)
142
static void scheduleYield (Capability **pcap, Task *task);
143
#endif
144
#if defined(THREADED_RTS)
Ben Gamari's avatar
Ben Gamari committed
145 146
static bool requestSync (Capability **pcap, Task *task,
                         PendingSync *sync_type, SyncType *prev_sync_type);
147
static void acquireAllCapabilities(Capability *cap, Task *task);
148
static void startWorkerTasks (uint32_t from USED_IF_THREADS,
Ben Gamari's avatar
Ben Gamari committed
149
                              uint32_t to USED_IF_THREADS);
150
#endif
151
static void scheduleStartSignalHandlers (Capability *cap);
152
static void scheduleCheckBlockedThreads (Capability *cap);
153 154
static void scheduleProcessInbox(Capability **cap);
static void scheduleDetectDeadlock (Capability **pcap, Task *task);
155
static void schedulePushWork(Capability *cap, Task *task);
Simon Marlow's avatar
Simon Marlow committed
156
#if defined(THREADED_RTS)
157
static void scheduleActivateSpark(Capability *cap);
158
#endif
159
static void schedulePostRunThread(Capability *cap, StgTSO *t);
Ben Gamari's avatar
Ben Gamari committed
160 161 162
static bool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
static bool scheduleHandleYield( Capability *cap, StgTSO *t,
                                 uint32_t prev_what_next );
163
static void scheduleHandleThreadBlocked( StgTSO *t );
Ben Gamari's avatar
Ben Gamari committed
164 165 166
static bool scheduleHandleThreadFinished( Capability *cap, Task *task,
                                          StgTSO *t );
static bool scheduleNeedHeapProfile(bool ready_to_gc);
167 168
static void scheduleDoGC( Capability **pcap, Task *task,
                          bool force_major, bool deadlock_detect );
169

170 171
static void deleteThread (StgTSO *tso);
static void deleteAllThreads (void);
172

Ben Gamari's avatar
Ben Gamari committed
173
#if defined(FORKPROCESS_PRIMOP_SUPPORTED)
174
static void deleteThread_(StgTSO *tso);
175
#endif
176

177
/* ---------------------------------------------------------------------------
178 179 180 181 182 183 184 185 186 187 188
   Main scheduling loop.

   We use round-robin scheduling, each thread returning to the
   scheduler loop when one of these conditions is detected:

      * out of heap space
      * timer expires (thread yields)
      * thread blocks
      * thread ends
      * stack overflow

189
   ------------------------------------------------------------------------ */
190

191 192
static Capability *
schedule (Capability *initialCapability, Task *task)
193 194
{
  StgTSO *t;
195
  Capability *cap;
196
  StgThreadReturnCode ret;
197
  uint32_t prev_what_next;
Ben Gamari's avatar
Ben Gamari committed
198
  bool ready_to_gc;
199

200 201
  cap = initialCapability;

202 203 204
  // Pre-condition: this task owns initialCapability.
  // The sched_mutex is *NOT* held
  // NB. on return, we still hold a capability.
205

206
  debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
207

208
  schedulePreLoop();
209

210 211
  // -----------------------------------------------------------
  // Scheduler loop starts here:
212

Simon Marlow's avatar
Simon Marlow committed
213
  while (1) {
214

215 216 217
    // Check whether we have re-entered the RTS from Haskell without
    // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
    // call).
218
    if (cap->in_haskell) {
219 220 221
          errorBelch("schedule: re-entered unsafely.\n"
                     "   Perhaps a 'foreign import unsafe' should be 'safe'?");
          stg_exit(EXIT_FAILURE);
222 223
    }

224
    // Note [shutdown]: The interruption / shutdown sequence.
225
    //
226 227 228
    // In order to cleanly shut down the runtime, we want to:
    //   * make sure that all main threads return to their callers
    //     with the state 'Interrupted'.
229
    //   * clean up all OS threads associated with the runtime
230 231
    //   * free all memory etc.
    //
232
    // So the sequence goes like this:
233
    //
234 235
    //   * The shutdown sequence is initiated by calling hs_exit(),
    //     interruptStgRts(), or running out of memory in the GC.
236
    //
237
    //   * Set sched_state = SCHED_INTERRUPTING
238
    //
239 240 241 242 243
    //   * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
    //     scheduleDoGC(), which halts the whole runtime by acquiring all the
    //     capabilities, does a GC and then calls deleteAllThreads() to kill all
    //     the remaining threads.  The zombies are left on the run queue for
    //     cleaning up.  We can't kill threads involved in foreign calls.
244
    //
245 246 247 248 249 250
    //   * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
    //
    //   * After this point, there can be NO MORE HASKELL EXECUTION.  This is
    //     enforced by the scheduler, which won't run any Haskell code when
    //     sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
    //     other capabilities by doing the GC earlier.
251
    //
252 253 254
    //   * all workers exit when the run queue on their capability
    //     drains.  All main threads will also exit when their TSO
    //     reaches the head of the run queue and they can return.
255
    //
256 257 258
    //   * eventually all Capabilities will shut down, and the RTS can
    //     exit.
    //
259
    //   * We might be left with threads blocked in foreign calls,
260
    //     we should really attempt to kill these somehow (TODO).
261

262 263
    switch (sched_state) {
    case SCHED_RUNNING:
264
        break;
265
    case SCHED_INTERRUPTING:
266
        debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
267
        /* scheduleDoGC() deletes all the threads */
268
        scheduleDoGC(&cap,task,true,false);
269 270 271 272 273 274 275 276

        // after scheduleDoGC(), we must be shutting down.  Either some
        // other Capability did the final GC, or we did it above,
        // either way we can fall through to the SCHED_SHUTTING_DOWN
        // case now.
        ASSERT(sched_state == SCHED_SHUTTING_DOWN);
        // fall through

277
    case SCHED_SHUTTING_DOWN:
278 279 280 281 282 283 284 285
        debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
        // If we are a worker, just exit.  If we're a bound thread
        // then we will exit below when we've removed our TSO from
        // the run queue.
        if (!isBoundTask(task) && emptyRunQueue(cap)) {
            return cap;
        }
        break;
286
    default:
duog's avatar
duog committed
287
        barf("sched_state: %" FMT_Word, sched_state);
288
    }
289

290
    scheduleFindWork(&cap);
291

292 293 294
    /* work pushing, currently relevant only for THREADED_RTS:
       (pushes threads, wakes up idle capabilities for stealing) */
    schedulePushWork(cap,task);
295

296
    scheduleDetectDeadlock(&cap,task);
297 298

    // Normally, the only way we can get here with no threads to
299
    // run is if a keyboard interrupt received during
300 301 302
    // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
    // Additionally, it is not fatal for the
    // threaded RTS to reach here with no threads to run.
303
    //
304 305
    // win32: might be here due to awaitEvent() being abandoned
    // as a result of a console event having been delivered.
306

307
#if defined(THREADED_RTS)
308
    scheduleYield(&cap,task);
309

310 311 312
    if (emptyRunQueue(cap)) continue; // look for work again
#endif

313
#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
314
    if ( emptyRunQueue(cap) ) {
315
        ASSERT(sched_state >= SCHED_INTERRUPTING);
316
    }
317
#endif
318

319
    //
320 321
    // Get a thread to run
    //
322
    t = popRunQueue(cap);
323

324 325 326
    // Sanity check the thread we're about to run.  This can be
    // expensive if there is lots of thread switching going on...
    IF_DEBUG(sanity,checkTSO(t));
327

328
#if defined(THREADED_RTS)
329 330 331
    // Check whether we can run this thread in the current task.
    // If not, we have to pass our capability to the right task.
    {
332
        InCall *bound = t->bound;
333 334 335 336 337 338 339

        if (bound) {
            if (bound->task == task) {
                // yes, the Haskell thread is bound to the current native thread
            } else {
                debugTrace(DEBUG_sched,
                           "thread %lu bound to another OS thread",
340
                           (unsigned long)t->id);
341 342 343 344 345 346 347 348 349
                // no, bound to a different Haskell thread: pass to that thread
                pushOnRunQueue(cap,t);
                continue;
            }
        } else {
            // The thread we want to run is unbound.
            if (task->incall->tso) {
                debugTrace(DEBUG_sched,
                           "this OS thread cannot run thread %lu",
350
                           (unsigned long)t->id);
351 352 353 354 355 356
                // no, the current native thread is bound to a different
                // Haskell thread, so pass it to any worker thread
                pushOnRunQueue(cap,t);
                continue;
            }
        }
357 358 359
    }
#endif

Simon Marlow's avatar
Simon Marlow committed
360 361 362 363 364 365
    // If we're shutting down, and this thread has not yet been
    // killed, kill it now.  This sometimes happens when a finalizer
    // thread is created by the final GC, or a thread previously
    // in a foreign call returns.
    if (sched_state >= SCHED_INTERRUPTING &&
        !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
366
        deleteThread(t);
Simon Marlow's avatar
Simon Marlow committed
367 368
    }

369 370 371 372 373 374 375 376 377 378 379 380
    // If this capability is disabled, migrate the thread away rather
    // than running it.  NB. but not if the thread is bound: it is
    // really hard for a bound thread to migrate itself.  Believe me,
    // I tried several ways and couldn't find a way to do it.
    // Instead, when everything is stopped for GC, we migrate all the
    // threads on the run queue then (see scheduleDoGC()).
    //
    // ToDo: what about TSO_LOCKED?  Currently we're migrating those
    // when the number of capabilities drops, but we never migrate
    // them back if it rises again.  Presumably we should, but after
    // the thread has been migrated we no longer know what capability
    // it was originally on.
Ben Gamari's avatar
Ben Gamari committed
381
#if defined(THREADED_RTS)
382
    if (cap->disabled && !t->bound) {
383
        Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
384 385 386 387 388
        migrateThread(cap, t, dest_cap);
        continue;
    }
#endif

389
    /* context switches are initiated by the timer signal, unless
390 391
     * the user specified "context switch as often as possible", with
     * +RTS -C0
sof's avatar
sof committed
392
     */
393
    if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
394 395
        && !emptyThreadQueues(cap)) {
        cap->context_switch = 1;
396
    }
397

398
run_thread:
399

Ben Gamari's avatar
Ben Gamari committed
400
    // CurrentTSO is the thread to run. It might be different if we
Simon Marlow's avatar
Simon Marlow committed
401 402 403 404
    // loop back to run_thread, so make sure to set CurrentTSO after
    // that.
    cap->r.rCurrentTSO = t;

405 406
    startHeapProfTimer();

407
    // ----------------------------------------------------------------------
408
    // Run the current thread
409

Simon Marlow's avatar
Simon Marlow committed
410
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
411
    ASSERT(t->cap == cap);
412
    ASSERT(t->bound ? t->bound->task->cap == cap : 1);
Simon Marlow's avatar
Simon Marlow committed
413

414 415 416
    prev_what_next = t->what_next;

    errno = t->saved_errno;
Ben Gamari's avatar
Ben Gamari committed
417
#if defined(mingw32_HOST_OS)
418 419 420
    SetLastError(t->saved_winerror);
#endif

421 422 423
    // reset the interrupt flag before running Haskell code
    cap->interrupt = 0;

Ben Gamari's avatar
Ben Gamari committed
424
    cap->in_haskell = true;
425
    cap->idle = 0;
426

427
    dirty_TSO(cap,t);
428
    dirty_STACK(cap,t->stackobj);
Simon Marlow's avatar
Simon Marlow committed
429

430 431 432
    switch (recent_activity)
    {
    case ACTIVITY_DONE_GC: {
433 434
        // ACTIVITY_DONE_GC means we turned off the timer signal to
        // conserve power (see #1623).  Re-enable it here.
435
        uint32_t prev;
436
        prev = xchg((P_)&recent_activity, ACTIVITY_YES);
437
        if (prev == ACTIVITY_DONE_GC) {
Ben Gamari's avatar
Ben Gamari committed
438
#if !defined(PROFILING)
439
            startTimer();
440
#endif
Simon Marlow's avatar
Simon Marlow committed
441
        }
442 443 444
        break;
    }
    case ACTIVITY_INACTIVE:
445 446 447 448
        // If we reached ACTIVITY_INACTIVE, then don't reset it until
        // we've done the GC.  The thread running here might just be
        // the IO manager thread that handle_tick() woke up via
        // wakeUpRts().
449 450
        break;
    default:
451 452
        recent_activity = ACTIVITY_YES;
    }
453

454
    traceEventRunThread(cap, t);
Simon Marlow's avatar
Simon Marlow committed
455

456
    switch (prev_what_next) {
457

458 459
    case ThreadKilled:
    case ThreadComplete:
460 461 462 463
        /* Thread already finished, return to scheduler. */
        ret = ThreadFinished;
        break;

464
    case ThreadRunGHC:
465
    {
466 467 468 469 470
        StgRegTable *r;
        r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
        cap = regTableToCapability(r);
        ret = r->rRet;
        break;
471
    }
472

473
    case ThreadInterpret:
474 475 476 477
        cap = interpretBCO(cap);
        ret = cap->r.rRet;
        break;

478
    default:
479
        barf("schedule: invalid prev_what_next=%u field", prev_what_next);
480 481
    }

Ben Gamari's avatar
Ben Gamari committed
482
    cap->in_haskell = false;
483

484 485 486 487
    // The TSO might have moved, eg. if it re-entered the RTS and a GC
    // happened.  So find the new location:
    t = cap->r.rCurrentTSO;

488 489 490 491
    // cap->r.rCurrentTSO is charged for calls to allocate(), so we
    // don't want it set when not running a Haskell thread.
    cap->r.rCurrentTSO = NULL;

492 493 494 495
    // And save the current errno in this thread.
    // XXX: possibly bogus for SMP because this thread might already
    // be running again, see code below.
    t->saved_errno = errno;
Ben Gamari's avatar
Ben Gamari committed
496
#if defined(mingw32_HOST_OS)
497
    // Similarly for Windows error code
498
    t->saved_winerror = GetLastError();
499
#endif
500

501 502 503 504 505 506 507 508 509
    if (ret == ThreadBlocked) {
        if (t->why_blocked == BlockedOnBlackHole) {
            StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
            traceEventStopThread(cap, t, t->why_blocked + 6,
                                 owner != NULL ? owner->id : 0);
        } else {
            traceEventStopThread(cap, t, t->why_blocked + 6, 0);
        }
    } else {
510 511 512 513 514
        if (ret == StackOverflow) {
          traceEventStopThread(cap, t, ret, t->tot_stack_size);
        } else {
          traceEventStopThread(cap, t, ret, 0);
        }
515
    }
Simon Marlow's avatar
Simon Marlow committed
516

517
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
518
    ASSERT(t->cap == cap);
519

520
    // ----------------------------------------------------------------------
521

522
    // Costs for the scheduler are assigned to CCS_SYSTEM
523
    stopHeapProfTimer();
524
#if defined(PROFILING)
525
    cap->r.rCCCS = CCS_SYSTEM;
526
#endif
527

528
    schedulePostRunThread(cap,t);
529

Ben Gamari's avatar
Ben Gamari committed
530
    ready_to_gc = false;
531

532 533
    switch (ret) {
    case HeapOverflow:
534 535
        ready_to_gc = scheduleHandleHeapOverflow(cap,t);
        break;
536 537

    case StackOverflow:
538 539 540 541 542
        // just adjust the stack for this thread, then pop it back
        // on the run queue.
        threadStackOverflow(cap, t);
        pushOnRunQueue(cap,t);
        break;
543 544

    case ThreadYielding:
545
        if (scheduleHandleYield(cap, t, prev_what_next)) {
546
            // shortcut for switching between compiler/interpreter:
547 548 549
            goto run_thread;
        }
        break;
550 551

    case ThreadBlocked:
552 553
        scheduleHandleThreadBlocked(t);
        break;
554 555

    case ThreadFinished:
556 557 558
        if (scheduleHandleThreadFinished(cap, task, t)) return cap;
        ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
        break;
559 560 561 562 563

    default:
      barf("schedule: invalid thread return code %d", (int)ret);
    }

564
    if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
565
      scheduleDoGC(&cap,task,false,false);
566
    }
567 568 569
  } /* end of while() */
}

570 571 572 573
/* -----------------------------------------------------------------------------
 * Run queue operations
 * -------------------------------------------------------------------------- */

574
static void
575 576 577 578 579 580 581 582 583 584 585 586 587 588 589
removeFromRunQueue (Capability *cap, StgTSO *tso)
{
    if (tso->block_info.prev == END_TSO_QUEUE) {
        ASSERT(cap->run_queue_hd == tso);
        cap->run_queue_hd = tso->_link;
    } else {
        setTSOLink(cap, tso->block_info.prev, tso->_link);
    }
    if (tso->_link == END_TSO_QUEUE) {
        ASSERT(cap->run_queue_tl == tso);
        cap->run_queue_tl = tso->block_info.prev;
    } else {
        setTSOPrev(cap, tso->_link, tso->block_info.prev);
    }
    tso->_link = tso->block_info.prev = END_TSO_QUEUE;
590
    cap->n_run_queue--;
591 592 593 594

    IF_DEBUG(sanity, checkRunQueue(cap));
}

595 596 597 598 599 600 601
void
promoteInRunQueue (Capability *cap, StgTSO *tso)
{
    removeFromRunQueue(cap, tso);
    pushOnRunQueue(cap, tso);
}

602 603 604 605 606 607 608
/* ----------------------------------------------------------------------------
 * Setting up the scheduler loop
 * ------------------------------------------------------------------------- */

static void
schedulePreLoop(void)
{
609
  // initialisation for scheduler - what cannot go into initScheduler()
610

611
#if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
612 613
    win32AllocStack();
#endif
614 615
}

616 617 618 619 620 621 622
/* -----------------------------------------------------------------------------
 * scheduleFindWork()
 *
 * Search for work to do, and handle messages from elsewhere.
 * -------------------------------------------------------------------------- */

static void
623
scheduleFindWork (Capability **pcap)
624
{
625
    scheduleStartSignalHandlers(*pcap);
626

627
    scheduleProcessInbox(pcap);
628

629
    scheduleCheckBlockedThreads(*pcap);
630

Simon Marlow's avatar
Simon Marlow committed
631
#if defined(THREADED_RTS)
632
    if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
633 634 635 636
#endif
}

#if defined(THREADED_RTS)
Ben Gamari's avatar
Ben Gamari committed
637 638
STATIC_INLINE bool
shouldYieldCapability (Capability *cap, Task *task, bool didGcLast)
639 640
{
    // we need to yield this capability to someone else if..
641 642
    //   - another thread is initiating a GC, and we didn't just do a GC
    //     (see Note [GC livelock])
643 644 645 646
    //   - another Task is returning from a foreign call
    //   - the thread at the head of the run queue cannot be run
    //     by this Task (it is bound to another Task, or it is unbound
    //     and this task it bound).
647 648 649 650 651 652 653 654 655
    //
    // Note [GC livelock]
    //
    // If we are interrupted to do a GC, then we do not immediately do
    // another one.  This avoids a starvation situation where one
    // Capability keeps forcing a GC and the other Capabilities make no
    // progress at all.

    return ((pending_sync && !didGcLast) ||
656
            cap->n_returning_tasks != 0 ||
657
            (!emptyRunQueue(cap) && (task->incall->tso == NULL
658 659
                                     ? peekRunQueue(cap)->bound != NULL
                                     : peekRunQueue(cap)->bound != task->incall)));
660 661 662 663 664
}

// This is the single place where a Task goes to sleep.  There are
// two reasons it might need to sleep:
//    - there are no threads to run
665
//    - we need to yield this Capability to someone else
666 667
//      (see shouldYieldCapability())
//
668 669 670
// Careful: the scheduler loop is quite delicate.  Make sure you run
// the tests in testsuite/concurrent (all ways) after modifying this,
// and also check the benchmarks in nofib/parallel for regressions.
671 672

static void
673
scheduleYield (Capability **pcap, Task *task)
674 675
{
    Capability *cap = *pcap;
Ben Gamari's avatar
Ben Gamari committed
676
    bool didGcLast = false;
677 678

    // if we have work, and we don't need to give up the Capability, continue.
679
    //
Ben Gamari's avatar
Ben Gamari committed
680
    if (!shouldYieldCapability(cap,task,false) &&
681
        (!emptyRunQueue(cap) ||
682
         !emptyInbox(cap) ||
683
         sched_state >= SCHED_INTERRUPTING)) {
684
        return;
685
    }
686 687 688

    // otherwise yield (sleep), and keep yielding if necessary.
    do {
689
        if (doIdleGCWork(cap, false)) {
690
            // there's more idle GC work to do
691 692
            didGcLast = false;
        } else {
693
            // no more idle GC work to do
694 695
            didGcLast = yieldCapability(&cap,task, !didGcLast);
        }
696
    }
697
    while (shouldYieldCapability(cap,task,didGcLast));
698 699 700 701 702 703 704 705

    // note there may still be no threads on the run queue at this
    // point, the caller has to check.

    *pcap = cap;
    return;
}
#endif
706

707 708 709 710 711 712 713
/* -----------------------------------------------------------------------------
 * schedulePushWork()
 *
 * Push work to other Capabilities if we have some.
 * -------------------------------------------------------------------------- */

static void
714 715
schedulePushWork(Capability *cap USED_IF_THREADS,
                 Task *task      USED_IF_THREADS)
716
{
717 718
#if defined(THREADED_RTS)

719
    Capability *free_caps[n_capabilities], *cap0;
720
    uint32_t i, n_wanted_caps, n_free_caps;
721

Simon Marlow's avatar
Simon Marlow committed
722 723
    uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;

Simon Marlow's avatar
Simon Marlow committed
724
    // migration can be turned off with +RTS -qm
Simon Marlow's avatar
Simon Marlow committed
725 726 727
    if (!RtsFlags.ParFlags.migrate) {
        spare_threads = 0;
    }
728

729 730
    // Figure out how many capabilities we want to wake up.  We need at least
    // sparkPoolSize(cap) plus the number of spare threads we have.
Simon Marlow's avatar
Simon Marlow committed
731
    n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
732
    if (n_wanted_caps == 0) return;
733

Simon Marlow's avatar
Simon Marlow committed
734 735
    // First grab as many free Capabilities as we can.  ToDo: we should use
    // capabilities on the same NUMA node preferably, but not exclusively.
736 737 738
    for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
         n_free_caps < n_wanted_caps && i != cap->no;
         i = (i + 1) % n_capabilities) {
739
        cap0 = capabilities[i];
740
        if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
741
            if (!emptyRunQueue(cap0)
742
                || cap0->n_returning_tasks != 0
Simon Marlow's avatar
Simon Marlow committed
743
                || !emptyInbox(cap0)) {
744 745 746 747 748 749 750
                // it already has some work, we just grabbed it at
                // the wrong moment.  Or maybe it's deadlocked!
                releaseCapability(cap0);
            } else {
                free_caps[n_free_caps++] = cap0;
            }
        }
751 752
    }

Simon Marlow's avatar
Simon Marlow committed
753 754 755 756 757 758 759 760 761 762
    // We now have n_free_caps free capabilities stashed in
    // free_caps[].  Attempt to share our run queue equally with them.
    // This is complicated slightly by the fact that we can't move
    // some threads:
    //
    //  - threads that have TSO_LOCKED cannot migrate
    //  - a thread that is bound to the current Task cannot be migrated
    //
    // This is about the simplest thing we could do; improvements we
    // might want to do include:
763
    //
764
    //   - giving high priority to moving relatively new threads, on
765 766 767 768 769 770
    //     the gournds that they haven't had time to build up a
    //     working set in the cache on this CPU/Capability.
    //
    //   - giving low priority to moving long-lived threads

    if (n_free_caps > 0) {
771
        StgTSO *prev, *t, *next;
772

773
        debugTrace(DEBUG_sched,
Simon Marlow's avatar
Simon Marlow committed
774 775
                   "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
                   cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
776
                   n_free_caps);
777

778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810
        // There are n_free_caps+1 caps in total.  We will share the threads
        // evently between them, *except* that if the run queue does not divide
        // evenly by n_free_caps+1 then we bias towards the current capability.
        // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
        uint32_t keep_threads =
            (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);

        // This also ensures that we don't give away all our threads, since
        // (x + y) / (y + 1) >= 1 when x >= 1.

        // The number of threads we have left.
        uint32_t n = cap->n_run_queue;

        // prev = the previous thread on this cap's run queue
        prev = END_TSO_QUEUE;

        // We're going to walk through the run queue, migrating threads to other
        // capabilities until we have only keep_threads left.  We might
        // encounter a thread that cannot be migrated, in which case we add it
        // to the current run queue and decrement keep_threads.
        for (t = cap->run_queue_hd, i = 0;
             t != END_TSO_QUEUE && n > keep_threads;
             t = next)
        {
            next = t->_link;
            t->_link = END_TSO_QUEUE;

            // Should we keep this thread?
            if (t->bound == task->incall // don't move my bound thread
                || tsoLocked(t) // don't move a locked thread
                ) {
                if (prev == END_TSO_QUEUE) {
                    cap->run_queue_hd = t;
811
                } else {
812
                    setTSOLink(cap, prev, t);
813
                }
814 815 816
                setTSOPrev(cap, t, prev);
                prev = t;
                if (keep_threads > 0) keep_threads--;
817
            }
818

819 820 821 822
            // Or migrate it?
            else {
                appendToRunQueue(free_caps[i],t);
                traceEventMigrateThread (cap, t, free_caps[i]->no);
823

824 825 826 827 828
                if (t->bound) { t->bound->task->cap = free_caps[i]; }
                t->cap = free_caps[i];
                n--; // we have one fewer threads now
                i++; // move on to the next free_cap
                if (i == n_free_caps) i = 0;
829 830
            }
        }
831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846

        // Join up the beginning of the queue (prev)
        // with the rest of the queue (t)
        if (t == END_TSO_QUEUE) {
            cap->run_queue_tl = prev;
        } else {
            setTSOPrev(cap, t, prev);
        }
        if (prev == END_TSO_QUEUE) {
            cap->run_queue_hd = t;
        } else {
            setTSOLink(cap, prev, t);
        }
        cap->n_run_queue = n;

        IF_DEBUG(sanity, checkRunQueue(cap));
847

848 849 850
        // release the capabilities
        for (i = 0; i < n_free_caps; i++) {
            task->cap = free_caps[i];
851 852 853 854 855 856 857
            if (sparkPoolSizeCap(cap) > 0) {
                // If we have sparks to steal, wake up a worker on the
                // capability, even if it has no threads to run.
                releaseAndWakeupCapability(free_caps[i]);
            } else {
                releaseCapability(free_caps[i]);
            }
858
        }
859 860
    }
    task->cap = cap; // reset to point to our Capability.
861 862 863

#endif /* THREADED_RTS */

864 865
}

866 867 868 869
/* ----------------------------------------------------------------------------
 * Start any pending signal handlers
 * ------------------------------------------------------------------------- */

870
#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
871
static void
872
scheduleStartSignalHandlers(Capability *cap)
873
{
874 875
    if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
        // safe outside the lock
876
        startSignalHandlers(cap);
877 878
    }
}
879 880 881 882 883 884
#else
static void
scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
{
}
#endif
885 886 887 888 889 890

/* ----------------------------------------------------------------------------
 * Check for blocked threads that can be woken up.
 * ------------------------------------------------------------------------- */

static void
891
scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
892
{
893
#if !defined(THREADED_RTS)
894 895 896 897 898
    //
    // Check whether any waiting threads need to be woken up.  If the
    // run queue is empty, and there are no other tasks running, we
    // can wait indefinitely for something to happen.
    //
899
    if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
900
    {
901
        awaitEvent (emptyRunQueue(cap));
902 903 904 905
    }
#endif
}

906 907 908 909 910
/* ----------------------------------------------------------------------------
 * Detect deadlock conditions and attempt to resolve them.
 * ------------------------------------------------------------------------- */

static void
911
scheduleDetectDeadlock (Capability **pcap, Task *task)
912
{
913 914
    Capability *cap = *pcap;
    /*
915
     * Detect deadlock: when we have no threads to run, there are no
916 917 918
     * threads blocked, waiting for I/O, or sleeping, and all the
     * other tasks are waiting for work, we must have a deadlock of
     * some description.
919
     */
920
    if ( emptyThreadQueues(cap) )
921
    {
922
#if defined(THREADED_RTS)
923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938
        /*
         * In the threaded RTS, we only check for deadlock if there
         * has been no activity in a complete timeslice.  This means
         * we won't eagerly start a full GC just because we don't have
         * any threads to run currently.
         */
        if (recent_activity != ACTIVITY_INACTIVE) return;
#endif

        debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");

        // Garbage collection can release some new threads due to
        // either (a) finalizers or (b) threads resurrected because
        // they are unreachable and will therefore be sent an
        // exception.  Any threads thus released will be immediately
        // runnable.
939
        scheduleDoGC (pcap, task, true/*force major GC*/, true/*deadlock detection*/);
940
        cap = *pcap;
Ben Gamari's avatar
Ben Gamari committed
941
        // when force_major == true. scheduleDoGC sets
942 943
        // recent_activity to ACTIVITY_DONE_GC and turns off the timer
        // signal.
944

945
        if ( !emptyRunQueue(cap) ) return;
946

947
#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
948 949 950 951 952 953 954 955 956 957 958 959 960
        /* If we have user-installed signal handlers, then wait
         * for signals to arrive rather then bombing out with a
         * deadlock.
         */
        if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
            debugTrace(DEBUG_sched,
                       "still deadlocked, waiting for signals...");

            awaitUserSignals();

            if (signals_pending()) {
                startSignalHandlers(cap);
            }
961

962 963
            // either we have threads to run, or we were interrupted:
            ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
964 965

            return;
966
        }
967 968
#endif

969
#if !defined(THREADED_RTS)
970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987
        /* Probably a real deadlock.  Send the current main thread the
         * Deadlock exception.
         */
        if (task->incall->tso) {
            switch (task->incall->tso->why_blocked) {
            case BlockedOnSTM:
            case BlockedOnBlackHole:
            case BlockedOnMsgThrowTo:
            case BlockedOnMVar:
            case BlockedOnMVarRead:
                throwToSingleThreaded(cap, task->incall->tso,
                                      (StgClosure *)nonTermination_closure);
                return;
            default:
                barf("deadlock: main thread blocked in a strange way");
            }
        }
        return;
988
#endif
989
    }
990 991
}

992

993 994 995 996 997
/* ----------------------------------------------------------------------------
 * Process message in the current Capability's inbox
 * ------------------------------------------------------------------------- */

static void
998
scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
999 1000
{
#if defined(THREADED_RTS)
1001
    Message *m, *next;
Simon Marlow's avatar
Simon Marlow committed
1002
    PutMVar *p, *pnext;
1003
    int r;
1004
    Capability *cap = *pcap;
1005 1006

    while (!emptyInbox(cap)) {
1007 1008
        // Executing messages might use heap, so we should check for GC.
        if (doYouWantToGC(cap)) {
1009
            scheduleDoGC(pcap, cap->running_task, false, false);
1010
            cap = *pcap;
1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025
        }

        // don't use a blocking acquire; if the lock is held by
        // another thread then just carry on.  This seems to avoid
        // getting stuck in a message ping-pong situation with other
        // processors.  We'll check the inbox again later anyway.
        //
        // We should really use a more efficient queue data structure
        // here.  The trickiness is that we must ensure a Capability
        // never goes idle if the inbox is non-empty, which is why we
        // use cap->lock (cap->lock is released as the last thing
        // before going idle; see Capability.c:releaseCapability()).
        r = TRY_ACQUIRE_LOCK(&cap->lock);
        if (r != 0) return;

1026
        m = cap->inbox;
Simon Marlow's avatar
Simon Marlow committed
1027
        p = cap->putMVars;
1028
        cap->inbox = (Message*)END_TSO_QUEUE;
Simon Marlow's avatar
Simon Marlow committed
1029
        cap->putMVars = NULL;
1030

1031
        RELEASE_LOCK(&cap->lock);
1032 1033 1034 1035 1036 1037

        while (m != (Message*)END_TSO_QUEUE) {
            next = m->link;
            executeMessage(cap, m);
            m = next;
        }
Simon Marlow's avatar
Simon Marlow committed
1038 1039 1040 1041 1042 1043 1044 1045 1046

        while (p != NULL) {
            pnext = p->link;
            performTryPutMVar(cap, (StgMVar*)deRefStablePtr(p->mvar),
                              Unit_closure);
            freeStablePtr(p->mvar);
            stgFree(p);
            p = pnext;
        }
1047 1048 1049