Schedule.c 95.1 KB
Newer Older
1
/* ---------------------------------------------------------------------------
2
 *
3
 * (c) The GHC Team, 1998-2006
4
 *
5
 * The scheduler and thread-related functionality
sof's avatar
sof committed
6
 *
7
8
 * --------------------------------------------------------------------------*/

9
#include "PosixSource.h"
10
11
12
13
#include "Rts.h"
#include "SchedAPI.h"
#include "RtsUtils.h"
#include "RtsFlags.h"
14
#include "OSThreads.h"
15
16
17
18
19
#include "Storage.h"
#include "StgRun.h"
#include "Hooks.h"
#include "Schedule.h"
#include "StgMiscClosures.h"
20
#include "Interpreter.h"
21
#include "Printer.h"
22
#include "RtsSignals.h"
23
#include "Sanity.h"
24
#include "Stats.h"
25
#include "STM.h"
sof's avatar
sof committed
26
#include "Timer.h"
27
#include "Prelude.h"
28
#include "ThreadLabels.h"
29
30
#include "LdvProfile.h"
#include "Updates.h"
31
32
33
34
#ifdef PROFILING
#include "Proftimer.h"
#include "ProfHeap.h"
#endif
35
#if defined(GRAN) || defined(PARALLEL_HASKELL)
36
37
38
39
40
41
42
43
# include "GranSimRts.h"
# include "GranSim.h"
# include "ParallelRts.h"
# include "Parallel.h"
# include "ParallelDebug.h"
# include "FetchMe.h"
# include "HLC.h"
#endif
44
#include "Sparks.h"
sof's avatar
sof committed
45
#include "Capability.h"
46
47
#include "Task.h"
#include "AwaitEvent.h"
48
49
50
#if defined(mingw32_HOST_OS)
#include "win32/IOManager.h"
#endif
Simon Marlow's avatar
Simon Marlow committed
51
#include "Trace.h"
52
53
#include "RaiseAsync.h"
#include "Threads.h"
54
#include "ThrIOManager.h"
55

56
57
58
59
60
61
62
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif

63
64
#include <string.h>
#include <stdlib.h>
65
#include <stdarg.h>
66

67
68
69
70
#ifdef HAVE_ERRNO_H
#include <errno.h>
#endif

71
72
73
74
75
76
// Turn off inlining when debugging - it obfuscates things
#ifdef DEBUG
# undef  STATIC_INLINE
# define STATIC_INLINE static
#endif

77
78
79
/* -----------------------------------------------------------------------------
 * Global variables
 * -------------------------------------------------------------------------- */
80

81
82
83
#if defined(GRAN)

StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
84
/* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
85
86

/* 
sof's avatar
sof committed
87
   In GranSim we have a runnable and a blocked queue for each processor.
88
89
90
91
92
93
94
95
   In order to minimise code changes new arrays run_queue_hds/tls
   are created. run_queue_hd is then a short cut (macro) for
   run_queue_hds[CurrentProc] (see GranSim.h).
   -- HWL
*/
StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
StgTSO *ccalling_threadss[MAX_PROC];
96
97
98
99
/* We use the same global list of threads (all_threads) in GranSim as in
   the std RTS (i.e. we are cheating). However, we don't use this list in
   the GranSim specific code at the moment (so we are only potentially
   cheating).  */
100
101
102

#else /* !GRAN */

103
104
#if !defined(THREADED_RTS)
// Blocked/sleeping thrads
105
106
StgTSO *blocked_queue_hd = NULL;
StgTSO *blocked_queue_tl = NULL;
107
108
StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
#endif
109

110
111
112
113
/* Threads blocked on blackholes.
 * LOCK: sched_mutex+capability, or all capabilities
 */
StgTSO *blackhole_queue = NULL;
114
115
#endif

116
117
/* The blackhole_queue should be checked for threads to wake up.  See
 * Schedule.h for more thorough comment.
118
 * LOCK: none (doesn't matter if we miss an update)
119
120
121
 */
rtsBool blackholes_need_checking = rtsFalse;

122
123
/* Linked list of all threads.
 * Used for detecting garbage collected threads.
124
 * LOCK: sched_mutex+capability, or all capabilities
125
 */
126
StgTSO *all_threads = NULL;
127

128
129
/* flag set by signal handler to precipitate a context switch
 * LOCK: none (just an advisory flag)
130
 */
131
int context_switch = 0;
132

133
134
135
136
/* flag that tracks whether we have done any execution in this time slice.
 * LOCK: currently none, perhaps we should lock (but needs to be
 * updated in the fast path of the scheduler).
 */
137
138
nat recent_activity = ACTIVITY_YES;

139
140
141
/* if this flag is set as well, give up execution
 * LOCK: none (changes once, from false->true)
 */
142
rtsBool sched_state = SCHED_RUNNING;
143

