Commit e289780e authored by sof's avatar sof
Browse files

[project @ 2002-02-13 08:48:06 by sof]

Revised implementation of multi-threaded callouts (and callins):

- unified synchronisation story for threaded and SMP builds,
  following up on SimonM's suggestion. The following synchro
  variables are now used inside the Scheduler:

    + thread_ready_cond - condition variable that is signalled
      when a H. thread has become runnable (via the THREAD_RUNNABLE()
      macro) and there are available capabilities. Waited on:
         + upon schedule() entry (iff no caps. available).
	 + when a thread inside of the Scheduler spots that there
	   are no runnable threads to service, but one or more
	   external call is in progress.
	 + in resumeThread(), waiting for a capability to become
	   available.

      Prior to waiting on thread_ready_cond, a counter rts_n_waiting_tasks
      is incremented, so that we can keep track of the number of
      readily available worker threads (need this in order to make
      an informed decision on whether or not to create a new thread
      when an external call is made).


    + returning_worker_cond - condition variable that is waited
      on by an OS thread that has finished executing and external
      call & now want to feed its result back to the H thread
      that made the call. Before doing so, the counter
      rts_n_returning_workers is incremented.

      Upon entry to the Scheduler, this counter is checked for &
      if it is non-zero, the thread gives up its capability and
      signals returning_worker_cond before trying to re-grab a
      capability. (releaseCapability() takes care of this).

    + sched_mutex - protect Scheduler data structures.
    + gc_pending_cond - SMP-only condition variable for signalling
      completion of GCs.

- initial implementation of call-ins, i.e., multiple OS threads
  may concurrently call into the RTS without interfering with
  each other. Implementation uses cheesy locking protocol to
  ensure that only one OS thread at a time can construct a
  function application -- stop-gap measure until the RtsAPI
  is revised (as discussed last month) *and* a designated
  block is used for allocating these applications.

- In the implementation of call-ins, the OS thread blocks
  waiting for an RTS worker thread to complete the evaluation
  of the function application. Since main() also uses the
  RtsAPI, provide a separate entry point for it (rts_mainEvalIO()),
  which avoids creating a separate thread to evaluate Main.main,
  that can be done by the thread exec'ing main() directly.
  [Maybe there's a tidier way of doing this, a bit ugly the
  way it is now..]


There are a couple of dark corners that needs to be looked at,
such as conditions for shutting down (and how) + consider what
ought to happen when async I/O is thrown into the mix (I know
what will happen, but that's maybe not what we want).

Other than that, things are in a generally happy state & I hope
to declare myself done before the week is up.
parent 3470e75b
......@@ -19,6 +19,7 @@
* --------------------------------------------------------------------------*/
#include "PosixSource.h"
#include "Rts.h"
#include "Schedule.h"
#include "RtsUtils.h"
#include "Capability.h"
......@@ -51,6 +52,7 @@ initCapabilities()
initCapabilities_(RtsFlags.ParFlags.nNodes);
#else
initCapability(&MainCapability);
rts_n_free_capabilities = 1;
#endif
return;
......@@ -75,14 +77,38 @@ void grabCapability(Capability** cap)
#endif
}
void releaseCapability(Capability* cap)
/*
* Letting go of a capability
*
* Locks required: sched_mutex
*/
void releaseCapability(Capability* cap
#if !defined(SMP)
STG_UNUSED
#endif
)
{
#if defined(SMP)
cap->link = free_capabilities;
free_capabilities = cap;
rts_n_free_capabilities++;
#endif
#else
rts_n_free_capabilities = 1;
#endif
#if defined(RTS_SUPPORTS_THREADS)
/* Check to see whether a worker thread can be given
the go-ahead to return the result of an external call..*/
if (rts_n_waiting_workers > 0) {
/* The worker is responsible for grabbing the capability and
* decrementing the rts_n_returning_workers count
*/
signalCondition(&returning_worker_cond);
} else if ( !EMPTY_RUN_QUEUE() ) {
/* Signal that work is available */
signalCondition(&thread_ready_cond);
}
#endif
return;
}
......
......@@ -28,24 +28,26 @@
extern Capability MainCapability;
#endif
extern void initCapabilities(void);
extern void grabCapability(Capability** cap);
extern void releaseCapability(Capability* cap);
#if defined(RTS_SUPPORTS_THREADS)
extern nat rts_n_free_capabilities; /* total number of available capabilities */
/* total number of available capabilities */
extern nat rts_n_free_capabilities;
static inline nat getFreeCapabilities()
static inline nat getFreeCapabilities (void)
{
return rts_n_free_capabilities;
}
static inline rtsBool noCapabilities()
static inline rtsBool noCapabilities (void)
{
return (rts_n_free_capabilities == 0);
}
static inline rtsBool allFreeCapabilities()
static inline rtsBool allFreeCapabilities (void)
{
# if defined(SMP)
return (rts_n_free_capabilities == RtsFlags.ParFlags.nNodes);
......
/* -----------------------------------------------------------------------------
* $Id: Main.c,v 1.33 2002/02/05 15:42:04 simonpj Exp $
* $Id: Main.c,v 1.34 2002/02/13 08:48:06 sof Exp $
*
* (c) The GHC Team 1998-2000
*
......@@ -83,7 +83,7 @@ int main(int argc, char *argv[])
fprintf(stderr, "==== [%x] Main Thread Started ...\n", mytid));
/* ToDo: Dump event for the main thread */
status = rts_evalIO((HaskellObj)mainIO_closure, NULL);
status = rts_mainEvalIO((HaskellObj)mainIO_closure, NULL);
} else {
/* Just to show we're alive */
IF_PAR_DEBUG(verbose,
......@@ -98,12 +98,12 @@ int main(int argc, char *argv[])
# elif defined(GRAN)
/* ToDo: Dump event for the main thread */
status = rts_evalIO(mainIO_closure, NULL);
status = rts_mainEvalIO(mainIO_closure, NULL);
# else /* !PAR && !GRAN */
/* ToDo: want to start with a larger stack size */
status = rts_evalIO((HaskellObj)mainIO_closure, NULL);
status = rts_mainEvalIO((HaskellObj)mainIO_closure, NULL);
# endif /* !PAR && !GRAN */
......
/* ----------------------------------------------------------------------------
* $Id: RtsAPI.c,v 1.31 2002/01/22 13:54:22 simonmar Exp $
* $Id: RtsAPI.c,v 1.32 2002/02/13 08:48:06 sof Exp $
*
* (c) The GHC Team, 1998-2001
*
......@@ -15,6 +15,60 @@
#include "RtsFlags.h"
#include "RtsUtils.h"
#include "Prelude.h"
#include "OSThreads.h"
#include "Schedule.h"
#if defined(THREADED_RTS)
#define SCHEDULE_MAIN_THREAD(tso) scheduleThread_(tso,rtsFalse)
#define WAIT_MAIN_THREAD(tso,ret) waitThread_(tso,ret,rtsFalse)
#else
#define SCHEDULE_MAIN_THREAD(tso) scheduleThread(tso)
#define WAIT_MAIN_THREAD(tso,ret) waitThread(tso,ret)
#endif
#if defined(RTS_SUPPORTS_THREADS)
/* Cheesy locking scheme while waiting for the
* RTS API to change.
*/
static Mutex alloc_mutex = INIT_MUTEX_VAR;
static Condition alloc_cond = INIT_COND_VAR;
#define INVALID_THREAD_ID ((OSThreadId)(-1))
/* Thread currently owning the allocator */
static OSThreadId c_id = INVALID_THREAD_ID;
static StgPtr alloc(nat n)
{
OSThreadId tid = osThreadId();
ACQUIRE_LOCK(&alloc_mutex);
if (tid == c_id) {
/* I've got the lock, just allocate() */
;
} else if (c_id == INVALID_THREAD_ID) {
c_id = tid;
} else {
waitCondition(&alloc_cond, &alloc_mutex);
c_id = tid;
}
RELEASE_LOCK(&alloc_mutex);
return allocate(n);
}
static void releaseAllocLock(void)
{
ACQUIRE_LOCK(&alloc_mutex);
/* Reset the allocator owner */
c_id = INVALID_THREAD_ID;
RELEASE_LOCK(&alloc_mutex);
/* Free up an OS thread waiting to get in */
signalCondition(&alloc_cond);
}
#else
# define alloc(n) allocate(n)
# define releaseAllocLock() /* nothing */
#endif
/* ----------------------------------------------------------------------------
Building Haskell objects from C datatypes.
......@@ -22,7 +76,7 @@
HaskellObj
rts_mkChar (HsChar c)
{
StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
SET_HDR(p, Czh_con_info, CCS_SYSTEM);
p->payload[0] = (StgClosure *)(StgChar)c;
return p;
......@@ -31,7 +85,7 @@ rts_mkChar (HsChar c)
HaskellObj
rts_mkInt (HsInt i)
{
StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
SET_HDR(p, Izh_con_info, CCS_SYSTEM);
p->payload[0] = (StgClosure *)(StgInt)i;
return p;
......@@ -40,7 +94,7 @@ rts_mkInt (HsInt i)
HaskellObj
rts_mkInt8 (HsInt8 i)
{
StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
SET_HDR(p, I8zh_con_info, CCS_SYSTEM);
/* Make sure we mask out the bits above the lowest 8 */
p->payload[0] = (StgClosure *)(StgInt)((unsigned)i & 0xff);
......@@ -50,7 +104,7 @@ rts_mkInt8 (HsInt8 i)
HaskellObj
rts_mkInt16 (HsInt16 i)
{
StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
SET_HDR(p, I16zh_con_info, CCS_SYSTEM);
/* Make sure we mask out the relevant bits */
p->payload[0] = (StgClosure *)(StgInt)((unsigned)i & 0xffff);
......@@ -60,7 +114,7 @@ rts_mkInt16 (HsInt16 i)
HaskellObj
rts_mkInt32 (HsInt32 i)
{
StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
SET_HDR(p, I32zh_con_info, CCS_SYSTEM);
p->payload[0] = (StgClosure *)(StgInt)((unsigned)i & 0xffffffff);
return p;
......@@ -70,7 +124,7 @@ HaskellObj
rts_mkInt64 (HsInt64 i)
{
long long *tmp;
StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,2));
StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,2));
SET_HDR(p, I64zh_con_info, CCS_SYSTEM);
tmp = (long long*)&(p->payload[0]);
*tmp = (StgInt64)i;
......@@ -80,7 +134,7 @@ rts_mkInt64 (HsInt64 i)
HaskellObj
rts_mkWord (HsWord i)
{
StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
SET_HDR(p, Wzh_con_info, CCS_SYSTEM);
p->payload[0] = (StgClosure *)(StgWord)i;
return p;
......@@ -90,7 +144,7 @@ HaskellObj
rts_mkWord8 (HsWord8 w)
{
/* see rts_mkInt* comments */
StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
SET_HDR(p, W8zh_con_info, CCS_SYSTEM);
p->payload[0] = (StgClosure *)(StgWord)(w & 0xff);
return p;
......@@ -100,7 +154,7 @@ HaskellObj
rts_mkWord16 (HsWord16 w)
{
/* see rts_mkInt* comments */
StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
SET_HDR(p, W16zh_con_info, CCS_SYSTEM);
p->payload[0] = (StgClosure *)(StgWord)(w & 0xffff);
return p;
......@@ -110,7 +164,7 @@ HaskellObj
rts_mkWord32 (HsWord32 w)
{
/* see rts_mkInt* comments */
StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
SET_HDR(p, W32zh_con_info, CCS_SYSTEM);
p->payload[0] = (StgClosure *)(StgWord)(w & 0xffffffff);
return p;
......@@ -121,7 +175,7 @@ rts_mkWord64 (HsWord64 w)
{
unsigned long long *tmp;
StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,2));
StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,2));
/* see mk_Int8 comment */
SET_HDR(p, W64zh_con_info, CCS_SYSTEM);
tmp = (unsigned long long*)&(p->payload[0]);
......@@ -132,7 +186,7 @@ rts_mkWord64 (HsWord64 w)
HaskellObj
rts_mkFloat (HsFloat f)
{
StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
SET_HDR(p, Fzh_con_info, CCS_SYSTEM);
ASSIGN_FLT((P_)p->payload, (StgFloat)f);
return p;
......@@ -141,7 +195,7 @@ rts_mkFloat (HsFloat f)
HaskellObj
rts_mkDouble (HsDouble d)
{
StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,sizeofW(StgDouble)));
StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,sizeofW(StgDouble)));
SET_HDR(p, Dzh_con_info, CCS_SYSTEM);
ASSIGN_DBL((P_)p->payload, (StgDouble)d);
return p;
......@@ -150,7 +204,7 @@ rts_mkDouble (HsDouble d)
HaskellObj
rts_mkStablePtr (HsStablePtr s)
{
StgClosure *p = (StgClosure *)allocate(sizeofW(StgHeader)+1);
StgClosure *p = (StgClosure *)alloc(sizeofW(StgHeader)+1);
SET_HDR(p, StablePtr_con_info, CCS_SYSTEM);
p->payload[0] = (StgClosure *)s;
return p;
......@@ -159,7 +213,7 @@ rts_mkStablePtr (HsStablePtr s)
HaskellObj
rts_mkPtr (HsPtr a)
{
StgClosure *p = (StgClosure *)allocate(sizeofW(StgHeader)+1);
StgClosure *p = (StgClosure *)alloc(sizeofW(StgHeader)+1);
SET_HDR(p, Ptr_con_info, CCS_SYSTEM);
p->payload[0] = (StgClosure *)a;
return p;
......@@ -186,7 +240,7 @@ rts_mkString (char *s)
HaskellObj
rts_apply (HaskellObj f, HaskellObj arg)
{
StgAP_UPD *ap = (StgAP_UPD *)allocate(AP_sizeW(1));
StgAP_UPD *ap = (StgAP_UPD *)alloc(AP_sizeW(1));
SET_HDR(ap, &stg_AP_UPD_info, CCS_SYSTEM);
ap->n_args = 1;
ap->fun = f;
......@@ -400,6 +454,7 @@ rts_eval (HaskellObj p, /*out*/HaskellObj *ret)
StgTSO *tso;
tso = createGenThread(RtsFlags.GcFlags.initialStkSize, p);
releaseAllocLock();
scheduleThread(tso);
return waitThread(tso, ret);
}
......@@ -410,6 +465,7 @@ rts_eval_ (HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret)
StgTSO *tso;
tso = createGenThread(stack_size, p);
releaseAllocLock();
scheduleThread(tso);
return waitThread(tso, ret);
}
......@@ -424,10 +480,26 @@ rts_evalIO (HaskellObj p, /*out*/HaskellObj *ret)
StgTSO* tso;
tso = createStrictIOThread(RtsFlags.GcFlags.initialStkSize, p);
releaseAllocLock();
scheduleThread(tso);
return waitThread(tso, ret);
}
/*
* Identical to rts_evalIO(), but won't create a new task/OS thread
* to evaluate the Haskell thread. Used by main() only. Hack.
*/
SchedulerStatus
rts_mainEvalIO(HaskellObj p, /*out*/HaskellObj *ret)
{
StgTSO* tso;
tso = createStrictIOThread(RtsFlags.GcFlags.initialStkSize, p);
releaseAllocLock();
SCHEDULE_MAIN_THREAD(tso);
return WAIT_MAIN_THREAD(tso, ret);
}
/*
* rts_evalStableIO() is suitable for calling from Haskell. It
* evaluates a value of the form (StablePtr (IO a)), forcing the
......@@ -443,6 +515,7 @@ rts_evalStableIO (HsStablePtr s, /*out*/HsStablePtr *ret)
p = (StgClosure *)deRefStablePtr(s);
tso = createStrictIOThread(RtsFlags.GcFlags.initialStkSize, p);
releaseAllocLock();
scheduleThread(tso);
stat = waitThread(tso, &r);
......@@ -463,6 +536,7 @@ rts_evalLazyIO (HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret)
StgTSO *tso;
tso = createIOThread(stack_size, p);
releaseAllocLock();
scheduleThread(tso);
return waitThread(tso, ret);
}
......
/* ---------------------------------------------------------------------------
* $Id: Schedule.c,v 1.121 2002/02/12 15:38:08 sof Exp $
* $Id: Schedule.c,v 1.122 2002/02/13 08:48:06 sof Exp $
*
* (c) The GHC Team, 1998-2000
*
......@@ -158,7 +158,7 @@ StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
/* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
/*
In GranSim we have a runable and a blocked queue for each processor.
In GranSim we have a runnable and a blocked queue for each processor.
In order to minimise code changes new arrays run_queue_hds/tls
are created. run_queue_hd is then a short cut (macro) for
run_queue_hds[CurrentProc] (see GranSim.h).
......@@ -265,44 +265,31 @@ static void sched_belch(char *s, ...);
*/
Mutex sched_mutex = INIT_MUTEX_VAR;
Mutex term_mutex = INIT_MUTEX_VAR;
#if defined(THREADED_RTS)
/*
* The rts_mutex is the 'big lock' that the active native
* thread within the RTS holds while executing code.
* It is given up when the thread makes a transition out of
* the RTS (e.g., to perform an external C call), hopefully
* for another thread to take over its chores and enter
* the RTS.
*
*/
Mutex rts_mutex = INIT_MUTEX_VAR;
/*
* When a native thread has completed executing an external
* call, it needs to communicate the result back to the
* (Haskell) thread that made the call. Do this as follows:
*
* - in resumeThread(), the thread increments the counter
* threads_waiting, and then blocks on the 'big' RTS lock.
* - upon entry to the scheduler, the thread that's currently
* holding the RTS lock checks threads_waiting. If there
* are native threads waiting, it gives up its RTS lock
* and tries to re-grab the RTS lock [perhaps after having
* waited for a bit..?]
* - care must be taken to deal with the case where more than
* one external thread are waiting on the lock. [ToDo: more]
*
* rts_n_returning_workers, and then blocks waiting on the
* condition returning_worker_cond.
* - upon entry to the scheduler, a worker/task checks
* rts_n_returning_workers. If it is > 0, worker threads
* are waiting to return, so it gives up its capability
* to let a worker deposit its result.
* - the worker thread that gave up its capability then tries
* to re-grab a capability and re-enter the Scheduler.
*/
static nat threads_waiting = 0;
#endif
/* thread_ready_cond: when signalled, a thread has become runnable for a
* task to execute.
*
* In the non-SMP case, it also implies that the thread that is woken up has
* exclusive access to the RTS and all its DS (that are not under sched_mutex's
* control).
* exclusive access to the RTS and all its data structures (that are not
* under sched_mutex's control).
*
* thread_ready_cond is signalled whenever COND_NO_THREADS_READY doesn't hold.
*
......@@ -313,12 +300,40 @@ Condition thread_ready_cond = INIT_COND_VAR;
#define COND_NO_THREADS_READY() (noCapabilities() || EMPTY_RUN_QUEUE())
#endif
#if defined(SMP)
Condition gc_pending_cond = INIT_COND_VAR;
/*
* To be able to make an informed decision about whether or not
* to create a new task when making an external call, keep track of
* the number of tasks currently blocked waiting on thread_ready_cond.
* (if > 0 => no need for a new task, just unblock an existing one).
*/
nat rts_n_waiting_tasks = 0;
/* returning_worker_cond: when a worker thread returns from executing an
* external call, it needs to wait for an RTS Capability before passing
* on the result of the call to the Haskell thread that made it.
*
* returning_worker_cond is signalled in Capability.releaseCapability().
*
*/
Condition returning_worker_cond = INIT_COND_VAR;
/*
* To avoid starvation of threads blocked on worker_thread_cond,
* the task(s) that enter the Scheduler will check to see whether
* there are one or more worker threads blocked waiting on
* returning_worker_cond.
*
* Locks needed: sched_mutex
*/
nat rts_n_waiting_workers = 0;
# if defined(SMP)
static Condition gc_pending_cond = INIT_COND_VAR;
nat await_death;
#endif
# endif
#endif
#endif /* RTS_SUPPORTS_THREADS */
#if defined(PAR)
StgTSO *LastTSO;
......@@ -360,12 +375,6 @@ static void taskStart(void);
static void
taskStart(void)
{
/* threads start up using 'taskStart', so make them
them grab the RTS lock. */
#if defined(THREADED_RTS)
ACQUIRE_LOCK(&rts_mutex);
taskNotAvailable();
#endif
schedule();
}
#endif
......@@ -431,28 +440,36 @@ schedule( void )
# endif
#endif
rtsBool was_interrupted = rtsFalse;
#if defined(RTS_SUPPORTS_THREADS)
schedule_start:
#endif
#if defined(RTS_SUPPORTS_THREADS)
ACQUIRE_LOCK(&sched_mutex);
#if defined(THREADED_RTS)
#endif
#if defined(RTS_SUPPORTS_THREADS)
/* ToDo: consider SMP support */
if (threads_waiting > 0) {
if ( rts_n_waiting_workers > 0 && noCapabilities() ) {
/* (At least) one native thread is waiting to
* deposit the result of an external call. So,
* give up our RTS executing privileges and let
* one of them continue.
*
* be nice and hand over our capability.
*/
taskAvailable();
IF_DEBUG(scheduler, sched_belch("worker thread (%d): giving up RTS token (waiting workers: %d)\n", osThreadId(), rts_n_waiting_workers));
releaseCapability(cap);
RELEASE_LOCK(&sched_mutex);
IF_DEBUG(scheduler, sched_belch("worker thread (%d): giving up RTS token (threads_waiting=%d)\n", osThreadId(), threads_waiting));
RELEASE_LOCK(&rts_mutex);
/* ToDo: come up with mechanism that guarantees that
* the main thread doesn't loop here.
*/
yieldThread();
/* ToDo: longjmp() */
taskStart();
goto schedule_start;
}
#endif
#if defined(RTS_SUPPORTS_THREADS)
while ( noCapabilities() ) {
rts_n_waiting_tasks++;
waitCondition(&thread_ready_cond, &sched_mutex);
rts_n_waiting_tasks--;
}
#endif
......@@ -646,21 +663,25 @@ schedule( void )
* inform all the main threads.
*/
#ifndef PAR
if ( EMPTY_QUEUE(blocked_queue_hd)
&& EMPTY_RUN_QUEUE()
if ( EMPTY_RUN_QUEUE()
&& EMPTY_QUEUE(blocked_queue_hd)
&& EMPTY_QUEUE(sleeping_queue)
#if defined(SMP)
&& allFreeCapabilities()
#elif defined(THREADED_RTS)
#if defined(RTS_SUPPORTS_THREADS)
&& EMPTY_QUEUE(suspended_ccalling_threads)
#endif
#ifdef SMP
&& allFreeCapabilities()
#endif
)
{
IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
#if defined(THREADED_RTS)
/* and SMP mode ..? */
releaseCapability(cap);
#endif
RELEASE_LOCK(&sched_mutex);
GarbageCollect(GetRoots,rtsTrue);
ACQUIRE_LOCK(&sched_mutex);
IF_DEBUG(scheduler, sched_belch("GC done."));
if ( EMPTY_QUEUE(blocked_queue_hd)
&& EMPTY_RUN_QUEUE()
&& EMPTY_QUEUE(sleeping_queue) ) {
......@@ -705,8 +726,10 @@ schedule( void )
#endif
}
#if defined(RTS_SUPPORTS_THREADS)
/* ToDo: revisit conditions (and mechanism) for shutting
down a multi-threaded world */
if ( EMPTY_RUN_QUEUE() ) {
IF_DEBUG(scheduler, sched_belch("all done, it seems...shut down."));
IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
shutdownHaskellAndExit(0);
}
......@@ -728,31 +751,22 @@ schedule( void )
}
#endif
#if defined(SMP)
#if defined(RTS_SUPPORTS_THREADS)
/* block until we've got a thread on the run queue and a free
* capability.
*
*/
while ( noCapabilities() || EMPTY_RUN_QUEUE() ) {
IF_DEBUG(scheduler, sched_belch("waiting for work"));
waitCondition( &thread_ready_cond, &sched_mutex );
IF_DEBUG(scheduler, sched_belch("work now available"));
if ( EMPTY_RUN_QUEUE() ) {
/* Give up our capability */
releaseCapability(cap);
while ( noCapabilities() || EMPTY_RUN_QUEUE() ) {
IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
rts_n_waiting_tasks++;
waitCondition( &thread_ready_cond, &sched_mutex );
rts_n_waiting_tasks--;
IF_DEBUG(scheduler, sched_belch("thread %d: work now available %d %d", osThreadId(), getFreeCapabilities(),EMPTY_RUN_QUEUE()));
}
}
#elif defined(THREADED_RTS)
if ( EMPTY_RUN_QUEUErun_queue_hd == END_TSO_QUEUE ) {
/* no work available, wait for external calls to complete. */
IF_DEBUG(scheduler, sched_belch("worker thread (%d): waiting for external thread to complete..", osThreadId()));
taskAvailable();
RELEASE_LOCK(&rts_mutex);
while ( EMPTY_RUN_QUEUE() ) {
waitCondition(&thread_ready_cond, &sched_mutex);
};
RELEASE_LOCK(&sched_mutex);