Schedule.c 95.7 KB
Newer Older
1
/* ---------------------------------------------------------------------------
2
 *
3
 * (c) The GHC Team, 1998-2006
4
 *
5
 * The scheduler and thread-related functionality
sof's avatar
sof committed
6
 *
7
8
 * --------------------------------------------------------------------------*/

9
#include "PosixSource.h"
10
11
12
13
#include "Rts.h"
#include "SchedAPI.h"
#include "RtsUtils.h"
#include "RtsFlags.h"
14
#include "BlockAlloc.h"
15
#include "OSThreads.h"
16
17
18
19
20
#include "Storage.h"
#include "StgRun.h"
#include "Hooks.h"
#include "Schedule.h"
#include "StgMiscClosures.h"
21
#include "Interpreter.h"
22
#include "Printer.h"
23
#include "RtsSignals.h"
24
#include "Sanity.h"
25
#include "Stats.h"
26
#include "STM.h"
sof's avatar
sof committed
27
#include "Timer.h"
28
#include "Prelude.h"
29
#include "ThreadLabels.h"
30
31
#include "LdvProfile.h"
#include "Updates.h"
32
33
34
35
#ifdef PROFILING
#include "Proftimer.h"
#include "ProfHeap.h"
#endif
36
#if defined(GRAN) || defined(PARALLEL_HASKELL)
37
38
39
40
41
42
43
44
# include "GranSimRts.h"
# include "GranSim.h"
# include "ParallelRts.h"
# include "Parallel.h"
# include "ParallelDebug.h"
# include "FetchMe.h"
# include "HLC.h"
#endif
45
#include "Sparks.h"
sof's avatar
sof committed
46
#include "Capability.h"
47
48
#include "Task.h"
#include "AwaitEvent.h"
49
50
51
#if defined(mingw32_HOST_OS)
#include "win32/IOManager.h"
#endif
Simon Marlow's avatar
Simon Marlow committed
52
#include "Trace.h"
53
54
#include "RaiseAsync.h"
#include "Threads.h"
55

56
57
58
59
60
61
62
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif

63
64
#include <string.h>
#include <stdlib.h>
65
#include <stdarg.h>
66

67
68
69
70
#ifdef HAVE_ERRNO_H
#include <errno.h>
#endif

71
72
73
74
75
76
// Turn off inlining when debugging - it obfuscates things
#ifdef DEBUG
# undef  STATIC_INLINE
# define STATIC_INLINE static
#endif

77
78
79
/* -----------------------------------------------------------------------------
 * Global variables
 * -------------------------------------------------------------------------- */
80

81
82
83
#if defined(GRAN)

StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
84
/* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
85
86

/* 
sof's avatar
sof committed
87
   In GranSim we have a runnable and a blocked queue for each processor.
88
89
90
91
92
93
94
95
   In order to minimise code changes new arrays run_queue_hds/tls
   are created. run_queue_hd is then a short cut (macro) for
   run_queue_hds[CurrentProc] (see GranSim.h).
   -- HWL
*/
StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
StgTSO *ccalling_threadss[MAX_PROC];
96
97
98
99
/* We use the same global list of threads (all_threads) in GranSim as in
   the std RTS (i.e. we are cheating). However, we don't use this list in
   the GranSim specific code at the moment (so we are only potentially
   cheating).  */
100
101
102

#else /* !GRAN */

103
104
#if !defined(THREADED_RTS)
// Blocked/sleeping thrads
105
106
StgTSO *blocked_queue_hd = NULL;
StgTSO *blocked_queue_tl = NULL;
107
108
StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
#endif
109

110
111
112
113
/* Threads blocked on blackholes.
 * LOCK: sched_mutex+capability, or all capabilities
 */
StgTSO *blackhole_queue = NULL;
114
115
#endif

116
117
/* The blackhole_queue should be checked for threads to wake up.  See
 * Schedule.h for more thorough comment.
118
 * LOCK: none (doesn't matter if we miss an update)
119
120
121
 */
rtsBool blackholes_need_checking = rtsFalse;

122
123
/* Linked list of all threads.
 * Used for detecting garbage collected threads.
124
 * LOCK: sched_mutex+capability, or all capabilities
125
 */
126
StgTSO *all_threads = NULL;
127

128
129
/* flag set by signal handler to precipitate a context switch
 * LOCK: none (just an advisory flag)
130
 */
131
int context_switch = 0;
132

133
134
135
136
/* flag that tracks whether we have done any execution in this time slice.
 * LOCK: currently none, perhaps we should lock (but needs to be
 * updated in the fast path of the scheduler).
 */
137
138
nat recent_activity = ACTIVITY_YES;

139
140
141
/* if this flag is set as well, give up execution
 * LOCK: none (changes once, from false->true)
 */
142
rtsBool sched_state = SCHED_RUNNING;
143

144
#if defined(GRAN)
145
StgTSO *CurrentTSO;
146
147
#endif

148
149
150
151
152
153
/*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
 *  exists - earlier gccs apparently didn't.
 *  -= chak
 */
