Schedule.c 130 KB
Newer Older
1
/* ---------------------------------------------------------------------------
2
 *
3
 * (c) The GHC Team, 1998-2005
4
 *
5
 * The scheduler and thread-related functionality
sof's avatar
sof committed
6
 *
7
8
 * --------------------------------------------------------------------------*/

9
#include "PosixSource.h"
10
11
12
13
#include "Rts.h"
#include "SchedAPI.h"
#include "RtsUtils.h"
#include "RtsFlags.h"
14
#include "BlockAlloc.h"
15
#include "OSThreads.h"
16
17
18
19
20
#include "Storage.h"
#include "StgRun.h"
#include "Hooks.h"
#include "Schedule.h"
#include "StgMiscClosures.h"
21
#include "Interpreter.h"
22
#include "Exception.h"
23
#include "Printer.h"
24
#include "RtsSignals.h"
25
#include "Sanity.h"
26
#include "Stats.h"
27
#include "STM.h"
sof's avatar
sof committed
28
#include "Timer.h"
29
#include "Prelude.h"
30
#include "ThreadLabels.h"
31
32
#include "LdvProfile.h"
#include "Updates.h"
33
34
35
36
#ifdef PROFILING
#include "Proftimer.h"
#include "ProfHeap.h"
#endif
37
#if defined(GRAN) || defined(PARALLEL_HASKELL)
38
39
40
41
42
43
44
45
# include "GranSimRts.h"
# include "GranSim.h"
# include "ParallelRts.h"
# include "Parallel.h"
# include "ParallelDebug.h"
# include "FetchMe.h"
# include "HLC.h"
#endif
46
#include "Sparks.h"
sof's avatar
sof committed
47
#include "Capability.h"
48
49
#include "Task.h"
#include "AwaitEvent.h"
50
51
52
#if defined(mingw32_HOST_OS)
#include "win32/IOManager.h"
#endif
53

54
55
56
57
58
59
60
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif

61
62
#include <string.h>
#include <stdlib.h>
63
#include <stdarg.h>
64

65
66
67
68
#ifdef HAVE_ERRNO_H
#include <errno.h>
#endif

69
70
71
72
73
74
// Turn off inlining when debugging - it obfuscates things
#ifdef DEBUG
# undef  STATIC_INLINE
# define STATIC_INLINE static
#endif

75
#ifdef THREADED_RTS
76
77
#define USED_WHEN_THREADED_RTS
#define USED_WHEN_NON_THREADED_RTS STG_UNUSED
78
#else
79
80
#define USED_WHEN_THREADED_RTS     STG_UNUSED
#define USED_WHEN_NON_THREADED_RTS
81
82
#endif

83
84
#ifdef SMP
#define USED_WHEN_SMP
85
#else
86
#define USED_WHEN_SMP STG_UNUSED
87
88
#endif

89
90
91
/* -----------------------------------------------------------------------------
 * Global variables
 * -------------------------------------------------------------------------- */
92

93
94
95
#if defined(GRAN)

StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
96
/* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
97
98

/* 
sof's avatar
sof committed
99
   In GranSim we have a runnable and a blocked queue for each processor.
100
101
102
103
104
105
106
107
   In order to minimise code changes new arrays run_queue_hds/tls
   are created. run_queue_hd is then a short cut (macro) for
   run_queue_hds[CurrentProc] (see GranSim.h).
   -- HWL
*/
StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
StgTSO *ccalling_threadss[MAX_PROC];
108
109
110
111
/* We use the same global list of threads (all_threads) in GranSim as in
   the std RTS (i.e. we are cheating). However, we don't use this list in
   the GranSim specific code at the moment (so we are only potentially
   cheating).  */
112
113
114

#else /* !GRAN */

115
116
#if !defined(THREADED_RTS)
// Blocked/sleeping thrads
117
118
StgTSO *blocked_queue_hd = NULL;
StgTSO *blocked_queue_tl = NULL;
119
120
StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
#endif
121

122
123
124
125
/* Threads blocked on blackholes.
 * LOCK: sched_mutex+capability, or all capabilities
 */
StgTSO *blackhole_queue = NULL;
126
127
#endif

128
129
/* The blackhole_queue should be checked for threads to wake up.  See
 * Schedule.h for more thorough comment.
130
 * LOCK: none (doesn't matter if we miss an update)
131
132
133
 */
rtsBool blackholes_need_checking = rtsFalse;

134
135
/* Linked list of all threads.
 * Used for detecting garbage collected threads.
136
 * LOCK: sched_mutex+capability, or all capabilities
137
 */
138
StgTSO *all_threads = NULL;
139

140
141
/* flag set by signal handler to precipitate a context switch
 * LOCK: none (just an advisory flag)
142
 */
143
int context_switch = 0;
144

145
146
147
148
/* flag that tracks whether we have done any execution in this time slice.
 * LOCK: currently none, perhaps we should lock (but needs to be
 * updated in the fast path of the scheduler).
 */
149
150
nat recent_activity = ACTIVITY_YES;

151
152
153
/* if this flag is set as well, give up execution
 * LOCK: none (changes once, from false->true)
 */
154
rtsBool interrupted = rtsFalse;
155

156
/* Next thread ID to allocate.
157
 * LOCK: sched_mutex
158
 */
159
static StgThreadID next_thread_id = 1;
160
161
162
163
164

/* The smallest stack size that makes any sense is:
 *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
 *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
 *  + 1                       (the closure to enter)
165
166
 *  + 1			      (stg_ap_v_ret)
 *  + 1			      (spare slot req'd by stg_ap_v_ret)
167
168
169
170
 *
 * A thread with this stack will bomb immediately with a stack
 * overflow, which will increase its stack size.  
 */
171
#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
172

173
#if defined(GRAN)
174
StgTSO *CurrentTSO;
175
176
#endif

177
178
179
180
181
182
/*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
 *  exists - earlier gccs apparently didn't.
 *  -= chak
 */
StgTSO dummy_tso;

sof's avatar
sof committed
183
184
185
186
187
/*
 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
 * in an MT setting, needed to signal that a worker thread shouldn't hang around
 * in the scheduler when it is out of work.
 */
188
rtsBool shutting_down_scheduler = rtsFalse;
189

190
191
192
/*
 * This mutex protects most of the global scheduler data in
 * the THREADED_RTS and (inc. SMP) runtime.
sof's avatar
sof committed
193
 */
194
#if defined(THREADED_RTS)
195
Mutex sched_mutex;
196
#endif
sof's avatar
sof committed
197

198
#if defined(PARALLEL_HASKELL)
199
200
StgTSO *LastTSO;
rtsTime TimeOfLastYield;
201
rtsBool emitSchedule = rtsTrue;
202
203
#endif

204
205
206
207
/* -----------------------------------------------------------------------------
 * static function prototypes
 * -------------------------------------------------------------------------- */

208
static Capability *schedule (Capability *initialCapability, Task *task);
209
210
211
212
213
214

//
// These function all encapsulate parts of the scheduler loop, and are
// abstracted only to make the structure and control flow of the
// scheduler clearer.
//
215
static void schedulePreLoop (void);
216
#if defined(SMP)
217
static void schedulePushWork(Capability *cap, Task *task);
218
#endif
219
static void scheduleStartSignalHandlers (Capability *cap);
220
221
222
static void scheduleCheckBlockedThreads (Capability *cap);
static void scheduleCheckBlackHoles (Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
223
224
225
226
227
228
229
230
231
232
233
234
235
#if defined(GRAN)
static StgTSO *scheduleProcessEvent(rtsEvent *event);
#endif
#if defined(PARALLEL_HASKELL)
static StgTSO *scheduleSendPendingMessages(void);
static void scheduleActivateSpark(void);
static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
#endif
#if defined(PAR) || defined(GRAN)
static void scheduleGranParReport(void);
#endif
static void schedulePostRunThread(void);
static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
236
237
238
239
static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
					 StgTSO *t);
static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
				    nat prev_what_next );
240
static void scheduleHandleThreadBlocked( StgTSO *t );
241
242
static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
					     StgTSO *t );
243
static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
244
245
246
247
static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major);

static void unblockThread(Capability *cap, StgTSO *tso);
static rtsBool checkBlackHoles(Capability *cap);
248
249
static void AllRoots(evac_fn evac);

250
static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
251

252
static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
253
254
			rtsBool stop_at_atomically);

255
256
257
258
static void deleteThread (Capability *cap, StgTSO *tso);
static void deleteRunQueue (Capability *cap);

#ifdef DEBUG
259
260
static void printThreadBlockage(StgTSO *tso);
static void printThreadStatus(StgTSO *tso);
261
void printThreadQueue(StgTSO *tso);
262
#endif
263
264

#if defined(PARALLEL_HASKELL)
265
266
267
268
StgTSO * createSparkThread(rtsSpark spark);
StgTSO * activateSpark (rtsSpark spark);  
#endif

269
270
271
272
273
274
275
276
277
#ifdef DEBUG
static char *whatNext_strs[] = {
  "(unknown)",
  "ThreadRunGHC",
  "ThreadInterpret",
  "ThreadKilled",
  "ThreadRelocated",
  "ThreadComplete"
};
278
#endif
sof's avatar
sof committed
279

280
281
282
283
284
/* -----------------------------------------------------------------------------
 * Putting a thread on the run queue: different scheduling policies
 * -------------------------------------------------------------------------- */

STATIC_INLINE void
285
addToRunQueue( Capability *cap, StgTSO *t )
286
287
288
289
{
#if defined(PARALLEL_HASKELL)
    if (RtsFlags.ParFlags.doFairScheduling) { 
	// this does round-robin scheduling; good for concurrency
290
	appendToRunQueue(cap,t);
291
292
    } else {
	// this does unfair scheduling; good for parallelism
293
	pushOnRunQueue(cap,t);
294
295
296
    }
#else
    // this does round-robin scheduling; good for concurrency
297
    appendToRunQueue(cap,t);
298
299
#endif
}
300

301
/* ---------------------------------------------------------------------------
302
303
304
305
306
307
308
309
310
311
312
   Main scheduling loop.

   We use round-robin scheduling, each thread returning to the
   scheduler loop when one of these conditions is detected:

      * out of heap space
      * timer expires (thread yields)
      * thread blocks
      * thread ends
      * stack overflow

313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
   GRAN version:
     In a GranSim setup this loop iterates over the global event queue.
     This revolves around the global event queue, which determines what 
     to do next. Therefore, it's more complicated than either the 
     concurrent or the parallel (GUM) setup.

   GUM version:
     GUM iterates over incoming messages.
     It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
     and sends out a fish whenever it has nothing to do; in-between
     doing the actual reductions (shared code below) it processes the
     incoming messages and deals with delayed operations 
     (see PendingFetches).
     This is not the ugliest code you could imagine, but it's bloody close.

328
   ------------------------------------------------------------------------ */
329

330
331
static Capability *
schedule (Capability *initialCapability, Task *task)
332
333
{
  StgTSO *t;
334
  Capability *cap;
335
  StgThreadReturnCode ret;
336
337
#if defined(GRAN)
  rtsEvent *event;
338
#elif defined(PARALLEL_HASKELL)
339
340
  StgTSO *tso;
  GlobalTaskId pe;
341
342
343
344
  rtsBool receivedFinish = rtsFalse;
# if defined(DEBUG)
  nat tp_size, sp_size; // stats only
# endif
345
#endif
346
  nat prev_what_next;
347
  rtsBool ready_to_gc;
348
#if defined(THREADED_RTS)
349
  rtsBool first = rtsTrue;
350
#endif
351
  
352
353
  cap = initialCapability;

354
355
356
  // Pre-condition: this task owns initialCapability.
  // The sched_mutex is *NOT* held
  // NB. on return, we still hold a capability.
357

358
  IF_DEBUG(scheduler,
359
360
	   sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
		       task, initialCapability);
361
      );
362

363
  schedulePreLoop();
364

365
366
  // -----------------------------------------------------------
  // Scheduler loop starts here:
