Schedule.c 110 KB
Newer Older
1
/* ---------------------------------------------------------------------------
sof's avatar
sof committed
2
 * $Id: Schedule.c,v 1.122 2002/02/13 08:48:06 sof Exp $
3
 *
4
 * (c) The GHC Team, 1998-2000
5 6 7
 *
 * Scheduler
 *
8 9 10 11 12
 * Different GHC ways use this scheduler quite differently (see comments below)
 * Here is the global picture:
 *
 * WAY  Name     CPP flag  What's it for
 * --------------------------------------
sof's avatar
sof committed
13 14 15 16 17
 * mp   GUM      PAR          Parallel execution on a distributed memory machine
 * s    SMP      SMP          Parallel execution on a shared memory machine
 * mg   GranSim  GRAN         Simulation of parallel execution
 * md   GUM/GdH  DIST         Distributed execution (based on GUM)
 *
18 19 20 21
 * --------------------------------------------------------------------------*/

//@node Main scheduling code, , ,
//@section Main scheduling code
22

23 24
/* 
 * Version with scheduler monitor support for SMPs (WAY=s):
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41

   This design provides a high-level API to create and schedule threads etc.
   as documented in the SMP design document.

   It uses a monitor design controlled by a single mutex to exercise control
   over accesses to shared data structures, and builds on the Posix threads
   library.

   The majority of state is shared.  In order to keep essential per-task state,
   there is a Capability structure, which contains all the information
   needed to run a thread: its STG registers, a pointer to its TSO, a
   nursery etc.  During STG execution, a pointer to the capability is
   kept in a register (BaseReg).

   In a non-SMP build, there is one global capability, namely MainRegTable.

   SDM & KH, 10/99
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59

 * Version with support for distributed memory parallelism aka GUM (WAY=mp):

   The main scheduling loop in GUM iterates until a finish message is received.
   In that case a global flag @receivedFinish@ is set and this instance of
   the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
   for the handling of incoming messages, such as PP_FINISH.
   Note that in the parallel case we have a system manager that coordinates
   different PEs, each of which are running one instance of the RTS.
   See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
   From this routine processes executing ghc/rts/Main.c are spawned. -- HWL

 * Version with support for simulating parallel execution aka GranSim (WAY=mg):

   The main scheduling code in GranSim is quite different from that in std
   (concurrent) Haskell: while concurrent Haskell just iterates over the
   threads in the runnable queue, GranSim is event driven, i.e. it iterates
   over the events in the global event queue.  -- HWL
60 61
*/

62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
//@menu
//* Includes::			
//* Variables and Data structures::  
//* Main scheduling loop::	
//* Suspend and Resume::	
//* Run queue code::		
//* Garbage Collextion Routines::  
//* Blocking Queue Routines::	
//* Exception Handling Routines::  
//* Debugging Routines::	
//* Index::			
//@end menu

//@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
//@subsection Includes

78
#include "PosixSource.h"
79 80 81 82 83 84 85 86 87 88 89
#include "Rts.h"
#include "SchedAPI.h"
#include "RtsUtils.h"
#include "RtsFlags.h"
#include "Storage.h"
#include "StgRun.h"
#include "StgStartup.h"
#include "Hooks.h"
#include "Schedule.h"
#include "StgMiscClosures.h"
#include "Storage.h"
90
#include "Interpreter.h"
91
#include "Exception.h"
92 93 94 95
#include "Printer.h"
#include "Main.h"
#include "Signals.h"
#include "Sanity.h"
96
#include "Stats.h"
andy's avatar
andy committed
97
#include "Itimer.h"
98
#include "Prelude.h"
99 100 101 102
#ifdef PROFILING
#include "Proftimer.h"
#include "ProfHeap.h"
#endif
103 104 105 106 107 108 109 110 111
#if defined(GRAN) || defined(PAR)
# include "GranSimRts.h"
# include "GranSim.h"
# include "ParallelRts.h"
# include "Parallel.h"
# include "ParallelDebug.h"
# include "FetchMe.h"
# include "HLC.h"
#endif
112
#include "Sparks.h"
sof's avatar
sof committed
113 114
#include "Capability.h"
#include "OSThreads.h"
sof's avatar
sof committed
115
#include  "Task.h"
116 117

#include <stdarg.h>
118

119 120 121
//@node Variables and Data structures, Prototypes, Includes, Main scheduling code
//@subsection Variables and Data structures

122 123 124 125
/* Main threads:
 *
 * These are the threads which clients have requested that we run.  
 *
sof's avatar
sof committed
126
 * In a 'threaded' build, we might have several concurrent clients all
127 128 129 130 131 132 133 134 135
 * waiting for results, and each one will wait on a condition variable
 * until the result is available.
 *
 * In non-SMP, clients are strictly nested: the first client calls
 * into the RTS, which might call out again to C with a _ccall_GC, and
 * eventually re-enter the RTS.
 *
 * Main threads information is kept in a linked list:
 */
136
//@cindex StgMainThread
137 138 139 140
typedef struct StgMainThread_ {
  StgTSO *         tso;
  SchedulerStatus  stat;
  StgClosure **    ret;
sof's avatar
sof committed
141
#if defined(RTS_SUPPORTS_THREADS)
sof's avatar
sof committed
142
  Condition        wakeup;
143 144 145 146 147 148 149 150 151 152 153 154
#endif
  struct StgMainThread_ *link;
} StgMainThread;

/* Main thread queue.
 * Locks required: sched_mutex.
 */
static StgMainThread *main_threads;

/* Thread queues.
 * Locks required: sched_mutex.
 */
155 156 157
#if defined(GRAN)

StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
158
/* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
159 160

/* 
sof's avatar
sof committed
161
   In GranSim we have a runnable and a blocked queue for each processor.
162 163 164 165 166 167 168 169
   In order to minimise code changes new arrays run_queue_hds/tls
   are created. run_queue_hd is then a short cut (macro) for
   run_queue_hds[CurrentProc] (see GranSim.h).
   -- HWL
*/
StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
StgTSO *ccalling_threadss[MAX_PROC];
170 171 172 173
/* We use the same global list of threads (all_threads) in GranSim as in
   the std RTS (i.e. we are cheating). However, we don't use this list in
   the GranSim specific code at the moment (so we are only potentially
   cheating).  */
174 175 176

#else /* !GRAN */

177 178
StgTSO *run_queue_hd, *run_queue_tl;
StgTSO *blocked_queue_hd, *blocked_queue_tl;
179
StgTSO *sleeping_queue;		/* perhaps replace with a hash table? */
180

181 182
#endif

183 184 185 186 187
/* Linked list of all threads.
 * Used for detecting garbage collected threads.
 */
StgTSO *all_threads;

sof's avatar
sof committed
188 189 190
/* When a thread performs a safe C call (_ccall_GC, using old
 * terminology), it gets put on the suspended_ccalling_threads
 * list. Used by the garbage collector.
191 192 193
 */
static StgTSO *suspended_ccalling_threads;

194 195
static StgTSO *threadStackOverflow(StgTSO *tso);

196 197 198 199 200
/* KH: The following two flags are shared memory locations.  There is no need
       to lock them, since they are only unset at the end of a scheduler
       operation.
*/

201
/* flag set by signal handler to precipitate a context switch */
202
//@cindex context_switch
203
nat context_switch;
204

205
/* if this flag is set as well, give up execution */
206
//@cindex interrupted
207
rtsBool interrupted;
208

209 210 211
/* Next thread ID to allocate.
 * Locks required: sched_mutex
 */
212
//@cindex next_thread_id
213 214 215 216 217 218 219
StgThreadID next_thread_id = 1;

/*
 * Pointers to the state of the current thread.
 * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
 * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
 */
220
 
221 222 223 224 225 226 227 228 229 230 231 232
/* The smallest stack size that makes any sense is:
 *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
 *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
 *  + 1                       (the realworld token for an IO thread)
 *  + 1                       (the closure to enter)
 *
 * A thread with this stack will bomb immediately with a stack
 * overflow, which will increase its stack size.  
 */

#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)

sof's avatar
sof committed
233

234
#if defined(GRAN)
235
StgTSO *CurrentTSO;
236 237
#endif

238 239 240 241 242 243
/*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
 *  exists - earlier gccs apparently didn't.
 *  -= chak
 */
StgTSO dummy_tso;

244 245 246 247 248
rtsBool ready_to_gc;

void            addToBlockedQueue ( StgTSO *tso );

static void     schedule          ( void );
249
       void     interruptStgRts   ( void );
250 251 252
#if defined(GRAN)
static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
#else
253
static StgTSO * createThread_     ( nat size, rtsBool have_lock );
254
#endif
255

256 257
static void     detectBlackHoles  ( void );

258 259 260
#ifdef DEBUG
static void sched_belch(char *s, ...);
#endif
261

sof's avatar
sof committed
262 263 264 265
#if defined(RTS_SUPPORTS_THREADS)
/* ToDo: carefully document the invariants that go together
 *       with these synchronisation objects.
 */
sof's avatar
sof committed
266 267
Mutex     sched_mutex       = INIT_MUTEX_VAR;
Mutex     term_mutex        = INIT_MUTEX_VAR;
sof's avatar
sof committed
268 269


sof's avatar
sof committed
270 271 272 273 274 275
/*
 * When a native thread has completed executing an external
 * call, it needs to communicate the result back to the
 * (Haskell) thread that made the call. Do this as follows:
 *
 *  - in resumeThread(), the thread increments the counter
sof's avatar
sof committed
276 277 278 279 280 281 282 283
 *    rts_n_returning_workers, and then blocks waiting on the
 *    condition returning_worker_cond.
 *  - upon entry to the scheduler, a worker/task checks 
 *    rts_n_returning_workers. If it is > 0, worker threads
 *    are waiting to return, so it gives up its capability
 *    to let a worker deposit its result.
 *  - the worker thread that gave up its capability then tries
 *    to re-grab a capability and re-enter the Scheduler.
sof's avatar
sof committed
284 285 286
 */


sof's avatar
sof committed
287 288 289 290
/* thread_ready_cond: when signalled, a thread has become runnable for a
 * task to execute.
 *
 * In the non-SMP case, it also implies that the thread that is woken up has
sof's avatar
sof committed
291 292
 * exclusive access to the RTS and all its data structures (that are not
 * under sched_mutex's control).
sof's avatar
sof committed
293 294 295
 *
 * thread_ready_cond is signalled whenever COND_NO_THREADS_READY doesn't hold.
 *
sof's avatar
sof committed
296 297
 */
Condition thread_ready_cond = INIT_COND_VAR;
sof's avatar
sof committed
298 299 300 301
#if 0
/* For documentation purposes only */
#define COND_NO_THREADS_READY() (noCapabilities() || EMPTY_RUN_QUEUE())
#endif
302

sof's avatar
sof committed
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332
/*
 * To be able to make an informed decision about whether or not 
 * to create a new task when making an external call, keep track of
 * the number of tasks currently blocked waiting on thread_ready_cond.
 * (if > 0 => no need for a new task, just unblock an existing one).
 */
nat rts_n_waiting_tasks = 0;

/* returning_worker_cond: when a worker thread returns from executing an
 * external call, it needs to wait for an RTS Capability before passing
 * on the result of the call to the Haskell thread that made it.
 * 
 * returning_worker_cond is signalled in Capability.releaseCapability().
 *
 */
Condition returning_worker_cond = INIT_COND_VAR;

/*
 * To avoid starvation of threads blocked on worker_thread_cond,
 * the task(s) that enter the Scheduler will check to see whether
 * there are one or more worker threads blocked waiting on
 * returning_worker_cond.
 *
 * Locks needed: sched_mutex
 */
nat rts_n_waiting_workers = 0;


# if defined(SMP)
static Condition gc_pending_cond = INIT_COND_VAR;
333
nat await_death;
sof's avatar
sof committed
334
# endif
335

sof's avatar
sof committed
336
#endif /* RTS_SUPPORTS_THREADS */
sof's avatar
sof committed
337

338 339 340
#if defined(PAR)
StgTSO *LastTSO;
rtsTime TimeOfLastYield;
341
rtsBool emitSchedule = rtsTrue;
342 343
#endif

344 345 346 347
#if DEBUG
char *whatNext_strs[] = {
  "ThreadEnterGHC",
  "ThreadRunGHC",
348
  "ThreadEnterInterp",
349 350 351 352 353 354 355 356 357 358 359 360 361
  "ThreadKilled",
  "ThreadComplete"
};

char *threadReturnCode_strs[] = {
  "HeapOverflow",			/* might also be StackOverflow */
  "StackOverflow",
  "ThreadYielding",
  "ThreadBlocked",
  "ThreadFinished"
};
#endif

sof's avatar
sof committed
362
#if defined(PAR)
363 364 365 366
StgTSO * createSparkThread(rtsSpark spark);
StgTSO * activateSpark (rtsSpark spark);  
#endif

367 368 369 370 371 372
/*
 * The thread state for the main thread.
// ToDo: check whether not needed any more
StgTSO   *MainTSO;
 */

sof's avatar
sof committed
373 374 375 376 377 378 379 380 381 382 383 384
#if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
static void taskStart(void);
static void
taskStart(void)
{
  schedule();
}
#endif




385 386 387 388
//@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
//@subsection Main scheduling loop

/* ---------------------------------------------------------------------------
389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406
   Main scheduling loop.

   We use round-robin scheduling, each thread returning to the
   scheduler loop when one of these conditions is detected:

      * out of heap space
      * timer expires (thread yields)
      * thread blocks
      * thread ends
      * stack overflow

   Locking notes:  we acquire the scheduler lock once at the beginning
   of the scheduler loop, and release it when
    
      * running a thread, or
      * waiting for work, or
      * waiting for a GC to complete.

407 408 409 410 411 412 413 414 415 416 417 418 419 420 421
   GRAN version:
     In a GranSim setup this loop iterates over the global event queue.
     This revolves around the global event queue, which determines what 
     to do next. Therefore, it's more complicated than either the 
     concurrent or the parallel (GUM) setup.

   GUM version:
     GUM iterates over incoming messages.
     It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
     and sends out a fish whenever it has nothing to do; in-between
     doing the actual reductions (shared code below) it processes the
     incoming messages and deals with delayed operations 
     (see PendingFetches).
     This is not the ugliest code you could imagine, but it's bloody close.

422 423
   ------------------------------------------------------------------------ */
//@cindex schedule
424 425 426 427 428 429
static void
schedule( void )
{
  StgTSO *t;
  Capability *cap;
  StgThreadReturnCode ret;
430 431 432
#if defined(GRAN)
  rtsEvent *event;
#elif defined(PAR)
433
  StgSparkPool *pool;
434 435 436
  rtsSpark spark;
  StgTSO *tso;
  GlobalTaskId pe;
437 438 439 440
  rtsBool receivedFinish = rtsFalse;
# if defined(DEBUG)
  nat tp_size, sp_size; // stats only
# endif
441
#endif
442
  rtsBool was_interrupted = rtsFalse;
sof's avatar
sof committed
443 444 445 446

#if defined(RTS_SUPPORTS_THREADS)
schedule_start:
#endif
447
  
sof's avatar
sof committed
448
#if defined(RTS_SUPPORTS_THREADS)
449
  ACQUIRE_LOCK(&sched_mutex);
sof's avatar
sof committed
450 451 452
#endif
 
#if defined(RTS_SUPPORTS_THREADS)
sof's avatar
sof committed
453
  /* ToDo: consider SMP support */
sof's avatar
sof committed
454
  if ( rts_n_waiting_workers > 0 && noCapabilities() ) {
sof's avatar
sof committed
455 456
    /* (At least) one native thread is waiting to
     * deposit the result of an external call. So,
sof's avatar
sof committed
457
     * be nice and hand over our capability.
sof's avatar
sof committed
458
     */
sof's avatar
sof committed
459 460
    IF_DEBUG(scheduler, sched_belch("worker thread (%d): giving up RTS token (waiting workers: %d)\n", osThreadId(), rts_n_waiting_workers));
    releaseCapability(cap);
sof's avatar
sof committed
461
    RELEASE_LOCK(&sched_mutex);
sof's avatar
sof committed
462

sof's avatar
sof committed
463
    yieldThread();
sof's avatar
sof committed
464 465 466 467 468 469 470 471 472
    goto schedule_start;
  }
#endif

#if defined(RTS_SUPPORTS_THREADS)
  while ( noCapabilities() ) {
    rts_n_waiting_tasks++;
    waitCondition(&thread_ready_cond, &sched_mutex);
    rts_n_waiting_tasks--;
sof's avatar
sof committed
473 474
  }
#endif
475

476
#if defined(GRAN)
477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499

  /* set up first event to get things going */
  /* ToDo: assign costs for system setup and init MainTSO ! */
  new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
	    ContinueThread, 
	    CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);

  IF_DEBUG(gran,
	   fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
	   G_TSO(CurrentTSO, 5));

  if (RtsFlags.GranFlags.Light) {
    /* Save current time; GranSim Light only */
    CurrentTSO->gran.clock = CurrentTime[CurrentProc];
  }      

  event = get_next_event();

  while (event!=(rtsEvent*)NULL) {
    /* Choose the processor with the next event */
    CurrentProc = event->proc;
    CurrentTSO = event->tso;

500
#elif defined(PAR)
501

502 503
  while (!receivedFinish) {    /* set by processMessages */
                               /* when receiving PP_FINISH message         */ 
504
#else
505

506
  while (1) {
507

508
#endif
509

510 511
    IF_DEBUG(scheduler, printAllThreads());

512 513 514 515 516
    /* If we're interrupted (the user pressed ^C, or some other
     * termination condition occurred), kill all the currently running
     * threads.
     */
    if (interrupted) {
517
      IF_DEBUG(scheduler, sched_belch("interrupted"));
518
      deleteAllThreads();
519 520
      interrupted = rtsFalse;
      was_interrupted = rtsTrue;
521 522 523 524 525 526 527
    }

    /* Go through the list of main threads and wake up any
     * clients whose computations have finished.  ToDo: this
     * should be done more efficiently without a linear scan
     * of the main threads list, somehow...
     */
sof's avatar
sof committed
528
#if defined(RTS_SUPPORTS_THREADS)
529 530 531 532
    { 
      StgMainThread *m, **prev;
      prev = &main_threads;
      for (m = main_threads; m != NULL; m = m->link) {
533
	switch (m->tso->what_next) {
534
	case ThreadComplete:
535 536 537 538 539
	  if (m->ret) {
	    *(m->ret) = (StgClosure *)m->tso->sp[0];
	  }
	  *prev = m->link;
	  m->stat = Success;
sof's avatar
sof committed
540
	  broadcastCondition(&m->wakeup);
541 542
	  break;
	case ThreadKilled:
543
	  if (m->ret) *(m->ret) = NULL;
544
	  *prev = m->link;
545
	  if (was_interrupted) {
546 547 548 549
	    m->stat = Interrupted;
	  } else {
	    m->stat = Killed;
	  }
sof's avatar
sof committed
550
	  broadcastCondition(&m->wakeup);
551 552 553
	  break;
	default:
	  break;
554 555 556
	}
      }
    }
557

sof's avatar
sof committed
558
#else /* not threaded */
559

560 561 562 563
# if defined(PAR)
    /* in GUM do this only on the Main PE */
    if (IAmMainThread)
# endif
564 565 566 567
    /* If our main thread has finished or been killed, return.
     */
    {
      StgMainThread *m = main_threads;
568 569
      if (m->tso->what_next == ThreadComplete
	  || m->tso->what_next == ThreadKilled) {
570
	main_threads = main_threads->link;
571
	if (m->tso->what_next == ThreadComplete) {
572 573 574 575 576
	  /* we finished successfully, fill in the return value */
	  if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
	  m->stat = Success;
	  return;
	} else {
577
	  if (m->ret) { *(m->ret) = NULL; };
578
	  if (was_interrupted) {
579 580 581 582
	    m->stat = Interrupted;
	  } else {
	    m->stat = Killed;
	  }
583 584 585 586 587 588
	  return;
	}
      }
    }
#endif

589 590 591
    /* Top up the run queue from our spark pool.  We try to make the
     * number of threads in the run queue equal to the number of
     * free capabilities.
sof's avatar
sof committed
592 593 594
     *
     * Disable spark support in SMP for now, non-essential & requires
     * a little bit of work to make it compile cleanly. -- sof 1/02.
595
     */
sof's avatar
sof committed
596
#if 0 /* defined(SMP) */
597
    {
sof's avatar
sof committed
598
      nat n = getFreeCapabilities();
599 600 601 602 603 604 605 606 607 608
      StgTSO *tso = run_queue_hd;

      /* Count the run queue */
      while (n > 0 && tso != END_TSO_QUEUE) {
	tso = tso->link;
	n--;
      }

      for (; n > 0; n--) {
	StgClosure *spark;
609
	spark = findSpark(rtsFalse);
610 611 612
	if (spark == NULL) {
	  break; /* no more sparks in the pool */
	} else {
613 614 615
	  /* I'd prefer this to be done in activateSpark -- HWL */
	  /* tricky - it needs to hold the scheduler lock and
	   * not try to re-acquire it -- SDM */
616
	  createSparkThread(spark);	  
617
	  IF_DEBUG(scheduler,
618
		   sched_belch("==^^ turning spark of closure %p into a thread",
619 620 621 622 623 624
			       (StgClosure *)spark));
	}
      }
      /* We need to wake up the other tasks if we just created some
       * work for them.
       */
sof's avatar
sof committed
625
      if (getFreeCapabilities() - n > 1) {
sof's avatar
sof committed
626
   	  signalCondition( &thread_ready_cond );
627 628
      }
    }
629
#endif // SMP
630

631 632 633 634 635 636 637
    /* check for signals each time around the scheduler */
#ifndef mingw32_TARGET_OS
    if (signals_pending()) {
      startSignalHandlers();
    }
#endif

638 639 640 641 642
    /* Check whether any waiting threads need to be woken up.  If the
     * run queue is empty, and there are no other tasks running, we
     * can wait indefinitely for something to happen.
     * ToDo: what if another client comes along & requests another
     * main thread?
643
     */
sof's avatar
sof committed
644 645
    if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
      awaitEvent( EMPTY_RUN_QUEUE()
sof's avatar
sof committed
646 647
#if defined(SMP)
	&& allFreeCapabilities()
648 649
#endif
	);
650
    }
651 652 653
    /* we can be interrupted while waiting for I/O... */
    if (interrupted) continue;

654 655 656 657 658 659 660 661 662 663
    /* 
     * Detect deadlock: when we have no threads to run, there are no
     * threads waiting on I/O or sleeping, and all the other tasks are
     * waiting for work, we must have a deadlock of some description.
     *
     * We first try to find threads blocked on themselves (ie. black
     * holes), and generate NonTermination exceptions where necessary.
     *
     * If no threads are black holed, we have a deadlock situation, so
     * inform all the main threads.
664
     */
665
#ifndef PAR
sof's avatar
sof committed
666 667
    if (   EMPTY_RUN_QUEUE()
	&& EMPTY_QUEUE(blocked_queue_hd)
sof's avatar
sof committed
668
	&& EMPTY_QUEUE(sleeping_queue)
sof's avatar
sof committed
669
#if defined(RTS_SUPPORTS_THREADS)
sof's avatar
sof committed
670
	&& EMPTY_QUEUE(suspended_ccalling_threads)
sof's avatar
sof committed
671 672 673
#endif
#ifdef SMP
	&& allFreeCapabilities()
674 675
#endif
	)
676
    {
677
	IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
sof's avatar
sof committed
678 679 680 681
#if defined(THREADED_RTS)
	/* and SMP mode ..? */
	releaseCapability(cap);
#endif
sof's avatar
sof committed
682
	RELEASE_LOCK(&sched_mutex);
683
	GarbageCollect(GetRoots,rtsTrue);
sof's avatar
sof committed
684
	ACQUIRE_LOCK(&sched_mutex);
sof's avatar
sof committed
685 686 687
	if (   EMPTY_QUEUE(blocked_queue_hd)
	    && EMPTY_RUN_QUEUE()
	    && EMPTY_QUEUE(sleeping_queue) ) {
688

689 690
	    IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
	    detectBlackHoles();
691

sof's avatar
sof committed
692 693 694 695 696
	    /* No black holes, so probably a real deadlock.  Send the
	     * current main thread the Deadlock exception (or in the SMP
	     * build, send *all* main threads the deadlock exception,
	     * since none of them can make progress).
	     */
sof's avatar
sof committed
697
	    if ( EMPTY_RUN_QUEUE() ) {
698
		StgMainThread *m;
sof's avatar
sof committed
699
#if defined(RTS_SUPPORTS_THREADS)
700 701 702 703 704 705 706 707 708 709 710 711
		for (m = main_threads; m != NULL; m = m->link) {
		    switch (m->tso->why_blocked) {
		    case BlockedOnBlackHole:
			raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
			break;
		    case BlockedOnException:
		    case BlockedOnMVar:
			raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
			break;
		    default:
			barf("deadlock: main thread blocked in a strange way");
		    }
712 713
		}
#else
714 715 716 717 718 719 720 721 722 723 724 725
		m = main_threads;
		switch (m->tso->why_blocked) {
		case BlockedOnBlackHole:
		    raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
		    break;
		case BlockedOnException:
		case BlockedOnMVar:
		    raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
		    break;
		default:
		    barf("deadlock: main thread blocked in a strange way");
		}
726
#endif
727
	    }
sof's avatar
sof committed
728
#if defined(RTS_SUPPORTS_THREADS)
sof's avatar
sof committed
729 730
	    /* ToDo: revisit conditions (and mechanism) for shutting
	       down a multi-threaded world  */
sof's avatar
sof committed
731
	    if ( EMPTY_RUN_QUEUE() ) {
sof's avatar
sof committed
732
	      IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
sof's avatar
sof committed
733 734 735
	      shutdownHaskellAndExit(0);
	    
	    }
sof's avatar
sof committed
736
#endif
sof's avatar
sof committed
737
	    ASSERT( !EMPTY_RUN_QUEUE() );
738
	}
739
    }
740 741
#elif defined(PAR)
    /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
742 743
#endif

sof's avatar
sof committed
744
#if defined(SMP)
745 746 747 748
    /* If there's a GC pending, don't do anything until it has
     * completed.
     */
    if (ready_to_gc) {
749
      IF_DEBUG(scheduler,sched_belch("waiting for GC"));
sof's avatar
sof committed
750
      waitCondition( &gc_pending_cond, &sched_mutex );
751
    }
sof's avatar
sof committed
752 753
#endif    

sof's avatar
sof committed
754
#if defined(RTS_SUPPORTS_THREADS)
755 756
    /* block until we've got a thread on the run queue and a free
     * capability.
sof's avatar
sof committed
757
     *
758
     */
sof's avatar
sof committed
759 760 761 762 763 764 765 766 767 768
    if ( EMPTY_RUN_QUEUE() ) {
      /* Give up our capability */
      releaseCapability(cap);
      while ( noCapabilities() || EMPTY_RUN_QUEUE() ) {
	IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
	rts_n_waiting_tasks++;
	waitCondition( &thread_ready_cond, &sched_mutex );
	rts_n_waiting_tasks--;
	IF_DEBUG(scheduler, sched_belch("thread %d: work now available %d %d", osThreadId(), getFreeCapabilities(),EMPTY_RUN_QUEUE()));
      }
769 770
    }
#endif
771 772

#if defined(GRAN)
773 774 775 776 777 778 779 780 781 782 783 784 785

    if (RtsFlags.GranFlags.Light)
      GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc

    /* adjust time based on time-stamp */
    if (event->time > CurrentTime[CurrentProc] &&
        event->evttype != ContinueThread)
      CurrentTime[CurrentProc] = event->time;
    
    /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
    if (!RtsFlags.GranFlags.Light)
      handleIdlePEs();

786
    IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899

    /* main event dispatcher in GranSim */
    switch (event->evttype) {
      /* Should just be continuing execution */
    case ContinueThread:
      IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
      /* ToDo: check assertion
      ASSERT(run_queue_hd != (StgTSO*)NULL &&
	     run_queue_hd != END_TSO_QUEUE);
      */
      /* Ignore ContinueThreads for fetching threads (if synchr comm) */
      if (!RtsFlags.GranFlags.DoAsyncFetch &&
	  procStatus[CurrentProc]==Fetching) {
	belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }	
      /* Ignore ContinueThreads for completed threads */
      if (CurrentTSO->what_next == ThreadComplete) {
	belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }	
      /* Ignore ContinueThreads for threads that are being migrated */
      if (PROCS(CurrentTSO)==Nowhere) { 
	belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	goto next_thread;
      }
      /* The thread should be at the beginning of the run queue */
      if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
	belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
	      CurrentTSO->id, CurrentTSO, CurrentProc);
	break; // run the thread anyway
      }
      /*
      new_event(proc, proc, CurrentTime[proc],
		FindWork,
		(StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
      goto next_thread; 
      */ /* Catches superfluous CONTINUEs -- should be unnecessary */
      break; // now actually run the thread; DaH Qu'vam yImuHbej 

    case FetchNode:
      do_the_fetchnode(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case GlobalBlock:
      do_the_globalblock(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case FetchReply:
      do_the_fetchreply(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case UnblockThread:   /* Move from the blocked queue to the tail of */
      do_the_unblock(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case ResumeThread:  /* Move from the blocked queue to the tail of */
      /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
      event->tso->gran.blocktime += 
	CurrentTime[CurrentProc] - event->tso->gran.blockedat;
      do_the_startthread(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case StartThread:
      do_the_startthread(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case MoveThread:
      do_the_movethread(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case MoveSpark:
      do_the_movespark(event);
      goto next_thread;             /* handle next event in event queue  */
      
    case FindWork:
      do_the_findwork(event);
      goto next_thread;             /* handle next event in event queue  */
      
    default:
      barf("Illegal event type %u\n", event->evttype);
    }  /* switch */
    
    /* This point was scheduler_loop in the old RTS */

    IF_DEBUG(gran, belch("GRAN: after main switch"));

    TimeOfLastEvent = CurrentTime[CurrentProc];
    TimeOfNextEvent = get_time_of_next_event();
    IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
    // CurrentTSO = ThreadQueueHd;

    IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
			 TimeOfNextEvent));

    if (RtsFlags.GranFlags.Light) 
      GranSimLight_leave_system(event, &ActiveTSO); 

    EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;

    IF_DEBUG(gran, 
	     belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));

    /* in a GranSim setup the TSO stays on the run queue */
    t = CurrentTSO;
    /* Take a thread from the run queue. */
    t = POP_RUN_QUEUE(); // take_off_run_queue(t);

    IF_DEBUG(gran, 
	     fprintf(stderr, "GRAN: About to run current thread, which is\n");
900
	     G_TSO(t,5));
901 902 903 904 905 906 907 908

    context_switch = 0; // turned on via GranYield, checking events and time slice

    IF_DEBUG(gran, 
	     DumpGranEvent(GR_SCHEDULE, t));

    procStatus[CurrentProc] = Busy;

909
#elif defined(PAR)
910 911 912 913
    if (PendingFetches != END_BF_QUEUE) {
        processFetches();
    }

914
    /* ToDo: phps merge with spark activation above */
915
    /* check whether we have local work and send requests if we have none */
916
    if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
917
      /* :-[  no local threads => look out for local sparks */
918 919
      /* the spark pool for the current PE */
      pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
920
      if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
921
	  pool->hd < pool->tl) {
922 923 924 925 926 927 928 929
	/* 
	 * ToDo: add GC code check that we really have enough heap afterwards!!
	 * Old comment:
	 * If we're here (no runnable threads) and we have pending
	 * sparks, we must have a space problem.  Get enough space
	 * to turn one of those pending sparks into a
	 * thread... 
	 */
930 931

	spark = findSpark(rtsFalse);                /* get a spark */
932 933
	if (spark != (rtsSpark) NULL) {
	  tso = activateSpark(spark);       /* turn the spark into a thread */
934 935 936
	  IF_PAR_DEBUG(schedule,
		       belch("==== schedule: Created TSO %d (%p); %d threads active",
			     tso->id, tso, advisory_thread_count));
937

938
	  if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
939
	    belch("==^^ failed to activate spark");
940
	    goto next_thread;
941
	  }               /* otherwise fall through & pick-up new tso */
942 943
	} else {
	  IF_PAR_DEBUG(verbose,
944 945
		       belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
			     spark_queue_len(pool)));
946 947
	  goto next_thread;
	}
948 949 950 951 952 953
      }

      /* If we still have no work we need to send a FISH to get a spark
	 from another PE 
      */
      if (EMPTY_RUN_QUEUE()) {
954 955 956 957 958 959 960 961 962
      /* =8-[  no local sparks => look for work on other PEs */
	/*
	 * We really have absolutely no work.  Send out a fish
	 * (there may be some out there already), and wait for
	 * something to arrive.  We clearly can't run any threads
	 * until a SCHEDULE or RESUME arrives, and so that's what
	 * we're hoping to see.  (Of course, we still have to
	 * respond to other types of messages.)
	 */
963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979
	TIME now = msTime() /*CURRENT_TIME*/;
	IF_PAR_DEBUG(verbose, 
		     belch("--  now=%ld", now));
	IF_PAR_DEBUG(verbose,
		     if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
			 (last_fish_arrived_at!=0 &&
			  last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
		       belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
			     last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
			     last_fish_arrived_at,
			     RtsFlags.ParFlags.fishDelay, now);
		     });
	
	if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
	    (last_fish_arrived_at==0 ||
	     (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
	  /* outstandingFishes is set in sendFish, processFish;
980 981 982 983
	     avoid flooding system with fishes via delay */
	  pe = choosePE();
	  sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
		   NEW_FISH_HUNGER);
984 985 986 987 988 989

	  // Global statistics: count no. of fishes
	  if (RtsFlags.ParFlags.ParStats.Global &&
	      RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
	    globalParStats.tot_fish_mess++;
	  }
990
	}
991 992
      
	receivedFinish = processMessages();
993 994 995
	goto next_thread;
      }
    } else if (PacketsWaiting()) {  /* Look for incoming messages */
996
      receivedFinish = processMessages();
997 998 999 1000
    }

    /* Now we are sure that we have some work available */
    ASSERT(run_queue_hd != END_TSO_QUEUE);
1001

1002
    /* Take a thread from the run queue, if we have work */
1003
    t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
1004
    IF_DEBUG(sanity,checkTSO(t));
1005 1006 1007 1008 1009 1010

    /* ToDo: write something to the log-file
    if (RTSflags.ParFlags.granSimStats && !sameThread)
        DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);

    CurrentTSO = t;
1011 1012 1013
    */
    /* the spark pool for the current PE */
    pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
1014

1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027
    IF_DEBUG(scheduler, 
	     belch("--=^ %d threads, %d sparks on [%#x]", 
		   run_queue_len(), spark_queue_len(pool), CURRENT_PROC));

#if 1
    if (0 && RtsFlags.ParFlags.ParStats.Full && 
	t && LastTSO && t->id != LastTSO->id && 
	LastTSO->why_blocked == NotBlocked && 
	LastTSO->what_next != ThreadComplete) {
      // if previously scheduled TSO not blocked we have to record the context switch
      DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
			   GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
    }
1028

1029 1030 1031
    if (RtsFlags.ParFlags.ParStats.Full && 
	(emitSchedule /* forced emit */ ||
        (t && LastTSO && t->id != LastTSO->id))) {
1032 1033 1034 1035 1036 1037 1038 1039 1040
      /* 
	 we are running a different TSO, so write a schedule event to log file
	 NB: If we use fair scheduling we also have to write  a deschedule 
	     event for LastTSO; with unfair scheduling we know that the
	     previous tso has blocked whenever we switch to another tso, so
	     we don't need it in GUM for now
      */
      DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
		       GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1041
      emitSchedule = rtsFalse;
1042
    }
1043
     
1044
#endif
1045
#else /* !GRAN && !PAR */
1046
  
sof's avatar
sof committed
1047
    /* grab a thread from the run queue */
1048
    ASSERT(run_queue_hd != END_TSO_QUEUE);
1049
    t = POP_RUN_QUEUE();
1050 1051
    // Sanity check the thread we're about to run.  This can be
    // expensive if there is lots of thread switching going on...
1052
    IF_DEBUG(sanity,checkTSO(t));
1053
#endif
1054
    
sof's avatar
sof committed
1055
    grabCapability(&cap);
1056
    cap->r.rCurrentTSO = t;
1057
    
1058 1059 1060 1061
    /* context switches are now initiated by the timer signal, unless
     * the user specified "context switch as often as possible", with
     * +RTS -C0
     */
1062 1063 1064 1065 1066 1067 1068 1069
    if (
#ifdef PROFILING
	RtsFlags.ProfFlags.profileInterval == 0 ||
#endif
	(RtsFlags.ConcFlags.ctxtSwitchTicks == 0
	 && (run_queue_hd != END_TSO_QUEUE
	     || blocked_queue_hd != END_TSO_QUEUE
	     || sleeping_queue != END_TSO_QUEUE)))
1070 1071 1072
	context_switch = 1;
    else
	context_switch = 0;
1073

1074
    RELEASE_LOCK(&sched_mutex);
1075

1076
    IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
1077
			      t->id, t, whatNext_strs[t->what_next]));
1078

1079 1080 1081 1082
#ifdef PROFILING
    startHeapProfTimer();
#endif

1083
    /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1084 1085
    /* Run the current thread 
     */
1086
    switch (cap->r.rCurrentTSO->what_next) {
1087 1088
    case ThreadKilled:
    case ThreadComplete:
1089 1090 1091
	/* Thread already finished, return to scheduler. */
	ret = ThreadFinished;
	break;
1092
    case ThreadEnterGHC:
1093
	ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1094
	break;
1095
    case ThreadRunGHC:
1096
	ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1097
	break;
1098
    case ThreadEnterInterp:
1099 1100
	ret = interpretBCO(cap);
	break;
1101
    default:
1102
      barf("schedule: invalid what_next field");
1103
    }
1104
    /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1105 1106 1107
    
    /* Costs for the scheduler are assigned to CCS_SYSTEM */
#ifdef PROFILING
1108
    stopHeapProfTimer();
1109 1110 1111 1112 1113 1114
    CCCS = CCS_SYSTEM;
#endif
    
    ACQUIRE_LOCK(&sched_mutex);

#ifdef SMP
sof's avatar
sof committed
1115
    IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1116
#elif !defined(GRAN) && !defined(PAR)
1117
    IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
simonmar's avatar