StgTSO dummy_tso;

sof's avatar
sof committed
154
155
156
157
158
/*
 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
 * in an MT setting, needed to signal that a worker thread shouldn't hang around
 * in the scheduler when it is out of work.
 */
159
rtsBool shutting_down_scheduler = rtsFalse;
160

161
162
/*
 * This mutex protects most of the global scheduler data in
163
 * the THREADED_RTS runtime.
sof's avatar
sof committed
164
 */
165
#if defined(THREADED_RTS)
166
Mutex sched_mutex;
167
#endif
sof's avatar
sof committed
168

169
#if defined(PARALLEL_HASKELL)
170
171
StgTSO *LastTSO;
rtsTime TimeOfLastYield;
172
rtsBool emitSchedule = rtsTrue;
173
174
#endif

175
176
177
178
#if !defined(mingw32_HOST_OS)
#define FORKPROCESS_PRIMOP_SUPPORTED
#endif

179
180
181
182
/* -----------------------------------------------------------------------------
 * static function prototypes
 * -------------------------------------------------------------------------- */

183
static Capability *schedule (Capability *initialCapability, Task *task);
184
185
186
187
188
189

//
// These function all encapsulate parts of the scheduler loop, and are
// abstracted only to make the structure and control flow of the
// scheduler clearer.
//
190
static void schedulePreLoop (void);
191
#if defined(THREADED_RTS)
192
static void schedulePushWork(Capability *cap, Task *task);
193
#endif
194
static void scheduleStartSignalHandlers (Capability *cap);
195
static void scheduleCheckBlockedThreads (Capability *cap);
196
static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
197
198
static void scheduleCheckBlackHoles (Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
199
200
201
202
203
204
205
206
207
208
209
210
211
#if defined(GRAN)
static StgTSO *scheduleProcessEvent(rtsEvent *event);
#endif
#if defined(PARALLEL_HASKELL)
static StgTSO *scheduleSendPendingMessages(void);
static void scheduleActivateSpark(void);
static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
#endif
#if defined(PAR) || defined(GRAN)
static void scheduleGranParReport(void);
#endif
static void schedulePostRunThread(void);
static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
212
213
214
215
static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
					 StgTSO *t);
static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
				    nat prev_what_next );
216
static void scheduleHandleThreadBlocked( StgTSO *t );
217
218
static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
					     StgTSO *t );
219
static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
220
221
222
static Capability *scheduleDoGC(Capability *cap, Task *task,
				rtsBool force_major, 
				void (*get_roots)(evac_fn));
223
224

static rtsBool checkBlackHoles(Capability *cap);
225
226
static void AllRoots(evac_fn evac);

227
static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
228

229
static void deleteThread (Capability *cap, StgTSO *tso);
230
static void deleteAllThreads (Capability *cap);
231

232
233
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
static void deleteThread_(Capability *cap, StgTSO *tso);
234
#endif
235
236

#if defined(PARALLEL_HASKELL)
237
238
239
240
StgTSO * createSparkThread(rtsSpark spark);
StgTSO * activateSpark (rtsSpark spark);  
#endif

241
242
243
244
245
246
247
248
249
#ifdef DEBUG
static char *whatNext_strs[] = {
  "(unknown)",
  "ThreadRunGHC",
  "ThreadInterpret",
  "ThreadKilled",
  "ThreadRelocated",
  "ThreadComplete"
};
250
#endif
sof's avatar
sof committed
251

252
253
254
255
256
/* -----------------------------------------------------------------------------
 * Putting a thread on the run queue: different scheduling policies
 * -------------------------------------------------------------------------- */

STATIC_INLINE void
257
addToRunQueue( Capability *cap, StgTSO *t )
258
259
260
261
{
#if defined(PARALLEL_HASKELL)
    if (RtsFlags.ParFlags.doFairScheduling) { 
	// this does round-robin scheduling; good for concurrency
262
	appendToRunQueue(cap,t);
263
264
    } else {
	// this does unfair scheduling; good for parallelism
265
	pushOnRunQueue(cap,t);
266
267
268
    }
#else
    // this does round-robin scheduling; good for concurrency
269
    appendToRunQueue(cap,t);
270
271
#endif
}
272

273
/* ---------------------------------------------------------------------------
274
275
276
277
278
279
280
281
282
283
284
   Main scheduling loop.

   We use round-robin scheduling, each thread returning to the
   scheduler loop when one of these conditions is detected:

      * out of heap space
      * timer expires (thread yields)
      * thread blocks
      * thread ends
      * stack overflow

285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
   GRAN version:
     In a GranSim setup this loop iterates over the global event queue.
     This revolves around the global event queue, which determines what 
     to do next. Therefore, it's more complicated than either the 
     concurrent or the parallel (GUM) setup.

   GUM version:
     GUM iterates over incoming messages.
     It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
     and sends out a fish whenever it has nothing to do; in-between
     doing the actual reductions (shared code below) it processes the
     incoming messages and deals with delayed operations 
     (see PendingFetches).
     This is not the ugliest code you could imagine, but it's bloody close.

300
   ------------------------------------------------------------------------ */
301

302
303
static Capability *
schedule (Capability *initialCapability, Task *task)
304
305
{
  StgTSO *t;
306
  Capability *cap;
307
  StgThreadReturnCode ret;
308
309
#if defined(GRAN)
  rtsEvent *event;
310
#elif defined(PARALLEL_HASKELL)
311
312
  StgTSO *tso;
  GlobalTaskId pe;
313
314
315
316
  rtsBool receivedFinish = rtsFalse;
# if defined(DEBUG)
  nat tp_size, sp_size; // stats only
# endif
317
#endif
318
  nat prev_what_next;
319
  rtsBool ready_to_gc;
320
#if defined(THREADED_RTS)
321
  rtsBool first = rtsTrue;
322
#endif
323
  
324
325
  cap = initialCapability;

326
327
328
  // Pre-condition: this task owns initialCapability.
  // The sched_mutex is *NOT* held
  // NB. on return, we still hold a capability.
329

Simon Marlow's avatar
Simon Marlow committed
330
331
332
  debugTrace (DEBUG_sched, 
	      "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
	      task, initialCapability);
333

334
  schedulePreLoop();
335

336
337
  // -----------------------------------------------------------
  // Scheduler loop starts here:
338

339
340
341
342
343
344
345
#if defined(PARALLEL_HASKELL)
#define TERMINATION_CONDITION        (!receivedFinish)
#elif defined(GRAN)
#define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
#else
#define TERMINATION_CONDITION        rtsTrue
#endif
346

347
  while (TERMINATION_CONDITION) {
348

349
350
351
352
353
#if defined(GRAN)
      /* Choose the processor with the next event */
      CurrentProc = event->proc;
      CurrentTSO = event->tso;
#endif
354

355
356
357
358
359
360
#if defined(THREADED_RTS)
      if (first) {
	  // don't yield the first time, we want a chance to run this
	  // thread for a bit, even if there are others banging at the
	  // door.
	  first = rtsFalse;
361
	  ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
362
363
364
      } else {
	  // Yield the capability to higher-priority tasks if necessary.
	  yieldCapability(&cap, task);
365
366
      }
#endif
367
      
368
#if defined(THREADED_RTS)
369
      schedulePushWork(cap,task);
370
#endif
371

372
373
374
    // Check whether we have re-entered the RTS from Haskell without
    // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
    // call).
375
    if (cap->in_haskell) {
376
377
    	  errorBelch("schedule: re-entered unsafely.\n"
    		     "   Perhaps a 'foreign import unsafe' should be 'safe'?");
378
    	  stg_exit(EXIT_FAILURE);
379
380
    }

381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
    // The interruption / shutdown sequence.
    // 
    // In order to cleanly shut down the runtime, we want to:
    //   * make sure that all main threads return to their callers
    //     with the state 'Interrupted'.
    //   * clean up all OS threads assocated with the runtime
    //   * free all memory etc.
    //
    // So the sequence for ^C goes like this:
    //
    //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
    //     arranges for some Capability to wake up
    //
    //   * all threads in the system are halted, and the zombies are
    //     placed on the run queue for cleaning up.  We acquire all
    //     the capabilities in order to delete the threads, this is
    //     done by scheduleDoGC() for convenience (because GC already
    //     needs to acquire all the capabilities).  We can't kill
    //     threads involved in foreign calls.
    // 
    //   * somebody calls shutdownHaskell(), which calls exitScheduler()
    //
    //   * sched_state := SCHED_SHUTTING_DOWN
404
    //
405
406
407
    //   * all workers exit when the run queue on their capability
    //     drains.  All main threads will also exit when their TSO
    //     reaches the head of the run queue and they can return.
408
    //
409
410
411
412
413
414
415
416
417
418
    //   * eventually all Capabilities will shut down, and the RTS can
    //     exit.
    //
    //   * We might be left with threads blocked in foreign calls, 
    //     we should really attempt to kill these somehow (TODO);
    
    switch (sched_state) {
    case SCHED_RUNNING:
	break;
    case SCHED_INTERRUPTING:
Simon Marlow's avatar
Simon Marlow committed
419
	debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
420
#if defined(THREADED_RTS)
421
422
	discardSparksCap(cap);
#endif
423
424
425
426
	/* scheduleDoGC() deletes all the threads */
	cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
	break;
    case SCHED_SHUTTING_DOWN:
Simon Marlow's avatar
Simon Marlow committed
427
	debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
428
429
430
431
432
	// If we are a worker, just exit.  If we're a bound thread
	// then we will exit below when we've removed our TSO from
	// the run queue.
	if (task->tso == NULL && emptyRunQueue(cap)) {
	    return cap;
433
	}
434
435
436
	break;
    default:
	barf("sched_state: %d", sched_state);
437
    }
438

439
#if defined(THREADED_RTS)
440
    // If the run queue is empty, take a spark and turn it into a thread.
441
    {
442
443
444
445
	if (emptyRunQueue(cap)) {
	    StgClosure *spark;
	    spark = findSpark(cap);
	    if (spark != NULL) {
Simon Marlow's avatar
Simon Marlow committed
446
447
448
		debugTrace(DEBUG_sched,
			   "turning spark of closure %p into a thread",
			   (StgClosure *)spark);
449
		createSparkThread(cap,spark);	  
450
451
	    }
	}
452
    }
453
#endif // THREADED_RTS
454

455
    scheduleStartSignalHandlers(cap);
456

457
458
459
460
    // Only check the black holes here if we've nothing else to do.
    // During normal execution, the black hole list only gets checked
    // at GC time, to avoid repeatedly traversing this possibly long
    // list each time around the scheduler.
461
    if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
462

463
    scheduleCheckWakeupThreads(cap);
464

465
    scheduleCheckBlockedThreads(cap);
466

467
    scheduleDetectDeadlock(cap,task);
468
469
470
#if defined(THREADED_RTS)
    cap = task->cap;    // reload cap, it might have changed
#endif
471
472
473
474
475
476

    // Normally, the only way we can get here with no threads to
    // run is if a keyboard interrupt received during 
    // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
    // Additionally, it is not fatal for the
    // threaded RTS to reach here with no threads to run.
477
    //
478
479
    // win32: might be here due to awaitEvent() being abandoned
    // as a result of a console event having been delivered.
480
481
    if ( emptyRunQueue(cap) ) {
#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
482
	ASSERT(sched_state >= SCHED_INTERRUPTING);
483
#endif
484
	continue; // nothing to do
485
    }
486

487
488
#if defined(PARALLEL_HASKELL)
    scheduleSendPendingMessages();
489
    if (emptyRunQueue(cap) && scheduleActivateSpark()) 
490
	continue;
491

492
493
494
#if defined(SPARKS)
    ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
#endif
495

496
497
    /* If we still have no work we need to send a FISH to get a spark
       from another PE */
498
    if (emptyRunQueue(cap)) {
499
500
501
502
503
504
505
506
507
508
509
510
	if (!scheduleGetRemoteWork(&receivedFinish)) continue;
	ASSERT(rtsFalse); // should not happen at the moment
    }
    // from here: non-empty run queue.
    //  TODO: merge above case with this, only one call processMessages() !
    if (PacketsWaiting()) {  /* process incoming messages, if
				any pending...  only in else
				because getRemoteWork waits for
				messages as well */
	receivedFinish = processMessages();
    }
#endif
511

512
513
514
#if defined(GRAN)
    scheduleProcessEvent(event);
#endif
515

516
517
518
    // 
    // Get a thread to run
    //
519
    t = popRunQueue(cap);
520

521
522
523
524
525
526
#if defined(GRAN) || defined(PAR)
    scheduleGranParReport(); // some kind of debuging output
#else
    // Sanity check the thread we're about to run.  This can be
    // expensive if there is lots of thread switching going on...
    IF_DEBUG(sanity,checkTSO(t));
527
528
#endif

529
#if defined(THREADED_RTS)
530
531
532
    // Check whether we can run this thread in the current task.
    // If not, we have to pass our capability to the right task.
    {
533
	Task *bound = t->bound;
534
      
535
536
	if (bound) {
	    if (bound == task) {
Simon Marlow's avatar
Simon Marlow committed
537
		debugTrace(DEBUG_sched,
538
			   "### Running thread %lu in bound thread", (unsigned long)t->id);
539
540
		// yes, the Haskell thread is bound to the current native thread
	    } else {
Simon Marlow's avatar
Simon Marlow committed
541
		debugTrace(DEBUG_sched,
542
			   "### thread %lu bound to another OS thread", (unsigned long)t->id);
543
544
545
546
547
548
549
		// no, bound to a different Haskell thread: pass to that thread
		pushOnRunQueue(cap,t);
		continue;
	    }
	} else {
	    // The thread we want to run is unbound.
	    if (task->tso) { 
Simon Marlow's avatar
Simon Marlow committed
550
		debugTrace(DEBUG_sched,
551
			   "### this OS thread cannot run thread %lu", (unsigned long)t->id);
552
553
554
555
556
		// no, the current native thread is bound to a different
		// Haskell thread, so pass it to any worker thread
		pushOnRunQueue(cap,t);
		continue; 
	    }
557
	}
558
559
560
    }
#endif

561
562
    cap->r.rCurrentTSO = t;
    
563
    /* context switches are initiated by the timer signal, unless
564
565
     * the user specified "context switch as often as possible", with
     * +RTS -C0
sof's avatar
sof committed
566
     */
567
568
    if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
	&& !emptyThreadQueues(cap)) {
569
	context_switch = 1;
570
571
    }
	 
572
run_thread:
573

Simon Marlow's avatar
Simon Marlow committed
574
575
    debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
			      (long)t->id, whatNext_strs[t->what_next]);
