Commit 85aa72b9 authored by wolfgang's avatar wolfgang

[project @ 2003-09-21 22:20:51 by wolfgang]

Bound Threads
=============

Introduce a way to use foreign libraries that rely on thread local state
from multiple threads (mainly affects the threaded RTS).

See the file threads.tex in CVS at haskell-report/ffi/threads.tex
(not entirely finished yet) for a definition of this extension. A less formal
description is also found in the documentation of Control.Concurrent.

The changes mostly affect the THREADED_RTS (./configure --enable-threaded-rts),
except for saving & restoring errno on a per-TSO basis, which is also necessary
for the non-threaded RTS (a bugfix).

Detailed list of changes
------------------------

- errno is saved in the TSO object and restored when necessary:
ghc/includes/TSO.h, ghc/rts/Interpreter.c, ghc/rts/Schedule.c

- rts_mainLazyIO is no longer needed, main is no special case anymore
ghc/includes/RtsAPI.h, ghc/rts/RtsAPI.c, ghc/rts/Main.c, ghc/rts/Weak.c

- passCapability: a new function that releases the capability and "passes"
  it to a specific OS thread:
ghc/rts/Capability.h ghc/rts/Capability.c

- waitThread(), scheduleWaitThread() and schedule() get an optional
  Capability *initialCapability passed as an argument:
ghc/includes/SchedAPI.h, ghc/rts/Schedule.c, ghc/rts/RtsAPI.c

