Commit 6d7576ef authored by sof's avatar sof
Browse files

[project @ 2002-02-15 07:50:36 by sof]

Tighten up the Scheduler synchronisation story some more:

- moved thread_ready_cond + the counter rts_n_waiting_tasks
  to Capability.c, leaving only sched_mutex as a synchro
  variable in Scheduler (the less stuff that inhabit
  Schedule.c, the better, methinks.)
- upon entry to the Scheduler, a worker thread will now call
  Capability.yieldToReturningWorker() to check whether it
  needs to give up its capability.
- Worker threads that are either idle or lack a capability,
  will now call Capability.waitForWorkCapability() and block.
parent 22da500c
......@@ -17,9 +17,10 @@
* --------------------------------------------------------------------------*/
#include "PosixSource.h"
#include "Rts.h"
#include "Schedule.h"
#include "RtsUtils.h"
#include "OSThreads.h"
#include "Capability.h"
#include "Schedule.h" /* to get at EMPTY_RUN_QUEUE() */
#if !defined(SMP)
Capability MainCapability; /* for non-SMP, we have one global capability */
......@@ -42,12 +43,40 @@ Condition returning_worker_cond = INIT_COND_VAR;
* the task(s) that enter the Scheduler will check to see whether
* there are one or more worker threads blocked waiting on
* returning_worker_cond.
*/
static nat rts_n_waiting_workers = 0;
/* thread_ready_cond: when signalled, a thread has become runnable for a
* task to execute.
*
* In the non-SMP case, it also implies that the thread that is woken up has
* exclusive access to the RTS and all its data structures (that are not
* locked by the Scheduler's mutex).
*
* thread_ready_cond is signalled whenever COND_NO_THREADS_READY doesn't hold.
*
* Locks needed: sched_mutex
*/
nat rts_n_waiting_workers = 0;
Condition thread_ready_cond = INIT_COND_VAR;
#if 0
/* For documentation purposes only */
#define COND_NO_THREADS_READY() (noCapabilities() || EMPTY_RUN_QUEUE())
#endif
/*
* To be able to make an informed decision about whether or not
* to create a new task when making an external call, keep track of
* the number of tasks currently blocked waiting on thread_ready_cond.
* (if > 0 => no need for a new task, just unblock an existing one).
*
* waitForWork() takes care of keeping it up-to-date; Task.startTask()
* uses its current value.
*/
nat rts_n_waiting_tasks = 0;
#endif
/* -----------------------------------------------------------------------------
Initialisation
-------------------------------------------------------------------------- */
static
void
initCapability( Capability *cap )
......@@ -76,6 +105,7 @@ initCapabilities()
{
#if defined(RTS_SUPPORTS_THREADS)
initCondition(&returning_worker_cond);
initCondition(&thread_ready_cond);
#endif
#if defined(SMP)
......@@ -88,13 +118,15 @@ initCapabilities()
return;
}
/* Free capability list.
* Locks required: sched_mutex.
*/
#if defined(SMP)
/* Free capability list. */
static Capability *free_capabilities; /* Available capabilities for running threads */
#endif
/* -----------------------------------------------------------------------------
Acquiring capabilities
-------------------------------------------------------------------------- */
/*
* Function: grabCapability(Capability**)
*
......@@ -102,12 +134,9 @@ static Capability *free_capabilities; /* Available capabilities for running thre
* remove one from the free capabilities list (which
* may just have one entry). In threaded builds, worker
* threads are prevented from doing so willy-nilly
* through the use of the sched_mutex lock along with
* condition variables thread_ready_cond and
* via the condition variables thread_ready_cond and
* returning_worker_cond.
*
* Pre-condition: sched_mutex is held (in threaded builds only).
*
*/
void grabCapability(Capability** cap)
{
......@@ -124,10 +153,10 @@ void grabCapability(Capability** cap)
/*
* Function: releaseCapability(Capability*)
*
* Purpose: Letting go of a capability.
* Purpose: Letting go of a capability. Causes a
* 'returning worker' thread or a 'waiting worker'
* to wake up, in that order.
*
* Pre-condition: sched_mutex is assumed held by current thread.
* Post-condition:
*/
void releaseCapability(Capability* cap
#if !defined(SMP)
......@@ -176,7 +205,7 @@ void releaseCapability(Capability* cap
* value of rts_n_waiting_workers. If > 0, the worker thread
* will yield its capability to let a returning worker thread
* proceed with returning its result -- this is done via
* yieldCapability().
* yieldToReturningWorker().
* - the worker thread that yielded its capability then tries
* to re-grab a capability and re-enter the Scheduler.
*/
......@@ -190,57 +219,91 @@ void releaseCapability(Capability* cap
* result of the ext. call back to the Haskell thread that
* made it.
*
* Pre-condition: sched_mutex isn't held.
* Post-condition: sched_mutex is held and a capability has
* Pre-condition: pMutex isn't held.
* Post-condition: pMutex is held and a capability has
* been assigned to the worker thread.
*/
void
grabReturnCapability(Capability** pCap)
grabReturnCapability(Mutex* pMutex, Capability** pCap)
{
IF_DEBUG(scheduler,
fprintf(stderr,"worker (%ld): returning, waiting for sched. lock.\n", osThreadId()));
ACQUIRE_LOCK(&sched_mutex);
fprintf(stderr,"worker (%ld): returning, waiting for lock.\n", osThreadId()));
ACQUIRE_LOCK(pMutex);
rts_n_waiting_workers++;
IF_DEBUG(scheduler,
fprintf(stderr,"worker (%ld): returning; workers waiting: %d\n",
osThreadId(), rts_n_waiting_workers));
while ( noCapabilities() ) {
waitCondition(&returning_worker_cond, &sched_mutex);
waitCondition(&returning_worker_cond, pMutex);
}
grabCapability(pCap);
return;
}
/* -----------------------------------------------------------------------------
Yielding/waiting for capabilities
-------------------------------------------------------------------------- */
/*
* Function: yieldCapability(Capability**)
* Function: yieldToReturningWorker(Mutex*,Capability*)
*
* Purpose: when, upon entry to the Scheduler, an OS worker thread
* spots that one or more threads are blocked waiting for
* permission to return back their result, it gives up
* its Capability.
*
* Pre-condition: sched_mutex is held and the thread possesses
* Pre-condition: pMutex is assumed held and the thread possesses
* a Capability.
* Post-condition: sched_mutex isn't held and the Capability has
* Post-condition: pMutex isn't held and the Capability has
* been given back.
*/
void
yieldCapability(Capability* cap)
yieldToReturningWorker(Mutex* pMutex, Capability* cap)
{
if ( rts_n_waiting_workers > 0 && noCapabilities() ) {
IF_DEBUG(scheduler,
fprintf(stderr,"worker thread (%ld): giving up RTS token\n", osThreadId()));
releaseCapability(cap);
RELEASE_LOCK(&sched_mutex);
RELEASE_LOCK(pMutex);
yieldThread();
/* At this point, sched_mutex has been given up & we've
/* At this point, pMutex has been given up & we've
* forced a thread context switch. Guaranteed to be
* enough for the signalled worker thread to race
* ahead?
* ahead of us?
*/
return;
/* Re-grab the mutex */
ACQUIRE_LOCK(pMutex);
}
return;
}
/*
* Function: waitForWorkCapability(Mutex*, Capability**, rtsBool)
*
* Purpose: wait for a Capability to become available. In
* the process of doing so, updates the number
* of tasks currently blocked waiting for a capability/more
* work. That counter is used when deciding whether or
* not to create a new worker thread when an external
* call is made.
*
* Pre-condition: pMutex is held.
*/
void
waitForWorkCapability(Mutex* pMutex, Capability** pCap, rtsBool runnable)
{
while ( noCapabilities() || (runnable && EMPTY_RUN_QUEUE()) ) {
rts_n_waiting_tasks++;
waitCondition(&thread_ready_cond, pMutex);
rts_n_waiting_tasks--;
}
grabCapability(pCap);
return;
}
#endif /* RTS_SUPPORTS_THREADS */
#if defined(SMP)
......@@ -251,9 +314,6 @@ yieldCapability(Capability* cap)
* holding 'n' Capabilities. Only for SMP, since
* it is the only build that supports multiple
* capabilities within the RTS.
*
* Pre-condition: sched_mutex is held.
*
*/
static void
initCapabilities_(nat n)
......
......@@ -37,8 +37,9 @@ extern void releaseCapability(Capability* cap);
extern nat rts_n_free_capabilities;
extern nat rts_n_waiting_workers;
extern void grabReturnCapability(Capability** pCap);
extern void yieldCapability(Capability* cap);
extern void grabReturnCapability(Mutex* pMutex, Capability** pCap);
extern void yieldToReturningWorker(Mutex* pMutex, Capability* cap);
extern void waitForWorkCapability(Mutex* pMutex, Capability** pCap, rtsBool runnable);
static inline nat getFreeCapabilities (void)
{
......
/* ---------------------------------------------------------------------------
* $Id: Schedule.c,v 1.123 2002/02/14 07:52:05 sof Exp $
* $Id: Schedule.c,v 1.124 2002/02/15 07:50:36 sof Exp $
*
* (c) The GHC Team, 1998-2000
*
......@@ -266,38 +266,6 @@ static void sched_belch(char *s, ...);
Mutex sched_mutex = INIT_MUTEX_VAR;
Mutex term_mutex = INIT_MUTEX_VAR;
/* thread_ready_cond: when signalled, a thread has become runnable for a
* task to execute.
*
* In the non-SMP case, it also implies that the thread that is woken up has
* exclusive access to the RTS and all its data structures (that are not
* under sched_mutex's control).
*
* thread_ready_cond is signalled whenever COND_NO_THREADS_READY doesn't hold.
*
*/
Condition thread_ready_cond = INIT_COND_VAR;
#if 0
/* For documentation purposes only */
#define COND_NO_THREADS_READY() (noCapabilities() || EMPTY_RUN_QUEUE())
#endif
/*
* To be able to make an informed decision about whether or not
* to create a new task when making an external call, keep track of
* the number of tasks currently blocked waiting on thread_ready_cond.
* (if > 0 => no need for a new task, just unblock an existing one).
*
* waitForWork() takes care of keeping it up-to-date; Task.startTask()
* uses its current value.
*/
nat rts_n_waiting_tasks = 0;
static void waitForWork(void);
# if defined(SMP)
static Condition gc_pending_cond = INIT_COND_VAR;
nat await_death;
......@@ -410,36 +378,19 @@ schedule( void )
# endif
#endif
rtsBool was_interrupted = rtsFalse;
#if defined(RTS_SUPPORTS_THREADS)
schedule_start:
#endif
#if defined(RTS_SUPPORTS_THREADS)
ACQUIRE_LOCK(&sched_mutex);
#endif
#if defined(RTS_SUPPORTS_THREADS)
/* ToDo: consider SMP support */
if ( rts_n_waiting_workers > 0 && noCapabilities() ) {
/* (At least) one native thread is waiting to
* deposit the result of an external call. So,
* be nice and hand over our capability.
*/
yieldCapability(cap);
/* Lost our sched_mutex lock, try to re-enter the scheduler. */
goto schedule_start;
}
#endif
/* Check to see whether there are any worker threads
waiting to deposit external call results. If so,
yield our capability */
yieldToReturningWorker(&sched_mutex, cap);
#if defined(RTS_SUPPORTS_THREADS)
while ( noCapabilities() ) {
waitForWork();
}
waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
#endif
#if defined(GRAN)
/* set up first event to get things going */
/* ToDo: assign costs for system setup and init MainTSO ! */
new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
......@@ -723,16 +674,19 @@ schedule_start:
if ( EMPTY_RUN_QUEUE() ) {
/* Give up our capability */
releaseCapability(cap);
while ( noCapabilities() || EMPTY_RUN_QUEUE() ) {
IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
waitForWork();
IF_DEBUG(scheduler, sched_belch("thread %d: work now available %d %d", osThreadId(), getFreeCapabilities(),EMPTY_RUN_QUEUE()));
IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
#if 0
while ( EMPTY_RUN_QUEUE() ) {
waitForWorkCapability(&sched_mutex, &cap);
IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
}
#endif
}
#endif
#if defined(GRAN)
if (RtsFlags.GranFlags.Light)
GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
......@@ -978,7 +932,7 @@ schedule_start:
belch("--=^ %d threads, %d sparks on [%#x]",
run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
#if 1
# if 1
if (0 && RtsFlags.ParFlags.ParStats.Full &&
t && LastTSO && t->id != LastTSO->id &&
LastTSO->why_blocked == NotBlocked &&
......@@ -1003,7 +957,7 @@ schedule_start:
emitSchedule = rtsFalse;
}
#endif
# endif
#else /* !GRAN && !PAR */
/* grab a thread from the run queue */
......@@ -1377,12 +1331,11 @@ schedule_start:
}
#endif
if (ready_to_gc
#ifdef SMP
if (ready_to_gc && allFreeCapabilities() )
#else
if (ready_to_gc)
&& allFreeCapabilities()
#endif
{
) {
/* everybody back, start the GC.
* Could do it in this thread, or signal a condition var
* to do it in another thread. Either way, we need to
......@@ -1512,7 +1465,7 @@ suspendThread( StgRegTable *reg )
for one (i.e., if there's only one Concurrent Haskell thread alive,
there's no need to create a new task).
*/
IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS\n", tok));
IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok));
startTask(taskStart);
#endif
......@@ -1528,8 +1481,8 @@ resumeThread( StgInt tok )
Capability *cap;
#if defined(RTS_SUPPORTS_THREADS)
/* Wait for permission to re-enter the RTS with the result.. */
grabReturnCapability(&cap);
/* Wait for permission to re-enter the RTS with the result. */
grabReturnCapability(&sched_mutex, &cap);
#else
grabCapability(&cap);
#endif
......@@ -1558,18 +1511,6 @@ resumeThread( StgInt tok )
}
#if defined(RTS_SUPPORTS_THREADS)
static void
waitForWork()
{
rts_n_waiting_tasks++;
waitCondition(&thread_ready_cond, &sched_mutex);
rts_n_waiting_tasks--;
return;
}
#endif
/* ---------------------------------------------------------------------------
* Static functions
* ------------------------------------------------------------------------ */
......@@ -1870,10 +1811,13 @@ activateSpark (rtsSpark spark)
* on this thread's stack before the scheduler is invoked.
* ------------------------------------------------------------------------ */
static void scheduleThread_ (StgTSO* tso, rtsBool createTask);
void
scheduleThread_(StgTSO *tso
#if defined(THREADED_RTS)
, rtsBool createTask
#if !defined(THREADED_RTS)
STG_UNUSED
#endif
)
{
......@@ -1903,11 +1847,12 @@ scheduleThread_(StgTSO *tso
void scheduleThread(StgTSO* tso)
{
#if defined(THREADED_RTS)
return scheduleThread_(tso, rtsFalse);
}
void scheduleExtThread(StgTSO* tso)
{
return scheduleThread_(tso, rtsTrue);
#else
return scheduleThread_(tso);
#endif
}
/* ---------------------------------------------------------------------------
......@@ -3688,7 +3633,6 @@ sched_belch(char *s, ...)
//@subsection Index
//@index
//* MainRegTable:: @cindex\s-+MainRegTable
//* StgMainThread:: @cindex\s-+StgMainThread
//* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue
//* blocked_queue_hd:: @cindex\s-+blocked_queue_hd
......@@ -3706,5 +3650,4 @@ sched_belch(char *s, ...)
//* schedule:: @cindex\s-+schedule
//* take_off_run_queue:: @cindex\s-+take_off_run_queue
//* term_mutex:: @cindex\s-+term_mutex
//* thread_ready_cond:: @cindex\s-+thread_ready_cond
//@end index
/* -----------------------------------------------------------------------------
* $Id: Schedule.h,v 1.28 2002/02/13 08:48:07 sof Exp $
* $Id: Schedule.h,v 1.29 2002/02/15 07:50:37 sof Exp $
*
* (c) The GHC Team 1998-1999
*
......@@ -164,11 +164,6 @@ extern SchedulerStatus waitThread_(StgTSO *tso,
, rtsBool blockWaiting
#endif
);
extern void scheduleThread_(StgTSO *tso
#if defined(THREADED_RTS)
, rtsBool createTask
#endif
);
extern SchedulerStatus rts_mainEvalIO(HaskellObj p, /*out*/HaskellObj *ret);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment