Schedule.c 97.4 KB
Newer Older
1
/* ---------------------------------------------------------------------------
2
 *
3
 * (c) The GHC Team, 1998-2006
4
 *
5
 * The scheduler and thread-related functionality
sof's avatar
sof committed
6
 *
7
8
 * --------------------------------------------------------------------------*/

9
#include "PosixSource.h"
10
#define KEEP_LOCKCLOSURE
11
#include "Rts.h"
Simon Marlow's avatar
Simon Marlow committed
12
13

#include "sm/Storage.h"
14
15
16
#include "RtsUtils.h"
#include "StgRun.h"
#include "Schedule.h"
17
#include "Interpreter.h"
18
#include "Printer.h"
19
#include "RtsSignals.h"
Simon Marlow's avatar
Simon Marlow committed
20
#include "sm/Sanity.h"
21
#include "Stats.h"
22
#include "STM.h"
23
#include "Prelude.h"
24
#include "ThreadLabels.h"
25
#include "Updates.h"
26
27
#include "Proftimer.h"
#include "ProfHeap.h"
28
#include "Weak.h"
Simon Marlow's avatar
Simon Marlow committed
29
#include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30
#include "sm/GCThread.h"
31
#include "Sparks.h"
sof's avatar
sof committed
32
#include "Capability.h"
33
34
#include "Task.h"
#include "AwaitEvent.h"
35
36
37
#if defined(mingw32_HOST_OS)
#include "win32/IOManager.h"
#endif
Simon Marlow's avatar
Simon Marlow committed
38
#include "Trace.h"
39
40
#include "RaiseAsync.h"
#include "Threads.h"
Simon Marlow's avatar
Simon Marlow committed
41
42
#include "Timer.h"
#include "ThreadPaused.h"
43
#include "Messages.h"
44
#include "Stable.h"
45

46
47
48
49
50
51
52
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif

53
54
#include <string.h>
#include <stdlib.h>
55
#include <stdarg.h>
56

57
58
59
60
#ifdef HAVE_ERRNO_H
#include <errno.h>
#endif

61
62
63
#ifdef TRACING
#include "eventlog/EventLog.h"
#endif
64
65
66
/* -----------------------------------------------------------------------------
 * Global variables
 * -------------------------------------------------------------------------- */
67

68
69
#if !defined(THREADED_RTS)
// Blocked/sleeping thrads
70
71
StgTSO *blocked_queue_hd = NULL;
StgTSO *blocked_queue_tl = NULL;
72
73
StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
#endif
74

75
76
77
78
79
80
/* Set to true when the latest garbage collection failed to reclaim
 * enough space, and the runtime should proceed to shut itself down in
 * an orderly fashion (emitting profiling info etc.)
 */
rtsBool heap_overflow = rtsFalse;

81
82
83
/* flag that tracks whether we have done any execution in this time slice.
 * LOCK: currently none, perhaps we should lock (but needs to be
 * updated in the fast path of the scheduler).
84
85
 *
 * NB. must be StgWord, we do xchg() on it.
86
 */
87
volatile StgWord recent_activity = ACTIVITY_YES;
88

89
/* if this flag is set as well, give up execution
90
 * LOCK: none (changes monotonically)
91
 */
92
volatile StgWord sched_state = SCHED_RUNNING;
93

94
95
/*
 * This mutex protects most of the global scheduler data in
96
 * the THREADED_RTS runtime.
sof's avatar
sof committed
97
 */
98
#if defined(THREADED_RTS)
99
Mutex sched_mutex;
100
#endif
sof's avatar
sof committed
101

102
103
104
105
#if !defined(mingw32_HOST_OS)
#define FORKPROCESS_PRIMOP_SUPPORTED
#endif

106
107
108
109
/* -----------------------------------------------------------------------------
 * static function prototypes
 * -------------------------------------------------------------------------- */

110
static Capability *schedule (Capability *initialCapability, Task *task);
111
112

//
Gabor Greif's avatar
typo    
Gabor Greif committed
113
// These functions all encapsulate parts of the scheduler loop, and are
114
115
116
// abstracted only to make the structure and control flow of the
// scheduler clearer.
//
117
static void schedulePreLoop (void);
118
static void scheduleFindWork (Capability **pcap);
119
#if defined(THREADED_RTS)
120
static void scheduleYield (Capability **pcap, Task *task);
121
#endif
122
#if defined(THREADED_RTS)
123
124
static rtsBool requestSync (Capability **pcap, Task *task,
                            PendingSync *sync_type, SyncType *prev_sync_type);
