Schedule.c 73.3 KB
Newer Older
1
/* ---------------------------------------------------------------------------
2
 *
3
 * (c) The GHC Team, 1998-2006
4
 *
5
 * The scheduler and thread-related functionality
sof's avatar
sof committed
6
 *
7
8
 * --------------------------------------------------------------------------*/

9
#include "PosixSource.h"
10
#define KEEP_LOCKCLOSURE
11
#include "Rts.h"
Simon Marlow's avatar
Simon Marlow committed
12
13

#include "sm/Storage.h"
14
15
16
#include "RtsUtils.h"
#include "StgRun.h"
#include "Schedule.h"
17
#include "Interpreter.h"
18
#include "Printer.h"
19
#include "RtsSignals.h"
Simon Marlow's avatar
Simon Marlow committed
20
#include "sm/Sanity.h"
21
#include "Stats.h"
22
#include "STM.h"
23
#include "Prelude.h"
24
#include "ThreadLabels.h"
25
#include "Updates.h"
26
27
#include "Proftimer.h"
#include "ProfHeap.h"
28
#include "Weak.h"
Simon Marlow's avatar
Simon Marlow committed
29
#include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30
#include "Sparks.h"
sof's avatar
sof committed
31
#include "Capability.h"
32
33
#include "Task.h"
#include "AwaitEvent.h"
34
35
36
#if defined(mingw32_HOST_OS)
#include "win32/IOManager.h"
#endif
Simon Marlow's avatar
Simon Marlow committed
37
#include "Trace.h"
38
39
#include "RaiseAsync.h"
#include "Threads.h"
Simon Marlow's avatar
Simon Marlow committed
40
41
#include "Timer.h"
#include "ThreadPaused.h"
42
#include "Messages.h"
43

44
45
46
47
48
49
50
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif

51
52
#include <string.h>
#include <stdlib.h>
53
#include <stdarg.h>
54

55
56
57
58
#ifdef HAVE_ERRNO_H
#include <errno.h>
#endif

59
60
61
#ifdef TRACING
#include "eventlog/EventLog.h"
#endif
62
63
64
/* -----------------------------------------------------------------------------
 * Global variables
 * -------------------------------------------------------------------------- */
65

66
67
#if !defined(THREADED_RTS)
// Blocked/sleeping thrads
68
69
StgTSO *blocked_queue_hd = NULL;
StgTSO *blocked_queue_tl = NULL;
70
71
StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
#endif
72

73
74
75
76
77
78
/* Set to true when the latest garbage collection failed to reclaim
 * enough space, and the runtime should proceed to shut itself down in
 * an orderly fashion (emitting profiling info etc.)
 */
rtsBool heap_overflow = rtsFalse;

79
80
81
/* flag that tracks whether we have done any execution in this time slice.
 * LOCK: currently none, perhaps we should lock (but needs to be
 * updated in the fast path of the scheduler).
82
83
 *
 * NB. must be StgWord, we do xchg() on it.
84
 */
85
volatile StgWord recent_activity = ACTIVITY_YES;
86

87
/* if this flag is set as well, give up execution
88
 * LOCK: none (changes monotonically)
89
 */
90
volatile StgWord sched_state = SCHED_RUNNING;
91

92
93
94
95
96
97
/*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
 *  exists - earlier gccs apparently didn't.
 *  -= chak
 */
StgTSO dummy_tso;

sof's avatar
sof committed
98
99
100
101
102
/*
 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
 * in an MT setting, needed to signal that a worker thread shouldn't hang around
 * in the scheduler when it is out of work.
 */
103
rtsBool shutting_down_scheduler = rtsFalse;
104

105
106
/*
 * This mutex protects most of the global scheduler data in
107
 * the THREADED_RTS runtime.
sof's avatar
sof committed
108
 */
109
#if defined(THREADED_RTS)
110
Mutex sched_mutex;
111
#endif
sof's avatar
sof committed
112

113
114
115
116
#if !defined(mingw32_HOST_OS)
#define FORKPROCESS_PRIMOP_SUPPORTED
#endif

117
118
119
120
/* -----------------------------------------------------------------------------
 * static function prototypes
 * -------------------------------------------------------------------------- */

121
static Capability *schedule (Capability *initialCapability, Task *task);
122
123
124
125
126
127