576

577
578
579
580
#if defined(PROFILING)
    startHeapProfTimer();
#endif

581
582
583
    // Check for exceptions blocked on this thread
    maybePerformBlockedException (cap, t);

584
585
586
    // ----------------------------------------------------------------------
    // Run the current thread 

Simon Marlow's avatar
Simon Marlow committed
587
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
588
    ASSERT(t->cap == cap);
Simon Marlow's avatar
Simon Marlow committed
589

590
591
592
    prev_what_next = t->what_next;

    errno = t->saved_errno;
593
    cap->in_haskell = rtsTrue;
594

Simon Marlow's avatar
Simon Marlow committed
595
596
    dirtyTSO(t);

597
598
    recent_activity = ACTIVITY_YES;

599
    switch (prev_what_next) {
600
	
601
602
603
604
605
    case ThreadKilled:
    case ThreadComplete:
	/* Thread already finished, return to scheduler. */
	ret = ThreadFinished;
	break;
606
	
607
    case ThreadRunGHC:
608
609
610
611
612
    {
	StgRegTable *r;
	r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
	cap = regTableToCapability(r);
	ret = r->rRet;
613
	break;
614
615
    }
    
616
    case ThreadInterpret:
617
618
	cap = interpretBCO(cap);
	ret = cap->r.rRet;
619
	break;
620
	
621
    default:
622
	barf("schedule: invalid what_next field");
623
624
    }