144
#if defined(GRAN)
145
StgTSO *CurrentTSO;
146
147
#endif

148
149
150
151
152
153
/*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
 *  exists - earlier gccs apparently didn't.
 *  -= chak
 */
StgTSO dummy_tso;

sof's avatar
sof committed
154
155
156
157
158
/*
 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
 * in an MT setting, needed to signal that a worker thread shouldn't hang around
 * in the scheduler when it is out of work.
 */
159
rtsBool shutting_down_scheduler = rtsFalse;
160

161
162
/*
 * This mutex protects most of the global scheduler data in
163
 * the THREADED_RTS runtime.
sof's avatar
sof committed
164
 */
165
#if defined(THREADED_RTS)
166
Mutex sched_mutex;
167
#endif
sof's avatar
sof committed
168

169
#if defined(PARALLEL_HASKELL)
170
171
StgTSO *LastTSO;
rtsTime TimeOfLastYield;
172
rtsBool emitSchedule = rtsTrue;
173
174
#endif

175
176
177
178
#if !defined(mingw32_HOST_OS)
#define FORKPROCESS_PRIMOP_SUPPORTED
#endif

179
180
181
182
/* -----------------------------------------------------------------------------
 * static function prototypes
 * -------------------------------------------------------------------------- */

183
static Capability *schedule (Capability *initialCapability, Task *task);
184
185
186
187
188
189

//
// These function all encapsulate parts of the scheduler loop, and are
// abstracted only to make the structure and control flow of the
// scheduler clearer.
//
190
static void schedulePreLoop (void);
191
#if defined(THREADED_RTS)
192
static void schedulePushWork(Capability *cap, Task *task);
193
#endif
194
static void scheduleStartSignalHandlers (Capability *cap);
195
static void scheduleCheckBlockedThreads (Capability *cap);
196
static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
197
198
static void scheduleCheckBlackHoles (Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
199
200
201
202
203
204
205
206
207
208
209
210
211
#if defined(GRAN)
static StgTSO *scheduleProcessEvent(rtsEvent *event);
#endif
#if defined(PARALLEL_HASKELL)
static StgTSO *scheduleSendPendingMessages(void);
static void scheduleActivateSpark(void);
static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
#endif
#if defined(PAR) || defined(GRAN)
static void scheduleGranParReport(void);
#endif
static void schedulePostRunThread(void);
static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
212
213
214
215
static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
					 StgTSO *t);
static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
				    nat prev_what_next );
216
static void scheduleHandleThreadBlocked( StgTSO *t );
217
218
static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
					     StgTSO *t );
219
static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
220
static Capability *scheduleDoGC(Capability *cap, Task *task,
221
				rtsBool force_major);
222
223

static rtsBool checkBlackHoles(Capability *cap);
224

225
static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
226

227
static void deleteThread (Capability *cap, StgTSO *tso);
228
static void deleteAllThreads (Capability *cap);
229

230
231
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
static void deleteThread_(Capability *cap, StgTSO *tso);
232
#endif
233
234

#if defined(PARALLEL_HASKELL)
235
236
237
238
StgTSO * createSparkThread(rtsSpark spark);
StgTSO * activateSpark (rtsSpark spark);  
#endif

239
240
241
242
243
244
245
246
247
#ifdef DEBUG
static char *whatNext_strs[] = {
  "(unknown)",
  "ThreadRunGHC",
  "ThreadInterpret",
  "ThreadKilled",
  "ThreadRelocated",
  "ThreadComplete"
};
248
#endif
sof's avatar
sof committed
249

250
251
252
253
254
/* -----------------------------------------------------------------------------
 * Putting a thread on the run queue: different scheduling policies
 * -------------------------------------------------------------------------- */

STATIC_INLINE void
255
addToRunQueue( Capability *cap, StgTSO *t )
256
257
258
259
{
#if defined(PARALLEL_HASKELL)
    if (RtsFlags.ParFlags.doFairScheduling) { 
	// this does round-robin scheduling; good for concurrency
260
	appendToRunQueue(cap,t);
261
262
    } else {
	// this does unfair scheduling; good for parallelism
263
	pushOnRunQueue(cap,t);
264
265
266
    }
#else
    // this does round-robin scheduling; good for concurrency
267
    appendToRunQueue(cap,t);
268
269
#endif
}
270

271
/* ---------------------------------------------------------------------------
272
273
274
275
276
277
278
279
280
281
282
   Main scheduling loop.

   We use round-robin scheduling, each thread returning to the
   scheduler loop when one of these conditions is detected:

      * out of heap space
      * timer expires (thread yields)
      * thread blocks
      * thread ends
      * stack overflow

283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
   GRAN version:
     In a GranSim setup this loop iterates over the global event queue.
     This revolves around the global event queue, which determines what 
     to do next. Therefore, it's more complicated than either the 
     concurrent or the parallel (GUM) setup.

   GUM version:
     GUM iterates over incoming messages.
     It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
     and sends out a fish whenever it has nothing to do; in-between
     doing the actual reductions (shared code below) it processes the
     incoming messages and deals with delayed operations 
     (see PendingFetches).
     This is not the ugliest code you could imagine, but it's bloody close.

298
   ------------------------------------------------------------------------ */