//
// These function all encapsulate parts of the scheduler loop, and are
// abstracted only to make the structure and control flow of the
// scheduler clearer.
//
128
static void schedulePreLoop (void);
129
130
static void scheduleFindWork (Capability *cap);
#if defined(THREADED_RTS)
131
static void scheduleYield (Capability **pcap, Task *task);
132
#endif
133
static void scheduleStartSignalHandlers (Capability *cap);
134
static void scheduleCheckBlockedThreads (Capability *cap);
135
static void scheduleProcessInbox(Capability *cap);
136
static void scheduleDetectDeadlock (Capability *cap, Task *task);
137
static void schedulePushWork(Capability *cap, Task *task);
Simon Marlow's avatar
Simon Marlow committed
138
#if defined(THREADED_RTS)
139
static void scheduleActivateSpark(Capability *cap);
140
#endif
141
static void schedulePostRunThread(Capability *cap, StgTSO *t);
142
static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
143
static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
144
				    nat prev_what_next );
145
static void scheduleHandleThreadBlocked( StgTSO *t );
146
147
static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
					     StgTSO *t );
148
static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
149
static Capability *scheduleDoGC(Capability *cap, Task *task,
150
				rtsBool force_major);
151
152

static void deleteThread (Capability *cap, StgTSO *tso);
153
static void deleteAllThreads (Capability *cap);
154

155
156
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
static void deleteThread_(Capability *cap, StgTSO *tso);
157
#endif
158

159
/* ---------------------------------------------------------------------------
160
161
162
163
164
165
166
167
168
169
170
   Main scheduling loop.

   We use round-robin scheduling, each thread returning to the
   scheduler loop when one of these conditions is detected:

      * out of heap space
      * timer expires (thread yields)
      * thread blocks
      * thread ends
      * stack overflow

171
172
173
174
175
   GRAN version:
     In a GranSim setup this loop iterates over the global event queue.
     This revolves around the global event queue, which determines what 
     to do next. Therefore, it's more complicated than either the 
     concurrent or the parallel (GUM) setup.
176
  This version has been entirely removed (JB 2008/08).
177
178
179
180
181
182
183
184
185
186

   GUM version:
     GUM iterates over incoming messages.
     It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
     and sends out a fish whenever it has nothing to do; in-between
     doing the actual reductions (shared code below) it processes the
     incoming messages and deals with delayed operations 
     (see PendingFetches).
     This is not the ugliest code you could imagine, but it's bloody close.

187
188
189
190
191
192
  (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
  now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
  as well as future GUM versions. This file has been refurbished to
  only contain valid code, which is however incomplete, refers to
  invalid includes etc.

193
   ------------------------------------------------------------------------ */
194

195
196
static Capability *
schedule (Capability *initialCapability, Task *task)
197
198
{
  StgTSO *t;
199
  Capability *cap;
200
  StgThreadReturnCode ret;
201
  nat prev_what_next;
202
  rtsBool ready_to_gc;
203
#if defined(THREADED_RTS)
204
  rtsBool first = rtsTrue;
Simon Marlow's avatar
Simon Marlow committed
205
#endif
206
  
207
208
  cap = initialCapability;

209
210
211
  // Pre-condition: this task owns initialCapability.
  // The sched_mutex is *NOT* held
  // NB. on return, we still hold a capability.
212

213
  debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
214

215
  schedulePreLoop();
216

217
218
  // -----------------------------------------------------------
  // Scheduler loop starts here:
219

Simon Marlow's avatar
Simon Marlow committed
220
  while (1) {
221

222
223
224
    // Check whether we have re-entered the RTS from Haskell without
    // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
    // call).
225
    if (cap->in_haskell) {
226
227
    	  errorBelch("schedule: re-entered unsafely.\n"
    		     "   Perhaps a 'foreign import unsafe' should be 'safe'?");
228
    	  stg_exit(EXIT_FAILURE);
229
230
    }

231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
    // The interruption / shutdown sequence.
    // 
    // In order to cleanly shut down the runtime, we want to:
    //   * make sure that all main threads return to their callers
    //     with the state 'Interrupted'.
    //   * clean up all OS threads assocated with the runtime
    //   * free all memory etc.
    //
    // So the sequence for ^C goes like this:
    //
    //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
    //     arranges for some Capability to wake up
    //
    //   * all threads in the system are halted, and the zombies are
    //     placed on the run queue for cleaning up.  We acquire all
    //     the capabilities in order to delete the threads, this is
    //     done by scheduleDoGC() for convenience (because GC already
    //     needs to acquire all the capabilities).  We can't kill
    //     threads involved in foreign calls.
    // 
    //   * somebody calls shutdownHaskell(), which calls exitScheduler()
    //
    //   * sched_state := SCHED_SHUTTING_DOWN
254
    //
255
256
257
    //   * all workers exit when the run queue on their capability
    //     drains.  All main threads will also exit when their TSO
    //     reaches the head of the run queue and they can return.
258
    //
259
260
261
262
263
264
265
266
267
268
    //   * eventually all Capabilities will shut down, and the RTS can
    //     exit.
    //
    //   * We might be left with threads blocked in foreign calls, 
    //     we should really attempt to kill these somehow (TODO);
    
    switch (sched_state) {
    case SCHED_RUNNING:
	break;
    case SCHED_INTERRUPTING:
Simon Marlow's avatar
Simon Marlow committed
269
	debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
270
#if defined(THREADED_RTS)
271
272
	discardSparksCap(cap);
#endif
273
	/* scheduleDoGC() deletes all the threads */
274
	cap = scheduleDoGC(cap,task,rtsFalse);
275
276
277
278
279
280
281
282

        // after scheduleDoGC(), we must be shutting down.  Either some
        // other Capability did the final GC, or we did it above,
        // either way we can fall through to the SCHED_SHUTTING_DOWN
        // case now.
        ASSERT(sched_state == SCHED_SHUTTING_DOWN);
        // fall through

283
    case SCHED_SHUTTING_DOWN:
Simon Marlow's avatar
Simon Marlow committed
284
	debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
285
286
287
	// If we are a worker, just exit.  If we're a bound thread
	// then we will exit below when we've removed our TSO from
	// the run queue.
288
	if (!isBoundTask(task) && emptyRunQueue(cap)) {
289
	    return cap;
290
	}
291
292
293
	break;
    default:
	barf("sched_state: %d", sched_state);
294
    }
295

296
    scheduleFindWork(cap);
297

298
299
300
    /* work pushing, currently relevant only for THREADED_RTS:
       (pushes threads, wakes up idle capabilities for stealing) */
    schedulePushWork(cap,task);
301

302
    scheduleDetectDeadlock(cap,task);
303

304
305
306
#if defined(THREADED_RTS)
    cap = task->cap;    // reload cap, it might have changed
#endif
307
308
309
310
311
312

    // Normally, the only way we can get here with no threads to
    // run is if a keyboard interrupt received during 
    // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
    // Additionally, it is not fatal for the
    // threaded RTS to reach here with no threads to run.
313
    //
314
315
    // win32: might be here due to awaitEvent() being abandoned
    // as a result of a console event having been delivered.
316
317
318
319
320
321
322
323
324
325
326
327
    
#if defined(THREADED_RTS)
    if (first) 
    {
    // XXX: ToDo
    //     // don't yield the first time, we want a chance to run this
    //     // thread for a bit, even if there are others banging at the
    //     // door.
    //     first = rtsFalse;
    //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
    }

328
    scheduleYield(&cap,task);
329

330
331
332
    if (emptyRunQueue(cap)) continue; // look for work again
#endif

333
#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
334
    if ( emptyRunQueue(cap) ) {
335
	ASSERT(sched_state >= SCHED_INTERRUPTING);
336
    }
337
#endif
338

339
340
341
    // 
    // Get a thread to run
    //
342
    t = popRunQueue(cap);
343

344
345
346
    // Sanity check the thread we're about to run.  This can be
    // expensive if there is lots of thread switching going on...
    IF_DEBUG(sanity,checkTSO(t));
347

348
#if defined(THREADED_RTS)
349
350
351
    // Check whether we can run this thread in the current task.
    // If not, we have to pass our capability to the right task.
    {
352
        InCall *bound = t->bound;
353
      
354
	if (bound) {
355
	    if (bound->task == task) {
356
357
		// yes, the Haskell thread is bound to the current native thread
	    } else {
Simon Marlow's avatar
Simon Marlow committed
358
		debugTrace(DEBUG_sched,
359
360
			   "thread %lu bound to another OS thread",
                           (unsigned long)t->id);
361
362
363
364
365
366
		// no, bound to a different Haskell thread: pass to that thread
		pushOnRunQueue(cap,t);
		continue;
	    }
	} else {
	    // The thread we want to run is unbound.
367
	    if (task->incall->tso) { 
Simon Marlow's avatar
Simon Marlow committed
368
		debugTrace(DEBUG_sched,
369
370
			   "this OS thread cannot run thread %lu",
                           (unsigned long)t->id);
371
372
373
374
375
		// no, the current native thread is bound to a different
		// Haskell thread, so pass it to any worker thread
		pushOnRunQueue(cap,t);
		continue; 
	    }
376
	}
377
378
379
    }
#endif

Simon Marlow's avatar
Simon Marlow committed
380
381
382
383
384
385
    // If we're shutting down, and this thread has not yet been
    // killed, kill it now.  This sometimes happens when a finalizer
    // thread is created by the final GC, or a thread previously
    // in a foreign call returns.
    if (sched_state >= SCHED_INTERRUPTING &&
        !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
386
        deleteThread(cap,t);
Simon Marlow's avatar
Simon Marlow committed
387
388
    }

389
    /* context switches are initiated by the timer signal, unless
390
391
     * the user specified "context switch as often as possible", with
     * +RTS -C0
sof's avatar
sof committed
392
     */
393
394
    if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
	&& !emptyThreadQueues(cap)) {
395
	cap->context_switch = 1;
396
397
    }
	 
398
run_thread:
399

Simon Marlow's avatar
Simon Marlow committed
400
401
402
403
404
    // CurrentTSO is the thread to run.  t might be different if we
    // loop back to run_thread, so make sure to set CurrentTSO after
    // that.
    cap->r.rCurrentTSO = t;

405
406
    startHeapProfTimer();

407
408
409
    // ----------------------------------------------------------------------
    // Run the current thread 

Simon Marlow's avatar
Simon Marlow committed
410
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
411
    ASSERT(t->cap == cap);
412
    ASSERT(t->bound ? t->bound->task->cap == cap : 1);
Simon Marlow's avatar
Simon Marlow committed
413

414
415
416
    prev_what_next = t->what_next;

    errno = t->saved_errno;
417
418
419
420
#if mingw32_HOST_OS
    SetLastError(t->saved_winerror);
#endif

421
    cap->in_haskell = rtsTrue;
422

423
    dirty_TSO(cap,t);
424
    dirty_STACK(cap,t->stackobj);
Simon Marlow's avatar
Simon Marlow committed
425

426
427
428
429
430
#if defined(THREADED_RTS)
    if (recent_activity == ACTIVITY_DONE_GC) {
        // ACTIVITY_DONE_GC means we turned off the timer signal to
        // conserve power (see #1623).  Re-enable it here.
        nat prev;
431
        prev = xchg((P_)&recent_activity, ACTIVITY_YES);
432
433
434
        if (prev == ACTIVITY_DONE_GC) {
            startTimer();
        }
435
436
437
438
439
    } else if (recent_activity != ACTIVITY_INACTIVE) {
        // If we reached ACTIVITY_INACTIVE, then don't reset it until
        // we've done the GC.  The thread running here might just be
        // the IO manager thread that handle_tick() woke up via
        // wakeUpRts().
440
441
442
        recent_activity = ACTIVITY_YES;
    }
#endif
443

444
    traceEventRunThread(cap, t);
Simon Marlow's avatar
Simon Marlow committed
445

446
    switch (prev_what_next) {
447
	
448
449
450
451
452
    case ThreadKilled:
    case ThreadComplete:
	/* Thread already finished, return to scheduler. */
	ret = ThreadFinished;
	break;
453
	
454
    case ThreadRunGHC:
455
456
457
458
459
    {
	StgRegTable *r;
	r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
	cap = regTableToCapability(r);
	ret = r->rRet;
460
	break;
461
462
    }
    
463
    case ThreadInterpret:
464
465
	cap = interpretBCO(cap);
	ret = cap->r.rRet;
466
	break;
467
	
468
    default:
469
	barf("schedule: invalid what_next field");
470
471
    }

472
    cap->in_haskell = rtsFalse;
473

474
475
476
477
    // The TSO might have moved, eg. if it re-entered the RTS and a GC
    // happened.  So find the new location:
    t = cap->r.rCurrentTSO;

478
479
480
481
    // And save the current errno in this thread.
    // XXX: possibly bogus for SMP because this thread might already
    // be running again, see code below.
    t->saved_errno = errno;
482
483
#if mingw32_HOST_OS
    // Similarly for Windows error code
484
    t->saved_winerror = GetLastError();
485
#endif
486

487
488
489
490
491
492
493
494
495
496
497
    if (ret == ThreadBlocked) {
        if (t->why_blocked == BlockedOnBlackHole) {
            StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
            traceEventStopThread(cap, t, t->why_blocked + 6,
                                 owner != NULL ? owner->id : 0);
        } else {
            traceEventStopThread(cap, t, t->why_blocked + 6, 0);
        }
    } else {
        traceEventStopThread(cap, t, ret, 0);
    }
Simon Marlow's avatar
Simon Marlow committed
498

499
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
500
    ASSERT(t->cap == cap);
501

502
    // ----------------------------------------------------------------------
503
    
504
    // Costs for the scheduler are assigned to CCS_SYSTEM
505
    stopHeapProfTimer();
506
#if defined(PROFILING)
507
508
509
    CCCS = CCS_SYSTEM;
#endif
    
510
    schedulePostRunThread(cap,t);
511

512
513
    ready_to_gc = rtsFalse;

514
515
516
517
518
519
    switch (ret) {
    case HeapOverflow:
	ready_to_gc = scheduleHandleHeapOverflow(cap,t);
	break;

    case StackOverflow:
520
521
522
523
524
        // just adjust the stack for this thread, then pop it back
        // on the run queue.
        threadStackOverflow(cap, t);
        pushOnRunQueue(cap,t);
        break;
525
526

    case ThreadYielding:
527
	if (scheduleHandleYield(cap, t, prev_what_next)) {
528
529
530
531
532
533
534
535
536
537
            // shortcut for switching between compiler/interpreter:
	    goto run_thread; 
	}
	break;

    case ThreadBlocked:
	scheduleHandleThreadBlocked(t);
	break;

    case ThreadFinished:
538
	if (scheduleHandleThreadFinished(cap, task, t)) return cap;
539
	ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
540
541
542
543
544
545
	break;

    default:
      barf("schedule: invalid thread return code %d", (int)ret);
    }

546
    if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
547
      cap = scheduleDoGC(cap,task,rtsFalse);
548
    }
549
550
551
  } /* end of while() */
}

552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
/* -----------------------------------------------------------------------------
 * Run queue operations
 * -------------------------------------------------------------------------- */

void
removeFromRunQueue (Capability *cap, StgTSO *tso)
{
    if (tso->block_info.prev == END_TSO_QUEUE) {
        ASSERT(cap->run_queue_hd == tso);
        cap->run_queue_hd = tso->_link;
    } else {
        setTSOLink(cap, tso->block_info.prev, tso->_link);
    }
    if (tso->_link == END_TSO_QUEUE) {
        ASSERT(cap->run_queue_tl == tso);
        cap->run_queue_tl = tso->block_info.prev;
    } else {
        setTSOPrev(cap, tso->_link, tso->block_info.prev);
    }
    tso->_link = tso->block_info.prev = END_TSO_QUEUE;

    IF_DEBUG(sanity, checkRunQueue(cap));
}

576
577
578
579
580
581
582
/* ----------------------------------------------------------------------------
 * Setting up the scheduler loop
 * ------------------------------------------------------------------------- */

static void
schedulePreLoop(void)
{
583
  // initialisation for scheduler - what cannot go into initScheduler()  
584
585
}

586
587
588
589
590
591
592
593
594
595
596
/* -----------------------------------------------------------------------------
 * scheduleFindWork()
 *
 * Search for work to do, and handle messages from elsewhere.
 * -------------------------------------------------------------------------- */

