Schedule.c 119 KB
Newer Older
1
/* ---------------------------------------------------------------------------
2
 * $Id: Schedule.c,v 1.181 2003/12/05 09:50:39 stolz Exp $
3
 *
4
 * (c) The GHC Team, 1998-2000
5
6
7
 *
 * Scheduler
 *
8
9
10
11
12
 * Different GHC ways use this scheduler quite differently (see comments below)
 * Here is the global picture:
 *
 * WAY  Name     CPP flag  What's it for
 * --------------------------------------
sof's avatar
sof committed
13
14
15
16
17
 * mp   GUM      PAR          Parallel execution on a distributed memory machine
 * s    SMP      SMP          Parallel execution on a shared memory machine
 * mg   GranSim  GRAN         Simulation of parallel execution
 * md   GUM/GdH  DIST         Distributed execution (based on GUM)
 *
18
19
20
21
 * --------------------------------------------------------------------------*/

//@node Main scheduling code, , ,
//@section Main scheduling code
22

23
24
/* 
 * Version with scheduler monitor support for SMPs (WAY=s):
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

   This design provides a high-level API to create and schedule threads etc.
   as documented in the SMP design document.

   It uses a monitor design controlled by a single mutex to exercise control
   over accesses to shared data structures, and builds on the Posix threads
   library.

   The majority of state is shared.  In order to keep essential per-task state,
   there is a Capability structure, which contains all the information
   needed to run a thread: its STG registers, a pointer to its TSO, a
   nursery etc.  During STG execution, a pointer to the capability is
   kept in a register (BaseReg).

   In a non-SMP build, there is one global capability, namely MainRegTable.

   SDM & KH, 10/99
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

 * Version with support for distributed memory parallelism aka GUM (WAY=mp):

   The main scheduling loop in GUM iterates until a finish message is received.
   In that case a global flag @receivedFinish@ is set and this instance of
   the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
   for the handling of incoming messages, such as PP_FINISH.
   Note that in the parallel case we have a system manager that coordinates
   different PEs, each of which are running one instance of the RTS.
   See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
   From this routine processes executing ghc/rts/Main.c are spawned. -- HWL

 * Version with support for simulating parallel execution aka GranSim (WAY=mg):

   The main scheduling code in GranSim is quite different from that in std
   (concurrent) Haskell: while concurrent Haskell just iterates over the
   threads in the runnable queue, GranSim is event driven, i.e. it iterates
   over the events in the global event queue.  -- HWL
60
61
*/

62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
//@menu
//* Includes::			
//* Variables and Data structures::  
//* Main scheduling loop::	
//* Suspend and Resume::	
//* Run queue code::		
//* Garbage Collextion Routines::  
//* Blocking Queue Routines::	
//* Exception Handling Routines::  
//* Debugging Routines::	
//* Index::			
//@end menu

//@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
//@subsection Includes

78
#include "PosixSource.h"
79
80
81
82
83
84
85
86
#include "Rts.h"
#include "SchedAPI.h"
#include "RtsUtils.h"
#include "RtsFlags.h"
#include "Storage.h"
#include "StgRun.h"
#include "StgStartup.h"
#include "Hooks.h"
sof's avatar
sof committed
87
#define COMPILING_SCHEDULER
88
89
90
#include "Schedule.h"
#include "StgMiscClosures.h"
#include "Storage.h"
91
#include "Interpreter.h"
92
#include "Exception.h"
93
94
95
#include "Printer.h"
#include "Signals.h"
#include "Sanity.h"
96
#include "Stats.h"
sof's avatar
sof committed
97
#include "Timer.h"
98
#include "Prelude.h"
99
#include "ThreadLabels.h"
100
101
102
103
#ifdef PROFILING
#include "Proftimer.h"
#include "ProfHeap.h"
#endif
104
105
106
107
108
109
110
111
112
#if defined(GRAN) || defined(PAR)
# include "GranSimRts.h"
# include "GranSim.h"
# include "ParallelRts.h"
# include "Parallel.h"
# include "ParallelDebug.h"
# include "FetchMe.h"
# include "HLC.h"
#endif
113
#include "Sparks.h"
sof's avatar
sof committed
114
115
#include "Capability.h"
#include "OSThreads.h"
sof's avatar
sof committed
116
#include  "Task.h"
117

118
119
120
121
122
123
124
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif

125
126
#include <string.h>
#include <stdlib.h>
127
#include <stdarg.h>
128

129
130
131
132
#ifdef HAVE_ERRNO_H
#include <errno.h>
#endif

133
134
135
136
137
138
139
140
141
142
143
144
#ifdef THREADED_RTS
#define USED_IN_THREADED_RTS
#else
#define USED_IN_THREADED_RTS STG_UNUSED
#endif

#ifdef RTS_SUPPORTS_THREADS
#define USED_WHEN_RTS_SUPPORTS_THREADS
#else
#define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
#endif

145
146
147
//@node Variables and Data structures, Prototypes, Includes, Main scheduling code
//@subsection Variables and Data structures

148
149
150
/* Main thread queue.
 * Locks required: sched_mutex.
 */
151
StgMainThread *main_threads = NULL;
152
153
154
155

/* Thread queues.
 * Locks required: sched_mutex.
 */
156
157
158
#if defined(GRAN)

StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
159
/* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
160
161

/* 
sof's avatar
sof committed
162
   In GranSim we have a runnable and a blocked queue for each processor.
163
164
165
166
167
168
169
170
   In order to minimise code changes new arrays run_queue_hds/tls
   are created. run_queue_hd is then a short cut (macro) for
   run_queue_hds[CurrentProc] (see GranSim.h).
   -- HWL
*/
StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
StgTSO *ccalling_threadss[MAX_PROC];
171
172
173
174
/* We use the same global list of threads (all_threads) in GranSim as in
   the std RTS (i.e. we are cheating). However, we don't use this list in
   the GranSim specific code at the moment (so we are only potentially
   cheating).  */
175
176
177

#else /* !GRAN */

178
179
180
181
182
StgTSO *run_queue_hd = NULL;
StgTSO *run_queue_tl = NULL;
StgTSO *blocked_queue_hd = NULL;
StgTSO *blocked_queue_tl = NULL;
StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
183

184
185
#endif

186
187
188
/* Linked list of all threads.
 * Used for detecting garbage collected threads.
 */
189
StgTSO *all_threads = NULL;
190

sof's avatar
sof committed
191
192
193
/* When a thread performs a safe C call (_ccall_GC, using old
 * terminology), it gets put on the suspended_ccalling_threads
 * list. Used by the garbage collector.
194
195
196
 */
static StgTSO *suspended_ccalling_threads;

197
198
static StgTSO *threadStackOverflow(StgTSO *tso);

199
200
201
202
203
/* KH: The following two flags are shared memory locations.  There is no need
       to lock them, since they are only unset at the end of a scheduler
       operation.
*/

204
/* flag set by signal handler to precipitate a context switch */
205
//@cindex context_switch
206
nat context_switch = 0;
207

208
/* if this flag is set as well, give up execution */
209
//@cindex interrupted
210
rtsBool interrupted = rtsFalse;
211

212
/* Next thread ID to allocate.
sof's avatar
sof committed
213
 * Locks required: thread_id_mutex
214
 */
215
//@cindex next_thread_id
216
static StgThreadID next_thread_id = 1;
217
218
219
220
221
222

/*
 * Pointers to the state of the current thread.
 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
 * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
 */
223
 
224
225
226
227
/* The smallest stack size that makes any sense is:
 *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
 *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
 *  + 1                       (the closure to enter)
228
229
 *  + 1			      (stg_ap_v_ret)
 *  + 1			      (spare slot req'd by stg_ap_v_ret)
230
231
232
233
234
 *
 * A thread with this stack will bomb immediately with a stack
 * overflow, which will increase its stack size.  
 */

235
#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
236

sof's avatar
sof committed
237

238
#if defined(GRAN)
239
StgTSO *CurrentTSO;
240
241
#endif

242
243
244
245
246
247
/*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
 *  exists - earlier gccs apparently didn't.
 *  -= chak
 */
StgTSO dummy_tso;

248
static rtsBool ready_to_gc;
sof's avatar
sof committed
249
250
251
252
253
254
255

/*
 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
 * in an MT setting, needed to signal that a worker thread shouldn't hang around
 * in the scheduler when it is out of work.
 */
static rtsBool shutting_down_scheduler = rtsFalse;
256
257
258

void            addToBlockedQueue ( StgTSO *tso );

259
static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
260
       void     interruptStgRts   ( void );
261

262
263
static void     detectBlackHoles  ( void );

264
265
266
#ifdef DEBUG
static void sched_belch(char *s, ...);
#endif
267

sof's avatar
sof committed
268
269
270
271
#if defined(RTS_SUPPORTS_THREADS)
/* ToDo: carefully document the invariants that go together
 *       with these synchronisation objects.
 */
sof's avatar
sof committed
272
273
Mutex     sched_mutex       = INIT_MUTEX_VAR;
Mutex     term_mutex        = INIT_MUTEX_VAR;
sof's avatar
sof committed
274

sof's avatar
sof committed
275
276
277
278
279
280
281
/*
 * A heavyweight solution to the problem of protecting
 * the thread_id from concurrent update.
 */
Mutex     thread_id_mutex   = INIT_MUTEX_VAR;


sof's avatar
sof committed
282
283
# if defined(SMP)
static Condition gc_pending_cond = INIT_COND_VAR;
284
nat await_death;
sof's avatar
sof committed
285
# endif
286

sof's avatar
sof committed
287
#endif /* RTS_SUPPORTS_THREADS */
sof's avatar
sof committed
288

289
290
291
#if defined(PAR)
StgTSO *LastTSO;
rtsTime TimeOfLastYield;
292
rtsBool emitSchedule = rtsTrue;
293
294
#endif

295
#if DEBUG
296
static char *whatNext_strs[] = {
297
  "ThreadRunGHC",
298
  "ThreadInterpret",
299
  "ThreadKilled",
300
  "ThreadRelocated",
301
302
303
304
  "ThreadComplete"
};
#endif

sof's avatar
sof committed
305
#if defined(PAR)
306
307
308
309
StgTSO * createSparkThread(rtsSpark spark);
StgTSO * activateSpark (rtsSpark spark);  
#endif

310
311
312
313
314
315
/*
 * The thread state for the main thread.
// ToDo: check whether not needed any more
StgTSO   *MainTSO;
 */

316
317
318
#if defined(RTS_SUPPORTS_THREADS)
static rtsBool startingWorkerThread = rtsFalse;

sof's avatar
sof committed
319
320
321
322
static void taskStart(void);
static void
taskStart(void)
{
323
324
325
326
327
328
329
330
  Capability *cap;
  
  ACQUIRE_LOCK(&sched_mutex);
  startingWorkerThread = rtsFalse;
  waitForWorkCapability(&sched_mutex, &cap, NULL);
  RELEASE_LOCK(&sched_mutex);
  
  schedule(NULL,cap);
sof's avatar
sof committed
331
332
}

333
void
334
startSchedulerTaskIfNecessary(void)
335
{
336
337
338
339
340
341
342
343
344
345
346
347
  if(run_queue_hd != END_TSO_QUEUE
    || blocked_queue_hd != END_TSO_QUEUE
    || sleeping_queue != END_TSO_QUEUE)
  {
    if(!startingWorkerThread)
    { // we don't want to start another worker thread
      // just because the last one hasn't yet reached the
      // "waiting for capability" state
      startingWorkerThread = rtsTrue;
      startTask(taskStart);
    }
  }
348
349
}
#endif
sof's avatar
sof committed
350

351
352
353
354
//@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
//@subsection Main scheduling loop

/* ---------------------------------------------------------------------------
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
   Main scheduling loop.

   We use round-robin scheduling, each thread returning to the
   scheduler loop when one of these conditions is detected:

      * out of heap space
      * timer expires (thread yields)
      * thread blocks
      * thread ends
      * stack overflow

   Locking notes:  we acquire the scheduler lock once at the beginning
   of the scheduler loop, and release it when
    
      * running a thread, or
      * waiting for work, or
      * waiting for a GC to complete.

373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
   GRAN version:
     In a GranSim setup this loop iterates over the global event queue.
     This revolves around the global event queue, which determines what 
     to do next. Therefore, it's more complicated than either the 
     concurrent or the parallel (GUM) setup.

   GUM version:
     GUM iterates over incoming messages.
     It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
     and sends out a fish whenever it has nothing to do; in-between
     doing the actual reductions (shared code below) it processes the
     incoming messages and deals with delayed operations 
     (see PendingFetches).
     This is not the ugliest code you could imagine, but it's bloody close.

388
389
   ------------------------------------------------------------------------ */
//@cindex schedule
390
static void
391
392
schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
          Capability *initialCapability )
393
394
{
  StgTSO *t;
395
  Capability *cap = initialCapability;
396
  StgThreadReturnCode ret;
397
398
399
#if defined(GRAN)
  rtsEvent *event;
#elif defined(PAR)
400
  StgSparkPool *pool;
401
402
403
  rtsSpark spark;
  StgTSO *tso;
  GlobalTaskId pe;
404
405
406
407
  rtsBool receivedFinish = rtsFalse;
# if defined(DEBUG)
  nat tp_size, sp_size; // stats only
# endif
408
#endif
409
  rtsBool was_interrupted = rtsFalse;
410
  StgTSOWhatNext prev_what_next;
411
412
  
  ACQUIRE_LOCK(&sched_mutex);
sof's avatar
sof committed
413
414
 
#if defined(RTS_SUPPORTS_THREADS)
415
416
417
418
419
420
421
422
423
424
  /* in the threaded case, the capability is either passed in via the initialCapability
     parameter, or initialized inside the scheduler loop */

  IF_DEBUG(scheduler,
    fprintf(stderr,"### NEW SCHEDULER LOOP in os thread %u(%p)\n",
	    osThreadId(), osThreadId()));
  IF_DEBUG(scheduler,
    fprintf(stderr,"### main thread: %p\n",mainThread));
  IF_DEBUG(scheduler,
    fprintf(stderr,"### initial cap: %p\n",initialCapability));
sof's avatar
sof committed
425
426
427
#else
  /* simply initialise it in the non-threaded case */
  grabCapability(&cap);
sof's avatar
sof committed
428
#endif
429

430
#if defined(GRAN)
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
  /* set up first event to get things going */
  /* ToDo: assign costs for system setup and init MainTSO ! */
  new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
	    ContinueThread, 
	    CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);

  IF_DEBUG(gran,
	   fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
	   G_TSO(CurrentTSO, 5));

  if (RtsFlags.GranFlags.Light) {
    /* Save current time; GranSim Light only */
    CurrentTSO->gran.clock = CurrentTime[CurrentProc];
  }      

  event = get_next_event();

  while (event!=(rtsEvent*)NULL) {
    /* Choose the processor with the next event */
    CurrentProc = event->proc;
    CurrentTSO = event->tso;

453
#elif defined(PAR)
454

455
456
  while (!receivedFinish) {    /* set by processMessages */
                               /* when receiving PP_FINISH message         */ 
457
#else
458

459
  while (1) {
460

461
#endif
462

463
464
    IF_DEBUG(scheduler, printAllThreads());

sof's avatar
sof committed
465
466
467
#if defined(RTS_SUPPORTS_THREADS)
    /* Check to see whether there are any worker threads
       waiting to deposit external call results. If so,
468
469
470
471
472
473
474
475
476
477
478
479
480
       yield our capability... if we have a capability, that is. */
    if(cap)
      yieldToReturningWorker(&sched_mutex, &cap,
	  mainThread ? &mainThread->bound_thread_cond : NULL);

    /* If we do not currently hold a capability, we wait for one */
    if(!cap)
    {
      waitForWorkCapability(&sched_mutex, &cap,
	  mainThread ? &mainThread->bound_thread_cond : NULL);
      IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): got cap",
				      osThreadId()));
    }
sof's avatar
sof committed
481
482
#endif

483
484
485
486
487
    /* If we're interrupted (the user pressed ^C, or some other
     * termination condition occurred), kill all the currently running
     * threads.
     */
    if (interrupted) {
488
      IF_DEBUG(scheduler, sched_belch("interrupted"));
489
490
      interrupted = rtsFalse;
      was_interrupted = rtsTrue;
491
492
493
494
495
496
497
498
499
500
#if defined(RTS_SUPPORTS_THREADS)
      // In the threaded RTS, deadlock detection doesn't work,
      // so just exit right away.
      prog_belch("interrupted");
      releaseCapability(cap);
      RELEASE_LOCK(&sched_mutex);
      shutdownHaskellAndExit(EXIT_SUCCESS);
#else
      deleteAllThreads();
#endif
501
502
503
504
505
506
507
    }

    /* Go through the list of main threads and wake up any
     * clients whose computations have finished.  ToDo: this
     * should be done more efficiently without a linear scan
     * of the main threads list, somehow...
     */
sof's avatar
sof committed
508
#if defined(RTS_SUPPORTS_THREADS)
509
    { 
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
	StgMainThread *m, **prev;
	prev = &main_threads;
	for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
	  if (m->tso->what_next == ThreadComplete
	      || m->tso->what_next == ThreadKilled)
	  {
	    if(m == mainThread)
	    {
              if(m->tso->what_next == ThreadComplete)
              {
                if (m->ret)
                {
                  // NOTE: return val is tso->sp[1] (see StgStartup.hc)
                  *(m->ret) = (StgClosure *)m->tso->sp[1]; 
                }
                m->stat = Success;
              }
              else
              {
                if (m->ret)
                {
                  *(m->ret) = NULL;
                }
                if (was_interrupted)
                {
                  m->stat = Interrupted;
                }
                else
                {
                  m->stat = Killed;
                }
              }
              *prev = m->link;
	    
544
#ifdef DEBUG
545
	      removeThreadLabel((StgWord)m->tso->id);
546
#endif
547
548
              releaseCapability(cap);
              RELEASE_LOCK(&sched_mutex);
549
550
551
552
553
554
555
556
557
558
559
              return;
            }
            else
            {
                // The current OS thread can not handle the fact that the Haskell
                // thread "m" has ended. 
                // "m" is bound; the scheduler loop in it's bound OS thread has
                // to return, so let's pass our capability directly to that thread.
              passCapability(&sched_mutex, cap, &m->bound_thread_cond);
              cap = NULL;
            }
560
          }
561
562
	}
    }
563
564
565
566
    
    if(!cap)	// If we gave our capability away,
      continue;	// go to the top to get it back
      
sof's avatar
sof committed
567
#else /* not threaded */
568

569
570
571
572
# if defined(PAR)
    /* in GUM do this only on the Main PE */
    if (IAmMainThread)
# endif
573
574
575
576
    /* If our main thread has finished or been killed, return.
     */
    {
      StgMainThread *m = main_threads;
577
578
      if (m->tso->what_next == ThreadComplete
	  || m->tso->what_next == ThreadKilled) {
579
#ifdef DEBUG
580
	removeThreadLabel((StgWord)m->tso->id);
581
#endif
582
	main_threads = main_threads->link;
583
	if (m->tso->what_next == ThreadComplete) {
584
585
586
587
588
	    // We finished successfully, fill in the return value
	    // NOTE: return val is tso->sp[1] (see StgStartup.hc)
	    if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[1]; };
	    m->stat = Success;
	    return;
589
	} else {
590
	  if (m->ret) { *(m->ret) = NULL; };
591
	  if (was_interrupted) {
592
593
594
595
	    m->stat = Interrupted;
	  } else {
	    m->stat = Killed;
	  }
596
597
598
599
600
601
	  return;
	}
      }
    }
#endif

602
603
604
    /* Top up the run queue from our spark pool.  We try to make the
     * number of threads in the run queue equal to the number of
     * free capabilities.
sof's avatar
sof committed
605
606
607
     *
     * Disable spark support in SMP for now, non-essential & requires
     * a little bit of work to make it compile cleanly. -- sof 1/02.
608
     */
sof's avatar
sof committed
609
#if 0 /* defined(SMP) */
610
    {
sof's avatar
sof committed
611
      nat n = getFreeCapabilities();
612
613
614
615
616
617
618
619
620
621
      StgTSO *tso = run_queue_hd;

      /* Count the run queue */
      while (n > 0 && tso != END_TSO_QUEUE) {
	tso = tso->link;
	n--;
      }

      for (; n > 0; n--) {
	StgClosure *spark;
622
	spark = findSpark(rtsFalse);
623
624
625
	if (spark == NULL) {
	  break; /* no more sparks in the pool */
	} else {
626
627
628
	  /* I'd prefer this to be done in activateSpark -- HWL */
	  /* tricky - it needs to hold the scheduler lock and
	   * not try to re-acquire it -- SDM */
629
	  createSparkThread(spark);	  
630
	  IF_DEBUG(scheduler,
631
		   sched_belch("==^^ turning spark of closure %p into a thread",
632
633
634
635
636
637
			       (StgClosure *)spark));
	}
      }
      /* We need to wake up the other tasks if we just created some
       * work for them.
       */
sof's avatar
sof committed
638
      if (getFreeCapabilities() - n > 1) {
sof's avatar
sof committed
639
   	  signalCondition( &thread_ready_cond );
640
641
      }
    }