125
static void acquireAllCapabilities(Capability *cap, Task *task);
Simon Marlow's avatar
Simon Marlow committed
126
static void releaseAllCapabilities(nat n, Capability *cap, Task *task);
127
static void stopAllCapabilities(Capability **pCap, Task *task);
128
static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
129
#endif
130
static void scheduleStartSignalHandlers (Capability *cap);
131
static void scheduleCheckBlockedThreads (Capability *cap);
132
133
static void scheduleProcessInbox(Capability **cap);
static void scheduleDetectDeadlock (Capability **pcap, Task *task);
134
static void schedulePushWork(Capability *cap, Task *task);
Simon Marlow's avatar
Simon Marlow committed
135
#if defined(THREADED_RTS)
136
static void scheduleActivateSpark(Capability *cap);
137
#endif
138
static void schedulePostRunThread(Capability *cap, StgTSO *t);
139
static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
140
static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
141
                                    nat prev_what_next );
142
static void scheduleHandleThreadBlocked( StgTSO *t );
143
static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
144
                                             StgTSO *t );
145
static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
146
static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
147
148

static void deleteThread (Capability *cap, StgTSO *tso);
149
static void deleteAllThreads (Capability *cap);
150

151
152
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
static void deleteThread_(Capability *cap, StgTSO *tso);
153
#endif
154

155
/* ---------------------------------------------------------------------------
156
157
158
159
160
161
162
163
164
165
166
   Main scheduling loop.

   We use round-robin scheduling, each thread returning to the
   scheduler loop when one of these conditions is detected:

      * out of heap space
      * timer expires (thread yields)
      * thread blocks
      * thread ends
      * stack overflow

167
   ------------------------------------------------------------------------ */
168

169
170
static Capability *
schedule (Capability *initialCapability, Task *task)
171
172
{
  StgTSO *t;
173
  Capability *cap;
174
  StgThreadReturnCode ret;
175
  nat prev_what_next;
176
  rtsBool ready_to_gc;
177
#if defined(THREADED_RTS)
178
  rtsBool first = rtsTrue;
Simon Marlow's avatar
Simon Marlow committed
179
#endif
180

181
182
  cap = initialCapability;

183
184
185
  // Pre-condition: this task owns initialCapability.
  // The sched_mutex is *NOT* held
  // NB. on return, we still hold a capability.
186

187
  debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
188

189
  schedulePreLoop();
190

191
192
  // -----------------------------------------------------------
  // Scheduler loop starts here:
193

Simon Marlow's avatar
Simon Marlow committed
194
  while (1) {
195

196
197
198
    // Check whether we have re-entered the RTS from Haskell without
    // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
    // call).
199
    if (cap->in_haskell) {
200
201
202
          errorBelch("schedule: re-entered unsafely.\n"
                     "   Perhaps a 'foreign import unsafe' should be 'safe'?");
          stg_exit(EXIT_FAILURE);
203
204
    }

205
    // The interruption / shutdown sequence.
206
    //
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
    // In order to cleanly shut down the runtime, we want to:
    //   * make sure that all main threads return to their callers
    //     with the state 'Interrupted'.
    //   * clean up all OS threads assocated with the runtime
    //   * free all memory etc.
    //
    // So the sequence for ^C goes like this:
    //
    //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
    //     arranges for some Capability to wake up
    //
    //   * all threads in the system are halted, and the zombies are
    //     placed on the run queue for cleaning up.  We acquire all
    //     the capabilities in order to delete the threads, this is
    //     done by scheduleDoGC() for convenience (because GC already
    //     needs to acquire all the capabilities).  We can't kill
    //     threads involved in foreign calls.
224
    //
225
226
227
    //   * somebody calls shutdownHaskell(), which calls exitScheduler()
    //
    //   * sched_state := SCHED_SHUTTING_DOWN
228
    //
229
230
231
    //   * all workers exit when the run queue on their capability
    //     drains.  All main threads will also exit when their TSO
    //     reaches the head of the run queue and they can return.
232
    //
233
234
235
    //   * eventually all Capabilities will shut down, and the RTS can
    //     exit.
    //
236
    //   * We might be left with threads blocked in foreign calls,
237
    //     we should really attempt to kill these somehow (TODO);
238

239
240
    switch (sched_state) {
    case SCHED_RUNNING:
241
        break;
242
    case SCHED_INTERRUPTING:
243
        debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
244
        /* scheduleDoGC() deletes all the threads */
245
        scheduleDoGC(&cap,task,rtsTrue);
246
247
248
249
250
251
252
253

        // after scheduleDoGC(), we must be shutting down.  Either some
        // other Capability did the final GC, or we did it above,
        // either way we can fall through to the SCHED_SHUTTING_DOWN
        // case now.
        ASSERT(sched_state == SCHED_SHUTTING_DOWN);
        // fall through

254
    case SCHED_SHUTTING_DOWN:
255
256
257
258
259
260
261
262
        debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
        // If we are a worker, just exit.  If we're a bound thread
        // then we will exit below when we've removed our TSO from
        // the run queue.
        if (!isBoundTask(task) && emptyRunQueue(cap)) {
            return cap;
        }
        break;
263
    default:
264
        barf("sched_state: %d", sched_state);
265
    }
266

267
    scheduleFindWork(&cap);
268

269
270
271
    /* work pushing, currently relevant only for THREADED_RTS:
       (pushes threads, wakes up idle capabilities for stealing) */
    schedulePushWork(cap,task);
272

273
    scheduleDetectDeadlock(&cap,task);
274
275

    // Normally, the only way we can get here with no threads to
276
    // run is if a keyboard interrupt received during
277
278
279
    // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
    // Additionally, it is not fatal for the
    // threaded RTS to reach here with no threads to run.
280
    //
281
282
    // win32: might be here due to awaitEvent() being abandoned
    // as a result of a console event having been delivered.
283

284
#if defined(THREADED_RTS)
285
    if (first)
286
287
288
289
290
291
292
293
294
    {
    // XXX: ToDo
    //     // don't yield the first time, we want a chance to run this
    //     // thread for a bit, even if there are others banging at the
    //     // door.
    //     first = rtsFalse;
    //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
    }

295
    scheduleYield(&cap,task);
296

297
298
299
    if (emptyRunQueue(cap)) continue; // look for work again
#endif

300
#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
301
    if ( emptyRunQueue(cap) ) {
302
        ASSERT(sched_state >= SCHED_INTERRUPTING);
303
    }
304
#endif
305

306
    //
307
308
    // Get a thread to run
    //
309
    t = popRunQueue(cap);
310

311
312
313
    // Sanity check the thread we're about to run.  This can be
    // expensive if there is lots of thread switching going on...
    IF_DEBUG(sanity,checkTSO(t));
314

315
#if defined(THREADED_RTS)
316
317
318
    // Check whether we can run this thread in the current task.
    // If not, we have to pass our capability to the right task.
    {
319
        InCall *bound = t->bound;
320
321
322
323
324
325
326

        if (bound) {
            if (bound->task == task) {
                // yes, the Haskell thread is bound to the current native thread
            } else {
                debugTrace(DEBUG_sched,
                           "thread %lu bound to another OS thread",
327
                           (unsigned long)t->id);
328
329
330
331
332
333
334
335
336
                // no, bound to a different Haskell thread: pass to that thread
                pushOnRunQueue(cap,t);
                continue;
            }
        } else {
            // The thread we want to run is unbound.
            if (task->incall->tso) {
                debugTrace(DEBUG_sched,
                           "this OS thread cannot run thread %lu",
337
                           (unsigned long)t->id);
338
339
340
341
342
343
                // no, the current native thread is bound to a different
                // Haskell thread, so pass it to any worker thread
                pushOnRunQueue(cap,t);
                continue;
            }
        }
344
345
346
    }
#endif

Simon Marlow's avatar
Simon Marlow committed
347
348
349
350
351
352
    // If we're shutting down, and this thread has not yet been
    // killed, kill it now.  This sometimes happens when a finalizer
    // thread is created by the final GC, or a thread previously
    // in a foreign call returns.
    if (sched_state >= SCHED_INTERRUPTING &&
        !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
353
        deleteThread(cap,t);
Simon Marlow's avatar
Simon Marlow committed
354
355
    }

356
357
358
359
360
361
362
363
364
365
366
367
368
369
    // If this capability is disabled, migrate the thread away rather
    // than running it.  NB. but not if the thread is bound: it is
    // really hard for a bound thread to migrate itself.  Believe me,
    // I tried several ways and couldn't find a way to do it.
    // Instead, when everything is stopped for GC, we migrate all the
    // threads on the run queue then (see scheduleDoGC()).
    //
    // ToDo: what about TSO_LOCKED?  Currently we're migrating those
    // when the number of capabilities drops, but we never migrate
    // them back if it rises again.  Presumably we should, but after
    // the thread has been migrated we no longer know what capability
    // it was originally on.
#ifdef THREADED_RTS
    if (cap->disabled && !t->bound) {
370
        Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
371
372
373
374
375
        migrateThread(cap, t, dest_cap);
        continue;
    }
#endif

376
    /* context switches are initiated by the timer signal, unless
377
378
     * the user specified "context switch as often as possible", with
     * +RTS -C0
sof's avatar
sof committed
379
     */
380
    if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
381
382
        && !emptyThreadQueues(cap)) {
        cap->context_switch = 1;
383
    }
384

385
run_thread:
386

Ben Gamari's avatar
Ben Gamari committed
387
    // CurrentTSO is the thread to run. It might be different if we
Simon Marlow's avatar
Simon Marlow committed
388
389
390
391
    // loop back to run_thread, so make sure to set CurrentTSO after
    // that.
    cap->r.rCurrentTSO = t;

392
393
    startHeapProfTimer();

394
    // ----------------------------------------------------------------------
395
    // Run the current thread
396

Simon Marlow's avatar
Simon Marlow committed
397
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
398
    ASSERT(t->cap == cap);
399
    ASSERT(t->bound ? t->bound->task->cap == cap : 1);
Simon Marlow's avatar
Simon Marlow committed
400

401
402
403
    prev_what_next = t->what_next;

    errno = t->saved_errno;
404
405
406
407
#if mingw32_HOST_OS
    SetLastError(t->saved_winerror);
#endif

408
409
410
    // reset the interrupt flag before running Haskell code
    cap->interrupt = 0;

411
    cap->in_haskell = rtsTrue;
412
    cap->idle = 0;
413

414
    dirty_TSO(cap,t);
415
    dirty_STACK(cap,t->stackobj);
Simon Marlow's avatar
Simon Marlow committed
416

417
418
419
    switch (recent_activity)
    {
    case ACTIVITY_DONE_GC: {
420
421
422
        // ACTIVITY_DONE_GC means we turned off the timer signal to
        // conserve power (see #1623).  Re-enable it here.
        nat prev;
423
        prev = xchg((P_)&recent_activity, ACTIVITY_YES);
424
        if (prev == ACTIVITY_DONE_GC) {
Simon Marlow's avatar
Simon Marlow committed
425
#ifndef PROFILING
426
            startTimer();
427
#endif
Simon Marlow's avatar
Simon Marlow committed
428
        }
429
430
431
        break;
    }
    case ACTIVITY_INACTIVE:
432
433
434
435
        // If we reached ACTIVITY_INACTIVE, then don't reset it until
        // we've done the GC.  The thread running here might just be
        // the IO manager thread that handle_tick() woke up via
        // wakeUpRts().
436
437
        break;
    default:
438
439
        recent_activity = ACTIVITY_YES;
    }
440

441
    traceEventRunThread(cap, t);
Simon Marlow's avatar
Simon Marlow committed
442

443
    switch (prev_what_next) {
444

445
446
    case ThreadKilled:
    case ThreadComplete:
447
448
449
450
        /* Thread already finished, return to scheduler. */
        ret = ThreadFinished;
        break;

451
    case ThreadRunGHC:
452
    {
453
454
455
456
457
        StgRegTable *r;
        r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
        cap = regTableToCapability(r);
        ret = r->rRet;
        break;
458
    }
459

460
    case ThreadInterpret:
461
462
463
464
        cap = interpretBCO(cap);
        ret = cap->r.rRet;
        break;

465
    default:
466
        barf("schedule: invalid what_next field");
467
468
    }

469
    cap->in_haskell = rtsFalse;
470

471
472
473
474
    // The TSO might have moved, eg. if it re-entered the RTS and a GC
    // happened.  So find the new location:
    t = cap->r.rCurrentTSO;

475
476
477
478
    // cap->r.rCurrentTSO is charged for calls to allocate(), so we
    // don't want it set when not running a Haskell thread.
    cap->r.rCurrentTSO = NULL;

479
480
481
482
    // And save the current errno in this thread.
    // XXX: possibly bogus for SMP because this thread might already
    // be running again, see code below.
    t->saved_errno = errno;
483
484
#if mingw32_HOST_OS
    // Similarly for Windows error code
485
    t->saved_winerror = GetLastError();
486
#endif
487

488
489
490
491
492
493
494
495
496
497
498
    if (ret == ThreadBlocked) {
        if (t->why_blocked == BlockedOnBlackHole) {
            StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
            traceEventStopThread(cap, t, t->why_blocked + 6,
                                 owner != NULL ? owner->id : 0);
        } else {
            traceEventStopThread(cap, t, t->why_blocked + 6, 0);
        }
    } else {
        traceEventStopThread(cap, t, ret, 0);
    }
Simon Marlow's avatar
Simon Marlow committed
499

500
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
501
    ASSERT(t->cap == cap);
502

503
    // ----------------------------------------------------------------------
504

505
    // Costs for the scheduler are assigned to CCS_SYSTEM
506
    stopHeapProfTimer();
507
#if defined(PROFILING)
508
    cap->r.rCCCS = CCS_SYSTEM;
509
#endif
510

511
    schedulePostRunThread(cap,t);
512

513
514
    ready_to_gc = rtsFalse;

515
516
    switch (ret) {
    case HeapOverflow:
517
518
        ready_to_gc = scheduleHandleHeapOverflow(cap,t);
        break;
519
520

    case StackOverflow:
521
522
523
524
525
        // just adjust the stack for this thread, then pop it back
        // on the run queue.
        threadStackOverflow(cap, t);
        pushOnRunQueue(cap,t);
        break;
526
527

    case ThreadYielding:
528
        if (scheduleHandleYield(cap, t, prev_what_next)) {
529
            // shortcut for switching between compiler/interpreter:
530
531
532
            goto run_thread;
        }
        break;
533
534

    case ThreadBlocked:
535
536
        scheduleHandleThreadBlocked(t);
        break;
537
538

    case ThreadFinished:
539
540
541
        if (scheduleHandleThreadFinished(cap, task, t)) return cap;
        ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
        break;
542
543
544
545
546

    default:
      barf("schedule: invalid thread return code %d", (int)ret);
    }

547
    if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
548
      scheduleDoGC(&cap,task,rtsFalse);
549
    }
550
551
552
  } /* end of while() */
}

553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
/* -----------------------------------------------------------------------------
 * Run queue operations
 * -------------------------------------------------------------------------- */

void
removeFromRunQueue (Capability *cap, StgTSO *tso)
{
    if (tso->block_info.prev == END_TSO_QUEUE) {
        ASSERT(cap->run_queue_hd == tso);
        cap->run_queue_hd = tso->_link;
    } else {
        setTSOLink(cap, tso->block_info.prev, tso->_link);
    }
    if (tso->_link == END_TSO_QUEUE) {
        ASSERT(cap->run_queue_tl == tso);
        cap->run_queue_tl = tso->block_info.prev;
    } else {
        setTSOPrev(cap, tso->_link, tso->block_info.prev);
    }
    tso->_link = tso->block_info.prev = END_TSO_QUEUE;
573
    cap->n_run_queue--;
574
575
576
577

    IF_DEBUG(sanity, checkRunQueue(cap));
}

578
579
580
581
582
583
584
void
promoteInRunQueue (Capability *cap, StgTSO *tso)
{
    removeFromRunQueue(cap, tso);
    pushOnRunQueue(cap, tso);
}

585
586
587
588
589
590
591
/* ----------------------------------------------------------------------------
 * Setting up the scheduler loop
 * ------------------------------------------------------------------------- */