static void
scheduleFindWork (Capability *cap)
{
    scheduleStartSignalHandlers(cap);

597
    scheduleProcessInbox(cap);
598
599
600

    scheduleCheckBlockedThreads(cap);

Simon Marlow's avatar
Simon Marlow committed
601
#if defined(THREADED_RTS)
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
    if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
#endif
}

#if defined(THREADED_RTS)
STATIC_INLINE rtsBool
shouldYieldCapability (Capability *cap, Task *task)
{
    // we need to yield this capability to someone else if..
    //   - another thread is initiating a GC
    //   - another Task is returning from a foreign call
    //   - the thread at the head of the run queue cannot be run
    //     by this Task (it is bound to another Task, or it is unbound
    //     and this task it bound).
    return (waiting_for_gc || 
            cap->returning_tasks_hd != NULL ||
618
            (!emptyRunQueue(cap) && (task->incall->tso == NULL
619
                                     ? cap->run_queue_hd->bound != NULL
620
                                     : cap->run_queue_hd->bound != task->incall)));
621
622
623
624
625
626
627
628
}

// This is the single place where a Task goes to sleep.  There are
// two reasons it might need to sleep:
//    - there are no threads to run
//    - we need to yield this Capability to someone else 
//      (see shouldYieldCapability())
//
629
630
631
// Careful: the scheduler loop is quite delicate.  Make sure you run
// the tests in testsuite/concurrent (all ways) after modifying this,
// and also check the benchmarks in nofib/parallel for regressions.
632
633

static void
634
scheduleYield (Capability **pcap, Task *task)
635
636
637
638
{
    Capability *cap = *pcap;

    // if we have work, and we don't need to give up the Capability, continue.
639
    //
640
    if (!shouldYieldCapability(cap,task) && 
641
        (!emptyRunQueue(cap) ||
642
         !emptyInbox(cap) ||
643
         sched_state >= SCHED_INTERRUPTING))
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
        return;

    // otherwise yield (sleep), and keep yielding if necessary.
    do {
        yieldCapability(&cap,task);
    } 
    while (shouldYieldCapability(cap,task));

    // note there may still be no threads on the run queue at this
    // point, the caller has to check.

    *pcap = cap;
    return;
}
#endif
    
660
661
662
663
664
665
666
/* -----------------------------------------------------------------------------
 * schedulePushWork()
 *
 * Push work to other Capabilities if we have some.
 * -------------------------------------------------------------------------- */

static void
667
668
schedulePushWork(Capability *cap USED_IF_THREADS, 
		 Task *task      USED_IF_THREADS)
669
{
670
671
672
673
  /* following code not for PARALLEL_HASKELL. I kept the call general,
     future GUM versions might use pushing in a distributed setup */
#if defined(THREADED_RTS)

674
675
676
    Capability *free_caps[n_capabilities], *cap0;
    nat i, n_free_caps;

Simon Marlow's avatar
Simon Marlow committed
677
    // migration can be turned off with +RTS -qm
678
679
    if (!RtsFlags.ParFlags.migrate) return;

680
681
    // Check whether we have more threads on our run queue, or sparks
    // in our pool, that we could hand to another Capability.
682
683
684
685
686
    if (cap->run_queue_hd == END_TSO_QUEUE) {
        if (sparkPoolSizeCap(cap) < 2) return;
    } else {
        if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
            sparkPoolSizeCap(cap) < 1) return;
687
688
689
690
691
692
    }

    // First grab as many free Capabilities as we can.
    for (i=0, n_free_caps=0; i < n_capabilities; i++) {
	cap0 = &capabilities[i];
	if (cap != cap0 && tryGrabCapability(cap0,task)) {
693
694
695
	    if (!emptyRunQueue(cap0)
                || cap->returning_tasks_hd != NULL
                || cap->inbox != (Message*)END_TSO_QUEUE) {
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
		// it already has some work, we just grabbed it at 
		// the wrong moment.  Or maybe it's deadlocked!
		releaseCapability(cap0);
	    } else {
		free_caps[n_free_caps++] = cap0;
	    }
	}
    }

    // we now have n_free_caps free capabilities stashed in
    // free_caps[].  Share our run queue equally with them.  This is
    // probably the simplest thing we could do; improvements we might
    // want to do include:
    //
    //   - giving high priority to moving relatively new threads, on 
    //     the gournds that they haven't had time to build up a
    //     working set in the cache on this CPU/Capability.
    //
    //   - giving low priority to moving long-lived threads

    if (n_free_caps > 0) {
	StgTSO *prev, *t, *next;
Ian Lynagh's avatar
Ian Lynagh committed
718
#ifdef SPARK_PUSHING
719
	rtsBool pushed_to_all;
Ian Lynagh's avatar
Ian Lynagh committed
720
#endif
721

722
723
724
725
726
727
	debugTrace(DEBUG_sched, 
		   "cap %d: %s and %d free capabilities, sharing...", 
		   cap->no, 
		   (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
		   "excess threads on run queue":"sparks to share (>=2)",
		   n_free_caps);
728
729

	i = 0;
Ian Lynagh's avatar
Ian Lynagh committed
730
#ifdef SPARK_PUSHING
731
	pushed_to_all = rtsFalse;
Ian Lynagh's avatar
Ian Lynagh committed
732
#endif
733
734
735

	if (cap->run_queue_hd != END_TSO_QUEUE) {
	    prev = cap->run_queue_hd;
736
737
	    t = prev->_link;
	    prev->_link = END_TSO_QUEUE;
738
	    for (; t != END_TSO_QUEUE; t = next) {
739
740
		next = t->_link;
		t->_link = END_TSO_QUEUE;
741
                if (t->bound == task->incall // don't move my bound thread
742
		    || tsoLocked(t)) {  // don't move a locked thread
743
		    setTSOLink(cap, prev, t);
744
                    setTSOPrev(cap, t, prev);
745
746
		    prev = t;
		} else if (i == n_free_caps) {
Ian Lynagh's avatar
Ian Lynagh committed
747
#ifdef SPARK_PUSHING
748
		    pushed_to_all = rtsTrue;
Ian Lynagh's avatar
Ian Lynagh committed
749
#endif
750
751
		    i = 0;
		    // keep one for us
752
		    setTSOLink(cap, prev, t);
753
                    setTSOPrev(cap, t, prev);
754
755
756
		    prev = t;
		} else {
		    appendToRunQueue(free_caps[i],t);
Simon Marlow's avatar
Simon Marlow committed
757

758
                    traceEventMigrateThread (cap, t, free_caps[i]->no);
Simon Marlow's avatar
Simon Marlow committed
759

760
		    if (t->bound) { t->bound->task->cap = free_caps[i]; }
761
		    t->cap = free_caps[i];
762
763
764
765
		    i++;
		}
	    }
	    cap->run_queue_tl = prev;
766
767

            IF_DEBUG(sanity, checkRunQueue(cap));
768
769
	}

770
771
772
#ifdef SPARK_PUSHING
	/* JB I left this code in place, it would work but is not necessary */

773
774
775
776
777
778
779
	// If there are some free capabilities that we didn't push any
	// threads to, then try to push a spark to each one.
	if (!pushed_to_all) {
	    StgClosure *spark;
	    // i is the next free capability to push to
	    for (; i < n_free_caps; i++) {
		if (emptySparkPoolCap(free_caps[i])) {
780
		    spark = tryStealSpark(cap->sparks);
781
		    if (spark != NULL) {
Simon Marlow's avatar
Simon Marlow committed
782
			debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
783

784
            traceEventStealSpark(free_caps[i], t, cap->no);
785

786
787
788
			newSpark(&(free_caps[i]->r), spark);
		    }
		}
789
790
	    }
	}
791
#endif /* SPARK_PUSHING */
792
793
794
795

	// release the capabilities
	for (i = 0; i < n_free_caps; i++) {
	    task->cap = free_caps[i];
796
	    releaseAndWakeupCapability(free_caps[i]);
797
798
799
	}
    }
    task->cap = cap; // reset to point to our Capability.
800
801
802

#endif /* THREADED_RTS */

803
804
}