299

300
301
static Capability *
schedule (Capability *initialCapability, Task *task)
302
303
{
  StgTSO *t;
304
  Capability *cap;
305
  StgThreadReturnCode ret;
306
307
#if defined(GRAN)
  rtsEvent *event;
308
#elif defined(PARALLEL_HASKELL)
309
310
  StgTSO *tso;
  GlobalTaskId pe;
311
312
313
314
  rtsBool receivedFinish = rtsFalse;
# if defined(DEBUG)
  nat tp_size, sp_size; // stats only
# endif
315
#endif
316
  nat prev_what_next;
317
  rtsBool ready_to_gc;
318
#if defined(THREADED_RTS)
319
  rtsBool first = rtsTrue;
320
#endif
321
  
322
323
  cap = initialCapability;

324
325
326
  // Pre-condition: this task owns initialCapability.
  // The sched_mutex is *NOT* held
  // NB. on return, we still hold a capability.
327

Simon Marlow's avatar
Simon Marlow committed
328
329
330
  debugTrace (DEBUG_sched, 
	      "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
	      task, initialCapability);
331

332
  schedulePreLoop();
333

334
335
  // -----------------------------------------------------------
  // Scheduler loop starts here:
336

337
338
339
340
341
342
343
#if defined(PARALLEL_HASKELL)
#define TERMINATION_CONDITION        (!receivedFinish)
#elif defined(GRAN)
#define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
#else
#define TERMINATION_CONDITION        rtsTrue
#endif
344

345
  while (TERMINATION_CONDITION) {
346

347
348
349
350
351
#if defined(GRAN)
      /* Choose the processor with the next event */
      CurrentProc = event->proc;
      CurrentTSO = event->tso;
#endif
352

353
354
355
356
357
358
#if defined(THREADED_RTS)
      if (first) {
	  // don't yield the first time, we want a chance to run this
	  // thread for a bit, even if there are others banging at the
	  // door.
	  first = rtsFalse;
359
	  ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
360
361
362
      } else {
	  // Yield the capability to higher-priority tasks if necessary.
	  yieldCapability(&cap, task);
363
364
      }
#endif
365
      
366
#if defined(THREADED_RTS)
367
      schedulePushWork(cap,task);
368
#endif
369

370
371
372
    // Check whether we have re-entered the RTS from Haskell without
    // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
    // call).
373
    if (cap->in_haskell) {
374
375
    	  errorBelch("schedule: re-entered unsafely.\n"
    		     "   Perhaps a 'foreign import unsafe' should be 'safe'?");
376
    	  stg_exit(EXIT_FAILURE);
377
378
    }

379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
    // The interruption / shutdown sequence.
    // 
    // In order to cleanly shut down the runtime, we want to:
    //   * make sure that all main threads return to their callers
    //     with the state 'Interrupted'.
    //   * clean up all OS threads assocated with the runtime
    //   * free all memory etc.
    //
    // So the sequence for ^C goes like this:
    //
    //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
    //     arranges for some Capability to wake up
    //
    //   * all threads in the system are halted, and the zombies are
    //     placed on the run queue for cleaning up.  We acquire all
    //     the capabilities in order to delete the threads, this is
    //     done by scheduleDoGC() for convenience (because GC already
    //     needs to acquire all the capabilities).  We can't kill
    //     threads involved in foreign calls.
    // 
    //   * somebody calls shutdownHaskell(), which calls exitScheduler()
    //
    //   * sched_state := SCHED_SHUTTING_DOWN
402
    //
403
404
405
    //   * all workers exit when the run queue on their capability
    //     drains.  All main threads will also exit when their TSO
    //     reaches the head of the run queue and they can return.
406
    //
407
408
409
410
411
412
413
414
415
416
    //   * eventually all Capabilities will shut down, and the RTS can
    //     exit.
    //
    //   * We might be left with threads blocked in foreign calls, 
    //     we should really attempt to kill these somehow (TODO);
    
    switch (sched_state) {
    case SCHED_RUNNING:
	break;
    case SCHED_INTERRUPTING:
Simon Marlow's avatar
Simon Marlow committed
417
	debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
418
#if defined(THREADED_RTS)
419
420
	discardSparksCap(cap);
#endif
421
	/* scheduleDoGC() deletes all the threads */
422
	cap = scheduleDoGC(cap,task,rtsFalse);
423
424
	break;
    case SCHED_SHUTTING_DOWN:
Simon Marlow's avatar
Simon Marlow committed
425
	debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
426
427
428
429
430
	// If we are a worker, just exit.  If we're a bound thread
	// then we will exit below when we've removed our TSO from
	// the run queue.
	if (task->tso == NULL && emptyRunQueue(cap)) {
	    return cap;
431
	}
432
433
434
	break;
    default:
	barf("sched_state: %d", sched_state);
435
    }
436

437
#if defined(THREADED_RTS)
438
    // If the run queue is empty, take a spark and turn it into a thread.
439
    {
440
441
442
443
	if (emptyRunQueue(cap)) {
	    StgClosure *spark;
	    spark = findSpark(cap);
	    if (spark != NULL) {
Simon Marlow's avatar
Simon Marlow committed
444
445
446
		debugTrace(DEBUG_sched,
			   "turning spark of closure %p into a thread",
			   (StgClosure *)spark);
447
		createSparkThread(cap,spark);	  
448
449
	    }
	}
450
    }
451
#endif // THREADED_RTS
452

453
    scheduleStartSignalHandlers(cap);
454

455
456
457
458
    // Only check the black holes here if we've nothing else to do.
    // During normal execution, the black hole list only gets checked
    // at GC time, to avoid repeatedly traversing this possibly long
    // list each time around the scheduler.
459
    if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
460

461
    scheduleCheckWakeupThreads(cap);
462

463
    scheduleCheckBlockedThreads(cap);
464

465
    scheduleDetectDeadlock(cap,task);
466
467
468
#if defined(THREADED_RTS)
    cap = task->cap;    // reload cap, it might have changed
#endif
469
470
471
472
473
474

    // Normally, the only way we can get here with no threads to
    // run is if a keyboard interrupt received during 
    // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
    // Additionally, it is not fatal for the
    // threaded RTS to reach here with no threads to run.
475
    //
476
477
    // win32: might be here due to awaitEvent() being abandoned
    // as a result of a console event having been delivered.
478
479
    if ( emptyRunQueue(cap) ) {
#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
480
	ASSERT(sched_state >= SCHED_INTERRUPTING);
481
#endif
482
	continue; // nothing to do
483
    }
484

485
486
#if defined(PARALLEL_HASKELL)
    scheduleSendPendingMessages();
487
    if (emptyRunQueue(cap) && scheduleActivateSpark()) 
488
	continue;
489

490
491
492
#if defined(SPARKS)
    ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
#endif
493

494
495
    /* If we still have no work we need to send a FISH to get a spark
       from another PE */
496
    if (emptyRunQueue(cap)) {
497
498
499
500
501
502
503
504
505
506
507
508
	if (!scheduleGetRemoteWork(&receivedFinish)) continue;
	ASSERT(rtsFalse); // should not happen at the moment
    }
    // from here: non-empty run queue.
    //  TODO: merge above case with this, only one call processMessages() !
    if (PacketsWaiting()) {  /* process incoming messages, if
				any pending...  only in else
				because getRemoteWork waits for
				messages as well */
	receivedFinish = processMessages();
    }
#endif
509

510
511
512
#if defined(GRAN)
    scheduleProcessEvent(event);
#endif
513

514
515
516
    // 
    // Get a thread to run
    //
517
    t = popRunQueue(cap);
518

519
520
521
522
523
524
#if defined(GRAN) || defined(PAR)
    scheduleGranParReport(); // some kind of debuging output
#else
    // Sanity check the thread we're about to run.  This can be
    // expensive if there is lots of thread switching going on...
    IF_DEBUG(sanity,checkTSO(t));
525
526
#endif

527
#if defined(THREADED_RTS)
528
529
530
    // Check whether we can run this thread in the current task.
    // If not, we have to pass our capability to the right task.
    {
531
	Task *bound = t->bound;
532
      
533
534
	if (bound) {
	    if (bound == task) {
Simon Marlow's avatar
Simon Marlow committed
535
		debugTrace(DEBUG_sched,
536
			   "### Running thread %lu in bound thread", (unsigned long)t->id);
537
538
		// yes, the Haskell thread is bound to the current native thread
	    } else {
Simon Marlow's avatar
Simon Marlow committed
539
		debugTrace(DEBUG_sched,
540
			   "### thread %lu bound to another OS thread", (unsigned long)t->id);
541
542
543
544
545
546
547
		// no, bound to a different Haskell thread: pass to that thread
		pushOnRunQueue(cap,t);
		continue;
	    }
	} else {
	    // The thread we want to run is unbound.
	    if (task->tso) { 
Simon Marlow's avatar
Simon Marlow committed
548
		debugTrace(DEBUG_sched,
549
			   "### this OS thread cannot run thread %lu", (unsigned long)t->id);
550
551
552
553
554
		// no, the current native thread is bound to a different
		// Haskell thread, so pass it to any worker thread
		pushOnRunQueue(cap,t);
		continue; 
	    }
555
	}
556
557
558
    }
#endif

559
560
    cap->r.rCurrentTSO = t;
    
561
    /* context switches are initiated by the timer signal, unless
562
563
     * the user specified "context switch as often as possible", with
     * +RTS -C0
sof's avatar
sof committed
564
     */
565
566
    if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
	&& !emptyThreadQueues(cap)) {
567
	context_switch = 1;
568
569
    }
	 
570
run_thread:
571

Simon Marlow's avatar
Simon Marlow committed
572
573
    debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
			      (long)t->id, whatNext_strs[t->what_next]);
