Schedule.c 102 KB
Newer Older
1
/* ---------------------------------------------------------------------------
2
 * $Id: Schedule.c,v 1.101 2001/10/23 10:54:14 simonmar Exp $
3
 *
4
 * (c) The GHC Team, 1998-2000
5
6
7
 *
 * Scheduler
 *
8
9
10
11
12
13
14
15
16
 * Different GHC ways use this scheduler quite differently (see comments below)
 * Here is the global picture:
 *
 * WAY  Name     CPP flag  What's it for
 * --------------------------------------
 * mp   GUM      PAR       Parallel execution on a distributed memory machine
 * s    SMP      SMP       Parallel execution on a shared memory machine
 * mg   GranSim  GRAN      Simulation of parallel execution
 * md   GUM/GdH  DIST      Distributed execution (based on GUM)
17
18
19
20
 * --------------------------------------------------------------------------*/

//@node Main scheduling code, , ,
//@section Main scheduling code
21

22
23
/* 
 * Version with scheduler monitor support for SMPs (WAY=s):
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

   This design provides a high-level API to create and schedule threads etc.
   as documented in the SMP design document.

   It uses a monitor design controlled by a single mutex to exercise control
   over accesses to shared data structures, and builds on the Posix threads
   library.

   The majority of state is shared.  In order to keep essential per-task state,
   there is a Capability structure, which contains all the information
   needed to run a thread: its STG registers, a pointer to its TSO, a
   nursery etc.  During STG execution, a pointer to the capability is
   kept in a register (BaseReg).

   In a non-SMP build, there is one global capability, namely MainRegTable.

   SDM & KH, 10/99
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

 * Version with support for distributed memory parallelism aka GUM (WAY=mp):

   The main scheduling loop in GUM iterates until a finish message is received.
   In that case a global flag @receivedFinish@ is set and this instance of
   the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
   for the handling of incoming messages, such as PP_FINISH.
   Note that in the parallel case we have a system manager that coordinates
   different PEs, each of which are running one instance of the RTS.
   See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
   From this routine processes executing ghc/rts/Main.c are spawned. -- HWL

 * Version with support for simulating parallel execution aka GranSim (WAY=mg):

   The main scheduling code in GranSim is quite different from that in std
   (concurrent) Haskell: while concurrent Haskell just iterates over the
   threads in the runnable queue, GranSim is event driven, i.e. it iterates
   over the events in the global event queue.  -- HWL
59
60
*/

61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
//@menu
//* Includes::			
//* Variables and Data structures::  
//* Main scheduling loop::	
//* Suspend and Resume::	
//* Run queue code::		
//* Garbage Collextion Routines::  
//* Blocking Queue Routines::	
//* Exception Handling Routines::  
//* Debugging Routines::	
//* Index::			
//@end menu

//@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
//@subsection Includes

77
#include "PosixSource.h"
78
79
80
81
82
83
84
85
86
87
88
#include "Rts.h"
#include "SchedAPI.h"
#include "RtsUtils.h"
#include "RtsFlags.h"
#include "Storage.h"
#include "StgRun.h"
#include "StgStartup.h"
#include "Hooks.h"
#include "Schedule.h"
#include "StgMiscClosures.h"
#include "Storage.h"
89
#include "Interpreter.h"
90
#include "Exception.h"
91
92
93
94
#include "Printer.h"
#include "Main.h"
#include "Signals.h"
#include "Sanity.h"
95
#include "Stats.h"
andy's avatar
andy committed
96
#include "Itimer.h"
97
#include "Prelude.h"
98
99
100
101
102
103
104
105
106
#if defined(GRAN) || defined(PAR)
# include "GranSimRts.h"
# include "GranSim.h"
# include "ParallelRts.h"
# include "Parallel.h"
# include "ParallelDebug.h"
# include "FetchMe.h"
# include "HLC.h"
#endif
107
#include "Sparks.h"
108
109

#include <stdarg.h>
110

111
112
113
//@node Variables and Data structures, Prototypes, Includes, Main scheduling code
//@subsection Variables and Data structures

114
115
116
117
118
119
120
121
122
123
124
125
126
127
/* Main threads:
 *
 * These are the threads which clients have requested that we run.  
 *
 * In an SMP build, we might have several concurrent clients all
 * waiting for results, and each one will wait on a condition variable
 * until the result is available.
 *
 * In non-SMP, clients are strictly nested: the first client calls
 * into the RTS, which might call out again to C with a _ccall_GC, and
 * eventually re-enter the RTS.
 *
 * Main threads information is kept in a linked list:
 */
128
//@cindex StgMainThread
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
typedef struct StgMainThread_ {
  StgTSO *         tso;
  SchedulerStatus  stat;
  StgClosure **    ret;
#ifdef SMP
  pthread_cond_t wakeup;
#endif
  struct StgMainThread_ *link;
} StgMainThread;

/* Main thread queue.
 * Locks required: sched_mutex.
 */
static StgMainThread *main_threads;

/* Thread queues.
 * Locks required: sched_mutex.
 */
147
148
149
#if defined(GRAN)

StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
150
/* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
151
152
153
154
155
156
157
158
159
160
161

/* 
   In GranSim we have a runable and a blocked queue for each processor.
   In order to minimise code changes new arrays run_queue_hds/tls
   are created. run_queue_hd is then a short cut (macro) for
   run_queue_hds[CurrentProc] (see GranSim.h).
   -- HWL
*/
StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
StgTSO *ccalling_threadss[MAX_PROC];
162
163
164
165
/* We use the same global list of threads (all_threads) in GranSim as in
   the std RTS (i.e. we are cheating). However, we don't use this list in
   the GranSim specific code at the moment (so we are only potentially
   cheating).  */
166
167
168

#else /* !GRAN */

169
170
StgTSO *run_queue_hd, *run_queue_tl;
StgTSO *blocked_queue_hd, *blocked_queue_tl;
171
StgTSO *sleeping_queue;		/* perhaps replace with a hash table? */
172

173
174
#endif

175
176
177
178
179
/* Linked list of all threads.
 * Used for detecting garbage collected threads.
 */
StgTSO *all_threads;

180
181
182
183
/* Threads suspended in _ccall_GC.
 */
static StgTSO *suspended_ccalling_threads;

184
static void GetRoots(evac_fn);
185
186
static StgTSO *threadStackOverflow(StgTSO *tso);

187
188
189
190
191
/* KH: The following two flags are shared memory locations.  There is no need
       to lock them, since they are only unset at the end of a scheduler
       operation.
*/

192
/* flag set by signal handler to precipitate a context switch */
193
//@cindex context_switch
194
nat context_switch;
195

196
/* if this flag is set as well, give up execution */
197
//@cindex interrupted
198
rtsBool interrupted;
199

200
201
202
/* Next thread ID to allocate.
 * Locks required: sched_mutex
 */
203
//@cindex next_thread_id
204
205
206
207
208
209
210
StgThreadID next_thread_id = 1;

/*
 * Pointers to the state of the current thread.
 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
 * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
 */
211
 
212
213
214
215
216
217
218
219
220
221
222
223
/* The smallest stack size that makes any sense is:
 *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
 *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
 *  + 1                       (the realworld token for an IO thread)
 *  + 1                       (the closure to enter)
 *
 * A thread with this stack will bomb immediately with a stack
 * overflow, which will increase its stack size.  
 */

#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)

224
225
226
227
/* Free capability list.
 * Locks required: sched_mutex.
 */
#ifdef SMP
228
229
230
231
232
233
234
235
236
237
//@cindex free_capabilities
//@cindex n_free_capabilities
Capability *free_capabilities; /* Available capabilities for running threads */
nat n_free_capabilities;       /* total number of available capabilities */
#else
//@cindex MainRegTable
Capability MainRegTable;       /* for non-SMP, we have one global capability */
#endif

#if defined(GRAN)
238
StgTSO *CurrentTSO;
239
240
#endif

241
242
243
244
245
246
/*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
 *  exists - earlier gccs apparently didn't.
 *  -= chak
 */
StgTSO dummy_tso;

247
248
249
250
251
rtsBool ready_to_gc;

/* All our current task ids, saved in case we need to kill them later.
 */
#ifdef SMP
252
//@cindex task_ids
253
254
255
256
257
258
task_info *task_ids;
#endif

void            addToBlockedQueue ( StgTSO *tso );

static void     schedule          ( void );
259
       void     interruptStgRts   ( void );
260
261
262
#if defined(GRAN)
static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
#else
263
static StgTSO * createThread_     ( nat size, rtsBool have_lock );
264
#endif
265

266
267
static void     detectBlackHoles  ( void );

268
269
270
#ifdef DEBUG
static void sched_belch(char *s, ...);
#endif
271
272

#ifdef SMP
273
274
275
276
//@cindex sched_mutex
//@cindex term_mutex
//@cindex thread_ready_cond
//@cindex gc_pending_cond
277
278
279
280
281
282
283
284
pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;

nat await_death;
#endif

285
286
287
#if defined(PAR)
StgTSO *LastTSO;
rtsTime TimeOfLastYield;
288
rtsBool emitSchedule = rtsTrue;
289
290
#endif

291
292
293
294
#if DEBUG
char *whatNext_strs[] = {
  "ThreadEnterGHC",
  "ThreadRunGHC",
295
  "ThreadEnterInterp",
296
297
298
299
300
301
302
303
304
305
306
307
308
  "ThreadKilled",
  "ThreadComplete"
};

char *threadReturnCode_strs[] = {
  "HeapOverflow",			/* might also be StackOverflow */
  "StackOverflow",
  "ThreadYielding",
  "ThreadBlocked",
  "ThreadFinished"
};
#endif

309
310
311
312
313
#ifdef PAR
StgTSO * createSparkThread(rtsSpark spark);
StgTSO * activateSpark (rtsSpark spark);  
#endif

314
315
316
317
318
319
320
321
322
323
/*
 * The thread state for the main thread.
// ToDo: check whether not needed any more
StgTSO   *MainTSO;
 */

//@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
//@subsection Main scheduling loop

/* ---------------------------------------------------------------------------
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
   Main scheduling loop.

   We use round-robin scheduling, each thread returning to the
   scheduler loop when one of these conditions is detected:

      * out of heap space
      * timer expires (thread yields)
      * thread blocks
      * thread ends
      * stack overflow

   Locking notes:  we acquire the scheduler lock once at the beginning
   of the scheduler loop, and release it when
    
      * running a thread, or
      * waiting for work, or
      * waiting for a GC to complete.

342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
   GRAN version:
     In a GranSim setup this loop iterates over the global event queue.
     This revolves around the global event queue, which determines what 
     to do next. Therefore, it's more complicated than either the 
     concurrent or the parallel (GUM) setup.

   GUM version:
     GUM iterates over incoming messages.
     It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
     and sends out a fish whenever it has nothing to do; in-between
     doing the actual reductions (shared code below) it processes the
     incoming messages and deals with delayed operations 
     (see PendingFetches).
     This is not the ugliest code you could imagine, but it's bloody close.

357
358
   ------------------------------------------------------------------------ */
//@cindex schedule
359
360
361
362
363
364
static void
schedule( void )
{
  StgTSO *t;
  Capability *cap;
  StgThreadReturnCode ret;
365
366
367
#if defined(GRAN)
  rtsEvent *event;
#elif defined(PAR)
368
  StgSparkPool *pool;
369
370
371
  rtsSpark spark;
  StgTSO *tso;
  GlobalTaskId pe;
372
373
374
375
  rtsBool receivedFinish = rtsFalse;
# if defined(DEBUG)
  nat tp_size, sp_size; // stats only
# endif
376
#endif
377
  rtsBool was_interrupted = rtsFalse;
378
379
380
  
  ACQUIRE_LOCK(&sched_mutex);

381
#if defined(GRAN)
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404

  /* set up first event to get things going */
  /* ToDo: assign costs for system setup and init MainTSO ! */
  new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
	    ContinueThread, 
	    CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);

  IF_DEBUG(gran,
	   fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
	   G_TSO(CurrentTSO, 5));

  if (RtsFlags.GranFlags.Light) {
    /* Save current time; GranSim Light only */
    CurrentTSO->gran.clock = CurrentTime[CurrentProc];
  }      

  event = get_next_event();

  while (event!=(rtsEvent*)NULL) {
    /* Choose the processor with the next event */
    CurrentProc = event->proc;
    CurrentTSO = event->tso;

405
#elif defined(PAR)
406

407
408
  while (!receivedFinish) {    /* set by processMessages */
                               /* when receiving PP_FINISH message         */ 
409
#else
410

411
  while (1) {
412

413
#endif
414

415
416
    IF_DEBUG(scheduler, printAllThreads());

417
418
419
420
421
    /* If we're interrupted (the user pressed ^C, or some other
     * termination condition occurred), kill all the currently running
     * threads.
     */
    if (interrupted) {
422
      IF_DEBUG(scheduler, sched_belch("interrupted"));
423
      deleteAllThreads();
424
425
      interrupted = rtsFalse;
      was_interrupted = rtsTrue;
426
427
428
429
430
431
432
433
434
435
436
437
    }

    /* Go through the list of main threads and wake up any
     * clients whose computations have finished.  ToDo: this
     * should be done more efficiently without a linear scan
     * of the main threads list, somehow...
     */
#ifdef SMP
    { 
      StgMainThread *m, **prev;
      prev = &main_threads;
      for (m = main_threads; m != NULL; m = m->link) {
438
	switch (m->tso->what_next) {
439
	case ThreadComplete:
440
441
442
443
444
445
	  if (m->ret) {
	    *(m->ret) = (StgClosure *)m->tso->sp[0];
	  }
	  *prev = m->link;
	  m->stat = Success;
	  pthread_cond_broadcast(&m->wakeup);
446
447
	  break;
	case ThreadKilled:
448
	  *prev = m->link;
449
	  if (was_interrupted) {
450
451
452
453
	    m->stat = Interrupted;
	  } else {
	    m->stat = Killed;
	  }
454
	  pthread_cond_broadcast(&m->wakeup);
455
456
457
	  break;
	default:
	  break;
458
459
460
	}
      }
    }
461

462
#else
463
464
465
466
# if defined(PAR)
    /* in GUM do this only on the Main PE */
    if (IAmMainThread)
# endif
467
468
469
470
    /* If our main thread has finished or been killed, return.
     */
    {
      StgMainThread *m = main_threads;
471
472
      if (m->tso->what_next == ThreadComplete
	  || m->tso->what_next == ThreadKilled) {
473
	main_threads = main_threads->link;
474
	if (m->tso->what_next == ThreadComplete) {
475
476
477
478
479
	  /* we finished successfully, fill in the return value */
	  if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
	  m->stat = Success;
	  return;
	} else {
480
	  if (was_interrupted) {
481
482
483
484
	    m->stat = Interrupted;
	  } else {
	    m->stat = Killed;
	  }
485
486
487
488
489
490
	  return;
	}
      }
    }
#endif

491
492
493
494
    /* Top up the run queue from our spark pool.  We try to make the
     * number of threads in the run queue equal to the number of
     * free capabilities.
     */
495
#if defined(SMP)
496
497
498
499
500
501
502
503
504
505
506
507
    {
      nat n = n_free_capabilities;
      StgTSO *tso = run_queue_hd;

      /* Count the run queue */
      while (n > 0 && tso != END_TSO_QUEUE) {
	tso = tso->link;
	n--;
      }

      for (; n > 0; n--) {
	StgClosure *spark;
508
	spark = findSpark(rtsFalse);
509
510
511
	if (spark == NULL) {
	  break; /* no more sparks in the pool */
	} else {
512
513
514
	  /* I'd prefer this to be done in activateSpark -- HWL */
	  /* tricky - it needs to hold the scheduler lock and
	   * not try to re-acquire it -- SDM */
515
	  createSparkThread(spark);	  
516
	  IF_DEBUG(scheduler,
517
		   sched_belch("==^^ turning spark of closure %p into a thread",
518
519
520
521
522
523
524
525
526
527
			       (StgClosure *)spark));
	}
      }
      /* We need to wake up the other tasks if we just created some
       * work for them.
       */
      if (n_free_capabilities - n > 1) {
	  pthread_cond_signal(&thread_ready_cond);
      }
    }
528
#endif /* SMP */
529

530
531
532
533
534
    /* Check whether any waiting threads need to be woken up.  If the
     * run queue is empty, and there are no other tasks running, we
     * can wait indefinitely for something to happen.
     * ToDo: what if another client comes along & requests another
     * main thread?
535
     */
536
    if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
537
538
539
      awaitEvent(
	   (run_queue_hd == END_TSO_QUEUE)
#ifdef SMP
540
	&& (n_free_capabilities == RtsFlags.ParFlags.nNodes)
541
542
#endif
	);
543
    }
544
545
546
    /* we can be interrupted while waiting for I/O... */
    if (interrupted) continue;

547
    /* check for signals each time around the scheduler */
rrt's avatar
rrt committed
548
#ifndef mingw32_TARGET_OS
549
550
551
552
553
    if (signals_pending()) {
      start_signal_handlers();
    }
#endif

554
555
556
557
558
559
560
561
562
563
    /* 
     * Detect deadlock: when we have no threads to run, there are no
     * threads waiting on I/O or sleeping, and all the other tasks are
     * waiting for work, we must have a deadlock of some description.
     *
     * We first try to find threads blocked on themselves (ie. black
     * holes), and generate NonTermination exceptions where necessary.
     *
     * If no threads are black holed, we have a deadlock situation, so
     * inform all the main threads.
564
     */
565
#ifndef PAR
566
567
    if (blocked_queue_hd == END_TSO_QUEUE
	&& run_queue_hd == END_TSO_QUEUE
568
	&& sleeping_queue == END_TSO_QUEUE
569
570
571
572
#ifdef SMP
	&& (n_free_capabilities == RtsFlags.ParFlags.nNodes)
#endif
	)
573
    {
574
575
576
577
578
579
580
581
582
583
584
	IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
	GarbageCollect(GetRoots,rtsTrue);
	if (blocked_queue_hd == END_TSO_QUEUE
	    && run_queue_hd == END_TSO_QUEUE
	    && sleeping_queue == END_TSO_QUEUE) {
	    IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
	    detectBlackHoles();
	    if (run_queue_hd == END_TSO_QUEUE) {
		StgMainThread *m = main_threads;
#ifdef SMP
		for (; m != NULL; m = m->link) {
585
		    deleteThread(m->tso);
586
587
588
589
590
591
		    m->ret = NULL;
		    m->stat = Deadlock;
		    pthread_cond_broadcast(&m->wakeup);
		}
		main_threads = NULL;
#else
592
		deleteThread(m->tso);
593
594
		m->ret = NULL;
		m->stat = Deadlock;
595
596
597
		main_threads = m->link;
		return;
#endif
598
599
	    }
	}
600
    }
601
602
#elif defined(PAR)
    /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
603
604
#endif

605
606
607
608
609
#ifdef SMP
    /* If there's a GC pending, don't do anything until it has
     * completed.
     */
    if (ready_to_gc) {
610
      IF_DEBUG(scheduler,sched_belch("waiting for GC"));
611
612
613
614
615
616
617
      pthread_cond_wait(&gc_pending_cond, &sched_mutex);
    }
    
    /* block until we've got a thread on the run queue and a free
     * capability.
     */
    while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
618
      IF_DEBUG(scheduler, sched_belch("waiting for work"));
619
      pthread_cond_wait(&thread_ready_cond, &sched_mutex);
620
      IF_DEBUG(scheduler, sched_belch("work now available"));
621
622
    }
#endif
623
624

#if defined(GRAN)
625
626
627
628
629
630
631
632
633
634
635
636
637

    if (RtsFlags.GranFlags.Light)
      GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc

    /* adjust time based on time-stamp */
    if (event->time > CurrentTime[CurrentProc] &&
        event->evttype != ContinueThread)
      CurrentTime[CurrentProc] = event->time;
    
    /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
    if (!RtsFlags.GranFlags.Light)
      handleIdlePEs();

638
    IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751

    /* main event dispatcher in GranSim */
    switch (event->evttype) {
      /* Should just be continuing execution */
    case ContinueThread:
      IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
      /* ToDo: check assertion
      ASSERT(run_queue_hd != (StgTSO*)NULL &&
	     run_queue_hd != END_TSO_QUEUE);
      */
      /* Ignore ContinueThreads for fetching threads (if synchr comm) */
      if (!RtsFlags.GranFlags.DoAsyncFetch &&
	  procStatus[CurrentProc]==Fetching) {
	belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }	
      /* Ignore ContinueThreads for completed threads */
      if (CurrentTSO->what_next == ThreadComplete) {
	belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }	
      /* Ignore ContinueThreads for threads that are being migrated */
      if (PROCS(CurrentTSO)==Nowhere) { 
	belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }
      /* The thread should be at the beginning of the run queue */
      if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
	belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	break; // run the thread anyway
      }
      /*
      new_event(proc, proc, CurrentTime[proc],
		FindWork,
		(StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
      goto next_thread; 
      */ /* Catches superfluous CONTINUEs -- should be unnecessary */
      break; // now actually run the thread; DaH Qu'vam yImuHbej 

    case FetchNode:
      do_the_fetchnode(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case GlobalBlock:
      do_the_globalblock(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case FetchReply:
      do_the_fetchreply(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case UnblockThread:   /* Move from the blocked queue to the tail of */
      do_the_unblock(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case ResumeThread:  /* Move from the blocked queue to the tail of */
      /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
      event->tso->gran.blocktime += 
	CurrentTime[CurrentProc] - event->tso->gran.blockedat;
      do_the_startthread(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case StartThread:
      do_the_startthread(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case MoveThread:
      do_the_movethread(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case MoveSpark:
      do_the_movespark(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case FindWork:
      do_the_findwork(event);
      goto next_thread;             /* handle next event in event queue  */
      
    default:
      barf("Illegal event type %u\n", event->evttype);
    }  /* switch */
    
    /* This point was scheduler_loop in the old RTS */

    IF_DEBUG(gran, belch("GRAN: after main switch"));

    TimeOfLastEvent = CurrentTime[CurrentProc];
    TimeOfNextEvent = get_time_of_next_event();
    IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
    // CurrentTSO = ThreadQueueHd;

    IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
			 TimeOfNextEvent));

    if (RtsFlags.GranFlags.Light) 
      GranSimLight_leave_system(event, &ActiveTSO); 

    EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;

    IF_DEBUG(gran, 
	     belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));

    /* in a GranSim setup the TSO stays on the run queue */
    t = CurrentTSO;
    /* Take a thread from the run queue. */
    t = POP_RUN_QUEUE(); // take_off_run_queue(t);

    IF_DEBUG(gran, 
	     fprintf(stderr, "GRAN: About to run current thread, which is\n");
752
	     G_TSO(t,5));
753
754
755
756
757
758
759
760

    context_switch = 0; // turned on via GranYield, checking events and time slice

    IF_DEBUG(gran, 
	     DumpGranEvent(GR_SCHEDULE, t));

    procStatus[CurrentProc] = Busy;

761
#elif defined(PAR)
762
763
764
765
    if (PendingFetches != END_BF_QUEUE) {
        processFetches();
    }

766
    /* ToDo: phps merge with spark activation above */
767
    /* check whether we have local work and send requests if we have none */
768
    if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
769
      /* :-[  no local threads => look out for local sparks */
770
771
      /* the spark pool for the current PE */
      pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
772
      if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
773
	  pool->hd < pool->tl) {
774
775
776
777
778
779
780
781
	/* 
	 * ToDo: add GC code check that we really have enough heap afterwards!!
	 * Old comment:
	 * If we're here (no runnable threads) and we have pending
	 * sparks, we must have a space problem.  Get enough space
	 * to turn one of those pending sparks into a
	 * thread... 
	 */
782
783

	spark = findSpark(rtsFalse);                /* get a spark */
784
785
	if (spark != (rtsSpark) NULL) {
	  tso = activateSpark(spark);       /* turn the spark into a thread */
786
787
788
	  IF_PAR_DEBUG(schedule,
		       belch("==== schedule: Created TSO %d (%p); %d threads active",
			     tso->id, tso, advisory_thread_count));
789

790
	  if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
791
	    belch("==^^ failed to activate spark");
792
	    goto next_thread;
793
	  }               /* otherwise fall through & pick-up new tso */
794
795
	} else {
	  IF_PAR_DEBUG(verbose,
796
797
		       belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
			     spark_queue_len(pool)));
798
799
	  goto next_thread;
	}
800
801
802
803
804
805
      }

      /* If we still have no work we need to send a FISH to get a spark
	 from another PE 
      */
      if (EMPTY_RUN_QUEUE()) {
806
807
808
809
810
811
812
813
814
      /* =8-[  no local sparks => look for work on other PEs */
	/*
	 * We really have absolutely no work.  Send out a fish
	 * (there may be some out there already), and wait for
	 * something to arrive.  We clearly can't run any threads
	 * until a SCHEDULE or RESUME arrives, and so that's what
	 * we're hoping to see.  (Of course, we still have to
	 * respond to other types of messages.)
	 */
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
	TIME now = msTime() /*CURRENT_TIME*/;
	IF_PAR_DEBUG(verbose, 
		     belch("--  now=%ld", now));
	IF_PAR_DEBUG(verbose,
		     if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
			 (last_fish_arrived_at!=0 &&
			  last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
		       belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
			     last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
			     last_fish_arrived_at,
			     RtsFlags.ParFlags.fishDelay, now);
		     });
	
	if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
	    (last_fish_arrived_at==0 ||
	     (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
	  /* outstandingFishes is set in sendFish, processFish;
832
833
834
835
	     avoid flooding system with fishes via delay */
	  pe = choosePE();
	  sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
		   NEW_FISH_HUNGER);
836
837
838
839
840
841

	  // Global statistics: count no. of fishes
	  if (RtsFlags.ParFlags.ParStats.Global &&
	      RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
	    globalParStats.tot_fish_mess++;
	  }
842
	}
843
844
      
	receivedFinish = processMessages();
845
846
847
	goto next_thread;
      }
    } else if (PacketsWaiting()) {  /* Look for incoming messages */
848
      receivedFinish = processMessages();
849
850
851
852
    }

    /* Now we are sure that we have some work available */
    ASSERT(run_queue_hd != END_TSO_QUEUE);
853

854
    /* Take a thread from the run queue, if we have work */
855
    t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
856
    IF_DEBUG(sanity,checkTSO(t));
857
858
859
860
861
862

    /* ToDo: write something to the log-file
    if (RTSflags.ParFlags.granSimStats && !sameThread)
        DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);

    CurrentTSO = t;
863
864
865
    */
    /* the spark pool for the current PE */
    pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
866

867
868
869
870
871
872
873
874
875
876
877
878
879
    IF_DEBUG(scheduler, 
	     belch("--=^ %d threads, %d sparks on [%#x]", 
		   run_queue_len(), spark_queue_len(pool), CURRENT_PROC));

#if 1
    if (0 && RtsFlags.ParFlags.ParStats.Full && 
	t && LastTSO && t->id != LastTSO->id && 
	LastTSO->why_blocked == NotBlocked && 
	LastTSO->what_next != ThreadComplete) {
      // if previously scheduled TSO not blocked we have to record the context switch
      DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
			   GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
    }
880

881
882
883
    if (RtsFlags.ParFlags.ParStats.Full && 
	(emitSchedule /* forced emit */ ||
        (t && LastTSO && t->id != LastTSO->id))) {
884
885
886
887
888
889
890
891
892
      /* 
	 we are running a different TSO, so write a schedule event to log file
	 NB: If we use fair scheduling we also have to write  a deschedule 
	     event for LastTSO; with unfair scheduling we know that the
	     previous tso has blocked whenever we switch to another tso, so
	     we don't need it in GUM for now
      */
      DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
		       GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
893
      emitSchedule = rtsFalse;
894
    }
895
     
896
#endif
897
#else /* !GRAN && !PAR */
898
899
900
  
    /* grab a thread from the run queue
     */
901
    ASSERT(run_queue_hd != END_TSO_QUEUE);
902
    t = POP_RUN_QUEUE();
903
    IF_DEBUG(sanity,checkTSO(t));
904
905

#endif
906
907
908
909
910
911
912
913
914
915
    
    /* grab a capability
     */
#ifdef SMP
    cap = free_capabilities;
    free_capabilities = cap->link;
    n_free_capabilities--;
#else
    cap = &MainRegTable;
#endif
916

917
918
    cap->rCurrentTSO = t;
    
919
920
921
922
923
924
    /* context switches are now initiated by the timer signal, unless
     * the user specified "context switch as often as possible", with
     * +RTS -C0
     */
    if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
	&& (run_queue_hd != END_TSO_QUEUE
925
926
	    || blocked_queue_hd != END_TSO_QUEUE
	    || sleeping_queue != END_TSO_QUEUE))
927
928
929
	context_switch = 1;
    else
	context_switch = 0;
930

931
    RELEASE_LOCK(&sched_mutex);
932

933
    IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
934
			      t->id, t, whatNext_strs[t->what_next]));
935

936
    /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
937
938
    /* Run the current thread 
     */
939
    switch (cap->rCurrentTSO->what_next) {
940
941
    case ThreadKilled:
    case ThreadComplete:
942
943
944
	/* Thread already finished, return to scheduler. */
	ret = ThreadFinished;
	break;
945
    case ThreadEnterGHC:
946
947
	ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
	break;
948
    case ThreadRunGHC:
949
950
	ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
	break;
951
    case ThreadEnterInterp:
952
953
	ret = interpretBCO(cap);
	break;
954
    default:
955
      barf("schedule: invalid what_next field");
956
    }
957
    /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
958
959
960
961
962
963
964
965
966
    
    /* Costs for the scheduler are assigned to CCS_SYSTEM */
#ifdef PROFILING
    CCCS = CCS_SYSTEM;
#endif
    
    ACQUIRE_LOCK(&sched_mutex);

#ifdef SMP
967
    IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
968
#elif !defined(GRAN) && !defined(PAR)
969
    IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
970
971
972
#endif
    t = cap->rCurrentTSO;
    
973
974
975
976
#if defined(PAR)
    /* HACK 675: if the last thread didn't yield, make sure to print a 
       SCHEDULE event to the log file when StgRunning the next thread, even
       if it is the same one as before */
977
    LastTSO = t; 
978
979
980
    TimeOfLastYield = CURRENT_TIME;
#endif

981
982
    switch (ret) {
    case HeapOverflow:
983
984
985
986
987
988
989
990
991
#if defined(GRAN)
      IF_DEBUG(gran, 
	       DumpGranEvent(GR_DESCHEDULE, t));
      globalGranStats.tot_heapover++;
#elif defined(PAR)
      // IF_DEBUG(par, 
      //DumpGranEvent(GR_DESCHEDULE, t);
      globalParStats.tot_heapover++;
#endif
992
993
994
995
      /* make all the running tasks block on a condition variable,
       * maybe set context_switch and wait till they all pile in,
       * then have them wait on a GC condition variable.
       */
996
      IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
997
			       t->id, t, whatNext_strs[t->what_next]));
998
      threadPaused(t);
999
1000
#if defined(GRAN)
      ASSERT(!is_on_queue(t,CurrentProc));
1001
1002
1003
1004
1005
1006
1007
1008
1009
#elif defined(PAR)
      /* Currently we emit a DESCHEDULE event before GC in GUM.
         ToDo: either add separate event to distinguish SYSTEM time from rest
	       or just nuke this DESCHEDULE (and the following SCHEDULE) */
      if (0 && RtsFlags.ParFlags.ParStats.Full) {
	DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
			 GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
	emitSchedule = rtsTrue;
      }
1010
#endif
1011
1012
1013
1014
      
      ready_to_gc = rtsTrue;
      context_switch = 1;		/* stop other threads ASAP */
      PUSH_ON_RUN_QUEUE(t);
1015
      /* actual GC is done at the end of the while loop */
1016
1017
1018
      break;
      
    case StackOverflow:
1019
1020
1021
1022
1023
1024
1025
1026
1027
#if defined(GRAN)
      IF_DEBUG(gran, 
	       DumpGranEvent(GR_DESCHEDULE, t));
      globalGranStats.tot_stackover++;
#elif defined(PAR)
      // IF_DEBUG(par, 
      // DumpGranEvent(GR_DESCHEDULE, t);
      globalParStats.tot_stackover++;
#endif
1028
      IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1029
			       t->id, t, whatNext_strs[t->what_next]));
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
      /* just adjust the stack for this thread, then pop it back
       * on the run queue.
       */
      threadPaused(t);
      { 
	StgMainThread *m;
	/* enlarge the stack */
	StgTSO *new_t = threadStackOverflow(t);
	
	/* This TSO has moved, so update any pointers to it from the
	 * main thread stack.  It better not be on any other queues...
1041
	 * (it shouldn't be).
1042
1043
1044
1045
1046
1047
	 */
	for (m = main_threads; m != NULL; m = m->link) {
	  if (m->tso == t) {
	    m->tso = new_t;
	  }
	}
1048
	threadPaused(new_t);
1049
1050
1051
1052
1053
	PUSH_ON_RUN_QUEUE(new_t);
      }
      break;

    case ThreadYielding:
1054
1055
1056
1057
1058
#if defined(GRAN)
      IF_DEBUG(gran, 
	       DumpGranEvent(GR_DESCHEDULE, t));
      globalGranStats.tot_yields++;
#elif defined(PAR)
1059
1060
1061
      // IF_DEBUG(par, 
      // DumpGranEvent(GR_DESCHEDULE, t);
      globalParStats.tot_yields++;
1062
#endif
1063
1064
1065
1066
1067
      /* put the thread back on the run queue.  Then, if we're ready to
       * GC, check whether this is the last task to stop.  If so, wake
       * up the GC thread.  getThread will block during a GC until the
       * GC is finished.
       */
1068
      IF_DEBUG(scheduler,
1069
               if (t->what_next == ThreadEnterInterp) {
1070
1071
1072
		   /* ToDo: or maybe a timer expired when we were in Hugs?
		    * or maybe someone hit ctrl-C
                    */
1073
                   belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1074
1075
			 t->id, t, whatNext_strs[t->what_next]);
               } else {
1076
                   belch("--<< thread %ld (%p; %s) stopped, yielding", 
1077
1078
1079
			 t->id, t, whatNext_strs[t->what_next]);
               }
               );
1080

1081
      threadPaused(t);
1082

1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
      IF_DEBUG(sanity,
	       //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
	       checkTSO(t));
      ASSERT(t->link == END_TSO_QUEUE);
#if defined(GRAN)
      ASSERT(!is_on_queue(t,CurrentProc));

      IF_DEBUG(sanity,
	       //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
	       checkThreadQsSanity(rtsTrue));
#endif
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
#if defined(PAR)
      if (RtsFlags.ParFlags.doFairScheduling) { 
	/* this does round-robin scheduling; good for concurrency */
	APPEND_TO_RUN_QUEUE(t);
      } else {
	/* this does unfair scheduling; good for parallelism */
	PUSH_ON_RUN_QUEUE(t);
      }
#else
      /* this does round-robin scheduling; good for concurrency */
1104
      APPEND_TO_RUN_QUEUE(t);
1105
#endif
1106
1107
1108
1109
1110
1111
1112
1113
#if defined(GRAN)
      /* add a ContinueThread event to actually process the thread */
      new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
		ContinueThread,
		t, (StgClosure*)NULL, (rtsSpark*)NULL);
      IF_GRAN_DEBUG(bq, 
	       belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
	       G_EVENTQ(0);
1114
	       G_CURR_THREADQ(0));
1115
#endif /* GRAN */
1116
1117
1118
      break;
      
    case ThreadBlocked:
1119
#if defined(GRAN)
1120
      IF_DEBUG(scheduler,
1121
	       belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
			       t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
	       if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));

      // ??? needed; should emit block before
      IF_DEBUG(gran, 
	       DumpGranEvent(GR_DESCHEDULE, t)); 
      prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
      /*
	ngoq Dogh!
      ASSERT(procStatus[CurrentProc]==Busy || 
	      ((procStatus[CurrentProc]==Fetching) && 
	      (t->block_info.closure!=(StgClosure*)NULL)));
      if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
	  !(!RtsFlags.GranFlags.DoAsyncFetch &&
	    procStatus[CurrentProc]==Fetching)) 
	procStatus[CurrentProc] = Idle;
      */
1139
#elif defined(PAR)
1140
1141
1142
1143
1144
1145
1146
      IF_DEBUG(scheduler,
	       belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
		     t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
      IF_PAR_DEBUG(bq,

		   if (t->block_info.closure!=(StgClosure*)NULL) 
		     print_bq(t->block_info.closure));
1147
1148
1149
1150

      /* Send a fetch (if BlockedOnGA) and dump event to log file */
      blockThread(t);

1151
1152
      /* whatever we schedule next, we must log that schedule */
      emitSchedule = rtsTrue;
1153
1154

#else /* !GRAN */
1155
1156
1157
1158
1159
1160
      /* don't need to do anything.  Either the thread is blocked on
       * I/O, in which case we'll have called addToBlockedQueue
       * previously, or it's blocked on an MVar or Blackhole, in which
       * case it'll be on the relevant queue already.
       */
      IF_DEBUG(scheduler,
1161
	       fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1162
1163
	       printThreadBlockage(t);
	       fprintf(stderr, "\n"));
1164
1165
1166
1167
1168
1169

      /* Only for dumping event to log file 
	 ToDo: do I need this in GranSim, too?
      blockThread(t);
      */
#endif
1170
1171
1172
1173
1174
1175
1176
1177
1178
      threadPaused(t);
      break;
      
    case ThreadFinished:
      /* Need to check whether this was a main thread, and if so, signal
       * the task that started it with the return value.  If we have no
       * more main threads, we probably need to stop all the tasks until
       * we get a new one.
       */
1179
1180
1181
      /* We also end up here if the thread kills itself with an
       * uncaught exception, see Exception.hc.
       */
1182
      IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1183
#if defined(GRAN)
1184
      endThread(t, CurrentProc); // clean-up the thread
1185
#elif defined(PAR)
1186
1187
      /* For now all are advisory -- HWL */
      //if(t->priority==AdvisoryPriority) ??
1188
      advisory_thread_count--;
1189
1190
1191
1192
1193
1194
1195
1196
      
# ifdef DIST
      if(t->dist.priority==RevalPriority)
	FinishReval(t);
# endif
      
      if (RtsFlags.ParFlags.ParStats.Full &&
	  !RtsFlags.ParFlags.ParStats.Suppressed) 
1197
1198
	DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
#endif
1199
1200
1201
      break;
      
    default:
1202
      barf("schedule: invalid thread return code %d", (int)ret);
1203
1204
1205
1206
1207
1208
1209
1210
1211
    }
    
#ifdef SMP
    cap->link = free_capabilities;
    free_capabilities = cap;
    n_free_capabilities++;
#endif

#ifdef SMP
1212
    if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
1213
#else
1214
    if (ready_to_gc) 
1215
#endif
1216
      {
1217
1218
1219
1220
1221
1222
      /* everybody back, start the GC.
       * Could do it in this thread, or signal a condition var
       * to do it in another thread.  Either way, we need to
       * broadcast on gc_pending_cond afterward.
       */
#ifdef SMP
1223
      IF_DEBUG(scheduler,sched_belch("doing GC"));
1224
#endif
1225
      GarbageCollect(GetRoots,rtsFalse);
1226
1227
1228
1229
      ready_to_gc = rtsFalse;
#ifdef SMP
      pthread_cond_broadcast(&gc_pending_cond);
#endif
1230
1231
1232
1233
1234
1235
1236
1237
#if defined(GRAN)
      /* add a ContinueThread event to continue execution of current thread */
      new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
		ContinueThread,
		t, (StgClosure*)NULL, (rtsSpark*)NULL);
      IF_GRAN_DEBUG(bq, 
	       fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
	       G_EVENTQ(0);
1238
	       G_CURR_THREADQ(0));
1239
#endif /* GRAN */
1240
    }
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
#if defined(GRAN)
  next_thread:
    IF_GRAN_DEBUG(unused,
		  print_eventq(EventHd));

    event = get_next_event();

#elif defined(PAR)
  next_thread:
    /* ToDo: wait for next message to arrive rather than busy wait */

#else /* GRAN */
  /* not any more
  next_thread:
    t = take_off_run_queue(END_TSO_QUEUE);
  */
#endif /* GRAN */
1258
  } /* end of while(1) */
1259
1260
  IF_PAR_DEBUG(verbose,
	       belch("== Leaving schedule() after having received Finish"));
1261
1262
}

1263
1264
1265
1266
1267
1268
1269
/* ---------------------------------------------------------------------------
 * deleteAllThreads():  kill all the live threads.
 *
 * This is used when we catch a user interrupt (^C), before performing
 * any necessary cleanups and running finalizers.
 * ------------------------------------------------------------------------- */
   
1270
1271
1272
void deleteAllThreads ( void )
{
  StgTSO* t;
1273
  IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1274
  for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1275
      deleteThread(t);
1276
1277
  }
  for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1278
1279
1280
1281
      deleteThread(t);
  }
  for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) {
      deleteThread(t);
1282
1283
1284
  }
  run_queue_hd = run_queue_tl = END_TSO_QUEUE;
  blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1285
  sleeping_queue = END_TSO_QUEUE;
1286
1287
}

1288
/* startThread and  insertThread are now in GranSim.c -- HWL */
1289

1290
1291
1292
1293
//@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
//@subsection Suspend and Resume

/* ---------------------------------------------------------------------------
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
 * Suspending & resuming Haskell threads.
 * 
 * When making a "safe" call to C (aka _ccall_GC), the task gives back
 * its capability before calling the C function.  This allows another
 * task to pick up the capability and carry on running Haskell
 * threads.  It also means that if the C call blocks, it won't lock
 * the whole system.
 *
 * The Haskell thread making the C call is put to sleep for the
 * duration of the call, on the susepended_ccalling_threads queue.  We
 * give out a token to the task, which it can use to resume the thread
 * on return from the C function.
1306
 * ------------------------------------------------------------------------- */
1307
1308
1309
1310
1311
1312
1313
1314
1315
   
StgInt
suspendThread( Capability *cap )
{
  nat tok;

  ACQUIRE_LOCK(&sched_mutex);

  IF_DEBUG(scheduler,
1316
	   sched_belch("thread %d did a _ccall_gc", cap->rCurrentTSO->id));
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354

  threadPaused(cap->rCurrentTSO);
  cap->rCurrentTSO->link = suspended_ccalling_threads;
  suspended_ccalling_threads = cap->rCurrentTSO;

  /* Use the thread ID as the token; it should be unique */
  tok = cap->rCurrentTSO->id;

#ifdef SMP
  cap->link = free_capabilities;
  free_capabilities = cap;
  n_free_capabilities++;
#endif

  RELEASE_LOCK(&sched_mutex);
  return tok; 
}

Capability *
resumeThread( StgInt tok )
{
  StgTSO *tso, **prev;
  Capability *cap;

  ACQUIRE_LOCK(&sched_mutex);

  prev = &suspended_ccalling_threads;
  for (tso = suspended_ccalling_threads; 
       tso != END_TSO_QUEUE; 
       prev = &tso->link, tso = tso->link) {
    if (tso->id == (StgThreadID)tok) {
      *prev = tso->link;
      break;
    }
  }
  if (tso == END_TSO_QUEUE) {
    barf("resumeThread: thread not found");
  }
1355
  tso->link = END_TSO_QUEUE;
1356
1357
1358

#ifdef SMP
  while (free_capabilities == NULL) {
1359
    IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1360
    pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1361
    IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
  }
  cap = free_capabilities;
  free_capabilities = cap->link;
  n_free_capabilities--;
#else  
  cap = &MainRegTable;
#endif

  cap->rCurrentTSO = tso;

  RELEASE_LOCK(&sched_mutex);
  return cap;
}

1376
1377

/* ---------------------------------------------------------------------------
1378
 * Static functions
1379
 * ------------------------------------------------------------------------ */
1380
1381
static void unblockThread(StgTSO *tso);

1382
/* ---------------------------------------------------------------------------
1383
1384
1385
1386
 * Comparing Thread ids.
 *
 * This is used from STG land in the implementation of the
 * instances of Eq/Ord for ThreadIds.
1387
 * ------------------------------------------------------------------------ */
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398

int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
{ 
  StgThreadID id1 = tso1->id; 
  StgThreadID id2 = tso2->id;
 
  if (id1 < id2) return (-1);
  if (id1 > id2) return 1;
  return 0;
}

1399
/* ---------------------------------------------------------------------------
1400
1401
1402
1403
1404
1405
1406
   Create a new thread.

   The new thread starts with the given stack size.  Before the
   scheduler can run, however, this thread needs to have a closure
   (and possibly some arguments) pushed on its stack.  See
   pushClosure() in Schedule.h.

sof's avatar
sof committed
1407
   createGenThread() and createIOThread() (in SchedAPI.h) are
1408
   convenient packaged versions of this function.
1409
1410

   currently pri (priority) is only used in a GRAN setup -- HWL
1411
1412
1413
   ------------------------------------------------------------------------ */
//@cindex createThread
#if defined(GRAN)