805
806
807
808
/* ----------------------------------------------------------------------------
 * Start any pending signal handlers
 * ------------------------------------------------------------------------- */

809
#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
810
static void
811
scheduleStartSignalHandlers(Capability *cap)
812
{
813
814
    if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
        // safe outside the lock
815
	startSignalHandlers(cap);
816
817
    }
}
818
819
820
821
822
823
#else
static void
scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
{
}
#endif
824
825
826
827
828
829

/* ----------------------------------------------------------------------------
 * Check for blocked threads that can be woken up.
 * ------------------------------------------------------------------------- */

static void
830
scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
831
{
832
#if !defined(THREADED_RTS)
833
834
835
836
837
    //
    // Check whether any waiting threads need to be woken up.  If the
    // run queue is empty, and there are no other tasks running, we
    // can wait indefinitely for something to happen.
    //
838
    if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
839
    {
840
	awaitEvent (emptyRunQueue(cap));
841
842
843
844
    }
#endif
}

845
846
847
848
849
/* ----------------------------------------------------------------------------
 * Detect deadlock conditions and attempt to resolve them.
 * ------------------------------------------------------------------------- */

static void
850
scheduleDetectDeadlock (Capability *cap, Task *task)
851
852
853
{
    /* 
     * Detect deadlock: when we have no threads to run, there are no
854
855
856
     * threads blocked, waiting for I/O, or sleeping, and all the
     * other tasks are waiting for work, we must have a deadlock of
     * some description.
857
     */
858
    if ( emptyThreadQueues(cap) )
859
    {
860
#if defined(THREADED_RTS)
861
862
863
864
865
866
867
868
869
	/* 
	 * In the threaded RTS, we only check for deadlock if there
	 * has been no activity in a complete timeslice.  This means
	 * we won't eagerly start a full GC just because we don't have
	 * any threads to run currently.
	 */
	if (recent_activity != ACTIVITY_INACTIVE) return;
#endif

Simon Marlow's avatar
Simon Marlow committed
870
	debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
871
872
873
874
875
876

	// Garbage collection can release some new threads due to
	// either (a) finalizers or (b) threads resurrected because
	// they are unreachable and will therefore be sent an
	// exception.  Any threads thus released will be immediately
	// runnable.
877
878
879
880
	cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
        // when force_major == rtsTrue. scheduleDoGC sets
        // recent_activity to ACTIVITY_DONE_GC and turns off the timer
        // signal.
881

882
	if ( !emptyRunQueue(cap) ) return;
883

884
#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
885
886
887
888
	/* If we have user-installed signal handlers, then wait
	 * for signals to arrive rather then bombing out with a
	 * deadlock.
	 */
889
	if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
Simon Marlow's avatar
Simon Marlow committed
890
891
	    debugTrace(DEBUG_sched,
		       "still deadlocked, waiting for signals...");
892
893
894
895

	    awaitUserSignals();

	    if (signals_pending()) {
896
		startSignalHandlers(cap);
897
898
899
	    }

	    // either we have threads to run, or we were interrupted:
900
	    ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
901
902

            return;
903
904
905
	}
#endif

906
#if !defined(THREADED_RTS)
907
	/* Probably a real deadlock.  Send the current main thread the
908
	 * Deadlock exception.
909
	 */
910
911
	if (task->incall->tso) {
	    switch (task->incall->tso->why_blocked) {
912
	    case BlockedOnSTM:
913
	    case BlockedOnBlackHole:
914
	    case BlockedOnMsgThrowTo:
915
	    case BlockedOnMVar:
916
		throwToSingleThreaded(cap, task->incall->tso, 
917
				      (StgClosure *)nonTermination_closure);
918
919
920
921
922
		return;
	    default:
		barf("deadlock: main thread blocked in a strange way");
	    }
	}
923
	return;
924
#endif
925
    }
926
927
}

928

929
930
931
932
/* ----------------------------------------------------------------------------
 * Send pending messages (PARALLEL_HASKELL only)
 * ------------------------------------------------------------------------- */

Simon Marlow's avatar
Simon Marlow committed
933
#if defined(PARALLEL_HASKELL)
934
static void
935
936
937
938
scheduleSendPendingMessages(void)
{

# if defined(PAR) // global Mem.Mgmt., omit for now
939
940
941
    if (PendingFetches != END_BF_QUEUE) {
        processFetches();
    }
942
943
944
945
946
947
948
# endif
    
    if (RtsFlags.ParFlags.BufferTime) {
	// if we use message buffering, we must send away all message
	// packets which have become too old...
	sendOldBuffers(); 
    }
949
}
Simon Marlow's avatar
Simon Marlow committed
950
#endif
951

952
953
954
955
956
957
958
959
/* ----------------------------------------------------------------------------
 * Process message in the current Capability's inbox
 * ------------------------------------------------------------------------- */

static void
scheduleProcessInbox (Capability *cap USED_IF_THREADS)
{
#if defined(THREADED_RTS)
960
961
    Message *m, *next;
    int r;
962
963

    while (!emptyInbox(cap)) {
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
        if (cap->r.rCurrentNursery->link == NULL ||
            g0->n_new_large_words >= large_alloc_lim) {
            scheduleDoGC(cap, cap->running_task, rtsFalse);
        }

        // don't use a blocking acquire; if the lock is held by
        // another thread then just carry on.  This seems to avoid
        // getting stuck in a message ping-pong situation with other
        // processors.  We'll check the inbox again later anyway.
        //
        // We should really use a more efficient queue data structure
        // here.  The trickiness is that we must ensure a Capability
        // never goes idle if the inbox is non-empty, which is why we
        // use cap->lock (cap->lock is released as the last thing
        // before going idle; see Capability.c:releaseCapability()).
        r = TRY_ACQUIRE_LOCK(&cap->lock);
        if (r != 0) return;

982
        m = cap->inbox;
983
984
        cap->inbox = (Message*)END_TSO_QUEUE;

985
        RELEASE_LOCK(&cap->lock);
986
987
988
989
990
991

        while (m != (Message*)END_TSO_QUEUE) {
            next = m->link;
            executeMessage(cap, m);
            m = next;
        }
992
993
994
995
    }
#endif
}

996
/* ----------------------------------------------------------------------------
997
 * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
998
999
 * ------------------------------------------------------------------------- */

Simon Marlow's avatar
Simon Marlow committed
1000
#if defined(THREADED_RTS)