574

575
576
577
578
#if defined(PROFILING)
    startHeapProfTimer();
#endif

579
580
581
    // Check for exceptions blocked on this thread
    maybePerformBlockedException (cap, t);

582
583
584
    // ----------------------------------------------------------------------
    // Run the current thread 

Simon Marlow's avatar
Simon Marlow committed
585
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
586
    ASSERT(t->cap == cap);
Simon Marlow's avatar
Simon Marlow committed
587

588
589
590
    prev_what_next = t->what_next;

    errno = t->saved_errno;
591
    cap->in_haskell = rtsTrue;
592

Simon Marlow's avatar
Simon Marlow committed
593
594
    dirtyTSO(t);

595
596
    recent_activity = ACTIVITY_YES;

597
    switch (prev_what_next) {
598
	
599
600
601
602
603
    case ThreadKilled:
    case ThreadComplete:
	/* Thread already finished, return to scheduler. */
	ret = ThreadFinished;
	break;
604
	
605
    case ThreadRunGHC:
606
607
608
609
610
    {
	StgRegTable *r;
	r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
	cap = regTableToCapability(r);
	ret = r->rRet;
611
	break;
612
613
    }
    
614
    case ThreadInterpret:
615
616
	cap = interpretBCO(cap);
	ret = cap->r.rRet;
617
	break;
618
	
619
    default:
620
	barf("schedule: invalid what_next field");
621
622
    }

623
    cap->in_haskell = rtsFalse;
624

625
626
627
628
    // The TSO might have moved, eg. if it re-entered the RTS and a GC
    // happened.  So find the new location:
    t = cap->r.rCurrentTSO;

629
630
631
632
633
634
635
636
637
638
639
640
    // We have run some Haskell code: there might be blackhole-blocked
    // threads to wake up now.
    // Lock-free test here should be ok, we're just setting a flag.
    if ( blackhole_queue != END_TSO_QUEUE ) {
	blackholes_need_checking = rtsTrue;
    }
    
    // And save the current errno in this thread.
    // XXX: possibly bogus for SMP because this thread might already
    // be running again, see code below.
    t->saved_errno = errno;

641
#if defined(THREADED_RTS)
642
643
644
645
646
647
    // If ret is ThreadBlocked, and this Task is bound to the TSO that
    // blocked, we are in limbo - the TSO is now owned by whatever it
    // is blocked on, and may in fact already have been woken up,
    // perhaps even on a different Capability.  It may be the case
    // that task->cap != cap.  We better yield this Capability
    // immediately and return to normaility.
648
    if (ret == ThreadBlocked) {
Simon Marlow's avatar
Simon Marlow committed
649
	debugTrace(DEBUG_sched,
650
651
		   "--<< thread %lu (%s) stopped: blocked",
		   (unsigned long)t->id, whatNext_strs[t->what_next]);
652
653
	continue;
    }
654
655
#endif

656
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
657
    ASSERT(t->cap == cap);
658

659
    // ----------------------------------------------------------------------
660
    
661
    // Costs for the scheduler are assigned to CCS_SYSTEM
662
663
664
665
666
667
668
#if defined(PROFILING)
    stopHeapProfTimer();
    CCCS = CCS_SYSTEM;
#endif
    
    schedulePostRunThread();

669
670
    ready_to_gc = rtsFalse;

671
672
673
674
675
676
    switch (ret) {
    case HeapOverflow:
	ready_to_gc = scheduleHandleHeapOverflow(cap,t);
	break;

    case StackOverflow:
677
	scheduleHandleStackOverflow(cap,task,t);
678
679
680
	break;

    case ThreadYielding:
681
	if (scheduleHandleYield(cap, t, prev_what_next)) {
682
683
684
685
686
687
688
689
690
691
            // shortcut for switching between compiler/interpreter:
	    goto run_thread; 
	}
	break;

    case ThreadBlocked:
	scheduleHandleThreadBlocked(t);
	break;

    case ThreadFinished:
692
	if (scheduleHandleThreadFinished(cap, task, t)) return cap;
693
	ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
694
695
696
697
698
699
	break;

    default:
      barf("schedule: invalid thread return code %d", (int)ret);
    }

700
    if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
701
    if (ready_to_gc) {
702
      cap = scheduleDoGC(cap,task,rtsFalse);
703
    }
704
705
  } /* end of while() */