static void
schedulePreLoop(void)
{
592
  // initialisation for scheduler - what cannot go into initScheduler()
593

594
#if defined(mingw32_HOST_OS) && !defined(USE_MINIINTERPRETER)
595
596
    win32AllocStack();
#endif
597
598
}

599
600
601
602
603
604
605
/* -----------------------------------------------------------------------------
 * scheduleFindWork()
 *
 * Search for work to do, and handle messages from elsewhere.
 * -------------------------------------------------------------------------- */

static void
606
scheduleFindWork (Capability **pcap)
607
{
608
    scheduleStartSignalHandlers(*pcap);
609

610
    scheduleProcessInbox(pcap);
611

612
    scheduleCheckBlockedThreads(*pcap);
613

Simon Marlow's avatar
Simon Marlow committed
614
#if defined(THREADED_RTS)
615
    if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
616
617
618
619
620
#endif
}

#if defined(THREADED_RTS)
STATIC_INLINE rtsBool
621
shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
622
623
{
    // we need to yield this capability to someone else if..
624
625
    //   - another thread is initiating a GC, and we didn't just do a GC
    //     (see Note [GC livelock])
626
627
628
629
    //   - another Task is returning from a foreign call
    //   - the thread at the head of the run queue cannot be run
    //     by this Task (it is bound to another Task, or it is unbound
    //     and this task it bound).
630
631
632
633
634
635
636
637
638
    //
    // Note [GC livelock]
    //
    // If we are interrupted to do a GC, then we do not immediately do
    // another one.  This avoids a starvation situation where one
    // Capability keeps forcing a GC and the other Capabilities make no
    // progress at all.

    return ((pending_sync && !didGcLast) ||
639
            cap->n_returning_tasks != 0 ||
640
            (!emptyRunQueue(cap) && (task->incall->tso == NULL
641
642
                                     ? peekRunQueue(cap)->bound != NULL
                                     : peekRunQueue(cap)->bound != task->incall)));
643
644
645
646
647
}

// This is the single place where a Task goes to sleep.  There are
// two reasons it might need to sleep:
//    - there are no threads to run
648
//    - we need to yield this Capability to someone else
649
650
//      (see shouldYieldCapability())
//
651
652
653
// Careful: the scheduler loop is quite delicate.  Make sure you run
// the tests in testsuite/concurrent (all ways) after modifying this,
// and also check the benchmarks in nofib/parallel for regressions.
654
655

static void
656
scheduleYield (Capability **pcap, Task *task)
657
658
{
    Capability *cap = *pcap;
659
    int didGcLast = rtsFalse;
660
661

    // if we have work, and we don't need to give up the Capability, continue.
662
    //
663
    if (!shouldYieldCapability(cap,task,rtsFalse) &&
664
        (!emptyRunQueue(cap) ||
665
         !emptyInbox(cap) ||
666
         sched_state >= SCHED_INTERRUPTING)) {
667
        return;
668
    }
669
670
671

    // otherwise yield (sleep), and keep yielding if necessary.
    do {
672
        didGcLast = yieldCapability(&cap,task, !didGcLast);
673
    }
674
    while (shouldYieldCapability(cap,task,didGcLast));
675
676
677
678
679
680
681
682

    // note there may still be no threads on the run queue at this
    // point, the caller has to check.

    *pcap = cap;
    return;
}
#endif
683

684
685
686
687
688
689
690
/* -----------------------------------------------------------------------------
 * schedulePushWork()
 *
 * Push work to other Capabilities if we have some.
 * -------------------------------------------------------------------------- */

static void
691
692
schedulePushWork(Capability *cap USED_IF_THREADS,
                 Task *task      USED_IF_THREADS)
693
{
694
695
696
697
  /* following code not for PARALLEL_HASKELL. I kept the call general,
     future GUM versions might use pushing in a distributed setup */
#if defined(THREADED_RTS)

698
    Capability *free_caps[n_capabilities], *cap0;
699
    uint32_t i, n_wanted_caps, n_free_caps;
700

Simon Marlow's avatar
Simon Marlow committed
701
702
    uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;

Simon Marlow's avatar
Simon Marlow committed
703
    // migration can be turned off with +RTS -qm
Simon Marlow's avatar
Simon Marlow committed
704
705
706
    if (!RtsFlags.ParFlags.migrate) {
        spare_threads = 0;
    }
707

708
709
    // Figure out how many capabilities we want to wake up.  We need at least
    // sparkPoolSize(cap) plus the number of spare threads we have.
Simon Marlow's avatar
Simon Marlow committed
710
    n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
711
    if (n_wanted_caps == 0) return;
712

713
    // First grab as many free Capabilities as we can.
714
715
716
    for (i = (cap->no + 1) % n_capabilities, n_free_caps=0;
         n_free_caps < n_wanted_caps && i != cap->no;
         i = (i + 1) % n_capabilities) {
717
        cap0 = capabilities[i];
718
        if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
719
            if (!emptyRunQueue(cap0)
720
                || cap0->n_returning_tasks != 0
721
                || cap0->inbox != (Message*)END_TSO_QUEUE) {
722
723
724
725
726
727
728
                // it already has some work, we just grabbed it at
                // the wrong moment.  Or maybe it's deadlocked!
                releaseCapability(cap0);
            } else {
                free_caps[n_free_caps++] = cap0;
            }
        }
729
730
    }

Simon Marlow's avatar
Simon Marlow committed
731
732
733
734
735
736
737
738
739
740
    // We now have n_free_caps free capabilities stashed in
    // free_caps[].  Attempt to share our run queue equally with them.
    // This is complicated slightly by the fact that we can't move
    // some threads:
    //
    //  - threads that have TSO_LOCKED cannot migrate
    //  - a thread that is bound to the current Task cannot be migrated
    //
    // This is about the simplest thing we could do; improvements we
    // might want to do include:
741
    //