642
#endif // SMP
643

644
    /* check for signals each time around the scheduler */
sof's avatar
sof committed
645
#if defined(RTS_USER_SIGNALS)
646
    if (signals_pending()) {
sof's avatar
sof committed
647
      RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
648
      startSignalHandlers();
sof's avatar
sof committed
649
      ACQUIRE_LOCK(&sched_mutex);
650
651
652
    }
#endif

653
654
655
    /* Check whether any waiting threads need to be woken up.  If the
     * run queue is empty, and there are no other tasks running, we
     * can wait indefinitely for something to happen.
656
     */
657
658
659
660
661
662
    if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) 
#if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
		|| EMPTY_RUN_QUEUE()
#endif
        )
    {
sof's avatar
sof committed
663
      awaitEvent( EMPTY_RUN_QUEUE()
sof's avatar
sof committed
664
665
#if defined(SMP)
	&& allFreeCapabilities()
666
667
#endif
	);
668
    }
669
670
671
    /* we can be interrupted while waiting for I/O... */
    if (interrupted) continue;

672
673
674
675
676
677
678
679
680
681
    /* 
     * Detect deadlock: when we have no threads to run, there are no
     * threads waiting on I/O or sleeping, and all the other tasks are
     * waiting for work, we must have a deadlock of some description.
     *
     * We first try to find threads blocked on themselves (ie. black
     * holes), and generate NonTermination exceptions where necessary.
     *
     * If no threads are black holed, we have a deadlock situation, so
     * inform all the main threads.
682
     */
683
#if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
684
    if (   EMPTY_THREAD_QUEUES()
sof's avatar
sof committed
685
#if defined(RTS_SUPPORTS_THREADS)
sof's avatar
sof committed
686
	&& EMPTY_QUEUE(suspended_ccalling_threads)
sof's avatar
sof committed
687
688
689
#endif
#ifdef SMP
	&& allFreeCapabilities()
690
691
#endif
	)
692
    {
693
	IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
sof's avatar
sof committed
694
695
696
697
#if defined(THREADED_RTS)
	/* and SMP mode ..? */
	releaseCapability(cap);
#endif
698
699
700
701
	// Garbage collection can release some new threads due to
	// either (a) finalizers or (b) threads resurrected because
	// they are about to be send BlockedOnDeadMVar.  Any threads
	// thus released will be immediately runnable.
702
	GarbageCollect(GetRoots,rtsTrue);
703
704
705
706
707
708
709
710
711

	if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }

	IF_DEBUG(scheduler, 
		 sched_belch("still deadlocked, checking for black holes..."));
	detectBlackHoles();

	if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }

sof's avatar
sof committed
712
#if defined(RTS_USER_SIGNALS)
713
714
715
716
	/* If we have user-installed signal handlers, then wait
	 * for signals to arrive rather then bombing out with a
	 * deadlock.
	 */
sof's avatar
sof committed
717
718
719
720
721
722
723
724
725
726
727
#if defined(RTS_SUPPORTS_THREADS)
	if ( 0 ) { /* hmm..what to do? Simply stop waiting for
		      a signal with no runnable threads (or I/O
		      suspended ones) leads nowhere quick.
		      For now, simply shut down when we reach this
		      condition.
		      
		      ToDo: define precisely under what conditions
		      the Scheduler should shut down in an MT setting.
		   */
#else
728
	if ( anyUserHandlers() ) {
sof's avatar
sof committed
729
#endif
730
731
732
733
734
735
736
737
738
	    IF_DEBUG(scheduler, 
		     sched_belch("still deadlocked, waiting for signals..."));

	    awaitUserSignals();

	    // we might be interrupted...
	    if (interrupted) { continue; }

	    if (signals_pending()) {
sof's avatar
sof committed
739
		RELEASE_LOCK(&sched_mutex);
740
		startSignalHandlers();
sof's avatar
sof committed
741
		ACQUIRE_LOCK(&sched_mutex);
742
743
744
745
746
747
748
749
750
751
752
753
754
	    }
	    ASSERT(!EMPTY_RUN_QUEUE());
	    goto not_deadlocked;
	}
#endif

	/* Probably a real deadlock.  Send the current main thread the
	 * Deadlock exception (or in the SMP build, send *all* main
	 * threads the deadlock exception, since none of them can make
	 * progress).
	 */
	{
	    StgMainThread *m;
sof's avatar
sof committed
755
#if defined(RTS_SUPPORTS_THREADS)
756
	    for (m = main_threads; m != NULL; m = m->link) {
757
758
		switch (m->tso->why_blocked) {
		case BlockedOnBlackHole:
sof's avatar
sof committed
759
		    raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
760
761
762
		    break;
		case BlockedOnException:
		case BlockedOnMVar:
sof's avatar
sof committed
763
		    raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
764
765
766
767
		    break;
		default:
		    barf("deadlock: main thread blocked in a strange way");
		}
768
	    }
769
770
771
772
#else
	    m = main_threads;
	    switch (m->tso->why_blocked) {
	    case BlockedOnBlackHole:
sof's avatar
sof committed
773
		raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
774
775
776
		break;
	    case BlockedOnException:
	    case BlockedOnMVar:
sof's avatar
sof committed
777
		raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
778
779
780
		break;
	    default:
		barf("deadlock: main thread blocked in a strange way");
sof's avatar
sof committed
781
	    }
sof's avatar
sof committed
782
#endif
783
	}
784
785
786
787
788

#if defined(RTS_SUPPORTS_THREADS)
	/* ToDo: revisit conditions (and mechanism) for shutting
	   down a multi-threaded world  */
	IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
sof's avatar
sof committed
789
790
791
	RELEASE_LOCK(&sched_mutex);
	shutdownHaskell();
	return;
792
#endif
793
    }
794
795
  not_deadlocked:

796
797
#elif defined(RTS_SUPPORTS_THREADS)
    /* ToDo: add deadlock detection in threaded RTS */
798
799
#elif defined(PAR)
    /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
800
801
#endif

sof's avatar
sof committed
802
#if defined(SMP)
803
804
805
806
    /* If there's a GC pending, don't do anything until it has
     * completed.
     */
    if (ready_to_gc) {
807
      IF_DEBUG(scheduler,sched_belch("waiting for GC"));
sof's avatar
sof committed
808
      waitCondition( &gc_pending_cond, &sched_mutex );
809
    }
sof's avatar
sof committed
810
811
#endif    

sof's avatar
sof committed
812
#if defined(RTS_SUPPORTS_THREADS)
813
#if defined(SMP)
814
815
    /* block until we've got a thread on the run queue and a free
     * capability.
sof's avatar
sof committed
816
     *
817
     */
sof's avatar
sof committed
818
819
820
    if ( EMPTY_RUN_QUEUE() ) {
      /* Give up our capability */
      releaseCapability(cap);
sof's avatar
sof committed
821
822
823
824
825
826
827
828

      /* If we're in the process of shutting down (& running the
       * a batch of finalisers), don't wait around.
       */
      if ( shutting_down_scheduler ) {
	RELEASE_LOCK(&sched_mutex);
	return;
      }
sof's avatar
sof committed
829
830
831
      IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
      waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
      IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
832
    }
833
834
835
836
837
#else
    if ( EMPTY_RUN_QUEUE() ) {
      continue; // nothing to do
    }
#endif
838
#endif
839
840

#if defined(GRAN)
841
842
843
844
845
846
847
848
849
850
851
852
    if (RtsFlags.GranFlags.Light)
      GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc

    /* adjust time based on time-stamp */
    if (event->time > CurrentTime[CurrentProc] &&
        event->evttype != ContinueThread)
      CurrentTime[CurrentProc] = event->time;
    
    /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
    if (!RtsFlags.GranFlags.Light)
      handleIdlePEs();

853
    IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962

    /* main event dispatcher in GranSim */
    switch (event->evttype) {
      /* Should just be continuing execution */
    case ContinueThread:
      IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
      /* ToDo: check assertion
      ASSERT(run_queue_hd != (StgTSO*)NULL &&
	     run_queue_hd != END_TSO_QUEUE);
      */
      /* Ignore ContinueThreads for fetching threads (if synchr comm) */
      if (!RtsFlags.GranFlags.DoAsyncFetch &&
	  procStatus[CurrentProc]==Fetching) {
	belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }	
      /* Ignore ContinueThreads for completed threads */
      if (CurrentTSO->what_next == ThreadComplete) {
	belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }	
      /* Ignore ContinueThreads for threads that are being migrated */
      if (PROCS(CurrentTSO)==Nowhere) { 
	belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }
      /* The thread should be at the beginning of the run queue */
      if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
	belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	break; // run the thread anyway
      }
      /*
      new_event(proc, proc, CurrentTime[proc],
		FindWork,
		(StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
      goto next_thread; 
      */ /* Catches superfluous CONTINUEs -- should be unnecessary */
      break; // now actually run the thread; DaH Qu'vam yImuHbej 

    case FetchNode:
      do_the_fetchnode(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case GlobalBlock:
      do_the_globalblock(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case FetchReply:
      do_the_fetchreply(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case UnblockThread:   /* Move from the blocked queue to the tail of */
      do_the_unblock(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case ResumeThread:  /* Move from the blocked queue to the tail of */
      /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
      event->tso->gran.blocktime += 
	CurrentTime[CurrentProc] - event->tso->gran.blockedat;
      do_the_startthread(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case StartThread:
      do_the_startthread(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case MoveThread:
      do_the_movethread(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case MoveSpark:
      do_the_movespark(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case FindWork:
      do_the_findwork(event);
      goto next_thread;             /* handle next event in event queue  */
      
    default:
      barf("Illegal event type %u\n", event->evttype);
    }  /* switch */
    
    /* This point was scheduler_loop in the old RTS */

    IF_DEBUG(gran, belch("GRAN: after main switch"));

    TimeOfLastEvent = CurrentTime[CurrentProc];
    TimeOfNextEvent = get_time_of_next_event();
    IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
    // CurrentTSO = ThreadQueueHd;

    IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
			 TimeOfNextEvent));

    if (RtsFlags.GranFlags.Light) 
      GranSimLight_leave_system(event, &ActiveTSO); 

    EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;

    IF_DEBUG(gran, 
	     belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));

    /* in a GranSim setup the TSO stays on the run queue */
    t = CurrentTSO;
    /* Take a thread from the run queue. */
sof's avatar
sof committed
963
    POP_RUN_QUEUE(t); // take_off_run_queue(t);
964
965
966

    IF_DEBUG(gran, 
	     fprintf(stderr, "GRAN: About to run current thread, which is\n");
967
	     G_TSO(t,5));
968
969
970
971
972
973
974
975

    context_switch = 0; // turned on via GranYield, checking events and time slice

    IF_DEBUG(gran, 
	     DumpGranEvent(GR_SCHEDULE, t));

    procStatus[CurrentProc] = Busy;

976
#elif defined(PAR)
977
978
979
980
    if (PendingFetches != END_BF_QUEUE) {
        processFetches();
    }

981
    /* ToDo: phps merge with spark activation above */
982
    /* check whether we have local work and send requests if we have none */
983
    if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
984
      /* :-[  no local threads => look out for local sparks */
985
986
      /* the spark pool for the current PE */
      pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
987
      if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
988
	  pool->hd < pool->tl) {
989
990
991
992
993
994
995
996
	/* 
	 * ToDo: add GC code check that we really have enough heap afterwards!!
	 * Old comment:
	 * If we're here (no runnable threads) and we have pending
	 * sparks, we must have a space problem.  Get enough space
	 * to turn one of those pending sparks into a
	 * thread... 
	 */
997
998

	spark = findSpark(rtsFalse);                /* get a spark */
999
1000
	if (spark != (rtsSpark) NULL) {
	  tso = activateSpark(spark);       /* turn the spark into a thread */
For faster browsing, not all history is shown. View entire blame