Simon Marlow's avatar
Simon Marlow committed
706
707
  debugTrace(PAR_DEBUG_verbose,
	     "== Leaving schedule() after having received Finish");
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
}

/* ----------------------------------------------------------------------------
 * Setting up the scheduler loop
 * ------------------------------------------------------------------------- */

static void
schedulePreLoop(void)
{
#if defined(GRAN) 
    /* set up first event to get things going */
    /* ToDo: assign costs for system setup and init MainTSO ! */
    new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
	      ContinueThread, 
	      CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
    
Simon Marlow's avatar
Simon Marlow committed
724
725
726
727
    debugTrace (DEBUG_gran,
		"GRAN: Init CurrentTSO (in schedule) = %p", 
		CurrentTSO);
    IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
728
729
730
731
732
733
734
735
    
    if (RtsFlags.GranFlags.Light) {
	/* Save current time; GranSim Light only */
	CurrentTSO->gran.clock = CurrentTime[CurrentProc];
    }      
#endif
}

736
737
738
739
740
741
/* -----------------------------------------------------------------------------
 * schedulePushWork()
 *
 * Push work to other Capabilities if we have some.
 * -------------------------------------------------------------------------- */

742
#if defined(THREADED_RTS)
743
static void
744
745
schedulePushWork(Capability *cap USED_IF_THREADS, 
		 Task *task      USED_IF_THREADS)
746
747
748
749
{
    Capability *free_caps[n_capabilities], *cap0;
    nat i, n_free_caps;

750
751
752
    // migration can be turned off with +RTS -qg
    if (!RtsFlags.ParFlags.migrate) return;

753
754
755
756
    // Check whether we have more threads on our run queue, or sparks
    // in our pool, that we could hand to another Capability.
    if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
	&& sparkPoolSizeCap(cap) < 2) {
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
	return;
    }

    // First grab as many free Capabilities as we can.
    for (i=0, n_free_caps=0; i < n_capabilities; i++) {
	cap0 = &capabilities[i];
	if (cap != cap0 && tryGrabCapability(cap0,task)) {
	    if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
		// it already has some work, we just grabbed it at 
		// the wrong moment.  Or maybe it's deadlocked!
		releaseCapability(cap0);
	    } else {
		free_caps[n_free_caps++] = cap0;
	    }
	}
    }

    // we now have n_free_caps free capabilities stashed in
    // free_caps[].  Share our run queue equally with them.  This is
    // probably the simplest thing we could do; improvements we might
    // want to do include:
    //
    //   - giving high priority to moving relatively new threads, on 
    //     the gournds that they haven't had time to build up a
    //     working set in the cache on this CPU/Capability.
    //
    //   - giving low priority to moving long-lived threads

    if (n_free_caps > 0) {
	StgTSO *prev, *t, *next;
787
788
	rtsBool pushed_to_all;

Simon Marlow's avatar
Simon Marlow committed
789
	debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
790
791

	i = 0;
792
793
794
795
796
797
798
799
800
801
	pushed_to_all = rtsFalse;

	if (cap->run_queue_hd != END_TSO_QUEUE) {
	    prev = cap->run_queue_hd;
	    t = prev->link;
	    prev->link = END_TSO_QUEUE;
	    for (; t != END_TSO_QUEUE; t = next) {
		next = t->link;
		t->link = END_TSO_QUEUE;
		if (t->what_next == ThreadRelocated
802
803
		    || t->bound == task // don't move my bound thread
		    || tsoLocked(t)) {  // don't move a locked thread
804
805
806
807
808
809
810
811
812
		    prev->link = t;
		    prev = t;
		} else if (i == n_free_caps) {
		    pushed_to_all = rtsTrue;
		    i = 0;
		    // keep one for us
		    prev->link = t;
		    prev = t;
		} else {
813
		    debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
814
815
		    appendToRunQueue(free_caps[i],t);
		    if (t->bound) { t->bound->cap = free_caps[i]; }
816
		    t->cap = free_caps[i];
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
		    i++;
		}
	    }
	    cap->run_queue_tl = prev;
	}

	// If there are some free capabilities that we didn't push any
	// threads to, then try to push a spark to each one.
	if (!pushed_to_all) {
	    StgClosure *spark;
	    // i is the next free capability to push to
	    for (; i < n_free_caps; i++) {
		if (emptySparkPoolCap(free_caps[i])) {
		    spark = findSpark(cap);
		    if (spark != NULL) {
Simon Marlow's avatar
Simon Marlow committed
832
			debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
833
834
835
			newSpark(&(free_caps[i]->r), spark);
		    }
		}
836
837
838
839
840
841
842
843
844
845
846
	    }
	}

	// release the capabilities
	for (i = 0; i < n_free_caps; i++) {
	    task->cap = free_caps[i];
	    releaseCapability(free_caps[i]);
	}
    }
    task->cap = cap; // reset to point to our Capability.
}
847
#endif
848