742
    //   - giving high priority to moving relatively new threads, on
743
744
745
746
747
748
    //     the gournds that they haven't had time to build up a
    //     working set in the cache on this CPU/Capability.
    //
    //   - giving low priority to moving long-lived threads

    if (n_free_caps > 0) {
749
        StgTSO *prev, *t, *next;
750

751
        debugTrace(DEBUG_sched,
Simon Marlow's avatar
Simon Marlow committed
752
753
                   "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
                   cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
754
                   n_free_caps);
755

756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
        // There are n_free_caps+1 caps in total.  We will share the threads
        // evently between them, *except* that if the run queue does not divide
        // evenly by n_free_caps+1 then we bias towards the current capability.
        // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
        uint32_t keep_threads =
            (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);

        // This also ensures that we don't give away all our threads, since
        // (x + y) / (y + 1) >= 1 when x >= 1.

        // The number of threads we have left.
        uint32_t n = cap->n_run_queue;

        // prev = the previous thread on this cap's run queue
        prev = END_TSO_QUEUE;

        // We're going to walk through the run queue, migrating threads to other
        // capabilities until we have only keep_threads left.  We might
        // encounter a thread that cannot be migrated, in which case we add it
        // to the current run queue and decrement keep_threads.
        for (t = cap->run_queue_hd, i = 0;
             t != END_TSO_QUEUE && n > keep_threads;
             t = next)
        {
            next = t->_link;
            t->_link = END_TSO_QUEUE;

            // Should we keep this thread?
            if (t->bound == task->incall // don't move my bound thread
                || tsoLocked(t) // don't move a locked thread
                ) {
                if (prev == END_TSO_QUEUE) {
                    cap->run_queue_hd = t;
789
                } else {
790
                    setTSOLink(cap, prev, t);
791
                }
792
793
794
                setTSOPrev(cap, t, prev);
                prev = t;
                if (keep_threads > 0) keep_threads--;
795
            }
796

797
798
799
800
            // Or migrate it?
            else {
                appendToRunQueue(free_caps[i],t);
                traceEventMigrateThread (cap, t, free_caps[i]->no);
801

802
803
804
805
806
                if (t->bound) { t->bound->task->cap = free_caps[i]; }
                t->cap = free_caps[i];
                n--; // we have one fewer threads now
                i++; // move on to the next free_cap
                if (i == n_free_caps) i = 0;
807
808
            }
        }
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824

        // Join up the beginning of the queue (prev)
        // with the rest of the queue (t)
        if (t == END_TSO_QUEUE) {
            cap->run_queue_tl = prev;
        } else {
            setTSOPrev(cap, t, prev);
        }
        if (prev == END_TSO_QUEUE) {
            cap->run_queue_hd = t;
        } else {
            setTSOLink(cap, prev, t);
        }
        cap->n_run_queue = n;

        IF_DEBUG(sanity, checkRunQueue(cap));
825

826
827
828
        // release the capabilities
        for (i = 0; i < n_free_caps; i++) {
            task->cap = free_caps[i];
829
830
831
832
833
834
835
            if (sparkPoolSizeCap(cap) > 0) {
                // If we have sparks to steal, wake up a worker on the
                // capability, even if it has no threads to run.
                releaseAndWakeupCapability(free_caps[i]);
            } else {
                releaseCapability(free_caps[i]);
            }
836
        }
837
838
    }
    task->cap = cap; // reset to point to our Capability.
839
840
841

#endif /* THREADED_RTS */

842
843
}

844
845
846
847
/* ----------------------------------------------------------------------------
 * Start any pending signal handlers
 * ------------------------------------------------------------------------- */

848
#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
849
static void
850
scheduleStartSignalHandlers(Capability *cap)
851
{
852
853
    if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
        // safe outside the lock
854
        startSignalHandlers(cap);
855
856
    }
}
857
858
859
860
861
862
#else
static void
scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
{
}
#endif
863
864
865
866
867
868

/* ----------------------------------------------------------------------------
 * Check for blocked threads that can be woken up.
 * ------------------------------------------------------------------------- */

static void
869
scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
870
{
871
#if !defined(THREADED_RTS)
872
873
874
875
876
    //
    // Check whether any waiting threads need to be woken up.  If the
    // run queue is empty, and there are no other tasks running, we
    // can wait indefinitely for something to happen.
    //
877
    if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
878
    {
879
        awaitEvent (emptyRunQueue(cap));
880
881
882
883
    }
#endif
}

884
885
886
887
888
/* ----------------------------------------------------------------------------
 * Detect deadlock conditions and attempt to resolve them.
 * ------------------------------------------------------------------------- */

static void
889
scheduleDetectDeadlock (Capability **pcap, Task *task)
890
{
891
892
    Capability *cap = *pcap;
    /*
893
     * Detect deadlock: when we have no threads to run, there are no
894
895
896
     * threads blocked, waiting for I/O, or sleeping, and all the
     * other tasks are waiting for work, we must have a deadlock of
     * some description.
897
     */
898
    if ( emptyThreadQueues(cap) )
899
    {
900
#if defined(THREADED_RTS)
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
        /*
         * In the threaded RTS, we only check for deadlock if there
         * has been no activity in a complete timeslice.  This means
         * we won't eagerly start a full GC just because we don't have
         * any threads to run currently.
         */
        if (recent_activity != ACTIVITY_INACTIVE) return;
#endif

        debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");

        // Garbage collection can release some new threads due to
        // either (a) finalizers or (b) threads resurrected because
        // they are unreachable and will therefore be sent an
        // exception.  Any threads thus released will be immediately
        // runnable.
917
918
        scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
        cap = *pcap;
919
920
921
        // when force_major == rtsTrue. scheduleDoGC sets
        // recent_activity to ACTIVITY_DONE_GC and turns off the timer
        // signal.
922

923
        if ( !emptyRunQueue(cap) ) return;
924

925
#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
926
927
928
929
930
931
932
933
934
935
936
937
938
        /* If we have user-installed signal handlers, then wait
         * for signals to arrive rather then bombing out with a
         * deadlock.
         */
        if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
            debugTrace(DEBUG_sched,
                       "still deadlocked, waiting for signals...");

            awaitUserSignals();

            if (signals_pending()) {
                startSignalHandlers(cap);
            }
939

940
941
            // either we have threads to run, or we were interrupted:
            ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
942
943

            return;
944
        }
945
946
#endif

947
#if !defined(THREADED_RTS)
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
        /* Probably a real deadlock.  Send the current main thread the
         * Deadlock exception.
         */
        if (task->incall->tso) {
            switch (task->incall->tso->why_blocked) {
            case BlockedOnSTM:
            case BlockedOnBlackHole:
            case BlockedOnMsgThrowTo:
            case BlockedOnMVar:
            case BlockedOnMVarRead:
                throwToSingleThreaded(cap, task->incall->tso,
                                      (StgClosure *)nonTermination_closure);
                return;
            default:
                barf("deadlock: main thread blocked in a strange way");
            }
        }
        return;
966
#endif
967
    }
968
969
}

970

971
972
973
974
975
/* ----------------------------------------------------------------------------
 * Process message in the current Capability's inbox
 * ------------------------------------------------------------------------- */

static void
976
scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
977
978
{
#if defined(THREADED_RTS)
979
980
    Message *m, *next;
    int r;
981
    Capability *cap = *pcap;
982
983

    while (!emptyInbox(cap)) {
984
985
        if (cap->r.rCurrentNursery->link == NULL ||
            g0->n_new_large_words >= large_alloc_lim) {
986
987
            scheduleDoGC(pcap, cap->running_task, rtsFalse);
            cap = *pcap;
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
        }

        // don't use a blocking acquire; if the lock is held by
        // another thread then just carry on.  This seems to avoid
        // getting stuck in a message ping-pong situation with other
        // processors.  We'll check the inbox again later anyway.
        //
        // We should really use a more efficient queue data structure
        // here.  The trickiness is that we must ensure a Capability
        // never goes idle if the inbox is non-empty, which is why we
        // use cap->lock (cap->lock is released as the last thing
        // before going idle; see Capability.c:releaseCapability()).
        r = TRY_ACQUIRE_LOCK(&cap->lock);
        if (r != 0) return;

1003
        m = cap->inbox;
1004
1005
        cap->inbox = (Message*)END_TSO_QUEUE;

1006
        RELEASE_LOCK(&cap->lock);
1007
1008
1009
1010
1011
1012

        while (m != (Message*)END_TSO_QUEUE) {
            next = m->link;
            executeMessage(cap, m);
            m = next;
        }
1013
1014
1015
1016
    }
#endif
}

1017
/* ----------------------------------------------------------------------------
thomie's avatar
thomie committed
1018
 * Activate spark threads (THREADED_RTS)
1019
1020
 * ------------------------------------------------------------------------- */

Simon Marlow's avatar
Simon Marlow committed
1021
#if defined(THREADED_RTS)