625
    cap->in_haskell = rtsFalse;
626

627
628
629
630
    // The TSO might have moved, eg. if it re-entered the RTS and a GC
    // happened.  So find the new location:
    t = cap->r.rCurrentTSO;

631
632
633
634
635
636
637
638
639
640
641
642
    // We have run some Haskell code: there might be blackhole-blocked
    // threads to wake up now.
    // Lock-free test here should be ok, we're just setting a flag.
    if ( blackhole_queue != END_TSO_QUEUE ) {
	blackholes_need_checking = rtsTrue;
    }
    
    // And save the current errno in this thread.
    // XXX: possibly bogus for SMP because this thread might already
    // be running again, see code below.
    t->saved_errno = errno;

643
#if defined(THREADED_RTS)
644
645
646
647
648
649
    // If ret is ThreadBlocked, and this Task is bound to the TSO that
    // blocked, we are in limbo - the TSO is now owned by whatever it
    // is blocked on, and may in fact already have been woken up,
    // perhaps even on a different Capability.  It may be the case
    // that task->cap != cap.  We better yield this Capability
    // immediately and return to normaility.
650
    if (ret == ThreadBlocked) {
Simon Marlow's avatar
Simon Marlow committed
651
	debugTrace(DEBUG_sched,
652
653
		   "--<< thread %lu (%s) stopped: blocked",
		   (unsigned long)t->id, whatNext_strs[t->what_next]);
654
655
	continue;
    }
656
657
#endif

658
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
659
    ASSERT(t->cap == cap);
660

661
    // ----------------------------------------------------------------------
662
    
663
    // Costs for the scheduler are assigned to CCS_SYSTEM
664
665
666
667
668
669
670
#if defined(PROFILING)
    stopHeapProfTimer();
    CCCS = CCS_SYSTEM;
#endif
    
    schedulePostRunThread();

671
672
    ready_to_gc = rtsFalse;

673
674
675
676
677
678
    switch (ret) {
    case HeapOverflow:
	ready_to_gc = scheduleHandleHeapOverflow(cap,t);
	break;

    case StackOverflow:
679
	scheduleHandleStackOverflow(cap,task,t);
680
681
682
	break;

    case ThreadYielding:
683
	if (scheduleHandleYield(cap, t, prev_what_next)) {
684
685
686
687
688
689
690
691
692
693
            // shortcut for switching between compiler/interpreter:
	    goto run_thread; 
	}
	break;

    case ThreadBlocked:
	scheduleHandleThreadBlocked(t);
	break;

    case ThreadFinished:
694
	if (scheduleHandleThreadFinished(cap, task, t)) return cap;
695
	ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
696
697
698
699
700
701
	break;

    default:
      barf("schedule: invalid thread return code %d", (int)ret);
    }

702
    if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
703
    if (ready_to_gc) {
704
      cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
705
    }
706
707
  } /* end of while() */

Simon Marlow's avatar
Simon Marlow committed
708
709
  debugTrace(PAR_DEBUG_verbose,
	     "== Leaving schedule() after having received Finish");
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
}

/* ----------------------------------------------------------------------------
 * Setting up the scheduler loop
 * ------------------------------------------------------------------------- */

static void
schedulePreLoop(void)
{
#if defined(GRAN) 
    /* set up first event to get things going */
    /* ToDo: assign costs for system setup and init MainTSO ! */
    new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
	      ContinueThread, 
	      CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
    
Simon Marlow's avatar
Simon Marlow committed
726
727
728
729
    debugTrace (DEBUG_gran,
		"GRAN: Init CurrentTSO (in schedule) = %p", 
		CurrentTSO);
    IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
730
731
732
733
734
735
736
737
    
    if (RtsFlags.GranFlags.Light) {
	/* Save current time; GranSim Light only */
	CurrentTSO->gran.clock = CurrentTime[CurrentProc];
    }      
#endif
}

738
739
740
741
742
743
/* -----------------------------------------------------------------------------
 * schedulePushWork()
 *
 * Push work to other Capabilities if we have some.
 * -------------------------------------------------------------------------- */

744
#if defined(THREADED_RTS)
745
static void
746
747
schedulePushWork(Capability *cap USED_IF_THREADS, 
		 Task *task      USED_IF_THREADS)
748
749
750
751
{
    Capability *free_caps[n_capabilities], *cap0;
    nat i, n_free_caps;

752
753
754
    // migration can be turned off with +RTS -qg
    if (!RtsFlags.ParFlags.migrate) return;

755
756
757
758
    // Check whether we have more threads on our run queue, or sparks
    // in our pool, that we could hand to another Capability.
    if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
	&& sparkPoolSizeCap(cap) < 2) {
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
	return;
    }

    // First grab as many free Capabilities as we can.
    for (i=0, n_free_caps=0; i < n_capabilities; i++) {
	cap0 = &capabilities[i];
	if (cap != cap0 && tryGrabCapability(cap0,task)) {
	    if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
		// it already has some work, we just grabbed it at 
		// the wrong moment.  Or maybe it's deadlocked!
		releaseCapability(cap0);
	    } else {
		free_caps[n_free_caps++] = cap0;
	    }
	}
    }

    // we now have n_free_caps free capabilities stashed in
    // free_caps[].  Share our run queue equally with them.  This is
    // probably the simplest thing we could do; improvements we might
    // want to do include:
    //
    //   - giving high priority to moving relatively new threads, on 
    //     the gournds that they haven't had time to build up a
    //     working set in the cache on this CPU/Capability.
    //
    //   - giving low priority to moving long-lived threads

    if (n_free_caps > 0) {
	StgTSO *prev, *t, *next;
789
790
	rtsBool pushed_to_all;

Simon Marlow's avatar
Simon Marlow committed
791
	debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
792
793

	i = 0;
794
795
796
797
798
799
800
801
802
803
	pushed_to_all = rtsFalse;

	if (cap->run_queue_hd != END_TSO_QUEUE) {
	    prev = cap->run_queue_hd;
	    t = prev->link;
	    prev->link = END_TSO_QUEUE;
	    for (; t != END_TSO_QUEUE; t = next) {
		next = t->link;
		t->link = END_TSO_QUEUE;
		if (t->what_next == ThreadRelocated
804
805
		    || t->bound == task // don't move my bound thread
		    || tsoLocked(t)) {  // don't move a locked thread
806
807
808
809
810
811
812
813
814
		    prev->link = t;
		    prev = t;
		} else if (i == n_free_caps) {
		    pushed_to_all = rtsTrue;
		    i = 0;
		    // keep one for us
		    prev->link = t;
		    prev = t;
		} else {
815
		    debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
816
817
		    appendToRunQueue(free_caps[i],t);
		    if (t->bound) { t->bound->cap = free_caps[i]; }
818
		    t->cap = free_caps[i];
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
		    i++;
		}
	    }
	    cap->run_queue_tl = prev;
	}

	// If there are some free capabilities that we didn't push any
	// threads to, then try to push a spark to each one.
	if (!pushed_to_all) {
	    StgClosure *spark;
	    // i is the next free capability to push to
	    for (; i < n_free_caps; i++) {
		if (emptySparkPoolCap(free_caps[i])) {
		    spark = findSpark(cap);
		    if (spark != NULL) {
Simon Marlow's avatar
Simon Marlow committed
834
			debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
835
836
837
			newSpark(&(free_caps[i]->r), spark);
		    }
		}
838
839
840
841
842
843
844
845
846
847
848
	    }
	}

	// release the capabilities
	for (i = 0; i < n_free_caps; i++) {
	    task->cap = free_caps[i];
	    releaseCapability(free_caps[i]);
	}
    }
    task->cap = cap; // reset to point to our Capability.
}
849
#endif
850

851
852
853
854
/* ----------------------------------------------------------------------------
 * Start any pending signal handlers
 * ------------------------------------------------------------------------- */

855
#if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
856
static void
857
scheduleStartSignalHandlers(Capability *cap)
858
{
859
    if (signals_pending()) { // safe outside the lock
860
	startSignalHandlers(cap);
861
862
    }
}
863
864
865
866
867
868
#else
static void
scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
{
}
#endif
869
870
871
872
873
874

/* ----------------------------------------------------------------------------
 * Check for blocked threads that can be woken up.
 * ------------------------------------------------------------------------- */

static void
875
scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
876
{
877
#if !defined(THREADED_RTS)
878
879
880
881
882
    //
    // Check whether any waiting threads need to be woken up.  If the
    // run queue is empty, and there are no other tasks running, we
    // can wait indefinitely for something to happen.
    //
883
    if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
884
    {
885
	awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
886
    }
887
#endif
888
889
890
}


891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
/* ----------------------------------------------------------------------------
 * Check for threads woken up by other Capabilities
 * ------------------------------------------------------------------------- */

static void
scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
{
#if defined(THREADED_RTS)
    // Any threads that were woken up by other Capabilities get
    // appended to our run queue.
    if (!emptyWakeupQueue(cap)) {
	ACQUIRE_LOCK(&cap->lock);
	if (emptyRunQueue(cap)) {
	    cap->run_queue_hd = cap->wakeup_queue_hd;
	    cap->run_queue_tl = cap->wakeup_queue_tl;
	} else {
	    cap->run_queue_tl->link = cap->wakeup_queue_hd;
	    cap->run_queue_tl = cap->wakeup_queue_tl;
	}
	cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
	RELEASE_LOCK(&cap->lock);
    }
#endif
}

916
917
918
919
/* ----------------------------------------------------------------------------
 * Check for threads blocked on BLACKHOLEs that can be woken up
 * ------------------------------------------------------------------------- */
static void
920
scheduleCheckBlackHoles (Capability *cap)
921
{
922
    if ( blackholes_need_checking ) // check without the lock first
923
    {
924
925
926
927
928
929
	ACQUIRE_LOCK(&sched_mutex);
	if ( blackholes_need_checking ) {
	    checkBlackHoles(cap);
	    blackholes_need_checking = rtsFalse;
	}
	RELEASE_LOCK(&sched_mutex);
930
931
932
933
934
935
936
937
    }
}

/* ----------------------------------------------------------------------------
 * Detect deadlock conditions and attempt to resolve them.
 * ------------------------------------------------------------------------- */

static void
938
scheduleDetectDeadlock (Capability *cap, Task *task)
939
{
940
941

#if defined(PARALLEL_HASKELL)
942
    // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
943
944
945
    return;
#endif

946
947
    /* 
     * Detect deadlock: when we have no threads to run, there are no
948
949
950
     * threads blocked, waiting for I/O, or sleeping, and all the
     * other tasks are waiting for work, we must have a deadlock of
     * some description.
951
     */
952
    if ( emptyThreadQueues(cap) )
953
    {
954
#if defined(THREADED_RTS)
955
956
957
958
959
960
961
962
963
	/* 
	 * In the threaded RTS, we only check for deadlock if there
	 * has been no activity in a complete timeslice.  This means
	 * we won't eagerly start a full GC just because we don't have
	 * any threads to run currently.
	 */
	if (recent_activity != ACTIVITY_INACTIVE) return;
#endif

Simon Marlow's avatar
Simon Marlow committed
964
	debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
965
966
967
968
969
970

	// Garbage collection can release some new threads due to
	// either (a) finalizers or (b) threads resurrected because
	// they are unreachable and will therefore be sent an
	// exception.  Any threads thus released will be immediately
	// runnable.
971
	cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/, GetRoots);
972

973
	recent_activity = ACTIVITY_DONE_GC;
974
975
	
	if ( !emptyRunQueue(cap) ) return;
976

977
#if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
978
979
980
981
982
	/* If we have user-installed signal handlers, then wait
	 * for signals to arrive rather then bombing out with a
	 * deadlock.
	 */
	if ( anyUserHandlers() ) {
Simon Marlow's avatar
Simon Marlow committed
983
984
	    debugTrace(DEBUG_sched,
		       "still deadlocked, waiting for signals...");
985
986
987
988

	    awaitUserSignals();

	    if (signals_pending()) {
989
		startSignalHandlers(cap);
990
991
992
	    }

	    // either we have threads to run, or we were interrupted:
993
	    ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
994
995
996
	}
#endif

997
#if !defined(THREADED_RTS)
998
	/* Probably a real deadlock.  Send the current main thread the
999
	 * Deadlock exception.
1000
	 */
1001
1002
	if (task->tso) {
	    switch (task->tso->why_blocked) {
1003
	    case BlockedOnSTM:
1004
1005
1006
	    case BlockedOnBlackHole:
	    case BlockedOnException:
	    case BlockedOnMVar:
1007
1008
		throwToSingleThreaded(cap, task->tso, 
				      (StgClosure *)NonTermination_closure);
1009
1010
1011
1012
1013
		return;
	    default:
		barf("deadlock: main thread blocked in a strange way");
	    }
	}
1014
	return;
1015
#endif
1016
    }
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
}

/* ----------------------------------------------------------------------------
 * Process an event (GRAN only)
 * ------------------------------------------------------------------------- */

