Schedule.c 110 KB
Newer Older
1
/* ---------------------------------------------------------------------------
2
 *
3
 * (c) The GHC Team, 1998-2004
4
5
6
 *
 * Scheduler
 *
7
8
9
10
11
 * Different GHC ways use this scheduler quite differently (see comments below)
 * Here is the global picture:
 *
 * WAY  Name     CPP flag  What's it for
 * --------------------------------------
12
 * mp   GUM      PAR          Parallel execution on a distrib. memory machine
sof's avatar
sof committed
13
14
15
16
 * s    SMP      SMP          Parallel execution on a shared memory machine
 * mg   GranSim  GRAN         Simulation of parallel execution
 * md   GUM/GdH  DIST         Distributed execution (based on GUM)
 *
17
18
 * --------------------------------------------------------------------------*/

19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/* 
 * Version with support for distributed memory parallelism aka GUM (WAY=mp):

   The main scheduling loop in GUM iterates until a finish message is received.
   In that case a global flag @receivedFinish@ is set and this instance of
   the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
   for the handling of incoming messages, such as PP_FINISH.
   Note that in the parallel case we have a system manager that coordinates
   different PEs, each of which are running one instance of the RTS.
   See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
   From this routine processes executing ghc/rts/Main.c are spawned. -- HWL

 * Version with support for simulating parallel execution aka GranSim (WAY=mg):

   The main scheduling code in GranSim is quite different from that in std
   (concurrent) Haskell: while concurrent Haskell just iterates over the
   threads in the runnable queue, GranSim is event driven, i.e. it iterates
   over the events in the global event queue.  -- HWL
37
38
*/

39
#include "PosixSource.h"
40
41
42
43
#include "Rts.h"
#include "SchedAPI.h"
#include "RtsUtils.h"
#include "RtsFlags.h"
44
#include "BlockAlloc.h"
45
46
47
#include "Storage.h"
#include "StgRun.h"
#include "Hooks.h"
sof's avatar
sof committed
48
#define COMPILING_SCHEDULER
49
50
51
#include "Schedule.h"
#include "StgMiscClosures.h"
#include "Storage.h"
52
#include "Interpreter.h"
53
#include "Exception.h"
54
55
56
#include "Printer.h"
#include "Signals.h"
#include "Sanity.h"
57
#include "Stats.h"
58
#include "STM.h"
sof's avatar
sof committed
59
#include "Timer.h"
60
#include "Prelude.h"
61
#include "ThreadLabels.h"
62
63
#include "LdvProfile.h"
#include "Updates.h"
64
65
66
67
#ifdef PROFILING
#include "Proftimer.h"
#include "ProfHeap.h"
#endif
68
69
70
71
72
73
74
75
76
#if defined(GRAN) || defined(PAR)
# include "GranSimRts.h"
# include "GranSim.h"
# include "ParallelRts.h"
# include "Parallel.h"
# include "ParallelDebug.h"
# include "FetchMe.h"
# include "HLC.h"
#endif
77
#include "Sparks.h"
sof's avatar
sof committed
78
79
#include "Capability.h"
#include "OSThreads.h"
sof's avatar
sof committed
80
#include  "Task.h"
81

82
83
84
85
86
87
88
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif

89
90
#include <string.h>
#include <stdlib.h>
91
#include <stdarg.h>
92

93
94
95
96
#ifdef HAVE_ERRNO_H
#include <errno.h>
#endif

97
98
99
100
101
102
103
104
105
106
107
108
#ifdef THREADED_RTS
#define USED_IN_THREADED_RTS
#else
#define USED_IN_THREADED_RTS STG_UNUSED
#endif

#ifdef RTS_SUPPORTS_THREADS
#define USED_WHEN_RTS_SUPPORTS_THREADS
#else
#define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
#endif

109
110
111
/* Main thread queue.
 * Locks required: sched_mutex.
 */
112
StgMainThread *main_threads = NULL;
113
114
115
116

/* Thread queues.
 * Locks required: sched_mutex.
 */
117
118
119
#if defined(GRAN)

StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
120
/* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
121
122

/* 
sof's avatar
sof committed
123
   In GranSim we have a runnable and a blocked queue for each processor.
124
125
126
127
128
129
130
131
   In order to minimise code changes new arrays run_queue_hds/tls
   are created. run_queue_hd is then a short cut (macro) for
   run_queue_hds[CurrentProc] (see GranSim.h).
   -- HWL
*/
StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
StgTSO *ccalling_threadss[MAX_PROC];
132
133
134
135
/* We use the same global list of threads (all_threads) in GranSim as in
   the std RTS (i.e. we are cheating). However, we don't use this list in
   the GranSim specific code at the moment (so we are only potentially
   cheating).  */
136
137
138

#else /* !GRAN */

139
140
141
142
143
StgTSO *run_queue_hd = NULL;
StgTSO *run_queue_tl = NULL;
StgTSO *blocked_queue_hd = NULL;
StgTSO *blocked_queue_tl = NULL;
StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
144

145
146
#endif

147
148
149
/* Linked list of all threads.
 * Used for detecting garbage collected threads.
 */
150
StgTSO *all_threads = NULL;
151

sof's avatar
sof committed
152
153
154
/* When a thread performs a safe C call (_ccall_GC, using old
 * terminology), it gets put on the suspended_ccalling_threads
 * list. Used by the garbage collector.
155
156
157
 */
static StgTSO *suspended_ccalling_threads;

158
159
static StgTSO *threadStackOverflow(StgTSO *tso);

160
161
162
163
164
/* KH: The following two flags are shared memory locations.  There is no need
       to lock them, since they are only unset at the end of a scheduler
       operation.
*/

165
/* flag set by signal handler to precipitate a context switch */
166
int context_switch = 0;
167

168
/* if this flag is set as well, give up execution */
169
rtsBool interrupted = rtsFalse;
170

171
172
173
174
175
/* If this flag is set, we are running Haskell code.  Used to detect
 * uses of 'foreign import unsafe' that should be 'safe'.
 */
rtsBool in_haskell = rtsFalse;

176
/* Next thread ID to allocate.
sof's avatar
sof committed
177
 * Locks required: thread_id_mutex
178
 */
179
static StgThreadID next_thread_id = 1;
180
181
182
183
184
185

/*
 * Pointers to the state of the current thread.
 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
 * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
 */
186
 
187
188
189
190
/* The smallest stack size that makes any sense is:
 *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
 *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
 *  + 1                       (the closure to enter)
191
192
 *  + 1			      (stg_ap_v_ret)
 *  + 1			      (spare slot req'd by stg_ap_v_ret)
193
194
195
196
197
 *
 * A thread with this stack will bomb immediately with a stack
 * overflow, which will increase its stack size.  
 */

198
#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
199

sof's avatar
sof committed
200

201
#if defined(GRAN)
202
StgTSO *CurrentTSO;
203
204
#endif

205
206
207
208
209
210
/*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
 *  exists - earlier gccs apparently didn't.
 *  -= chak
 */
StgTSO dummy_tso;

211
static rtsBool ready_to_gc;
sof's avatar
sof committed
212
213
214
215
216
217
218

/*
 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
 * in an MT setting, needed to signal that a worker thread shouldn't hang around
 * in the scheduler when it is out of work.
 */
static rtsBool shutting_down_scheduler = rtsFalse;
219
220
221

void            addToBlockedQueue ( StgTSO *tso );

222
static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
223
       void     interruptStgRts   ( void );
224

225
#if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
226
static void     detectBlackHoles  ( void );
227
#endif
228

229
230
static void     raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically);

sof's avatar
sof committed
231
232
233
234
#if defined(RTS_SUPPORTS_THREADS)
/* ToDo: carefully document the invariants that go together
 *       with these synchronisation objects.
 */
sof's avatar
sof committed
235
236
Mutex     sched_mutex       = INIT_MUTEX_VAR;
Mutex     term_mutex        = INIT_MUTEX_VAR;
sof's avatar
sof committed
237
238

#endif /* RTS_SUPPORTS_THREADS */
sof's avatar
sof committed
239

240
241
242
#if defined(PAR)
StgTSO *LastTSO;
rtsTime TimeOfLastYield;
243
rtsBool emitSchedule = rtsTrue;
244
245
#endif

246
#if DEBUG
247
static char *whatNext_strs[] = {
248
  "(unknown)",
249
  "ThreadRunGHC",
250
  "ThreadInterpret",
251
  "ThreadKilled",
252
  "ThreadRelocated",
253
254
255
256
  "ThreadComplete"
};
#endif

sof's avatar
sof committed
257
#if defined(PAR)
258
259
260
261
StgTSO * createSparkThread(rtsSpark spark);
StgTSO * activateSpark (rtsSpark spark);  
#endif

262
263
264
/* ----------------------------------------------------------------------------
 * Starting Tasks
 * ------------------------------------------------------------------------- */
265

266
267
268
#if defined(RTS_SUPPORTS_THREADS)
static rtsBool startingWorkerThread = rtsFalse;

sof's avatar
sof committed
269
270
271
272
static void taskStart(void);
static void
taskStart(void)
{
273
  ACQUIRE_LOCK(&sched_mutex);
274
  startingWorkerThread = rtsFalse;
275
  schedule(NULL,NULL);
276
  RELEASE_LOCK(&sched_mutex);
sof's avatar
sof committed
277
278
}

279
void
280
startSchedulerTaskIfNecessary(void)
281
{
282
283
284
285
286
287
288
289
290
  if(run_queue_hd != END_TSO_QUEUE
    || blocked_queue_hd != END_TSO_QUEUE
    || sleeping_queue != END_TSO_QUEUE)
  {
    if(!startingWorkerThread)
    { // we don't want to start another worker thread
      // just because the last one hasn't yet reached the
      // "waiting for capability" state
      startingWorkerThread = rtsTrue;
291
292
293
294
      if(!startTask(taskStart))
      {
        startingWorkerThread = rtsFalse;
      }
295
296
    }
  }
297
298
}
#endif
sof's avatar
sof committed
299

300
/* ---------------------------------------------------------------------------
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
   Main scheduling loop.

   We use round-robin scheduling, each thread returning to the
   scheduler loop when one of these conditions is detected:

      * out of heap space
      * timer expires (thread yields)
      * thread blocks
      * thread ends
      * stack overflow

   Locking notes:  we acquire the scheduler lock once at the beginning
   of the scheduler loop, and release it when
    
      * running a thread, or
      * waiting for work, or
      * waiting for a GC to complete.

319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
   GRAN version:
     In a GranSim setup this loop iterates over the global event queue.
     This revolves around the global event queue, which determines what 
     to do next. Therefore, it's more complicated than either the 
     concurrent or the parallel (GUM) setup.

   GUM version:
     GUM iterates over incoming messages.
     It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
     and sends out a fish whenever it has nothing to do; in-between
     doing the actual reductions (shared code below) it processes the
     incoming messages and deals with delayed operations 
     (see PendingFetches).
     This is not the ugliest code you could imagine, but it's bloody close.

334
   ------------------------------------------------------------------------ */
335
static void
336
337
schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
          Capability *initialCapability )
338
339
{
  StgTSO *t;
340
  Capability *cap;
341
  StgThreadReturnCode ret;
342
343
344
#if defined(GRAN)
  rtsEvent *event;
#elif defined(PAR)
345
  StgSparkPool *pool;
346
347
348
  rtsSpark spark;
  StgTSO *tso;
  GlobalTaskId pe;
349
350
351
352
  rtsBool receivedFinish = rtsFalse;
# if defined(DEBUG)
  nat tp_size, sp_size; // stats only
# endif
353
#endif
354
  rtsBool was_interrupted = rtsFalse;
355
  nat prev_what_next;
356
  
357
  // Pre-condition: sched_mutex is held.
358
359
360
  // We might have a capability, passed in as initialCapability.
  cap = initialCapability;

sof's avatar
sof committed
361
#if defined(RTS_SUPPORTS_THREADS)
362
363
364
365
366
  //
  // in the threaded case, the capability is either passed in via the
  // initialCapability parameter, or initialized inside the scheduler
  // loop 
  //
367
  IF_DEBUG(scheduler,
368
369
370
	   sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
		       mainThread, initialCapability);
      );
sof's avatar
sof committed
371
#else
372
  // simply initialise it in the non-threaded case
sof's avatar
sof committed
373
  grabCapability(&cap);
sof's avatar
sof committed
374
#endif
375

376
#if defined(GRAN)
377
378
379
380
381
382
383
  /* set up first event to get things going */
  /* ToDo: assign costs for system setup and init MainTSO ! */
  new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
	    ContinueThread, 
	    CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);

  IF_DEBUG(gran,
384
	   debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
385
386
387
388
389
390
391
392
393
394
395
396
397
398
	   G_TSO(CurrentTSO, 5));

  if (RtsFlags.GranFlags.Light) {
    /* Save current time; GranSim Light only */
    CurrentTSO->gran.clock = CurrentTime[CurrentProc];
  }      

  event = get_next_event();

  while (event!=(rtsEvent*)NULL) {
    /* Choose the processor with the next event */
    CurrentProc = event->proc;
    CurrentTSO = event->tso;

399
#elif defined(PAR)
400

401
402
  while (!receivedFinish) {    /* set by processMessages */
                               /* when receiving PP_FINISH message         */ 
403
404

#else // everything except GRAN and PAR
405

406
  while (1) {
407

408
#endif
409

410
     IF_DEBUG(scheduler, printAllThreads());
411

sof's avatar
sof committed
412
#if defined(RTS_SUPPORTS_THREADS)
413
414
415
416
417
      // Yield the capability to higher-priority tasks if necessary.
      //
      if (cap != NULL) {
	  yieldCapability(&cap);
      }
418

419
420
421
422
423
424
425
426
      // If we do not currently hold a capability, we wait for one
      //
      if (cap == NULL) {
	  waitForCapability(&sched_mutex, &cap,
			    mainThread ? &mainThread->bound_thread_cond : NULL);
      }

      // We now have a capability...
sof's avatar
sof committed
427
428
#endif

429
430
431
432
433
434
435
436
437
    // Check whether we have re-entered the RTS from Haskell without
    // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
    // call).
    if (in_haskell) {
    	  errorBelch("schedule: re-entered unsafely.\n"
    		     "   Perhaps a 'foreign import unsafe' should be 'safe'?");
    	  stg_exit(1);
    }

438
439
440
441
442
    //
    // If we're interrupted (the user pressed ^C, or some other
    // termination condition occurred), kill all the currently running
    // threads.
    //
443
    if (interrupted) {
444
445
446
	IF_DEBUG(scheduler, sched_belch("interrupted"));
	interrupted = rtsFalse;
	was_interrupted = rtsTrue;
447
#if defined(RTS_SUPPORTS_THREADS)
448
449
	// In the threaded RTS, deadlock detection doesn't work,
	// so just exit right away.
450
	errorBelch("interrupted");
451
452
453
	releaseCapability(cap);
	RELEASE_LOCK(&sched_mutex);
	shutdownHaskellAndExit(EXIT_SUCCESS);
454
#else
455
	deleteAllThreads();
456
#endif
457
458
    }

sof's avatar
sof committed
459
#if defined(RTS_USER_SIGNALS)
460
    // check for signals each time around the scheduler
461
    if (signals_pending()) {
sof's avatar
sof committed
462
      RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
463
      startSignalHandlers();
sof's avatar
sof committed
464
      ACQUIRE_LOCK(&sched_mutex);
465
466
467
    }
#endif

468
469
470
471
472
    //
    // Check whether any waiting threads need to be woken up.  If the
    // run queue is empty, and there are no other tasks running, we
    // can wait indefinitely for something to happen.
    //
473
474
    if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
    {
475
#if defined(RTS_SUPPORTS_THREADS)
476
477
	// We shouldn't be here...
	barf("schedule: awaitEvent() in threaded RTS");
478
#endif
479
	awaitEvent( EMPTY_RUN_QUEUE() );
480
    }
481
    // we can be interrupted while waiting for I/O...
482
483
    if (interrupted) continue;

484
485
486
487
488
489
490
491
492
493
    /* 
     * Detect deadlock: when we have no threads to run, there are no
     * threads waiting on I/O or sleeping, and all the other tasks are
     * waiting for work, we must have a deadlock of some description.
     *
     * We first try to find threads blocked on themselves (ie. black
     * holes), and generate NonTermination exceptions where necessary.
     *
     * If no threads are black holed, we have a deadlock situation, so
     * inform all the main threads.
494
     */
495
#if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
496
    if (   EMPTY_THREAD_QUEUES() )
497
    {
498
	IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
499

500
501
	// Garbage collection can release some new threads due to
	// either (a) finalizers or (b) threads resurrected because
502
503
504
	// they are unreachable and will therefore be sent an
	// exception.  Any threads thus released will be immediately
	// runnable.
505
	GarbageCollect(GetRoots,rtsTrue);
506
507
	if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }

sof's avatar
sof committed
508
#if defined(RTS_USER_SIGNALS)
509
510
511
512
513
514
515
516
517
518
519
520
521
522
	/* If we have user-installed signal handlers, then wait
	 * for signals to arrive rather then bombing out with a
	 * deadlock.
	 */
	if ( anyUserHandlers() ) {
	    IF_DEBUG(scheduler, 
		     sched_belch("still deadlocked, waiting for signals..."));

	    awaitUserSignals();

	    // we might be interrupted...
	    if (interrupted) { continue; }

	    if (signals_pending()) {
sof's avatar
sof committed
523
		RELEASE_LOCK(&sched_mutex);
524
		startSignalHandlers();
sof's avatar
sof committed
525
		ACQUIRE_LOCK(&sched_mutex);
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
	    }
	    ASSERT(!EMPTY_RUN_QUEUE());
	    goto not_deadlocked;
	}
#endif

	/* Probably a real deadlock.  Send the current main thread the
	 * Deadlock exception (or in the SMP build, send *all* main
	 * threads the deadlock exception, since none of them can make
	 * progress).
	 */
	{
	    StgMainThread *m;
	    m = main_threads;
	    switch (m->tso->why_blocked) {
	    case BlockedOnBlackHole:
	    case BlockedOnException:
	    case BlockedOnMVar:
544
		raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
545
546
547
		break;
	    default:
		barf("deadlock: main thread blocked in a strange way");
sof's avatar
sof committed
548
	    }
549
	}
550
    }
551
552
  not_deadlocked:

553
#elif defined(RTS_SUPPORTS_THREADS)
554
    // ToDo: add deadlock detection in threaded RTS
555
#elif defined(PAR)
556
    // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
557
558
#endif

559
#if defined(RTS_SUPPORTS_THREADS) || defined(mingw32_HOST_OS)
sof's avatar
sof committed
560
561
562
    /* win32: might be back here due to awaitEvent() being abandoned
     * as a result of a console event having been delivered.
     */
563
    if ( EMPTY_RUN_QUEUE() ) {
564
	continue; // nothing to do
565
566
    }
#endif
567
568

#if defined(GRAN)
569
570
571
572
573
574
575
576
577
578
579
580
    if (RtsFlags.GranFlags.Light)
      GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc

    /* adjust time based on time-stamp */
    if (event->time > CurrentTime[CurrentProc] &&
        event->evttype != ContinueThread)
      CurrentTime[CurrentProc] = event->time;
    
    /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
    if (!RtsFlags.GranFlags.Light)
      handleIdlePEs();

581
    IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
582
583
584
585
586

    /* main event dispatcher in GranSim */
    switch (event->evttype) {
      /* Should just be continuing execution */
    case ContinueThread:
587
      IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
588
589
590
591
592
593
594
      /* ToDo: check assertion
      ASSERT(run_queue_hd != (StgTSO*)NULL &&
	     run_queue_hd != END_TSO_QUEUE);
      */
      /* Ignore ContinueThreads for fetching threads (if synchr comm) */
      if (!RtsFlags.GranFlags.DoAsyncFetch &&
	  procStatus[CurrentProc]==Fetching) {
595
	debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
596
597
598
599
600
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }	
      /* Ignore ContinueThreads for completed threads */
      if (CurrentTSO->what_next == ThreadComplete) {
601
	debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
602
603
604
605
606
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }	
      /* Ignore ContinueThreads for threads that are being migrated */
      if (PROCS(CurrentTSO)==Nowhere) { 
607
	debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
608
609
610
611
612
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }
      /* The thread should be at the beginning of the run queue */
      if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
613
	debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	break; // run the thread anyway
      }
      /*
      new_event(proc, proc, CurrentTime[proc],
		FindWork,
		(StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
      goto next_thread; 
      */ /* Catches superfluous CONTINUEs -- should be unnecessary */
      break; // now actually run the thread; DaH Qu'vam yImuHbej 

    case FetchNode:
      do_the_fetchnode(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case GlobalBlock:
      do_the_globalblock(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case FetchReply:
      do_the_fetchreply(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case UnblockThread:   /* Move from the blocked queue to the tail of */
      do_the_unblock(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case ResumeThread:  /* Move from the blocked queue to the tail of */
      /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
      event->tso->gran.blocktime += 
	CurrentTime[CurrentProc] - event->tso->gran.blockedat;
      do_the_startthread(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case StartThread:
      do_the_startthread(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case MoveThread:
      do_the_movethread(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case MoveSpark:
      do_the_movespark(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case FindWork:
      do_the_findwork(event);
      goto next_thread;             /* handle next event in event queue  */
      
    default:
      barf("Illegal event type %u\n", event->evttype);
    }  /* switch */
    
    /* This point was scheduler_loop in the old RTS */

670
    IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
671
672
673
674
675
676

    TimeOfLastEvent = CurrentTime[CurrentProc];
    TimeOfNextEvent = get_time_of_next_event();
    IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
    // CurrentTSO = ThreadQueueHd;

677
    IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
678
679
680
681
682
683
684
685
			 TimeOfNextEvent));

    if (RtsFlags.GranFlags.Light) 
      GranSimLight_leave_system(event, &ActiveTSO); 

    EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;

    IF_DEBUG(gran, 
686
	     debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
687
688
689
690

    /* in a GranSim setup the TSO stays on the run queue */
    t = CurrentTSO;
    /* Take a thread from the run queue. */
sof's avatar
sof committed
691
    POP_RUN_QUEUE(t); // take_off_run_queue(t);
692
693

    IF_DEBUG(gran, 
694
	     debugBelch("GRAN: About to run current thread, which is\n");
695
	     G_TSO(t,5));
696
697
698
699
700
701
702
703

    context_switch = 0; // turned on via GranYield, checking events and time slice

    IF_DEBUG(gran, 
	     DumpGranEvent(GR_SCHEDULE, t));

    procStatus[CurrentProc] = Busy;

704
#elif defined(PAR)
705
706
707
708
    if (PendingFetches != END_BF_QUEUE) {
        processFetches();
    }

709
    /* ToDo: phps merge with spark activation above */
710
    /* check whether we have local work and send requests if we have none */
711
    if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
712
      /* :-[  no local threads => look out for local sparks */
713
714
      /* the spark pool for the current PE */
      pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
715
      if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
716
	  pool->hd < pool->tl) {
717
718
719
720
721
722
723
724
	/* 
	 * ToDo: add GC code check that we really have enough heap afterwards!!
	 * Old comment:
	 * If we're here (no runnable threads) and we have pending
	 * sparks, we must have a space problem.  Get enough space
	 * to turn one of those pending sparks into a
	 * thread... 
	 */
725
726

	spark = findSpark(rtsFalse);                /* get a spark */
727
728
	if (spark != (rtsSpark) NULL) {
	  tso = activateSpark(spark);       /* turn the spark into a thread */
729
	  IF_PAR_DEBUG(schedule,
730
		       debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
731
			     tso->id, tso, advisory_thread_count));
732

733
	  if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
734
	    debugBelch("==^^ failed to activate spark\n");
735
	    goto next_thread;
736
	  }               /* otherwise fall through & pick-up new tso */
737
738
	} else {
	  IF_PAR_DEBUG(verbose,
739
		       debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
740
			     spark_queue_len(pool)));
741
742
	  goto next_thread;
	}
743
744
745
746
747
748
      }

      /* If we still have no work we need to send a FISH to get a spark
	 from another PE 
      */
      if (EMPTY_RUN_QUEUE()) {
749
750
751
752
753
754
755
756
757
      /* =8-[  no local sparks => look for work on other PEs */
	/*
	 * We really have absolutely no work.  Send out a fish
	 * (there may be some out there already), and wait for
	 * something to arrive.  We clearly can't run any threads
	 * until a SCHEDULE or RESUME arrives, and so that's what
	 * we're hoping to see.  (Of course, we still have to
	 * respond to other types of messages.)
	 */
758
759
	TIME now = msTime() /*CURRENT_TIME*/;
	IF_PAR_DEBUG(verbose, 
760
		     debugBelch("--  now=%ld\n", now));
761
762
763
764
	IF_PAR_DEBUG(verbose,
		     if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
			 (last_fish_arrived_at!=0 &&
			  last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
765
		       debugBelch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)\n",
766
767
768
769
770
771
772
773
774
			     last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
			     last_fish_arrived_at,
			     RtsFlags.ParFlags.fishDelay, now);
		     });
	
	if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
	    (last_fish_arrived_at==0 ||
	     (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
	  /* outstandingFishes is set in sendFish, processFish;
775
776
777
778
	     avoid flooding system with fishes via delay */
	  pe = choosePE();
	  sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
		   NEW_FISH_HUNGER);
779
780
781
782
783
784

	  // Global statistics: count no. of fishes
	  if (RtsFlags.ParFlags.ParStats.Global &&
	      RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
	    globalParStats.tot_fish_mess++;
	  }
785
	}
786
787
      
	receivedFinish = processMessages();
788
789
790
	goto next_thread;
      }
    } else if (PacketsWaiting()) {  /* Look for incoming messages */
791
      receivedFinish = processMessages();
792
793
794
795
    }

    /* Now we are sure that we have some work available */
    ASSERT(run_queue_hd != END_TSO_QUEUE);
796

797
    /* Take a thread from the run queue, if we have work */
sof's avatar
sof committed
798
    POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
799
    IF_DEBUG(sanity,checkTSO(t));
800
801
802
803
804
805

    /* ToDo: write something to the log-file
    if (RTSflags.ParFlags.granSimStats && !sameThread)
        DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);

    CurrentTSO = t;
806
807
808
    */
    /* the spark pool for the current PE */
    pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
809

810
    IF_DEBUG(scheduler, 
811
	     debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
812
813
		   run_queue_len(), spark_queue_len(pool), CURRENT_PROC));

sof's avatar
sof committed
814
# if 1
815
816
817
818
819
820
821
822
    if (0 && RtsFlags.ParFlags.ParStats.Full && 
	t && LastTSO && t->id != LastTSO->id && 
	LastTSO->why_blocked == NotBlocked && 
	LastTSO->what_next != ThreadComplete) {
      // if previously scheduled TSO not blocked we have to record the context switch
      DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
			   GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
    }
823

824
825
826
    if (RtsFlags.ParFlags.ParStats.Full && 
	(emitSchedule /* forced emit */ ||
        (t && LastTSO && t->id != LastTSO->id))) {
827
828
829
830
831
832
833
834
835
      /* 
	 we are running a different TSO, so write a schedule event to log file
	 NB: If we use fair scheduling we also have to write  a deschedule 
	     event for LastTSO; with unfair scheduling we know that the
	     previous tso has blocked whenever we switch to another tso, so
	     we don't need it in GUM for now
      */
      DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
		       GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
836
      emitSchedule = rtsFalse;
837
    }
838
     
sof's avatar
sof committed
839
# endif
840
#else /* !GRAN && !PAR */
841
  
842
    // grab a thread from the run queue
843
    ASSERT(run_queue_hd != END_TSO_QUEUE);
sof's avatar
sof committed
844
    POP_RUN_QUEUE(t);
845

846
847
    // Sanity check the thread we're about to run.  This can be
    // expensive if there is lots of thread switching going on...
848
    IF_DEBUG(sanity,checkTSO(t));
849
#endif
850

851
852
#ifdef THREADED_RTS
    {
853
      StgMainThread *m = t->main;
854
855
856
857
858
859
      
      if(m)
      {
	if(m == mainThread)
	{
	  IF_DEBUG(scheduler,
860
	    sched_belch("### Running thread %d in bound thread", t->id));
861
862
863
864
865
	  // yes, the Haskell thread is bound to the current native thread
	}
	else
	{
	  IF_DEBUG(scheduler,
866
	    sched_belch("### thread %d bound to another OS thread", t->id));
867
868
	  // no, bound to a different Haskell thread: pass to that thread
	  PUSH_ON_RUN_QUEUE(t);
869
	  passCapability(&m->bound_thread_cond);
870
871
872
873
874
	  continue;
	}
      }
      else
      {
875
876
	if(mainThread != NULL)
        // The thread we want to run is bound.
877
878
	{
	  IF_DEBUG(scheduler,
879
	    sched_belch("### this OS thread cannot run thread %d", t->id));
880
881
882
	  // no, the current native thread is bound to a different
	  // Haskell thread, so pass it to any worker thread
	  PUSH_ON_RUN_QUEUE(t);
883
	  passCapabilityToWorker();
884
885
886
887
888
889
	  continue; 
	}
      }
    }
#endif

890
    cap->r.rCurrentTSO = t;
891
    
892
893
894
895
    /* context switches are now initiated by the timer signal, unless
     * the user specified "context switch as often as possible", with
     * +RTS -C0
     */