849
850
851
852
/* ----------------------------------------------------------------------------
 * Start any pending signal handlers
 * ------------------------------------------------------------------------- */

853
#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
854
static void
855
scheduleStartSignalHandlers(Capability *cap)
856
{
857
    if (signals_pending()) { // safe outside the lock
858
	startSignalHandlers(cap);
859
860
    }
}
861
862
863
864
865
866
#else
static void
scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
{
}
#endif
867
868
869
870
871
872

/* ----------------------------------------------------------------------------
 * Check for blocked threads that can be woken up.
 * ------------------------------------------------------------------------- */

static void
873
scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
874
{
875
#if !defined(THREADED_RTS)
876
877
878
879
880
    //
    // Check whether any waiting threads need to be woken up.  If the
    // run queue is empty, and there are no other tasks running, we
    // can wait indefinitely for something to happen.
    //
881
    if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
882
    {
883
	awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
884
    }
885
#endif
886
887
888
}


889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
/* ----------------------------------------------------------------------------
 * Check for threads woken up by other Capabilities
 * ------------------------------------------------------------------------- */

static void
scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
{
#if defined(THREADED_RTS)
    // Any threads that were woken up by other Capabilities get
    // appended to our run queue.
    if (!emptyWakeupQueue(cap)) {
	ACQUIRE_LOCK(&cap->lock);
	if (emptyRunQueue(cap)) {
	    cap->run_queue_hd = cap->wakeup_queue_hd;
	    cap->run_queue_tl = cap->wakeup_queue_tl;
	} else {
	    cap->run_queue_tl->link = cap->wakeup_queue_hd;
	    cap->run_queue_tl = cap->wakeup_queue_tl;
	}
	cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
	RELEASE_LOCK(&cap->lock);
    }
#endif
}

914
915
916
917
/* ----------------------------------------------------------------------------
 * Check for threads blocked on BLACKHOLEs that can be woken up
 * ------------------------------------------------------------------------- */
static void
918
scheduleCheckBlackHoles (Capability *cap)
919
{
920
    if ( blackholes_need_checking ) // check without the lock first
921
    {
922
923
924
925
926
927
	ACQUIRE_LOCK(&sched_mutex);
	if ( blackholes_need_checking ) {
	    checkBlackHoles(cap);
	    blackholes_need_checking = rtsFalse;
	}
	RELEASE_LOCK(&sched_mutex);
928
929
930
931
932
933
934
935
    }
}

/* ----------------------------------------------------------------------------
 * Detect deadlock conditions and attempt to resolve them.
 * ------------------------------------------------------------------------- */

static void
936
scheduleDetectDeadlock (Capability *cap, Task *task)
937
{
938
939

#if defined(PARALLEL_HASKELL)
940
    // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
941
942
943
    return;
#endif

944
945
    /* 
     * Detect deadlock: when we have no threads to run, there are no
946
947
948
     * threads blocked, waiting for I/O, or sleeping, and all the
     * other tasks are waiting for work, we must have a deadlock of
     * some description.
949
     */
950
    if ( emptyThreadQueues(cap) )
951
    {
952
#if defined(THREADED_RTS)
953
954
955
956
957
958
959
960
961
	/* 
	 * In the threaded RTS, we only check for deadlock if there
	 * has been no activity in a complete timeslice.  This means
	 * we won't eagerly start a full GC just because we don't have
	 * any threads to run currently.
	 */
	if (recent_activity != ACTIVITY_INACTIVE) return;
#endif

Simon Marlow's avatar
Simon Marlow committed
962
	debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
963
964
965
966
967
968

	// Garbage collection can release some new threads due to
	// either (a) finalizers or (b) threads resurrected because
	// they are unreachable and will therefore be sent an
	// exception.  Any threads thus released will be immediately
	// runnable.
969
	cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
970

971
	recent_activity = ACTIVITY_DONE_GC;
972
973
	
	if ( !emptyRunQueue(cap) ) return;
974

975
#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
976
977
978
979
980
	/* If we have user-installed signal handlers, then wait
	 * for signals to arrive rather then bombing out with a
	 * deadlock.
	 */
	if ( anyUserHandlers() ) {
Simon Marlow's avatar
Simon Marlow committed
981
982
	    debugTrace(DEBUG_sched,
		       "still deadlocked, waiting for signals...");
983
984
985
986

	    awaitUserSignals();

	    if (signals_pending()) {
987
		startSignalHandlers(cap);
988
989
990
	    }

	    // either we have threads to run, or we were interrupted:
991
	    ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
992
993
994
	}
#endif

995
#if !defined(THREADED_RTS)
996
	/* Probably a real deadlock.  Send the current main thread the
997
	 * Deadlock exception.
998
	 */
999
1000
	if (task->tso) {
	    switch (task->tso->why_blocked) {
1001
	    case BlockedOnSTM:
1002
1003
1004
	    case BlockedOnBlackHole:
	    case BlockedOnException:
	    case BlockedOnMVar:
1005
1006
		throwToSingleThreaded(cap, task->tso, 
				      (StgClosure *)NonTermination_closure);
1007
1008
1009
1010
1011
		return;
	    default:
		barf("deadlock: main thread blocked in a strange way");
	    }
	}
1012
	return;
1013
#endif
1014
    }
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
}