#if defined(GRAN)
static StgTSO *
scheduleProcessEvent(rtsEvent *event)
{
    StgTSO *t;

    if (RtsFlags.GranFlags.Light)
      GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc

    /* adjust time based on time-stamp */
    if (event->time > CurrentTime[CurrentProc] &&
        event->evttype != ContinueThread)
      CurrentTime[CurrentProc] = event->time;
    
    /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
    if (!RtsFlags.GranFlags.Light)
      handleIdlePEs();

    IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));

    /* main event dispatcher in GranSim */
    switch (event->evttype) {
      /* Should just be continuing execution */
    case ContinueThread:
      IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
      /* ToDo: check assertion
      ASSERT(run_queue_hd != (StgTSO*)NULL &&
	     run_queue_hd != END_TSO_QUEUE);
      */
      /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1053
1054
      if (!RtsFlags.GranFlags.DoAsyncFetch &&
	  procStatus[CurrentProc]==Fetching) {
1055
	debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1056
1057
1058
1059
1060
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }	
      /* Ignore ContinueThreads for completed threads */
      if (CurrentTSO->what_next == ThreadComplete) {
1061
	debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1062
1063
1064
1065
1066
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }	
      /* Ignore ContinueThreads for threads that are being migrated */
      if (PROCS(CurrentTSO)==Nowhere) { 
1067
	debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1068
1069
1070
1071
1072
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }
      /* The thread should be at the beginning of the run queue */
      if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1073
	debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	break; // run the thread anyway
      }
      /*
      new_event(proc, proc, CurrentTime[proc],
		FindWork,
		(StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
      goto next_thread; 
      */ /* Catches superfluous CONTINUEs -- should be unnecessary */
      break; // now actually run the thread; DaH Qu'vam yImuHbej 

    case FetchNode:
      do_the_fetchnode(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case GlobalBlock:
      do_the_globalblock(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case FetchReply:
      do_the_fetchreply(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case UnblockThread:   /* Move from the blocked queue to the tail of */
      do_the_unblock(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case ResumeThread:  /* Move from the blocked queue to the tail of */
      /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
      event->tso->gran.blocktime += 
	CurrentTime[CurrentProc] - event->tso->gran.blockedat;
      do_the_startthread(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case StartThread:
      do_the_startthread(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case MoveThread:
      do_the_movethread(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case MoveSpark:
      do_the_movespark(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case FindWork:
      do_the_findwork(event);
      goto next_thread;             /* handle next event in event queue  */
      
    default:
      barf("Illegal event type %u\n", event->evttype);
    }  /* switch */
    
    /* This point was scheduler_loop in the old RTS */

1130
    IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1131
1132
1133
1134
1135
1136

    TimeOfLastEvent = CurrentTime[CurrentProc];
    TimeOfNextEvent = get_time_of_next_event();
    IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
    // CurrentTSO = ThreadQueueHd;

1137
    IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1138
1139
1140
1141
1142
1143
1144
1145
			 TimeOfNextEvent));

    if (RtsFlags.GranFlags.Light) 
      GranSimLight_leave_system(event, &ActiveTSO); 

    EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;

    IF_DEBUG(gran, 
1146
	     debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1147
1148
1149
1150

    /* in a GranSim setup the TSO stays on the run queue */
    t = CurrentTSO;
    /* Take a thread from the run queue. */
sof's avatar
sof committed
1151
    POP_RUN_QUEUE(t); // take_off_run_queue(t);
1152
1153

    IF_DEBUG(gran, 
1154
	     debugBelch("GRAN: About to run current thread, which is\n");
1155
	     G_TSO(t,5));
1156
1157
1158
1159
1160
1161
1162

    context_switch = 0; // turned on via GranYield, checking events and time slice

    IF_DEBUG(gran, 
	     DumpGranEvent(GR_SCHEDULE, t));

    procStatus[CurrentProc] = Busy;
1163
1164
}
#endif // GRAN
1165

1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
/* ----------------------------------------------------------------------------
 * Send pending messages (PARALLEL_HASKELL only)
 * ------------------------------------------------------------------------- */

#if defined(PARALLEL_HASKELL)
static StgTSO *
scheduleSendPendingMessages(void)
{
    StgSparkPool *pool;
    rtsSpark spark;
    StgTSO *t;

# if defined(PAR) // global Mem.Mgmt., omit for now
1179
1180
1181
    if (PendingFetches != END_BF_QUEUE) {
        processFetches();
    }
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
# endif
    
    if (RtsFlags.ParFlags.BufferTime) {
	// if we use message buffering, we must send away all message
	// packets which have become too old...
	sendOldBuffers(); 
    }
}
#endif

/* ----------------------------------------------------------------------------
 * Activate spark threads (PARALLEL_HASKELL only)
 * ------------------------------------------------------------------------- */

#if defined(PARALLEL_HASKELL)
static void
scheduleActivateSpark(void)
{
#if defined(SPARKS)
1201
  ASSERT(emptyRunQueue());
1202
1203
1204
1205
1206
/* We get here if the run queue is empty and want some work.
   We try to turn a spark into a thread, and add it to the run queue,
   from where it will be picked up in the next iteration of the scheduler
   loop.
*/
1207