896
    if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
897
898
899
	 && (run_queue_hd != END_TSO_QUEUE
	     || blocked_queue_hd != END_TSO_QUEUE
	     || sleeping_queue != END_TSO_QUEUE)))
900
	context_switch = 1;
901

902
903
run_thread:

904
    RELEASE_LOCK(&sched_mutex);
905

906
    IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
907
			      (long)t->id, whatNext_strs[t->what_next]));
908

909
910
911
912
#ifdef PROFILING
    startHeapProfTimer();
#endif

913
    /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
914
915
    /* Run the current thread 
     */
916
    prev_what_next = t->what_next;
917
918

    errno = t->saved_errno;
919
    in_haskell = rtsTrue;
920

921
    switch (prev_what_next) {
922

923
924
    case ThreadKilled:
    case ThreadComplete:
925
926
927
	/* Thread already finished, return to scheduler. */
	ret = ThreadFinished;
	break;
928

929
    case ThreadRunGHC:
930
	ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
931
	break;
932

933
    case ThreadInterpret:
934
935
	ret = interpretBCO(cap);
	break;
936

937
    default:
938
      barf("schedule: invalid what_next field");
939
    }
940

941
942
    in_haskell = rtsFalse;

943
944
945
946
947
948
    // The TSO might have moved, so find the new location:
    t = cap->r.rCurrentTSO;

    // And save the current errno in this thread.
    t->saved_errno = errno;

949
    /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
950
951
952
    
    /* Costs for the scheduler are assigned to CCS_SYSTEM */
#ifdef PROFILING
953
    stopHeapProfTimer();
954
955
956
957
    CCCS = CCS_SYSTEM;
#endif
    
    ACQUIRE_LOCK(&sched_mutex);
958
959
    
#ifdef RTS_SUPPORTS_THREADS
960
    IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
961
#elif !defined(GRAN) && !defined(PAR)
962
    IF_DEBUG(scheduler,debugBelch("sched: "););
963
964
#endif
    
965
966
967
968
#if defined(PAR)
    /* HACK 675: if the last thread didn't yield, make sure to print a 
       SCHEDULE event to the log file when StgRunning the next thread, even
       if it is the same one as before */
969
    LastTSO = t; 
970
971
972
    TimeOfLastYield = CURRENT_TIME;
#endif

973
974
    switch (ret) {
    case HeapOverflow:
975
#if defined(GRAN)
976
      IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
977
978
979
980
      globalGranStats.tot_heapover++;
#elif defined(PAR)
      globalParStats.tot_heapover++;
#endif
981
982

      // did the task ask for a large block?
983
      if (cap->r.rHpAlloc > BLOCK_SIZE) {
984
985
986
987
	  // if so, get one and push it on the front of the nursery.
	  bdescr *bd;
	  nat blocks;
	  
988
	  blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
989

990
991
	  IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %d)\n", 
				   (long)t->id, whatNext_strs[t->what_next], blocks));
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011

	  // don't do this if it would push us over the
	  // alloc_blocks_lim limit; we'll GC first.
	  if (alloc_blocks + blocks < alloc_blocks_lim) {

	      alloc_blocks += blocks;
	      bd = allocGroup( blocks );

	      // link the new group into the list
	      bd->link = cap->r.rCurrentNursery;
	      bd->u.back = cap->r.rCurrentNursery->u.back;
	      if (cap->r.rCurrentNursery->u.back != NULL) {
		  cap->r.rCurrentNursery->u.back->link = bd;
	      } else {
		  ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
			 g0s0->blocks == cap->r.rNursery);
		  cap->r.rNursery = g0s0->blocks = bd;
	      }		  
	      cap->r.rCurrentNursery->u.back = bd;

1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
	      // initialise it as a nursery block.  We initialise the
	      // step, gen_no, and flags field of *every* sub-block in
	      // this large block, because this is easier than making
	      // sure that we always find the block head of a large
	      // block whenever we call Bdescr() (eg. evacuate() and
	      // isAlive() in the GC would both have to do this, at
	      // least).
	      { 
		  bdescr *x;
		  for (x = bd; x < bd + blocks; x++) {
		      x->step = g0s0;
		      x->gen_no = 0;
		      x->flags = 0;
		  }
	      }
1027
1028
1029

	      // don't forget to update the block count in g0s0.
	      g0s0->n_blocks += blocks;
1030
1031
	      // This assert can be a killer if the app is doing lots
	      // of large block allocations.
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
	      ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);

	      // now update the nursery to point to the new block
	      cap->r.rCurrentNursery = bd;

	      // we might be unlucky and have another thread get on the
	      // run queue before us and steal the large block, but in that
	      // case the thread will just end up requesting another large
	      // block.
	      PUSH_ON_RUN_QUEUE(t);
	      break;
	  }
      }

1046
1047
1048
1049
      /* make all the running tasks block on a condition variable,
       * maybe set context_switch and wait till they all pile in,
       * then have them wait on a GC condition variable.
       */
1050
1051
      IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
			       (long)t->id, whatNext_strs[t->what_next]));
1052
      threadPaused(t);
1053
1054
#if defined(GRAN)
      ASSERT(!is_on_queue(t,CurrentProc));
1055
1056
1057
1058
1059
1060
1061
1062
1063
#elif defined(PAR)
      /* Currently we emit a DESCHEDULE event before GC in GUM.
         ToDo: either add separate event to distinguish SYSTEM time from rest
	       or just nuke this DESCHEDULE (and the following SCHEDULE) */
      if (0 && RtsFlags.ParFlags.ParStats.Full) {
	DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
			 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
	emitSchedule = rtsTrue;
      }
1064
#endif
1065
1066
1067
      
      ready_to_gc = rtsTrue;
      PUSH_ON_RUN_QUEUE(t);
1068
      /* actual GC is done at the end of the while loop */
1069
1070
1071
      break;
      
    case StackOverflow:
1072
1073
1074
1075
1076
1077
1078
1079
1080
#if defined(GRAN)
      IF_DEBUG(gran, 
	       DumpGranEvent(GR_DESCHEDULE, t));
      globalGranStats.tot_stackover++;
#elif defined(PAR)
      // IF_DEBUG(par, 
      // DumpGranEvent(GR_DESCHEDULE, t);
      globalParStats.tot_stackover++;
#endif
1081
1082
      IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
			       (long)t->id, whatNext_strs[t->what_next]));
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
      /* just adjust the stack for this thread, then pop it back
       * on the run queue.
       */
      threadPaused(t);
      { 
	/* enlarge the stack */
	StgTSO *new_t = threadStackOverflow(t);
	
	/* This TSO has moved, so update any pointers to it from the
	 * main thread stack.  It better not be on any other queues...
1093
	 * (it shouldn't be).
1094
	 */
1095
1096
	if (t->main != NULL) {
	    t->main->tso = new_t;
1097
1098
1099
1100
1101
1102
	}
	PUSH_ON_RUN_QUEUE(new_t);
      }
      break;

    case ThreadYielding:
1103
1104
1105
1106
1107
1108
1109
1110
      // Reset the context switch flag.  We don't do this just before
      // running the thread, because that would mean we would lose ticks
      // during GC, which can lead to unfair scheduling (a thread hogs
      // the CPU because the tick always arrives during GC).  This way
      // penalises threads that do a lot of allocation, but that seems
      // better than the alternative.
      context_switch = 0;

1111
1112
1113
1114
1115
#if defined(GRAN)
      IF_DEBUG(gran, 
	       DumpGranEvent(GR_DESCHEDULE, t));
      globalGranStats.tot_yields++;
#elif defined(PAR)
1116
1117
1118
      // IF_DEBUG(par, 
      // DumpGranEvent(GR_DESCHEDULE, t);
      globalParStats.tot_yields++;
1119
#endif
1120
1121
1122
1123
1124
      /* put the thread back on the run queue.  Then, if we're ready to
       * GC, check whether this is the last task to stop.  If so, wake
       * up the GC thread.  getThread will block during a GC until the
       * GC is finished.
       */
1125
      IF_DEBUG(scheduler,
1126
               if (t->what_next != prev_what_next) {
1127
1128
		   debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
			 (long)t->id, whatNext_strs[t->what_next]);
1129
               } else {
1130
1131
                   debugBelch("--<< thread %ld (%s) stopped, yielding\n",
			 (long)t->id, whatNext_strs[t->what_next]);
1132
1133
               }
               );
1134

1135
      IF_DEBUG(sanity,
1136
	       //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1137
1138
	       checkTSO(t));
      ASSERT(t->link == END_TSO_QUEUE);
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148

      // Shortcut if we're just switching evaluators: don't bother
      // doing stack squeezing (which can be expensive), just run the
      // thread.
      if (t->what_next != prev_what_next) {
	  goto run_thread;
      }

      threadPaused(t);

1149
1150
1151
1152
#if defined(GRAN)
      ASSERT(!is_on_queue(t,CurrentProc));

      IF_DEBUG(sanity,
1153
	       //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1154
1155
	       checkThreadQsSanity(rtsTrue));
#endif
1156

1157
1158
1159
1160
1161
1162
1163
1164
1165
#if defined(PAR)
      if (RtsFlags.ParFlags.doFairScheduling) { 
	/* this does round-robin scheduling; good for concurrency */
	APPEND_TO_RUN_QUEUE(t);
      } else {
	/* this does unfair scheduling; good for parallelism */
	PUSH_ON_RUN_QUEUE(t);
      }
#else
1166
1167
      // this does round-robin scheduling; good for concurrency
      APPEND_TO_RUN_QUEUE(t);
1168
#endif
1169

1170
1171
1172
1173
1174
1175
#if defined(GRAN)
      /* add a ContinueThread event to actually process the thread */
      new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
		ContinueThread,
		t, (StgClosure*)NULL, (rtsSpark*)NULL);
      IF_GRAN_DEBUG(bq, 
1176
	       debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");