367

368
369
370
371
372
373
374
#if defined(PARALLEL_HASKELL)
#define TERMINATION_CONDITION        (!receivedFinish)
#elif defined(GRAN)
#define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
#else
#define TERMINATION_CONDITION        rtsTrue
#endif
375

376
  while (TERMINATION_CONDITION) {
377

378
379
380
381
382
#if defined(GRAN)
      /* Choose the processor with the next event */
      CurrentProc = event->proc;
      CurrentTSO = event->tso;
#endif
383

384
385
386
387
388
389
#if defined(THREADED_RTS)
      if (first) {
	  // don't yield the first time, we want a chance to run this
	  // thread for a bit, even if there are others banging at the
	  // door.
	  first = rtsFalse;
390
	  ASSERT_CAPABILITY_INVARIANTS(cap,task);
391
392
393
      } else {
	  // Yield the capability to higher-priority tasks if necessary.
	  yieldCapability(&cap, task);
394
395
      }
#endif
396
      
397
398
399
400
#ifdef SMP
      schedulePushWork(cap,task);
#endif		

401
402
403
    // Check whether we have re-entered the RTS from Haskell without
    // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
    // call).
404
    if (cap->in_haskell) {
405
406
    	  errorBelch("schedule: re-entered unsafely.\n"
    		     "   Perhaps a 'foreign import unsafe' should be 'safe'?");
407
    	  stg_exit(EXIT_FAILURE);
408
409
    }

410
411
412
413
414
415
416
    //
    // Test for interruption.  If interrupted==rtsTrue, then either
    // we received a keyboard interrupt (^C), or the scheduler is
    // trying to shut down all the tasks (shutting_down_scheduler) in
    // the threaded RTS.
    //
    if (interrupted) {
417
	deleteRunQueue(cap);
418
419
	if (shutting_down_scheduler) {
	    IF_DEBUG(scheduler, sched_belch("shutting down"));
420
421
422
	    // If we are a worker, just exit.  If we're a bound thread
	    // then we will exit below when we've removed our TSO from
	    // the run queue.
423
	    if (task->tso == NULL && emptyRunQueue(cap)) {
424
425
		return cap;
	    }
426
427
428
429
	} else {
	    IF_DEBUG(scheduler, sched_belch("interrupted"));
	}
    }
430
431

#if defined(not_yet) && defined(SMP)
432
    //
433
434
435
    // Top up the run queue from our spark pool.  We try to make the
    // number of threads in the run queue equal to the number of
    // free capabilities.
436
    //
437
438
    {
	StgClosure *spark;
439
	if (emptyRunQueue()) {
440
441
442
443
444
445
446
447
448
449
	    spark = findSpark(rtsFalse);
	    if (spark == NULL) {
		break; /* no more sparks in the pool */
	    } else {
		createSparkThread(spark);	  
		IF_DEBUG(scheduler,
			 sched_belch("==^^ turning spark of closure %p into a thread",
				     (StgClosure *)spark));
	    }
	}
450
    }
451
#endif // SMP
452

453
    scheduleStartSignalHandlers(cap);
454

455
456
457
458
    // Only check the black holes here if we've nothing else to do.
    // During normal execution, the black hole list only gets checked
    // at GC time, to avoid repeatedly traversing this possibly long
    // list each time around the scheduler.
459
    if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
460

461
    scheduleCheckBlockedThreads(cap);
462

463
    scheduleDetectDeadlock(cap,task);
464
465
466
467
468
469

    // Normally, the only way we can get here with no threads to
    // run is if a keyboard interrupt received during 
    // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
    // Additionally, it is not fatal for the
    // threaded RTS to reach here with no threads to run.
470
    //
471
472
    // win32: might be here due to awaitEvent() being abandoned
    // as a result of a console event having been delivered.
473
474
    if ( emptyRunQueue(cap) ) {
#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
475
	ASSERT(interrupted);
476
#endif
477
	continue; // nothing to do
478
    }
479

480
481
#if defined(PARALLEL_HASKELL)
    scheduleSendPendingMessages();
482
    if (emptyRunQueue(cap) && scheduleActivateSpark()) 
483
	continue;
484

485
486
487
#if defined(SPARKS)
    ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
#endif
488

489
490
    /* If we still have no work we need to send a FISH to get a spark
       from another PE */
491
    if (emptyRunQueue(cap)) {
492
493
494
495
496
497
498
499
500
501
502
503
	if (!scheduleGetRemoteWork(&receivedFinish)) continue;
	ASSERT(rtsFalse); // should not happen at the moment
    }
    // from here: non-empty run queue.
    //  TODO: merge above case with this, only one call processMessages() !
    if (PacketsWaiting()) {  /* process incoming messages, if
				any pending...  only in else
				because getRemoteWork waits for
				messages as well */
	receivedFinish = processMessages();
    }
#endif
504

505
506
507
#if defined(GRAN)
    scheduleProcessEvent(event);
#endif
508

509
510
511
    // 
    // Get a thread to run
    //
512
    t = popRunQueue(cap);
513

514
515
516
517
518
519
#if defined(GRAN) || defined(PAR)
    scheduleGranParReport(); // some kind of debuging output
#else
    // Sanity check the thread we're about to run.  This can be
    // expensive if there is lots of thread switching going on...
    IF_DEBUG(sanity,checkTSO(t));
520
521
#endif

522
#if defined(THREADED_RTS)
523
524
525
    // Check whether we can run this thread in the current task.
    // If not, we have to pass our capability to the right task.
    {
526
	Task *bound = t->bound;
527
      
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
	if (bound) {
	    if (bound == task) {
		IF_DEBUG(scheduler,
			 sched_belch("### Running thread %d in bound thread",
				     t->id));
		// yes, the Haskell thread is bound to the current native thread
	    } else {
		IF_DEBUG(scheduler,
			 sched_belch("### thread %d bound to another OS thread",
				     t->id));
		// no, bound to a different Haskell thread: pass to that thread
		pushOnRunQueue(cap,t);
		continue;
	    }
	} else {
	    // The thread we want to run is unbound.
	    if (task->tso) { 
		IF_DEBUG(scheduler,
			 sched_belch("### this OS thread cannot run thread %d", t->id));
		// no, the current native thread is bound to a different
		// Haskell thread, so pass it to any worker thread
		pushOnRunQueue(cap,t);
		continue; 
	    }
552
	}
553
554
555
    }
#endif

556
557
    cap->r.rCurrentTSO = t;
    
558
    /* context switches are initiated by the timer signal, unless
559
560
     * the user specified "context switch as often as possible", with
     * +RTS -C0
sof's avatar
sof committed
561
     */
562
563
    if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
	&& !emptyThreadQueues(cap)) {
564
	context_switch = 1;
565
566
    }
	 
567
run_thread:
568

569
570
    IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
			      (long)t->id, whatNext_strs[t->what_next]));
571

572
573
574
575
#if defined(PROFILING)
    startHeapProfTimer();
#endif

576
577
578
    // ----------------------------------------------------------------------
    // Run the current thread 

579
580
581
    prev_what_next = t->what_next;

    errno = t->saved_errno;
582
    cap->in_haskell = rtsTrue;
583

584
585
    recent_activity = ACTIVITY_YES;

586
    switch (prev_what_next) {
587
	
588
589
590
591
592
    case ThreadKilled:
    case ThreadComplete:
	/* Thread already finished, return to scheduler. */
	ret = ThreadFinished;
	break;
593
	
594
    case ThreadRunGHC:
595
596
597
598
599
    {
	StgRegTable *r;
	r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
	cap = regTableToCapability(r);
	ret = r->rRet;
600
	break;
601
602
    }
    
603
    case ThreadInterpret:
604
605
	cap = interpretBCO(cap);
	ret = cap->r.rRet;
606
	break;
607
	
608
    default:
609
	barf("schedule: invalid what_next field");
610
611
    }

612
    cap->in_haskell = rtsFalse;
613

614
615
616
617
    // The TSO might have moved, eg. if it re-entered the RTS and a GC
    // happened.  So find the new location:
    t = cap->r.rCurrentTSO;

618
619
620
621
622
623
624
#ifdef SMP
    // If ret is ThreadBlocked, and this Task is bound to the TSO that
    // blocked, we are in limbo - the TSO is now owned by whatever it
    // is blocked on, and may in fact already have been woken up,
    // perhaps even on a different Capability.  It may be the case
    // that task->cap != cap.  We better yield this Capability
    // immediately and return to normaility.
625
626
627
628
629
630
    if (ret == ThreadBlocked) {
	IF_DEBUG(scheduler,
		 debugBelch("--<< thread %d (%s) stopped: blocked\n",
			    t->id, whatNext_strs[t->what_next]));
	continue;
    }
631
632
#endif

633
    ASSERT_CAPABILITY_INVARIANTS(cap,task);
634

635
636
637
    // And save the current errno in this thread.
    t->saved_errno = errno;

638
    // ----------------------------------------------------------------------
639
    
640
    // Costs for the scheduler are assigned to CCS_SYSTEM
641
642
643
644
645
#if defined(PROFILING)
    stopHeapProfTimer();
    CCCS = CCS_SYSTEM;
#endif
    
646
647
    // We have run some Haskell code: there might be blackhole-blocked
    // threads to wake up now.
648
    // Lock-free test here should be ok, we're just setting a flag.
649
650
651
    if ( blackhole_queue != END_TSO_QUEUE ) {
	blackholes_need_checking = rtsTrue;
    }
652
    
653
654
#if defined(THREADED_RTS)
    IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
655
656
657
658
659
660
#elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
    IF_DEBUG(scheduler,debugBelch("sched: "););
#endif
    
    schedulePostRunThread();

661
662
    ready_to_gc = rtsFalse;

663
664
665
666
667
668
    switch (ret) {
    case HeapOverflow:
	ready_to_gc = scheduleHandleHeapOverflow(cap,t);
	break;

    case StackOverflow:
669
	scheduleHandleStackOverflow(cap,task,t);
670
671
672
	break;

    case ThreadYielding:
673
	if (scheduleHandleYield(cap, t, prev_what_next)) {
674
675
676
677
678
679
680
681
682
683
            // shortcut for switching between compiler/interpreter:
	    goto run_thread; 
	}
	break;

    case ThreadBlocked:
	scheduleHandleThreadBlocked(t);
	break;

    case ThreadFinished:
684
	if (scheduleHandleThreadFinished(cap, task, t)) return cap;
685
	ASSERT_CAPABILITY_INVARIANTS(cap,task);
686
687
688
689
690
691
	break;

    default:
      barf("schedule: invalid thread return code %d", (int)ret);
    }

692
    if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
693
    if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); }
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
  } /* end of while() */

  IF_PAR_DEBUG(verbose,
	       debugBelch("== Leaving schedule() after having received Finish\n"));
}

/* ----------------------------------------------------------------------------
 * Setting up the scheduler loop
 * ------------------------------------------------------------------------- */

static void
schedulePreLoop(void)
{
#if defined(GRAN) 
    /* set up first event to get things going */
    /* ToDo: assign costs for system setup and init MainTSO ! */
    new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
	      ContinueThread, 
	      CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
    
    IF_DEBUG(gran,
	     debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
			CurrentTSO);
	     G_TSO(CurrentTSO, 5));
    
    if (RtsFlags.GranFlags.Light) {
	/* Save current time; GranSim Light only */
	CurrentTSO->gran.clock = CurrentTime[CurrentProc];
    }      
#endif
}

726
727
728
729
730
731
/* -----------------------------------------------------------------------------
 * schedulePushWork()
 *
 * Push work to other Capabilities if we have some.
 * -------------------------------------------------------------------------- */

732
#ifdef SMP
733
static void
734
735
schedulePushWork(Capability *cap USED_WHEN_SMP, 
		 Task *task      USED_WHEN_SMP)
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
{
    Capability *free_caps[n_capabilities], *cap0;
    nat i, n_free_caps;

    // Check whether we have more threads on our run queue that we
    // could hand to another Capability.
    if (emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) {
	return;
    }

    // First grab as many free Capabilities as we can.
    for (i=0, n_free_caps=0; i < n_capabilities; i++) {
	cap0 = &capabilities[i];
	if (cap != cap0 && tryGrabCapability(cap0,task)) {
	    if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
		// it already has some work, we just grabbed it at 
		// the wrong moment.  Or maybe it's deadlocked!
		releaseCapability(cap0);
	    } else {
		free_caps[n_free_caps++] = cap0;
	    }
	}
    }

    // we now have n_free_caps free capabilities stashed in
    // free_caps[].  Share our run queue equally with them.  This is
    // probably the simplest thing we could do; improvements we might
    // want to do include:
    //
    //   - giving high priority to moving relatively new threads, on 
    //     the gournds that they haven't had time to build up a
    //     working set in the cache on this CPU/Capability.
    //
    //   - giving low priority to moving long-lived threads

    if (n_free_caps > 0) {
	StgTSO *prev, *t, *next;
	IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));

	prev = cap->run_queue_hd;
	t = prev->link;
	prev->link = END_TSO_QUEUE;
	i = 0;
	for (; t != END_TSO_QUEUE; t = next) {
	    next = t->link;
	    t->link = END_TSO_QUEUE;
782
783
	    if (t->what_next == ThreadRelocated
		|| t->bound == task) { // don't move my bound thread
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
		prev->link = t;
		prev = t;
	    } else if (i == n_free_caps) {
		i = 0;
		// keep one for us
		prev->link = t;
		prev = t;
	    } else {
		appendToRunQueue(free_caps[i],t);
		if (t->bound) { t->bound->cap = free_caps[i]; }
		i++;
	    }
	}
	cap->run_queue_tl = prev;

	// release the capabilities
	for (i = 0; i < n_free_caps; i++) {
	    task->cap = free_caps[i];
	    releaseCapability(free_caps[i]);
	}
    }
    task->cap = cap; // reset to point to our Capability.
}
807
#endif
808

809
810
811
812
813
/* ----------------------------------------------------------------------------
 * Start any pending signal handlers
 * ------------------------------------------------------------------------- */

static void
814
scheduleStartSignalHandlers(Capability *cap)
815
{
816
#if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
817
    if (signals_pending()) { // safe outside the lock
818
	startSignalHandlers(cap);
819
820
821
822
823
824
825
826
827
    }
#endif
}

/* ----------------------------------------------------------------------------
 * Check for blocked threads that can be woken up.
 * ------------------------------------------------------------------------- */

static void
828
scheduleCheckBlockedThreads(Capability *cap USED_WHEN_NON_THREADED_RTS)
829
{
830
#if !defined(THREADED_RTS)
831
832
833
834
835
    //
    // Check whether any waiting threads need to be woken up.  If the
    // run queue is empty, and there are no other tasks running, we
    // can wait indefinitely for something to happen.
    //
836
    if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
837
    {
838
	awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
839
    }
840
#endif
841
842
843
844
845
846
847
}


/* ----------------------------------------------------------------------------
 * Check for threads blocked on BLACKHOLEs that can be woken up
 * ------------------------------------------------------------------------- */
static void
848
scheduleCheckBlackHoles (Capability *cap)
849
{
850
    if ( blackholes_need_checking ) // check without the lock first
851
    {
852
853
854
855
856
857
	ACQUIRE_LOCK(&sched_mutex);
	if ( blackholes_need_checking ) {
	    checkBlackHoles(cap);
	    blackholes_need_checking = rtsFalse;
	}
	RELEASE_LOCK(&sched_mutex);
858
859
860
861
862
863
864
865
    }
}

/* ----------------------------------------------------------------------------
 * Detect deadlock conditions and attempt to resolve them.
 * ------------------------------------------------------------------------- */

static void
866
scheduleDetectDeadlock (Capability *cap, Task *task)
867
{
868
869
870
871
872
873

#if defined(PARALLEL_HASKELL)
    // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
    return;
#endif

874
875
    /* 
     * Detect deadlock: when we have no threads to run, there are no
876
877
878
     * threads blocked, waiting for I/O, or sleeping, and all the
     * other tasks are waiting for work, we must have a deadlock of
     * some description.
879
     */
880
    if ( emptyThreadQueues(cap) )
881
    {
882
#if defined(THREADED_RTS)
883
884
885
886
887
888
889
890
891
	/* 
	 * In the threaded RTS, we only check for deadlock if there
	 * has been no activity in a complete timeslice.  This means
	 * we won't eagerly start a full GC just because we don't have
	 * any threads to run currently.
	 */
	if (recent_activity != ACTIVITY_INACTIVE) return;
#endif

892
893
894
895
896
897
898
	IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));

	// Garbage collection can release some new threads due to
	// either (a) finalizers or (b) threads resurrected because
	// they are unreachable and will therefore be sent an
	// exception.  Any threads thus released will be immediately
	// runnable.
899
	scheduleDoGC( cap, task, rtsTrue/*force  major GC*/ );
900
	recent_activity = ACTIVITY_DONE_GC;
901
902
	
	if ( !emptyRunQueue(cap) ) return;
903

904
#if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
905
906
907
908
909
910
911
912
913
914
915
	/* If we have user-installed signal handlers, then wait
	 * for signals to arrive rather then bombing out with a
	 * deadlock.
	 */
	if ( anyUserHandlers() ) {
	    IF_DEBUG(scheduler, 
		     sched_belch("still deadlocked, waiting for signals..."));

	    awaitUserSignals();

	    if (signals_pending()) {
916
		startSignalHandlers(cap);
917
918
919
	    }

	    // either we have threads to run, or we were interrupted:
920
	    ASSERT(!emptyRunQueue(cap) || interrupted);
921
922
923
	}
#endif

924
#if !defined(THREADED_RTS)
925
	/* Probably a real deadlock.  Send the current main thread the
926
	 * Deadlock exception.
927
	 */
928
929
	if (task->tso) {
	    switch (task->tso->why_blocked) {
930
	    case BlockedOnSTM:
931
932
933
	    case BlockedOnBlackHole:
	    case BlockedOnException:
	    case BlockedOnMVar:
934
		raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
935
936
937
938
939
		return;
	    default:
		barf("deadlock: main thread blocked in a strange way");
	    }
	}
940
	return;
941
#endif
942
    }
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
}

/* ----------------------------------------------------------------------------
 * Process an event (GRAN only)
 * ------------------------------------------------------------------------- */

#if defined(GRAN)
static StgTSO *
scheduleProcessEvent(rtsEvent *event)
{
    StgTSO *t;

    if (RtsFlags.GranFlags.Light)
      GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc

    /* adjust time based on time-stamp */
    if (event->time > CurrentTime[CurrentProc] &&
        event->evttype != ContinueThread)
      CurrentTime[CurrentProc] = event->time;
    
    /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
    if (!RtsFlags.GranFlags.Light)
      handleIdlePEs();

    IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));

    /* main event dispatcher in GranSim */
    switch (event->evttype) {
      /* Should just be continuing execution */
    case ContinueThread:
      IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
      /* ToDo: check assertion
      ASSERT(run_queue_hd != (StgTSO*)NULL &&
	     run_queue_hd != END_TSO_QUEUE);
      */
      /* Ignore ContinueThreads for fetching threads (if synchr comm) */
979
980
      if (!RtsFlags.GranFlags.DoAsyncFetch &&
	  procStatus[CurrentProc]==Fetching) {
981
	debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
982
983
984
985
986
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }	
      /* Ignore ContinueThreads for completed threads */
      if (CurrentTSO->what_next == ThreadComplete) {
987
	debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
988
989
990
991
992
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }	
      /* Ignore ContinueThreads for threads that are being migrated */
      if (PROCS(CurrentTSO)==Nowhere) { 
993
	debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
994
995
996
997
998
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }
      /* The thread should be at the beginning of the run queue */
      if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
999
	debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1000
	      CurrentTSO->id, CurrentTSO, CurrentProc);