/* ----------------------------------------------------------------------------
 * Process an event (GRAN only)
 * ------------------------------------------------------------------------- */

#if defined(GRAN)
static StgTSO *
scheduleProcessEvent(rtsEvent *event)
{
    StgTSO *t;

    if (RtsFlags.GranFlags.Light)
      GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc

    /* adjust time based on time-stamp */
    if (event->time > CurrentTime[CurrentProc] &&
        event->evttype != ContinueThread)
      CurrentTime[CurrentProc] = event->time;
    
    /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
    if (!RtsFlags.GranFlags.Light)
      handleIdlePEs();

    IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));

    /* main event dispatcher in GranSim */
    switch (event->evttype) {
      /* Should just be continuing execution */
    case ContinueThread:
      IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
      /* ToDo: check assertion
      ASSERT(run_queue_hd != (StgTSO*)NULL &&
	     run_queue_hd != END_TSO_QUEUE);
      */
      /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1051
1052
      if (!RtsFlags.GranFlags.DoAsyncFetch &&
	  procStatus[CurrentProc]==Fetching) {
1053
	debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1054
1055
1056
1057
1058
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }	
      /* Ignore ContinueThreads for completed threads */
      if (CurrentTSO->what_next == ThreadComplete) {
1059
	debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1060
1061
1062
1063
1064
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }	
      /* Ignore ContinueThreads for threads that are being migrated */
      if (PROCS(CurrentTSO)==Nowhere) { 
1065
	debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1066
1067
1068
1069
1070
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }
      /* The thread should be at the beginning of the run queue */
      if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1071
	debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	break; // run the thread anyway
      }
      /*
      new_event(proc, proc, CurrentTime[proc],
		FindWork,
		(StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
      goto next_thread; 
      */ /* Catches superfluous CONTINUEs -- should be unnecessary */
      break; // now actually run the thread; DaH Qu'vam yImuHbej 

    case FetchNode:
      do_the_fetchnode(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case GlobalBlock:
      do_the_globalblock(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case FetchReply:
      do_the_fetchreply(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case UnblockThread:   /* Move from the blocked queue to the tail of */
      do_the_unblock(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case ResumeThread:  /* Move from the blocked queue to the tail of */
      /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
      event->tso->gran.blocktime += 
	CurrentTime[CurrentProc] - event->tso->gran.blockedat;
      do_the_startthread(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case StartThread:
      do_the_startthread(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case MoveThread:
      do_the_movethread(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case MoveSpark:
      do_the_movespark(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case FindWork:
      do_the_findwork(event);
      goto next_thread;             /* handle next event in event queue  */
      
    default:
      barf("Illegal event type %u\n", event->evttype);
    }  /* switch */
    
    /* This point was scheduler_loop in the old RTS */

1128
    IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1129
1130
1131
1132
1133
1134

    TimeOfLastEvent = CurrentTime[CurrentProc];
    TimeOfNextEvent = get_time_of_next_event();
    IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
    // CurrentTSO = ThreadQueueHd;

1135
    IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1136
1137
1138
1139
1140
1141
1142
1143
			 TimeOfNextEvent));

    if (RtsFlags.GranFlags.Light) 
      GranSimLight_leave_system(event, &ActiveTSO); 

    EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;

    IF_DEBUG(gran, 
1144
	     debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1145
1146
1147
1148

    /* in a GranSim setup the TSO stays on the run queue */
    t = CurrentTSO;
    /* Take a thread from the run queue. */
sof's avatar
sof committed
1149
    POP_RUN_QUEUE(t); // take_off_run_queue(t);
1150
1151

    IF_DEBUG(gran, 
1152
	     debugBelch("GRAN: About to run current thread, which is\n");
1153
	     G_TSO(t,5));
1154
1155
1156
1157
1158
1159
1160

    context_switch = 0; // turned on via GranYield, checking events and time slice

    IF_DEBUG(gran, 
	     DumpGranEvent(GR_SCHEDULE, t));

    procStatus[CurrentProc] = Busy;
1161
1162
}
#endif // GRAN
1163

1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
/* ----------------------------------------------------------------------------
 * Send pending messages (PARALLEL_HASKELL only)
 * ------------------------------------------------------------------------- */

#if defined(PARALLEL_HASKELL)
static StgTSO *
scheduleSendPendingMessages(void)
{
    StgSparkPool *pool;
    rtsSpark spark;
    StgTSO *t;

# if defined(PAR) // global Mem.Mgmt., omit for now