Schedule.c 97.8 KB
Newer Older
1
/* ---------------------------------------------------------------------------
2
 *
3
 * (c) The GHC Team, 1998-2006
4
 *
5
 * The scheduler and thread-related functionality
sof's avatar
sof committed
6
 *
7
8
 * --------------------------------------------------------------------------*/

9
#include "PosixSource.h"
10
#define KEEP_LOCKCLOSURE
11
12
13
14
#include "Rts.h"
#include "SchedAPI.h"
#include "RtsUtils.h"
#include "RtsFlags.h"
15
#include "OSThreads.h"
16
17
18
19
20
#include "Storage.h"
#include "StgRun.h"
#include "Hooks.h"
#include "Schedule.h"
#include "StgMiscClosures.h"
21
#include "Interpreter.h"
22
#include "Printer.h"
23
#include "RtsSignals.h"
24
#include "Sanity.h"
25
#include "Stats.h"
26
#include "STM.h"
sof's avatar
sof committed
27
#include "Timer.h"
28
#include "Prelude.h"
29
#include "ThreadLabels.h"
30
31
#include "LdvProfile.h"
#include "Updates.h"
32
33
#include "Proftimer.h"
#include "ProfHeap.h"
34
#if defined(GRAN) || defined(PARALLEL_HASKELL)
35
36
37
38
39
40
41
42
# include "GranSimRts.h"
# include "GranSim.h"
# include "ParallelRts.h"
# include "Parallel.h"
# include "ParallelDebug.h"
# include "FetchMe.h"
# include "HLC.h"
#endif
43
#include "Sparks.h"
sof's avatar
sof committed
44
#include "Capability.h"
45
46
#include "Task.h"
#include "AwaitEvent.h"
47
48
49
#if defined(mingw32_HOST_OS)
#include "win32/IOManager.h"
#endif
Simon Marlow's avatar
Simon Marlow committed
50
#include "Trace.h"
51
52
#include "RaiseAsync.h"
#include "Threads.h"
53
#include "ThrIOManager.h"
54

55
56
57
58
59
60
61
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif

62
63
#include <string.h>
#include <stdlib.h>
64
#include <stdarg.h>
65

66
67
68
69
#ifdef HAVE_ERRNO_H
#include <errno.h>
#endif

70
71
72
73
74
75
// Turn off inlining when debugging - it obfuscates things
#ifdef DEBUG
# undef  STATIC_INLINE
# define STATIC_INLINE static
#endif

76
77
78
/* -----------------------------------------------------------------------------
 * Global variables
 * -------------------------------------------------------------------------- */
79

80
81
82
#if defined(GRAN)

StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
83
/* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
84
85

/* 
sof's avatar
sof committed
86
   In GranSim we have a runnable and a blocked queue for each processor.
87
88
89
90
91
92
93
94
   In order to minimise code changes new arrays run_queue_hds/tls
   are created. run_queue_hd is then a short cut (macro) for
   run_queue_hds[CurrentProc] (see GranSim.h).
   -- HWL
*/
StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
StgTSO *ccalling_threadss[MAX_PROC];
95
96
97
98
/* We use the same global list of threads (all_threads) in GranSim as in
   the std RTS (i.e. we are cheating). However, we don't use this list in
   the GranSim specific code at the moment (so we are only potentially
   cheating).  */
99
100
101

#else /* !GRAN */

102
103
#if !defined(THREADED_RTS)
// Blocked/sleeping thrads
104
105
StgTSO *blocked_queue_hd = NULL;
StgTSO *blocked_queue_tl = NULL;
106
107
StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
#endif
108

109
110
111
112
/* Threads blocked on blackholes.
 * LOCK: sched_mutex+capability, or all capabilities
 */
StgTSO *blackhole_queue = NULL;
113
114
#endif

115
116
/* The blackhole_queue should be checked for threads to wake up.  See
 * Schedule.h for more thorough comment.
117
 * LOCK: none (doesn't matter if we miss an update)
118
119
120
 */
rtsBool blackholes_need_checking = rtsFalse;

121
122
/* flag set by signal handler to precipitate a context switch
 * LOCK: none (just an advisory flag)
123
 */
124
int context_switch = 0;
125

126
127
128
129
/* flag that tracks whether we have done any execution in this time slice.
 * LOCK: currently none, perhaps we should lock (but needs to be
 * updated in the fast path of the scheduler).
 */
130
131
nat recent_activity = ACTIVITY_YES;

132
133
134
/* if this flag is set as well, give up execution
 * LOCK: none (changes once, from false->true)
 */
135
rtsBool sched_state = SCHED_RUNNING;
136

137
#if defined(GRAN)
138
StgTSO *CurrentTSO;
139
140
#endif

141
142
143
144
145
146
/*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
 *  exists - earlier gccs apparently didn't.
 *  -= chak
 */
StgTSO dummy_tso;

sof's avatar
sof committed
147
148
149
150
151
/*
 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
 * in an MT setting, needed to signal that a worker thread shouldn't hang around
 * in the scheduler when it is out of work.
 */
152
rtsBool shutting_down_scheduler = rtsFalse;
153

154
155
/*
 * This mutex protects most of the global scheduler data in
156
 * the THREADED_RTS runtime.
sof's avatar
sof committed
157
 */
158
#if defined(THREADED_RTS)
159
Mutex sched_mutex;
160
#endif
sof's avatar
sof committed
161

162
#if defined(PARALLEL_HASKELL)
163
164
StgTSO *LastTSO;
rtsTime TimeOfLastYield;
165
rtsBool emitSchedule = rtsTrue;
166
167
#endif

168
169
170
171
#if !defined(mingw32_HOST_OS)
#define FORKPROCESS_PRIMOP_SUPPORTED
#endif

172
173
174
175
/* -----------------------------------------------------------------------------
 * static function prototypes
 * -------------------------------------------------------------------------- */

176
static Capability *schedule (Capability *initialCapability, Task *task);
177
178
179
180
181
182

//
// These function all encapsulate parts of the scheduler loop, and are
// abstracted only to make the structure and control flow of the
// scheduler clearer.
//
183
static void schedulePreLoop (void);
184
#if defined(THREADED_RTS)
185
static void schedulePushWork(Capability *cap, Task *task);
186
#endif
187
static void scheduleStartSignalHandlers (Capability *cap);
188
static void scheduleCheckBlockedThreads (Capability *cap);
189
static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
190
191
static void scheduleCheckBlackHoles (Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
192
193
194
195
196
197
198
199
200
201
202
#if defined(GRAN)
static StgTSO *scheduleProcessEvent(rtsEvent *event);
#endif
#if defined(PARALLEL_HASKELL)
static StgTSO *scheduleSendPendingMessages(void);
static void scheduleActivateSpark(void);
static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
#endif
#if defined(PAR) || defined(GRAN)
static void scheduleGranParReport(void);
#endif
203
static void schedulePostRunThread(StgTSO *t);
204
static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
205
206
207
208
static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
					 StgTSO *t);
static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
				    nat prev_what_next );
209
static void scheduleHandleThreadBlocked( StgTSO *t );
210
211
static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
					     StgTSO *t );
212
static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
213
static Capability *scheduleDoGC(Capability *cap, Task *task,
214
				rtsBool force_major);
215
216

static rtsBool checkBlackHoles(Capability *cap);
217

218
static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
219
static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
220

221
static void deleteThread (Capability *cap, StgTSO *tso);
222
static void deleteAllThreads (Capability *cap);
223

224
225
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
static void deleteThread_(Capability *cap, StgTSO *tso);
226
#endif
227
228

#if defined(PARALLEL_HASKELL)
229
230
231
232
StgTSO * createSparkThread(rtsSpark spark);
StgTSO * activateSpark (rtsSpark spark);  
#endif

233
234
235
236
237
238
239
240
241
#ifdef DEBUG
static char *whatNext_strs[] = {
  "(unknown)",
  "ThreadRunGHC",
  "ThreadInterpret",
  "ThreadKilled",
  "ThreadRelocated",
  "ThreadComplete"
};
242
#endif
sof's avatar
sof committed
243

244
245
246
247
248
/* -----------------------------------------------------------------------------
 * Putting a thread on the run queue: different scheduling policies
 * -------------------------------------------------------------------------- */

STATIC_INLINE void
249
addToRunQueue( Capability *cap, StgTSO *t )
250
251
252
253
{
#if defined(PARALLEL_HASKELL)
    if (RtsFlags.ParFlags.doFairScheduling) { 
	// this does round-robin scheduling; good for concurrency
254
	appendToRunQueue(cap,t);
255
256
    } else {
	// this does unfair scheduling; good for parallelism
257
	pushOnRunQueue(cap,t);
258
259
260
    }
#else
    // this does round-robin scheduling; good for concurrency
261
    appendToRunQueue(cap,t);
262
263
#endif
}
264

265
/* ---------------------------------------------------------------------------
266
267
268
269
270
271
272
273
274
275
276
   Main scheduling loop.

   We use round-robin scheduling, each thread returning to the
   scheduler loop when one of these conditions is detected:

      * out of heap space
      * timer expires (thread yields)
      * thread blocks
      * thread ends
      * stack overflow

277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
   GRAN version:
     In a GranSim setup this loop iterates over the global event queue.
     This revolves around the global event queue, which determines what 
     to do next. Therefore, it's more complicated than either the 
     concurrent or the parallel (GUM) setup.

   GUM version:
     GUM iterates over incoming messages.
     It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
     and sends out a fish whenever it has nothing to do; in-between
     doing the actual reductions (shared code below) it processes the
     incoming messages and deals with delayed operations 
     (see PendingFetches).
     This is not the ugliest code you could imagine, but it's bloody close.

292
   ------------------------------------------------------------------------ */
293

294
295
static Capability *
schedule (Capability *initialCapability, Task *task)
296
297
{
  StgTSO *t;
298
  Capability *cap;
299
  StgThreadReturnCode ret;
300
301
#if defined(GRAN)
  rtsEvent *event;
302
#elif defined(PARALLEL_HASKELL)
303
304
  StgTSO *tso;
  GlobalTaskId pe;
305
306
307
308
  rtsBool receivedFinish = rtsFalse;
# if defined(DEBUG)
  nat tp_size, sp_size; // stats only
# endif
309
#endif
310
  nat prev_what_next;
311
  rtsBool ready_to_gc;
312
#if defined(THREADED_RTS)
313
  rtsBool first = rtsTrue;
314
#endif
315
  
316
317
  cap = initialCapability;

318
319
320
  // Pre-condition: this task owns initialCapability.
  // The sched_mutex is *NOT* held
  // NB. on return, we still hold a capability.
321

Simon Marlow's avatar
Simon Marlow committed
322
323
324
  debugTrace (DEBUG_sched, 
	      "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
	      task, initialCapability);
325

326
  schedulePreLoop();
327

328
329
  // -----------------------------------------------------------
  // Scheduler loop starts here:
330

331
332
333
334
335
336
337
#if defined(PARALLEL_HASKELL)
#define TERMINATION_CONDITION        (!receivedFinish)
#elif defined(GRAN)
#define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
#else
#define TERMINATION_CONDITION        rtsTrue
#endif
338

339
  while (TERMINATION_CONDITION) {
340

341
342
343
344
345
#if defined(GRAN)
      /* Choose the processor with the next event */
      CurrentProc = event->proc;
      CurrentTSO = event->tso;
#endif
346

347
348
349
350
351
352
#if defined(THREADED_RTS)
      if (first) {
	  // don't yield the first time, we want a chance to run this
	  // thread for a bit, even if there are others banging at the
	  // door.
	  first = rtsFalse;
353
	  ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
354
355
356
      } else {
	  // Yield the capability to higher-priority tasks if necessary.
	  yieldCapability(&cap, task);
357
358
      }
#endif
359
      
360
#if defined(THREADED_RTS)
361
      schedulePushWork(cap,task);
362
#endif
363

364
365
366
    // Check whether we have re-entered the RTS from Haskell without
    // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
    // call).
367
    if (cap->in_haskell) {
368
369
    	  errorBelch("schedule: re-entered unsafely.\n"
    		     "   Perhaps a 'foreign import unsafe' should be 'safe'?");
370
    	  stg_exit(EXIT_FAILURE);
371
372
    }

373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
    // The interruption / shutdown sequence.
    // 
    // In order to cleanly shut down the runtime, we want to:
    //   * make sure that all main threads return to their callers
    //     with the state 'Interrupted'.
    //   * clean up all OS threads assocated with the runtime
    //   * free all memory etc.
    //
    // So the sequence for ^C goes like this:
    //
    //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
    //     arranges for some Capability to wake up
    //
    //   * all threads in the system are halted, and the zombies are
    //     placed on the run queue for cleaning up.  We acquire all
    //     the capabilities in order to delete the threads, this is
    //     done by scheduleDoGC() for convenience (because GC already
    //     needs to acquire all the capabilities).  We can't kill
    //     threads involved in foreign calls.
    // 
    //   * somebody calls shutdownHaskell(), which calls exitScheduler()
    //
    //   * sched_state := SCHED_SHUTTING_DOWN
396
    //
397
398
399
    //   * all workers exit when the run queue on their capability
    //     drains.  All main threads will also exit when their TSO
    //     reaches the head of the run queue and they can return.
400
    //
401
402
403
404
405
406
407
408
409
410
    //   * eventually all Capabilities will shut down, and the RTS can
    //     exit.
    //
    //   * We might be left with threads blocked in foreign calls, 
    //     we should really attempt to kill these somehow (TODO);
    
    switch (sched_state) {
    case SCHED_RUNNING:
	break;
    case SCHED_INTERRUPTING:
Simon Marlow's avatar
Simon Marlow committed
411
	debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
412
#if defined(THREADED_RTS)
413
414
	discardSparksCap(cap);
#endif
415
	/* scheduleDoGC() deletes all the threads */
416
	cap = scheduleDoGC(cap,task,rtsFalse);
417
418
	break;
    case SCHED_SHUTTING_DOWN:
Simon Marlow's avatar
Simon Marlow committed
419
	debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
420
421
422
423
424
	// If we are a worker, just exit.  If we're a bound thread
	// then we will exit below when we've removed our TSO from
	// the run queue.
	if (task->tso == NULL && emptyRunQueue(cap)) {
	    return cap;
425
	}
426
427
428
	break;
    default:
	barf("sched_state: %d", sched_state);
429
    }
430

431
#if defined(THREADED_RTS)
432
    // If the run queue is empty, take a spark and turn it into a thread.
433
    {
434
435
436
437
	if (emptyRunQueue(cap)) {
	    StgClosure *spark;
	    spark = findSpark(cap);
	    if (spark != NULL) {
Simon Marlow's avatar
Simon Marlow committed
438
439
440
		debugTrace(DEBUG_sched,
			   "turning spark of closure %p into a thread",
			   (StgClosure *)spark);
441
		createSparkThread(cap,spark);	  
442
443
	    }
	}
444
    }
445
#endif // THREADED_RTS
446

447
    scheduleStartSignalHandlers(cap);
448

449
450
451
452
    // Only check the black holes here if we've nothing else to do.
    // During normal execution, the black hole list only gets checked
    // at GC time, to avoid repeatedly traversing this possibly long
    // list each time around the scheduler.
453
    if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
454

455
    scheduleCheckWakeupThreads(cap);
456

457
    scheduleCheckBlockedThreads(cap);
458

459
    scheduleDetectDeadlock(cap,task);
460
461
462
#if defined(THREADED_RTS)
    cap = task->cap;    // reload cap, it might have changed
#endif
463
464
465
466
467
468

    // Normally, the only way we can get here with no threads to
    // run is if a keyboard interrupt received during 
    // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
    // Additionally, it is not fatal for the
    // threaded RTS to reach here with no threads to run.
469
    //
470
471
    // win32: might be here due to awaitEvent() being abandoned
    // as a result of a console event having been delivered.
472
473
    if ( emptyRunQueue(cap) ) {
#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
474
	ASSERT(sched_state >= SCHED_INTERRUPTING);
475
#endif
476
	continue; // nothing to do
477
    }
478

479
480
#if defined(PARALLEL_HASKELL)
    scheduleSendPendingMessages();
481
    if (emptyRunQueue(cap) && scheduleActivateSpark()) 
482
	continue;
483

484
485
486
#if defined(SPARKS)
    ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
#endif
487

488
489
    /* If we still have no work we need to send a FISH to get a spark
       from another PE */
490
    if (emptyRunQueue(cap)) {
491
492
493
494
495
496
497
498
499
500
501
502
	if (!scheduleGetRemoteWork(&receivedFinish)) continue;
	ASSERT(rtsFalse); // should not happen at the moment
    }
    // from here: non-empty run queue.
    //  TODO: merge above case with this, only one call processMessages() !
    if (PacketsWaiting()) {  /* process incoming messages, if
				any pending...  only in else
				because getRemoteWork waits for
				messages as well */
	receivedFinish = processMessages();
    }
#endif
503

504
505
506
#if defined(GRAN)
    scheduleProcessEvent(event);
#endif
507

508
509
510
    // 
    // Get a thread to run
    //
511
    t = popRunQueue(cap);
512

513
514
515
516
517
518
#if defined(GRAN) || defined(PAR)
    scheduleGranParReport(); // some kind of debuging output
#else
    // Sanity check the thread we're about to run.  This can be
    // expensive if there is lots of thread switching going on...
    IF_DEBUG(sanity,checkTSO(t));
519
520
#endif

521
#if defined(THREADED_RTS)
522
523
524
    // Check whether we can run this thread in the current task.
    // If not, we have to pass our capability to the right task.
    {
525
	Task *bound = t->bound;
526
      
527
528
	if (bound) {
	    if (bound == task) {
Simon Marlow's avatar
Simon Marlow committed
529
		debugTrace(DEBUG_sched,
530
			   "### Running thread %lu in bound thread", (unsigned long)t->id);
531
532
		// yes, the Haskell thread is bound to the current native thread
	    } else {
Simon Marlow's avatar
Simon Marlow committed
533
		debugTrace(DEBUG_sched,
534
			   "### thread %lu bound to another OS thread", (unsigned long)t->id);
535
536
537
538
539
540
541
		// no, bound to a different Haskell thread: pass to that thread
		pushOnRunQueue(cap,t);
		continue;
	    }
	} else {
	    // The thread we want to run is unbound.
	    if (task->tso) { 
Simon Marlow's avatar
Simon Marlow committed
542
		debugTrace(DEBUG_sched,
543
			   "### this OS thread cannot run thread %lu", (unsigned long)t->id);
544
545
546
547
548
		// no, the current native thread is bound to a different
		// Haskell thread, so pass it to any worker thread
		pushOnRunQueue(cap,t);
		continue; 
	    }
549
	}
550
551
552
    }
#endif

553
    /* context switches are initiated by the timer signal, unless
554
555
     * the user specified "context switch as often as possible", with
     * +RTS -C0
sof's avatar
sof committed
556
     */
557
558
    if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
	&& !emptyThreadQueues(cap)) {
559
	context_switch = 1;
560
561
    }
	 
562
run_thread:
563

Simon Marlow's avatar
Simon Marlow committed
564
565
566
567
568
    // CurrentTSO is the thread to run.  t might be different if we
    // loop back to run_thread, so make sure to set CurrentTSO after
    // that.
    cap->r.rCurrentTSO = t;

Simon Marlow's avatar
Simon Marlow committed
569
570
    debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
			      (long)t->id, whatNext_strs[t->what_next]);
571

572
573
    startHeapProfTimer();

574
575
576
    // Check for exceptions blocked on this thread
    maybePerformBlockedException (cap, t);

577
578
579
    // ----------------------------------------------------------------------
    // Run the current thread 

Simon Marlow's avatar
Simon Marlow committed
580
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
581
    ASSERT(t->cap == cap);
Simon Marlow's avatar
Simon Marlow committed
582

583
584
585
    prev_what_next = t->what_next;

    errno = t->saved_errno;
586
587
588
589
#if mingw32_HOST_OS
    SetLastError(t->saved_winerror);
#endif

590
    cap->in_haskell = rtsTrue;
591

592
    dirty_TSO(cap,t);
Simon Marlow's avatar
Simon Marlow committed
593

594
595
596
597
598
#if defined(THREADED_RTS)
    if (recent_activity == ACTIVITY_DONE_GC) {
        // ACTIVITY_DONE_GC means we turned off the timer signal to
        // conserve power (see #1623).  Re-enable it here.
        nat prev;
599
        prev = xchg((P_)&recent_activity, ACTIVITY_YES);
600
601
602
603
604
605
606
        if (prev == ACTIVITY_DONE_GC) {
            startTimer();
        }
    } else {
        recent_activity = ACTIVITY_YES;
    }
#endif
607

608
    switch (prev_what_next) {
609
	
610
611
612
613
614
    case ThreadKilled:
    case ThreadComplete:
	/* Thread already finished, return to scheduler. */
	ret = ThreadFinished;
	break;
615
	
616
    case ThreadRunGHC:
617
618
619
620
621
    {
	StgRegTable *r;
	r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
	cap = regTableToCapability(r);
	ret = r->rRet;
622
	break;
623
624
    }
    
625
    case ThreadInterpret:
626
627
	cap = interpretBCO(cap);
	ret = cap->r.rRet;
628
	break;
629
	
630
    default:
631
	barf("schedule: invalid what_next field");
632
633
    }

634
    cap->in_haskell = rtsFalse;
635

636
637
638
639
    // The TSO might have moved, eg. if it re-entered the RTS and a GC
    // happened.  So find the new location:
    t = cap->r.rCurrentTSO;

640
641
642
643
644
645
646
647
648
649
650
    // We have run some Haskell code: there might be blackhole-blocked
    // threads to wake up now.
    // Lock-free test here should be ok, we're just setting a flag.
    if ( blackhole_queue != END_TSO_QUEUE ) {
	blackholes_need_checking = rtsTrue;
    }
    
    // And save the current errno in this thread.
    // XXX: possibly bogus for SMP because this thread might already
    // be running again, see code below.
    t->saved_errno = errno;
651
652
#if mingw32_HOST_OS
    // Similarly for Windows error code
653
    t->saved_winerror = GetLastError();
654
#endif
655

656
#if defined(THREADED_RTS)
657
658
659
660
661
662
    // If ret is ThreadBlocked, and this Task is bound to the TSO that
    // blocked, we are in limbo - the TSO is now owned by whatever it
    // is blocked on, and may in fact already have been woken up,
    // perhaps even on a different Capability.  It may be the case
    // that task->cap != cap.  We better yield this Capability
    // immediately and return to normaility.
663
    if (ret == ThreadBlocked) {
Simon Marlow's avatar
Simon Marlow committed
664
	debugTrace(DEBUG_sched,
665
666
		   "--<< thread %lu (%s) stopped: blocked",
		   (unsigned long)t->id, whatNext_strs[t->what_next]);
667
668
	continue;
    }
669
670
#endif

671
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
672
    ASSERT(t->cap == cap);
673

674
    // ----------------------------------------------------------------------
675
    
676
    // Costs for the scheduler are assigned to CCS_SYSTEM
677
    stopHeapProfTimer();
678
#if defined(PROFILING)
679
680
681
    CCCS = CCS_SYSTEM;
#endif
    
682
    schedulePostRunThread(t);
683

684
685
    t = threadStackUnderflow(task,t);

686
687
    ready_to_gc = rtsFalse;

688
689
690
691
692
693
    switch (ret) {
    case HeapOverflow:
	ready_to_gc = scheduleHandleHeapOverflow(cap,t);
	break;

    case StackOverflow:
694
	scheduleHandleStackOverflow(cap,task,t);
695
696
697
	break;

    case ThreadYielding:
698
	if (scheduleHandleYield(cap, t, prev_what_next)) {
699
700
701
702
703
704
705
706
707
708
            // shortcut for switching between compiler/interpreter:
	    goto run_thread; 
	}
	break;

    case ThreadBlocked:
	scheduleHandleThreadBlocked(t);
	break;

    case ThreadFinished:
709
	if (scheduleHandleThreadFinished(cap, task, t)) return cap;
710
	ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
711
712
713
714
715
716
	break;

    default:
      barf("schedule: invalid thread return code %d", (int)ret);
    }

717
    if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
718
      cap = scheduleDoGC(cap,task,rtsFalse);
719
    }
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
  } /* end of while() */
}

/* ----------------------------------------------------------------------------
 * Setting up the scheduler loop
 * ------------------------------------------------------------------------- */

static void
schedulePreLoop(void)
{
#if defined(GRAN) 
    /* set up first event to get things going */
    /* ToDo: assign costs for system setup and init MainTSO ! */
    new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
	      ContinueThread, 
	      CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
    
Simon Marlow's avatar
Simon Marlow committed
737
738
739
740
    debugTrace (DEBUG_gran,
		"GRAN: Init CurrentTSO (in schedule) = %p", 
		CurrentTSO);
    IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
741
742
743
744
745
746
747
748
    
    if (RtsFlags.GranFlags.Light) {
	/* Save current time; GranSim Light only */
	CurrentTSO->gran.clock = CurrentTime[CurrentProc];
    }      
#endif
}

749
750
751
752
753
754
/* -----------------------------------------------------------------------------
 * schedulePushWork()
 *
 * Push work to other Capabilities if we have some.
 * -------------------------------------------------------------------------- */

755
#if defined(THREADED_RTS)
756
static void
757
758
schedulePushWork(Capability *cap USED_IF_THREADS, 
		 Task *task      USED_IF_THREADS)
759
760
761
762
{
    Capability *free_caps[n_capabilities], *cap0;
    nat i, n_free_caps;

763
764
765
    // migration can be turned off with +RTS -qg
    if (!RtsFlags.ParFlags.migrate) return;

766
767
    // Check whether we have more threads on our run queue, or sparks
    // in our pool, that we could hand to another Capability.
768
    if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
769
	&& sparkPoolSizeCap(cap) < 2) {
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
	return;
    }

    // First grab as many free Capabilities as we can.
    for (i=0, n_free_caps=0; i < n_capabilities; i++) {
	cap0 = &capabilities[i];
	if (cap != cap0 && tryGrabCapability(cap0,task)) {
	    if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
		// it already has some work, we just grabbed it at 
		// the wrong moment.  Or maybe it's deadlocked!
		releaseCapability(cap0);
	    } else {
		free_caps[n_free_caps++] = cap0;
	    }
	}
    }

    // we now have n_free_caps free capabilities stashed in
    // free_caps[].  Share our run queue equally with them.  This is
    // probably the simplest thing we could do; improvements we might
    // want to do include:
    //
    //   - giving high priority to moving relatively new threads, on 
    //     the gournds that they haven't had time to build up a
    //     working set in the cache on this CPU/Capability.
    //
    //   - giving low priority to moving long-lived threads

    if (n_free_caps > 0) {
	StgTSO *prev, *t, *next;
800
801
	rtsBool pushed_to_all;

Simon Marlow's avatar
Simon Marlow committed
802
	debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
803
804

	i = 0;
805
806
807
808
	pushed_to_all = rtsFalse;

	if (cap->run_queue_hd != END_TSO_QUEUE) {
	    prev = cap->run_queue_hd;
809
810
	    t = prev->_link;
	    prev->_link = END_TSO_QUEUE;
811
	    for (; t != END_TSO_QUEUE; t = next) {
812
813
		next = t->_link;
		t->_link = END_TSO_QUEUE;
814
		if (t->what_next == ThreadRelocated
815
816
		    || t->bound == task // don't move my bound thread
		    || tsoLocked(t)) {  // don't move a locked thread
817
		    setTSOLink(cap, prev, t);
818
819
820
821
822
		    prev = t;
		} else if (i == n_free_caps) {
		    pushed_to_all = rtsTrue;
		    i = 0;
		    // keep one for us
823
		    setTSOLink(cap, prev, t);
824
825
		    prev = t;
		} else {
826
		    debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
827
828
		    appendToRunQueue(free_caps[i],t);
		    if (t->bound) { t->bound->cap = free_caps[i]; }
829
		    t->cap = free_caps[i];
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
		    i++;
		}
	    }
	    cap->run_queue_tl = prev;
	}

	// If there are some free capabilities that we didn't push any
	// threads to, then try to push a spark to each one.
	if (!pushed_to_all) {
	    StgClosure *spark;
	    // i is the next free capability to push to
	    for (; i < n_free_caps; i++) {
		if (emptySparkPoolCap(free_caps[i])) {
		    spark = findSpark(cap);
		    if (spark != NULL) {
Simon Marlow's avatar
Simon Marlow committed
845
			debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
846
847
848
			newSpark(&(free_caps[i]->r), spark);
		    }
		}
849
850
851
852
853
854
855
856
857
858
859
	    }
	}

	// release the capabilities
	for (i = 0; i < n_free_caps; i++) {
	    task->cap = free_caps[i];
	    releaseCapability(free_caps[i]);
	}
    }
    task->cap = cap; // reset to point to our Capability.
}
860
#endif
861

862
863
864
865
/* ----------------------------------------------------------------------------
 * Start any pending signal handlers
 * ------------------------------------------------------------------------- */

866
#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
867
static void
868
scheduleStartSignalHandlers(Capability *cap)
869
{
870
871
    if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
        // safe outside the lock
872
	startSignalHandlers(cap);
873
874
    }
}
875
876
877
878
879
880
#else
static void
scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
{
}
#endif
881
882
883
884
885
886

/* ----------------------------------------------------------------------------
 * Check for blocked threads that can be woken up.
 * ------------------------------------------------------------------------- */

static void
887
scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
888
{
889
#if !defined(THREADED_RTS)
890
891
892
893
894
    //
    // Check whether any waiting threads need to be woken up.  If the
    // run queue is empty, and there are no other tasks running, we
    // can wait indefinitely for something to happen.
    //
895
    if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
896
    {
897
	awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
898
    }
899
#endif
900
901
902
}


903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
/* ----------------------------------------------------------------------------
 * Check for threads woken up by other Capabilities
 * ------------------------------------------------------------------------- */

static void
scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
{
#if defined(THREADED_RTS)
    // Any threads that were woken up by other Capabilities get
    // appended to our run queue.
    if (!emptyWakeupQueue(cap)) {
	ACQUIRE_LOCK(&cap->lock);
	if (emptyRunQueue(cap)) {
	    cap->run_queue_hd = cap->wakeup_queue_hd;
	    cap->run_queue_tl = cap->wakeup_queue_tl;
	} else {
919
	    setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
920
921
922
923
924
925
926
927
	    cap->run_queue_tl = cap->wakeup_queue_tl;
	}
	cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
	RELEASE_LOCK(&cap->lock);
    }
#endif
}

928
929
930
931
/* ----------------------------------------------------------------------------
 * Check for threads blocked on BLACKHOLEs that can be woken up
 * ------------------------------------------------------------------------- */
static void
932
scheduleCheckBlackHoles (Capability *cap)
933
{
934
    if ( blackholes_need_checking ) // check without the lock first
935
    {
936
937
938
939
940
941
	ACQUIRE_LOCK(&sched_mutex);
	if ( blackholes_need_checking ) {
	    checkBlackHoles(cap);
	    blackholes_need_checking = rtsFalse;
	}
	RELEASE_LOCK(&sched_mutex);
942
943
944
945
946
947
948
949
    }
}

/* ----------------------------------------------------------------------------
 * Detect deadlock conditions and attempt to resolve them.
 * ------------------------------------------------------------------------- */

static void
950
scheduleDetectDeadlock (Capability *cap, Task *task)
951
{
952
953

#if defined(PARALLEL_HASKELL)
954
    // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
955
956
957
    return;
#endif

958
959
    /* 
     * Detect deadlock: when we have no threads to run, there are no
960
961
962
     * threads blocked, waiting for I/O, or sleeping, and all the
     * other tasks are waiting for work, we must have a deadlock of
     * some description.
963
     */
964
    if ( emptyThreadQueues(cap) )
965
    {
966
#if defined(THREADED_RTS)
967
968
969
970
971
972
973
974
975
	/* 
	 * In the threaded RTS, we only check for deadlock if there
	 * has been no activity in a complete timeslice.  This means
	 * we won't eagerly start a full GC just because we don't have
	 * any threads to run currently.
	 */
	if (recent_activity != ACTIVITY_INACTIVE) return;
#endif

Simon Marlow's avatar
Simon Marlow committed
976
	debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
977
978
979
980
981
982

	// Garbage collection can release some new threads due to
	// either (a) finalizers or (b) threads resurrected because
	// they are unreachable and will therefore be sent an
	// exception.  Any threads thus released will be immediately
	// runnable.
983
	cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
984

985
	recent_activity = ACTIVITY_DONE_GC;
986
987
        // disable timer signals (see #1623)
        stopTimer();
988
989
	
	if ( !emptyRunQueue(cap) ) return;
990

991
#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
992
993
994
995
	/* If we have user-installed signal handlers, then wait
	 * for signals to arrive rather then bombing out with a
	 * deadlock.
	 */
996
	if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
Simon Marlow's avatar
Simon Marlow committed
997
998
	    debugTrace(DEBUG_sched,
		       "still deadlocked, waiting for signals...");
999
1000
1001
1002

	    awaitUserSignals();

	    if (signals_pending()) {
1003
		startSignalHandlers(cap);
1004
1005
1006
	    }

	    // either we have threads to run, or we were interrupted:
1007
	    ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1008
1009
1010
	}
#endif

1011
#if !defined(THREADED_RTS)
1012
	/* Probably a real deadlock.  Send the current main thread the
1013
	 * Deadlock exception.
1014
	 */
1015
1016
	if (task->tso) {
	    switch (task->tso->why_blocked) {
1017
	    case BlockedOnSTM:
1018
1019
1020
	    case BlockedOnBlackHole:
	    case BlockedOnException:
	    case BlockedOnMVar:
1021
1022
		throwToSingleThreaded(cap, task->tso, 
				      (StgClosure *)NonTermination_closure);
1023
1024
1025
1026
1027
		return;
	    default:
		barf("deadlock: main thread blocked in a strange way");
	    }
	}
1028
	return;
1029
#endif
1030
    }
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
}

/* ----------------------------------------------------------------------------
 * Process an event (GRAN only)
 * ------------------------------------------------------------------------- */

#if defined(GRAN)
static StgTSO *
scheduleProcessEvent(rtsEvent *event)
{
    StgTSO *t;

    if (RtsFlags.GranFlags.Light)
      GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc

    /* adjust time based on time-stamp */
    if (event->time > CurrentTime[CurrentProc] &&
        event->evttype != ContinueThread)
      CurrentTime[CurrentProc] = event->time;
    
    /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
    if (!RtsFlags.GranFlags.Light)
      handleIdlePEs();

    IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));

    /* main event dispatcher in GranSim */
    switch (event->evttype) {
      /* Should just be continuing execution */
    case ContinueThread:
      IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
      /* ToDo: check assertion
      ASSERT(run_queue_hd != (StgTSO*)NULL &&
	     run_queue_hd != END_TSO_QUEUE);
      */
      /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1067
1068
      if (!RtsFlags.GranFlags.DoAsyncFetch &&
	  procStatus[CurrentProc]==Fetching) {
1069
	debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1070
1071
1072
1073
1074
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }	
      /* Ignore ContinueThreads for completed threads */
      if (CurrentTSO->what_next == ThreadComplete) {
1075
	debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1076
1077
1078
1079
1080
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }	
      /* Ignore ContinueThreads for threads that are being migrated */
      if (PROCS(CurrentTSO)==Nowhere) { 
1081
	debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1082
1083
1084
1085
1086
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }
      /* The thread should be at the beginning of the run queue */
      if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1087
	debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	break; // run the thread anyway
      }
      /*
      new_event(proc, proc, CurrentTime[proc],
		FindWork,
		(StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
      goto next_thread; 
      */ /* Catches superfluous CONTINUEs -- should be unnecessary */
      break; // now actually run the thread; DaH Qu'vam yImuHbej 

    case FetchNode:
      do_the_fetchnode(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case GlobalBlock:
      do_the_globalblock(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case FetchReply:
      do_the_fetchreply(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case UnblockThread:   /* Move from the blocked queue to the tail of */
      do_the_unblock(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case ResumeThread:  /* Move from the blocked queue to the tail of */
      /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
      event->tso->gran.blocktime += 
	CurrentTime[CurrentProc] - event->tso->gran.blockedat;
      do_the_startthread(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case StartThread:
      do_the_startthread(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case MoveThread:
      do_the_movethread(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case MoveSpark:
      do_the_movespark(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case FindWork:
      do_the_findwork(event);
      goto next_thread;             /* handle next event in event queue  */
      
    default:
      barf("Illegal event type %u\n", event->evttype);
    }  /* switch */
    
    /* This point was scheduler_loop in the old RTS */

1144
    IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1145
1146
1147
1148
1149
1150

    TimeOfLastEvent = CurrentTime[CurrentProc];
    TimeOfNextEvent = get_time_of_next_event();
    IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
    // CurrentTSO = ThreadQueueHd;

1151
    IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1152
1153
1154
1155
1156
1157
1158
1159
			 TimeOfNextEvent));

    if (RtsFlags.GranFlags.Light) 
      GranSimLight_leave_system(event, &ActiveTSO); 

    EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;

    IF_DEBUG(gran, 
1160
	     d