- Bound Thread scheduling (that's what this is all about):
ghc/rts/Schedule.h, ghc/rts/Schedule.c

- new Primop isCurrentThreadBound#:
ghc/compiler/prelude/primops.txt.pp, ghc/includes/PrimOps.h, ghc/rts/PrimOps.hc,
ghc/rts/Schedule.h, ghc/rts/Schedule.c

- a simple function, rtsSupportsBoundThreads, that returns true if THREADED_RTS
  is defined:
ghc/rts/Schedule.h, ghc/rts/Schedule.c

- a new implementation of forkProcess (the old implementation stays in place
  for the non-threaded case). Partially broken; works for the standard
  fork-and-exec case, but not for much else. A proper forkProcess is
  really next to impossible to implement:
ghc/rts/Schedule.c

- Library support for bound threads:
    Control.Concurrent.
      rtsSupportsBoundThreads, isCurrentThreadBound, forkOS,
      runInBoundThread, runInUnboundThread
libraries/base/Control/Concurrent.hs, libraries/base/Makefile,
libraries/base/include/HsBase.h, libraries/base/cbits/forkOS.c (new file)
parent 6f0dcafb
-----------------------------------------------------------------------
-- $Id: primops.txt.pp,v 1.28 2003/07/03 15:14:56 sof Exp $
-- $Id: primops.txt.pp,v 1.29 2003/09/21 22:20:51 wolfgang Exp $
--
-- Primitive Operations
--
......@@ -1494,6 +1494,11 @@ primop LabelThreadOp "labelThread#" GenPrimOp
with
has_side_effects = True
out_of_line = True
primop IsCurrentThreadBoundOp "isCurrentThreadBound#" GenPrimOp
State# RealWorld -> (# State# RealWorld, Int# #)
with
out_of_line = True
------------------------------------------------------------------------
section "Weak pointers"
......
/* -----------------------------------------------------------------------------
* $Id: PrimOps.h,v 1.103 2003/07/03 15:14:57 sof Exp $
* $Id: PrimOps.h,v 1.104 2003/09/21 22:20:52 wolfgang Exp $
*
* (c) The GHC Team, 1998-2000
*
......@@ -281,6 +281,7 @@ EXTFUN_RTS(blockAsyncExceptionszh_fast);
EXTFUN_RTS(unblockAsyncExceptionszh_fast);
EXTFUN_RTS(myThreadIdzh_fast);
EXTFUN_RTS(labelThreadzh_fast);
EXTFUN_RTS(isCurrentThreadBoundzh_fast);
extern int cmp_thread(StgPtr tso1, StgPtr tso2);
extern int rts_getThreadId(StgPtr tso);
......@@ -417,4 +418,5 @@ EXTFUN_RTS(mkApUpd0zh_fast);
-------------------------------------------------------------------------- */
#define ForeignObj_CLOSURE_DATA(c) (((StgForeignObj *)c)->data)
#endif /* PRIMOPS_H */
/* ----------------------------------------------------------------------------
* $Id: RtsAPI.h,v 1.35 2003/08/22 22:38:02 sof Exp $
* $Id: RtsAPI.h,v 1.36 2003/09/21 22:20:52 wolfgang Exp $
*
* (c) The GHC Team, 1998-1999
*
......@@ -113,17 +113,14 @@ rts_eval_ ( HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret );
SchedulerStatus
rts_evalIO ( HaskellObj p, /*out*/HaskellObj *ret );
#if defined(COMPILING_RTS_MAIN)
/* Used by the RTS' main() only */
SchedulerStatus
rts_mainLazyIO ( HaskellObj p, /*out*/HaskellObj *ret );
#endif
SchedulerStatus
rts_evalStableIO ( HsStablePtr s, /*out*/HsStablePtr *ret );
SchedulerStatus
rts_evalLazyIO ( HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret );
rts_evalLazyIO ( HaskellObj p, /*out*/HaskellObj *ret );
SchedulerStatus
rts_evalLazyIO_ ( HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret );
void
rts_checkSchedStatus ( char* site, SchedulerStatus rc);
......
/* -----------------------------------------------------------------------------
* $Id: SchedAPI.h,v 1.17 2002/12/27 12:33:21 panne Exp $
* $Id: SchedAPI.h,v 1.18 2003/09/21 22:20:53 wolfgang Exp $
*
* (c) The GHC Team 1998-2002
*
......@@ -16,7 +16,8 @@
#define NO_PRI 0
#endif
extern SchedulerStatus waitThread(StgTSO *main_thread, /*out*/StgClosure **ret);
extern SchedulerStatus waitThread(StgTSO *main_thread, /*out*/StgClosure **ret,
Capability *initialCapability);
/*
* Creating threads
......@@ -30,7 +31,8 @@ extern StgTSO *createThread(nat stack_size);
extern void taskStart(void);
#endif
extern void scheduleThread(StgTSO *tso);
extern SchedulerStatus scheduleWaitThread(StgTSO *tso, /*out*/HaskellObj* ret);
extern SchedulerStatus scheduleWaitThread(StgTSO *tso, /*out*/HaskellObj* ret,
Capability *initialCapability);
static inline void pushClosure (StgTSO *tso, StgWord c) {
tso->sp--;
......
/* -----------------------------------------------------------------------------
* $Id: TSO.h,v 1.31 2003/07/03 15:14:58 sof Exp $
* $Id: TSO.h,v 1.32 2003/09/21 22:20:53 wolfgang Exp $
*
* (c) The GHC Team, 1998-1999
*
......@@ -194,7 +194,8 @@ typedef struct StgTSO_ {
StgTSOBlockInfo block_info;
struct StgTSO_* blocked_exceptions;
StgThreadID id;
int saved_errno;
StgTSOTickyInfo ticky;
StgTSOProfInfo prof;
StgTSOParInfo par;
......
......@@ -149,6 +149,9 @@ void grabCapability(Capability** cap)
free_capabilities = (*cap)->link;
rts_n_free_capabilities--;
#endif
IF_DEBUG(scheduler,
fprintf(stderr,"worker thread (%p): got capability\n",
osThreadId()));
}
/*
......@@ -196,6 +199,9 @@ void releaseCapability(Capability* cap
signalCondition(&thread_ready_cond);
}
#endif
IF_DEBUG(scheduler,
fprintf(stderr,"worker thread (%p): released capability\n",
osThreadId()));
return;
}
......@@ -236,9 +242,9 @@ void
grabReturnCapability(Mutex* pMutex, Capability** pCap)
{
IF_DEBUG(scheduler,
fprintf(stderr,"worker (%ld): returning, waiting for lock.\n", osThreadId()));
fprintf(stderr,"worker (%p): returning, waiting for lock.\n", osThreadId()));
IF_DEBUG(scheduler,
fprintf(stderr,"worker (%ld): returning; workers waiting: %d\n",
fprintf(stderr,"worker (%p): returning; workers waiting: %d\n",
osThreadId(), rts_n_waiting_workers));
if ( noCapabilities() ) {
rts_n_waiting_workers++;
......@@ -265,27 +271,30 @@ grabReturnCapability(Mutex* pMutex, Capability** pCap)
-------------------------------------------------------------------------- */
/*
* Function: yieldToReturningWorker(Mutex*,Capability*)
* Function: yieldToReturningWorker(Mutex*,Capability*,Condition*)
*
* Purpose: when, upon entry to the Scheduler, an OS worker thread
* spots that one or more threads are blocked waiting for
* permission to return back their result, it gives up
* its Capability.
* its Capability.
* Immediately afterwards, it tries to reaquire the Capabilty
* using waitForWorkCapability.
*
*
* Pre-condition: pMutex is assumed held and the thread possesses
* a Capability.
* Post-condition: pMutex is held and the Capability has
* been given back.
* Post-condition: pMutex is held and the thread possesses
* a Capability.
*/
void
yieldToReturningWorker(Mutex* pMutex, Capability** pCap)
yieldToReturningWorker(Mutex* pMutex, Capability** pCap, Condition* pThreadCond)
{
if ( rts_n_waiting_workers > 0 ) {
IF_DEBUG(scheduler,
fprintf(stderr,"worker thread (%p): giving up RTS token\n", osThreadId()));
releaseCapability(*pCap);
/* And wait for work */
waitForWorkCapability(pMutex, pCap, rtsFalse);
waitForWorkCapability(pMutex, pCap, pThreadCond);
IF_DEBUG(scheduler,
fprintf(stderr,"worker thread (%p): got back RTS token (after yieldToReturningWorker)\n",
osThreadId()));
......@@ -295,7 +304,7 @@ yieldToReturningWorker(Mutex* pMutex, Capability** pCap)
/*
* Function: waitForWorkCapability(Mutex*, Capability**, rtsBool)
* Function: waitForWorkCapability(Mutex*, Capability**, Condition*)
*
* Purpose: wait for a Capability to become available. In
* the process of doing so, updates the number
......@@ -303,22 +312,74 @@ yieldToReturningWorker(Mutex* pMutex, Capability** pCap)
* work. That counter is used when deciding whether or
* not to create a new worker thread when an external
* call is made.
* If pThreadCond is not NULL, a capability can be specifically
* passed to this thread using passCapability.
*
* Pre-condition: pMutex is held.
* Post-condition: pMutex is held and *pCap is held by the current thread
*/
static Condition *passTarget = NULL;
void
waitForWorkCapability(Mutex* pMutex, Capability** pCap, rtsBool runnable)
waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition* pThreadCond)
{
while ( noCapabilities() || (runnable && EMPTY_RUN_QUEUE()) ) {
rts_n_waiting_tasks++;
waitCondition(&thread_ready_cond, pMutex);
rts_n_waiting_tasks--;
#ifdef SMP
#error SMP version not implemented
#endif
IF_DEBUG(scheduler,
fprintf(stderr,"worker thread (%p): wait for cap (cond: %p)\n",
osThreadId(),pThreadCond));
while ( noCapabilities() || (pThreadCond && passTarget != pThreadCond)
|| (!pThreadCond && passTarget)) {
if(pThreadCond)
{
waitCondition(pThreadCond, pMutex);
IF_DEBUG(scheduler,
fprintf(stderr,"worker thread (%p): get passed capability\n",
osThreadId()));
}
else
{
rts_n_waiting_tasks++;
waitCondition(&thread_ready_cond, pMutex);
rts_n_waiting_tasks--;
IF_DEBUG(scheduler,
fprintf(stderr,"worker thread (%p): get normal capability\n",
osThreadId()));
}
}
passTarget = NULL;
grabCapability(pCap);
return;
}
/*
* Function: passCapability(Mutex*, Capability*, Condition*)
*
* Purpose: Let go of the capability and make sure the thread associated
* with the Condition pTargetThreadCond gets it next.
*
* Pre-condition: pMutex is held and cap is held by the current thread
* Post-condition: pMutex is held; cap will be grabbed by the "target"
* thread when pMutex is released.
*/
void
passCapability(Mutex* pMutex, Capability* cap, Condition *pTargetThreadCond)
{
#ifdef SMP
#error SMP version not implemented
#endif
rts_n_free_capabilities = 1;
signalCondition(pTargetThreadCond);
passTarget = pTargetThreadCond;
IF_DEBUG(scheduler,
fprintf(stderr,"worker thread (%p): passCapability\n",
osThreadId()));
}
#endif /* RTS_SUPPORTS_THREADS */
#if defined(SMP)
......
......@@ -39,9 +39,9 @@ extern nat rts_n_free_capabilities;
extern nat rts_n_waiting_workers;
extern void grabReturnCapability(Mutex* pMutex, Capability** pCap);
extern void yieldToReturningWorker(Mutex* pMutex, Capability** pCap);
extern void waitForWorkCapability(Mutex* pMutex, Capability** pCap, rtsBool runnable);
extern void yieldToReturningWorker(Mutex* pMutex, Capability** pCap, Condition *pThreadCond);
extern void waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition *pThreadCond);
extern void passCapability(Mutex* pMutex, Capability* cap, Condition *pTargetThreadCond);
static inline rtsBool needToYieldToReturningWorker(void)
{
......
......@@ -26,6 +26,11 @@
#include "Disassembler.h"
#include "Interpreter.h"
#include <string.h> /* for memcpy */
#ifdef HAVE_ERRNO_H
#include <errno.h>
#endif
/* --------------------------------------------------------------------------
* The bytecode interpreter
......@@ -1172,6 +1177,9 @@ run_BCO:
memcpy(arguments, Sp, sizeof(W_) * stk_offset);
#endif
// Restore the Haskell thread's current value of errno
errno = cap->r.rCurrentTSO->saved_errno;
// There are a bunch of non-ptr words on the stack (the
// ccall args, the ccall fun address and space for the
// result), which we need to cover with an info table
......@@ -1208,6 +1216,9 @@ run_BCO:
LOAD_STACK_POINTERS;
Sp += ret_dyn_size;
// Save the Haskell thread's current value of errno
cap->r.rCurrentTSO->saved_errno = errno;
#ifdef RTS_SUPPORTS_THREADS
// Threaded RTS:
// Copy the "arguments", which might include a return value,
......
/* -----------------------------------------------------------------------------
* $Id: Linker.c,v 1.129 2003/09/11 15:12:25 wolfgang Exp $
* $Id: Linker.c,v 1.130 2003/09/21 22:20:54 wolfgang Exp $
*
* (c) The GHC Team, 2000-2003
*
......@@ -381,6 +381,7 @@ typedef struct _RtsSymbolVal {
SymX(int2Integerzh_fast) \
SymX(integer2Intzh_fast) \
SymX(integer2Wordzh_fast) \
SymX(isCurrentThreadBoundzh_fast) \
SymX(isDoubleDenormalized) \
SymX(isDoubleInfinite) \
SymX(isDoubleNaN) \
......@@ -422,6 +423,7 @@ typedef struct _RtsSymbolVal {
SymX(rts_eval) \
SymX(rts_evalIO) \
SymX(rts_evalLazyIO) \
SymX(rts_evalStableIO) \
SymX(rts_eval_) \
SymX(rts_getBool) \
SymX(rts_getChar) \
......@@ -455,6 +457,7 @@ typedef struct _RtsSymbolVal {
SymX(rts_mkWord64) \
SymX(rts_mkWord8) \
SymX(rts_unlock) \
SymX(rtsSupportsBoundThreads) \
SymX(run_queue_hd) \
SymX(setProgArgv) \
SymX(startupHaskell) \
......
/* -----------------------------------------------------------------------------
* $Id: Main.c,v 1.39 2003/07/10 08:02:29 simonpj Exp $
* $Id: Main.c,v 1.40 2003/09/21 22:20:55 wolfgang Exp $
*
* (c) The GHC Team 1998-2000
*
......@@ -105,7 +105,9 @@ int main(int argc, char *argv[])
# else /* !PAR && !GRAN */
/* ToDo: want to start with a larger stack size */
status = rts_mainLazyIO((HaskellObj)mainIO_closure, NULL);
rts_lock();
status = rts_evalLazyIO((HaskellObj)mainIO_closure, NULL);
rts_unlock();
# endif /* !PAR && !GRAN */
......
......@@ -110,11 +110,17 @@ ifeq "$(way)" "mp"
SRC_HC_OPTS += -I$$PVM_ROOT/include
endif
# Currently, you only get 'threads support' in the normal
# way.
# You get 'threads support' in the normal
# and profiling ways.
ifeq "$(GhcRtsThreaded)" "YES"
ifeq "$(way)" ""
SRC_CC_OPTS += -DTHREADED_RTS
SRC_HC_OPTS += -optc-DTHREADED_RTS
PACKAGE_CPP_OPTS += -DTHREADED_RTS
endif
ifeq "$(way)" "p"
SRC_CC_OPTS += -DTHREADED_RTS
SRC_HC_OPTS += -optc-DTHREADED_RTS
PACKAGE_CPP_OPTS += -DTHREADED_RTS
endif
endif
......
/* -----------------------------------------------------------------------------
* $Id: PrimOps.hc,v 1.112 2003/09/12 16:32:13 sof Exp $
* $Id: PrimOps.hc,v 1.113 2003/09/21 22:20:55 wolfgang Exp $
*
* (c) The GHC Team, 1998-2002
*
......@@ -1095,6 +1095,15 @@ FN_(labelThreadzh_fast)
FE_
}
FN_(isCurrentThreadBoundzh_fast)
{
/* no args */
I_ r;
FB_
r = (I_)(RET_STGCALL1(StgBool, isThreadBound, CurrentTSO));
RET_N(r);
FE_
}
/* -----------------------------------------------------------------------------
* MVar primitives
......@@ -1736,3 +1745,4 @@ FN_(asyncDoProczh_fast)
FE_
}
#endif
/* ----------------------------------------------------------------------------
* $Id: RtsAPI.c,v 1.45 2003/08/28 16:33:42 simonmar Exp $
* $Id: RtsAPI.c,v 1.46 2003/09/21 22:20:56 wolfgang Exp $
*
* (c) The GHC Team, 1998-2001
*
......@@ -21,6 +21,8 @@
#include <stdlib.h>
static Capability *rtsApiCapability = NULL;
/* ----------------------------------------------------------------------------
Building Haskell objects from C datatypes.
------------------------------------------------------------------------- */
......@@ -385,7 +387,7 @@ rts_eval (HaskellObj p, /*out*/HaskellObj *ret)
StgTSO *tso;
tso = createGenThread(RtsFlags.GcFlags.initialStkSize, p);
return scheduleWaitThread(tso,ret);
return scheduleWaitThread(tso,ret,rtsApiCapability);
}
SchedulerStatus
......@@ -394,7 +396,7 @@ rts_eval_ (HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret)
StgTSO *tso;
tso = createGenThread(stack_size, p);
return scheduleWaitThread(tso,ret);
return scheduleWaitThread(tso,ret,rtsApiCapability);
}
/*
......@@ -407,22 +409,7 @@ rts_evalIO (HaskellObj p, /*out*/HaskellObj *ret)
StgTSO* tso;
tso = createStrictIOThread(RtsFlags.GcFlags.initialStkSize, p);
return scheduleWaitThread(tso,ret);
}
/*
* Identical to rts_evalLazyIO(), but won't create a new task/OS thread
* to evaluate the Haskell thread. Used by main() only. Hack.
*/
SchedulerStatus
rts_mainLazyIO(HaskellObj p, /*out*/HaskellObj *ret)
{
StgTSO* tso;
tso = createIOThread(RtsFlags.GcFlags.initialStkSize, p);
scheduleThread(tso);
return waitThread(tso, ret);
return scheduleWaitThread(tso,ret,rtsApiCapability);
}
/*
......@@ -440,9 +427,9 @@ rts_evalStableIO (HsStablePtr s, /*out*/HsStablePtr *ret)
p = (StgClosure *)deRefStablePtr(s);
tso = createStrictIOThread(RtsFlags.GcFlags.initialStkSize, p);
stat = scheduleWaitThread(tso,&r);
stat = scheduleWaitThread(tso,&r,rtsApiCapability);
if (stat == Success) {
if (stat == Success && ret != NULL) {
ASSERT(r != NULL);
*ret = getStablePtr((StgPtr)r);
}
......@@ -454,12 +441,21 @@ rts_evalStableIO (HsStablePtr s, /*out*/HsStablePtr *ret)
* Like rts_evalIO(), but doesn't force the action's result.
*/
SchedulerStatus
rts_evalLazyIO (HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret)
rts_evalLazyIO (HaskellObj p, /*out*/HaskellObj *ret)
{
StgTSO *tso;
tso = createIOThread(RtsFlags.GcFlags.initialStkSize, p);
return scheduleWaitThread(tso,ret,rtsApiCapability);
}
SchedulerStatus
rts_evalLazyIO_ (HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret)
{
StgTSO *tso;
tso = createIOThread(stack_size, p);
return scheduleWaitThread(tso,ret);
return scheduleWaitThread(tso,ret,rtsApiCapability);
}
/* Convenience function for decoding the returned status. */
......@@ -486,18 +482,13 @@ void
rts_lock()
{
#ifdef RTS_SUPPORTS_THREADS
Capability *cap;
ACQUIRE_LOCK(&sched_mutex);
// we request to get the capability immediately, in order to
// a) stop other threads from using allocate()
// b) wake the current worker thread from awaitEvent()
// (so that a thread started by rts_eval* will start immediately)
grabReturnCapability(&sched_mutex,&cap);
// now that we have the capability, we don't need it anymore
// (other threads will continue to run as soon as we release the sched_mutex)
releaseCapability(cap);
grabReturnCapability(&sched_mutex,&rtsApiCapability);
// In the RTS hasn't been entered yet,
// start a RTS task.
......@@ -511,6 +502,7 @@ void
rts_unlock()
{
#ifdef RTS_SUPPORTS_THREADS
rtsApiCapability = NULL;
RELEASE_LOCK(&sched_mutex);
#endif
}
/* ---------------------------------------------------------------------------
* $Id: Schedule.c,v 1.173 2003/08/15 12:43:57 simonmar Exp $
* $Id: Schedule.c,v 1.174 2003/09/21 22:20:56 wolfgang Exp $
*
* (c) The GHC Team, 1998-2000
*
......@@ -126,6 +126,10 @@
#include <stdlib.h>
#include <stdarg.h>
#ifdef HAVE_ERRNO_H
#include <errno.h>
#endif
//@node Variables and Data structures, Prototypes, Includes, Main scheduling code
//@subsection Variables and Data structures
......@@ -134,15 +138,6 @@
*/
StgMainThread *main_threads = NULL;
#ifdef THREADED_RTS
// Pointer to the thread that executes main
// When this thread is finished, the program terminates
// by calling shutdownHaskellAndExit.
// It would be better to add a call to shutdownHaskellAndExit
// to the Main.main wrapper and to remove this hack.
StgMainThread *main_main_thread = NULL;
#endif
/* Thread queues.
* Locks required: sched_mutex.
*/
......@@ -249,7 +244,7 @@ static rtsBool shutting_down_scheduler = rtsFalse;
void addToBlockedQueue ( StgTSO *tso );
static void schedule ( void );
static void schedule ( StgMainThread *mainThread, Capability *initialCapability );
void interruptStgRts ( void );
static void detectBlackHoles ( void );
......@@ -311,7 +306,7 @@ static void taskStart(void);
static void
taskStart(void)
{
schedule();
schedule(NULL,NULL);
}
#endif
......@@ -363,10 +358,10 @@ startSchedulerTask(void)
------------------------------------------------------------------------ */
//@cindex schedule
static void
schedule( void )
schedule( StgMainThread *mainThread, Capability *initialCapability )
{
StgTSO *t;
Capability *cap;
Capability *cap = initialCapability;
StgThreadReturnCode ret;
#if defined(GRAN)
rtsEvent *event;
......@@ -386,8 +381,16 @@ schedule( void )
ACQUIRE_LOCK(&sched_mutex);
#if defined(RTS_SUPPORTS_THREADS)
waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): entering RTS", osThreadId()));
/* in the threaded case, the capability is either passed in via the initialCapability
parameter, or initialized inside the scheduler loop */
IF_DEBUG(scheduler,
fprintf(stderr,"### NEW SCHEDULER LOOP in os thread %u(%p)\n",
osThreadId(), osThreadId()));
IF_DEBUG(scheduler,
fprintf(stderr,"### main thread: %p\n",mainThread));
IF_DEBUG(scheduler,
fprintf(stderr,"### initial cap: %p\n",initialCapability));
#else
/* simply initialise it in the non-threaded case */
grabCapability(&cap);
......@@ -431,8 +434,19 @@ schedule( void )
#if defined(RTS_SUPPORTS_THREADS)
/* Check to see whether there are any worker threads
waiting to deposit external call results. If so,
yield our capability */
yieldToReturningWorker(&sched_mutex, &cap);
yield our capability... if we have a capability, that is. */
if(cap)
yieldToReturningWorker(&sched_mutex, &cap,
mainThread ? &mainThread->bound_thread_cond : NULL);
/* If we do not currently hold a capability, we wait for one */
if(!cap)
{
waitForWorkCapability(&sched_mutex, &cap,
mainThread ? &mainThread->bound_thread_cond : NULL);
IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): got cap",
osThreadId()));
}
#endif
/* If we're interrupted (the user pressed ^C, or some other
......@@ -463,55 +477,63 @@ schedule( void )
*/
#if defined(RTS_SUPPORTS_THREADS)
{
StgMainThread *m, **prev;
prev = &main_threads;
for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {