Commit efa41d9d authored by sof's avatar sof
Browse files

[project @ 2002-02-14 07:52:05 by sof]

Restructured / tidied a bit:

* Capability.grabReturnCapability() is now called by resumeThread().
  It takes care of waiting on the (Capability.c-local) condition
  variable, 'returning_worker_cond' (moved here from Schedule.c)

* If a worker notices upon entry to the Scheduler that there are
  worker threads waiting to deposit results of external calls,
  it gives up its capability by calling Capability.yieldCapability().

* Added Scheduler.waitForWork(), which takes care of blocking
  on 'thread_ready_cond' (+ 'rts_n_waiting_tasks' book-keeping).

Note: changes haven't been fully tested, due to HEAD instability.
parent 05abfa38
/* ---------------------------------------------------------------------------
*
* (c) The GHC Team, 2001
* (c) The GHC Team, 2002
*
* Capabilities
*
* The notion of a capability is used when operating in multi-threaded
* environments (which the SMP and Threads builds of the RTS do), to
* hold all the state an OS thread/task needs to run Haskell code:
* A Capability represent the token required to execute STG code,
* and all the state an OS thread/task needs to run Haskell code:
* its STG registers, a pointer to its TSO, a nursery etc. During
* STG execution, a pointer to the capabilitity is kept in a
* STG execution, a pointer to the capabilitity is kept in a
* register (BaseReg).
*
* Only in an SMP build will there be multiple capabilities, the threaded
* RTS and other non-threaded builds, there is one global capability,
* namely MainRegTable.
*
*
* --------------------------------------------------------------------------*/
#include "PosixSource.h"
......@@ -29,6 +27,27 @@ Capability MainCapability; /* for non-SMP, we have one global capability */
nat rts_n_free_capabilities;
#if defined(RTS_SUPPORTS_THREADS)
/* returning_worker_cond: when a worker thread returns from executing an
* external call, it needs to wait for an RTS Capability before passing
* on the result of the call to the Haskell thread that made it.
*
* returning_worker_cond is signalled in Capability.releaseCapability().
*
*/
Condition returning_worker_cond = INIT_COND_VAR;
/*
* To avoid starvation of threads blocked on worker_thread_cond,
* the task(s) that enter the Scheduler will check to see whether
* there are one or more worker threads blocked waiting on
* returning_worker_cond.
*
* Locks needed: sched_mutex
*/
nat rts_n_waiting_workers = 0;
#endif
static
void
initCapability( Capability *cap )
......@@ -48,6 +67,10 @@ static void initCapabilities_(nat n);
void
initCapabilities()
{
#if defined(RTS_SUPPORTS_THREADS)
initCondition(returning_worker_cond);
#endif
#if defined(SMP)
initCapabilities_(RtsFlags.ParFlags.nNodes);
#else
......@@ -78,9 +101,12 @@ void grabCapability(Capability** cap)
}
/*
* Letting go of a capability
* Function: releaseCapability(Capability*)
*
* Purpose: Letting go of a capability.
*
* Locks required: sched_mutex
* Pre-condition: sched_mutex is assumed held by current thread.
* Post-condition:
*/
void releaseCapability(Capability* cap
#if !defined(SMP)
......@@ -100,9 +126,11 @@ void releaseCapability(Capability* cap
/* Check to see whether a worker thread can be given
the go-ahead to return the result of an external call..*/
if (rts_n_waiting_workers > 0) {
/* The worker is responsible for grabbing the capability and
* decrementing the rts_n_returning_workers count
/* Decrement the counter here to avoid livelock where the
* thread that is yielding its capability will repeatedly
* signal returning_worker_cond.
*/
rts_n_waiting_workers--;
signalCondition(&returning_worker_cond);
} else if ( !EMPTY_RUN_QUEUE() ) {
/* Signal that work is available */
......@@ -112,8 +140,100 @@ void releaseCapability(Capability* cap
return;
}
#if defined(RTS_SUPPORTS_THREADS)
/*
* When a native thread has completed the execution of an external
* call, it needs to communicate the result back. This is done
* as follows:
*
* - in resumeThread(), the thread calls grabReturnCapability().
* - If no capabilities are readily available, grabReturnCapability()
* increments a counter rts_n_waiting_workers, and blocks
* waiting for the condition returning_worker_cond to become
* signalled.
* - upon entry to the Scheduler, a worker thread checks the
* value of rts_n_waiting_workers. If > 0, the worker thread
* will yield its capability to let a returning worker thread
* proceed with returning its result -- this is done via
* yieldCapability().
* - the worker thread that yielded its capability then tries
* to re-grab a capability and re-enter the Scheduler.
*/
/*
* Function: grabReturnCapability(Capability**)
*
* Purpose: when an OS thread returns from an external call,
* it calls grabReturningCapability() (via Schedule.resumeThread())
* to wait for permissions to enter the RTS & communicate the
* result of the ext. call back to the Haskell thread that
* made it.
*
* Pre-condition: sched_mutex isn't held.
* Post-condition: sched_mutex is held and a capability has
* been assigned to the worker thread.
*/
void
grabReturnCapability(Capability** pCap)
{
IF_DEBUG(scheduler,
sched_belch("thread %d: returning, waiting for sched. lock.\n", osThreadId()));
ACQUIRE_LOCK(&sched_mutex);
rts_n_waiting_workers++;
IF_DEBUG(scheduler,
sched_belch("worker (%d,%d): returning; workers waiting: %d\n",
tok, osThreadId(), rts_n_waiting_workers));
while ( noCapabilities() ) {
waitCondition(&returning_worker_cond, &sched_mutex);
}
grabCapability(pCap);
return;
}
/*
* Function: yieldCapability(Capability**)
*
* Purpose: when, upon entry to the Scheduler, an OS worker thread
* spots that one or more threads are blocked waiting for
* permission to return back their result, it gives up
* its Capability.
*
* Pre-condition: sched_mutex is held and the thread possesses
* a Capability.
* Post-condition: sched_mutex isn't held and the Capability has
* been given back.
*/
void
yieldCapability(Capability* cap)
{
IF_DEBUG(scheduler,
sched_belch("worker thread (%d): giving up RTS token\n", osThreadId()));
releaseCapability(cap);
RELEASE_LOCK(&sched_mutex);
yieldThread();
/* At this point, sched_mutex has been given up & we've
* forced a thread context switch. Guaranteed to be
* enough for the signalled worker thread to race
* ahead?
*/
return;
}
#endif /* RTS_SUPPORTS_THREADS */
#if defined(SMP)
/* Allocate 'n' capabilities */
/*
* Function: initCapabilities_(nat)
*
* Purpose: upon startup, allocate and fill in table
* holding 'n' Capabilities. Only for SMP, since
* it is the only build that supports multiple
* capabilities within the RTS.
*
* Pre-condition: sched_mutex is held.
*
*/
static void
initCapabilities_(nat n)
{
......
......@@ -28,14 +28,17 @@
extern Capability MainCapability;
#endif
extern void initCapabilities(void);
extern void grabCapability(Capability** cap);
extern void grabCapability(Capability** pCap);
extern void releaseCapability(Capability* cap);
#if defined(RTS_SUPPORTS_THREADS)
/* total number of available capabilities */
extern nat rts_n_free_capabilities;
extern nat rts_n_waiting_workers;
extern void grabReturnCapability(Capability** pCap);
extern void yieldCapability(Capability* cap);
static inline nat getFreeCapabilities (void)
{
......
/* ---------------------------------------------------------------------------
* $Id: Schedule.c,v 1.122 2002/02/13 08:48:06 sof Exp $
* $Id: Schedule.c,v 1.123 2002/02/14 07:52:05 sof Exp $
*
* (c) The GHC Team, 1998-2000
*
......@@ -267,21 +267,6 @@ Mutex sched_mutex = INIT_MUTEX_VAR;
Mutex term_mutex = INIT_MUTEX_VAR;
/*
* When a native thread has completed executing an external
* call, it needs to communicate the result back to the
* (Haskell) thread that made the call. Do this as follows:
*
* - in resumeThread(), the thread increments the counter
* rts_n_returning_workers, and then blocks waiting on the
* condition returning_worker_cond.
* - upon entry to the scheduler, a worker/task checks
* rts_n_returning_workers. If it is > 0, worker threads
* are waiting to return, so it gives up its capability
* to let a worker deposit its result.
* - the worker thread that gave up its capability then tries
* to re-grab a capability and re-enter the Scheduler.
*/
/* thread_ready_cond: when signalled, a thread has become runnable for a
......@@ -305,28 +290,13 @@ Condition thread_ready_cond = INIT_COND_VAR;
* to create a new task when making an external call, keep track of
* the number of tasks currently blocked waiting on thread_ready_cond.
* (if > 0 => no need for a new task, just unblock an existing one).
*/
nat rts_n_waiting_tasks = 0;
/* returning_worker_cond: when a worker thread returns from executing an
* external call, it needs to wait for an RTS Capability before passing
* on the result of the call to the Haskell thread that made it.
*
* returning_worker_cond is signalled in Capability.releaseCapability().
*
*/
Condition returning_worker_cond = INIT_COND_VAR;
/*
* To avoid starvation of threads blocked on worker_thread_cond,
* the task(s) that enter the Scheduler will check to see whether
* there are one or more worker threads blocked waiting on
* returning_worker_cond.
*
* Locks needed: sched_mutex
* waitForWork() takes care of keeping it up-to-date; Task.startTask()
* uses its current value.
*/
nat rts_n_waiting_workers = 0;
nat rts_n_waiting_tasks = 0;
static void waitForWork(void);
# if defined(SMP)
static Condition gc_pending_cond = INIT_COND_VAR;
......@@ -456,20 +426,15 @@ schedule_start:
* deposit the result of an external call. So,
* be nice and hand over our capability.
*/
IF_DEBUG(scheduler, sched_belch("worker thread (%d): giving up RTS token (waiting workers: %d)\n", osThreadId(), rts_n_waiting_workers));
releaseCapability(cap);
RELEASE_LOCK(&sched_mutex);
yieldThread();
yieldCapability(cap);
/* Lost our sched_mutex lock, try to re-enter the scheduler. */
goto schedule_start;
}
#endif
#if defined(RTS_SUPPORTS_THREADS)
while ( noCapabilities() ) {
rts_n_waiting_tasks++;
waitCondition(&thread_ready_cond, &sched_mutex);
rts_n_waiting_tasks--;
waitForWork();
}
#endif
......@@ -731,7 +696,6 @@ schedule_start:
if ( EMPTY_RUN_QUEUE() ) {
IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
shutdownHaskellAndExit(0);
}
#endif
ASSERT( !EMPTY_RUN_QUEUE() );
......@@ -761,9 +725,7 @@ schedule_start:
releaseCapability(cap);
while ( noCapabilities() || EMPTY_RUN_QUEUE() ) {
IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
rts_n_waiting_tasks++;
waitCondition( &thread_ready_cond, &sched_mutex );
rts_n_waiting_tasks--;
waitForWork();
IF_DEBUG(scheduler, sched_belch("thread %d: work now available %d %d", osThreadId(), getFreeCapabilities(),EMPTY_RUN_QUEUE()));
}
}
......@@ -1566,21 +1528,10 @@ resumeThread( StgInt tok )
Capability *cap;
#if defined(RTS_SUPPORTS_THREADS)
IF_DEBUG(scheduler, sched_belch("worker %d: returning, waiting for sched. lock.\n", tok));
ACQUIRE_LOCK(&sched_mutex);
rts_n_waiting_workers++;
IF_DEBUG(scheduler, sched_belch("worker %d: returning; workers waiting: %d.\n", tok, rts_n_waiting_workers));
/*
* Wait for the go ahead
*/
IF_DEBUG(scheduler, sched_belch("worker %d: waiting for capability %d...\n", tok, rts_n_free_capabilities));
while ( noCapabilities() ) {
waitCondition(&returning_worker_cond, &sched_mutex);
}
rts_n_waiting_workers--;
IF_DEBUG(scheduler, sched_belch("worker %d: acquired capability...\n", tok));
/* Wait for permission to re-enter the RTS with the result.. */
grabReturnCapability(&cap);
#else
grabCapability(&cap);
#endif
/* Remove the thread off of the suspended list */
......@@ -1597,32 +1548,28 @@ resumeThread( StgInt tok )
barf("resumeThread: thread not found");
}
tso->link = END_TSO_QUEUE;
#if defined(RTS_SUPPORTS_THREADS)
/* Is it clever to block here with the TSO off the list,
* but not hooked up to a capability?
*/
while ( noCapabilities() ) {
IF_DEBUG(scheduler, sched_belch("waiting to resume"));
rts_n_waiting_tasks++;
waitCondition(&thread_ready_cond, &sched_mutex);
rts_n_waiting_tasks--;
IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
}
#endif
grabCapability(&cap);
RELEASE_LOCK(&sched_mutex);
/* Reset blocking status */
tso->why_blocked = NotBlocked;
cap->r.rCurrentTSO = tso;
RELEASE_LOCK(&sched_mutex);
cap->r.rCurrentTSO = tso;
return &cap->r;
}
#if defined(RTS_SUPPORTS_THREADS)
static void
waitForWork()
{
rts_n_waiting_tasks++;
waitCondition(&thread_ready_cond, &sched_mutex);
rts_n_waiting_tasks--;
return;
}
#endif
/* ---------------------------------------------------------------------------
* Static functions
* ------------------------------------------------------------------------ */
......@@ -2024,7 +1971,6 @@ initScheduler(void)
initMutex(&term_mutex);
initCondition(&thread_ready_cond);
initCondition(&returning_worker_cond);
#endif
#if defined(SMP)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment