Commit 324e96d2 authored by wolfgang's avatar wolfgang
Browse files

[project @ 2003-10-01 10:49:07 by wolfgang]

Threaded RTS:
Don't start new worker threads earlier than necessary.
After this commit, a Haskell program that uses neither forkOS nor forkIO is
really single-threaded (rather than using two OS threads internally).

Some details:
Worker threads are now only created when a capability is released, and
only when
(there are no worker threads)
	&& (there are runnable Haskell threads ||
	    there are Haskell threads blocked on IO or threadDelay)
awaitEvent can now be called from bound thread scheduling loops
(so that we don't have to create a worker thread just to run awaitEvent)
parent aefc6956
......@@ -199,6 +199,8 @@ void releaseCapability(Capability* cap
#endif
/* Signal that a capability is available */
signalCondition(&thread_ready_cond);
startSchedulerTaskIfNecessary(); // if there is more work to be done,
// we'll need a new thread
}
#endif
#ifdef RTS_SUPPORTS_THREADS
......@@ -324,6 +326,7 @@ yieldToReturningWorker(Mutex* pMutex, Capability** pCap, Condition* pThreadCond)
*/
static Condition *passTarget = NULL;
static rtsBool passingCapability = rtsFalse;
void
waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition* pThreadCond)
......@@ -334,8 +337,7 @@ waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition* pThreadCond)
IF_DEBUG(scheduler,
fprintf(stderr,"worker thread (%p): wait for cap (cond: %p)\n",
osThreadId(),pThreadCond));
while ( noCapabilities() || (pThreadCond && passTarget != pThreadCond)
|| (!pThreadCond && passTarget)) {
while ( noCapabilities() || (passingCapability && passTarget != pThreadCond)) {
if(pThreadCond)
{
waitCondition(pThreadCond, pMutex);
......@@ -353,7 +355,7 @@ waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition* pThreadCond)
osThreadId()));
}
}
passTarget = NULL;
passingCapability = rtsFalse;
grabCapability(pCap);
return;
}
......@@ -378,11 +380,40 @@ passCapability(Mutex* pMutex, Capability* cap, Condition *pTargetThreadCond)
rts_n_free_capabilities = 1;
signalCondition(pTargetThreadCond);
passTarget = pTargetThreadCond;
passingCapability = rtsTrue;
IF_DEBUG(scheduler,
fprintf(stderr,"worker thread (%p): passCapability\n",
osThreadId()));
}
/*
* Function: passCapabilityToWorker(Mutex*, Capability*)
*
* Purpose: Let go of the capability and make sure that a
* "plain" worker thread (not a bound thread) gets it next.
*
* Pre-condition: pMutex is held and cap is held by the current thread
* Post-condition: pMutex is held; cap will be grabbed by the "target"
* thread when pMutex is released.
*/
void
passCapabilityToWorker(Mutex* pMutex, Capability* cap)
{
#ifdef SMP
#error SMP version not implemented
#endif
rts_n_free_capabilities = 1;
signalCondition(&thread_ready_cond);
startSchedulerTaskIfNecessary();
passTarget = NULL;
passingCapability = rtsTrue;
IF_DEBUG(scheduler,
fprintf(stderr,"worker thread (%p): passCapabilityToWorker\n",
osThreadId()));
}
#endif /* RTS_SUPPORTS_THREADS */
......
......@@ -42,6 +42,7 @@ extern void grabReturnCapability(Mutex* pMutex, Capability** pCap);
extern void yieldToReturningWorker(Mutex* pMutex, Capability** pCap, Condition *pThreadCond);
extern void waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition *pThreadCond);
extern void passCapability(Mutex* pMutex, Capability* cap, Condition *pTargetThreadCond);
extern void passCapabilityToWorker(Mutex* pMutex, Capability* cap);
static inline rtsBool needToYieldToReturningWorker(void)
{
......
/* ----------------------------------------------------------------------------
* $Id: RtsAPI.c,v 1.48 2003/10/01 10:36:49 wolfgang Exp $
* $Id: RtsAPI.c,v 1.49 2003/10/01 10:49:07 wolfgang Exp $
*
* (c) The GHC Team, 1998-2001
*
......@@ -501,12 +501,6 @@ rts_lock()
// b) wake the current worker thread from awaitEvent()
// (so that a thread started by rts_eval* will start immediately)
grabReturnCapability(&sched_mutex,&rtsApiCapability);
// In the RTS hasn't been entered yet,
// start a RTS task.
// If there is already a task available (waiting for the work capability),
// this will do nothing.
startSchedulerTask();
#endif
}
......
/* ---------------------------------------------------------------------------
* $Id: Schedule.c,v 1.175 2003/09/26 13:32:14 panne Exp $
* $Id: Schedule.c,v 1.176 2003/10/01 10:49:08 wolfgang Exp $
*
* (c) The GHC Team, 1998-2000
*
......@@ -313,20 +313,38 @@ StgTSO * activateSpark (rtsSpark spark);
StgTSO *MainTSO;
*/
#if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
#if defined(RTS_SUPPORTS_THREADS)
static rtsBool startingWorkerThread = rtsFalse;
static void taskStart(void);
static void
taskStart(void)
{
schedule(NULL,NULL);
Capability *cap;
ACQUIRE_LOCK(&sched_mutex);
startingWorkerThread = rtsFalse;
waitForWorkCapability(&sched_mutex, &cap, NULL);
RELEASE_LOCK(&sched_mutex);
schedule(NULL,cap);
}
#endif
#if defined(RTS_SUPPORTS_THREADS)
void
startSchedulerTask(void)
startSchedulerTaskIfNecessary(void)
{
startTask(taskStart);
if(run_queue_hd != END_TSO_QUEUE
|| blocked_queue_hd != END_TSO_QUEUE
|| sleeping_queue != END_TSO_QUEUE)
{
if(!startingWorkerThread)
{ // we don't want to start another worker thread
// just because the last one hasn't yet reached the
// "waiting for capability" state
startingWorkerThread = rtsTrue;
startTask(taskStart);
}
}
}
#endif
......@@ -475,7 +493,6 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
// so just exit right away.
prog_belch("interrupted");
releaseCapability(cap);
startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit
RELEASE_LOCK(&sched_mutex);
shutdownHaskellAndExit(EXIT_SUCCESS);
#else
......@@ -1151,7 +1168,7 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
// no, the current native thread is bound to a different
// Haskell thread, so pass it to any worker thread
PUSH_ON_RUN_QUEUE(t);
releaseCapability(cap);
passCapabilityToWorker(&sched_mutex, cap);
cap = NULL;
continue;
}
......@@ -1830,9 +1847,6 @@ suspendThread( StgRegTable *reg,
waiting to take over.
*/
IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): leaving RTS", tok, osThreadId()));
//if (concCall) { // implementing "safe" as opposed to "threadsafe" is more difficult
startTask(taskStart);
//}
#endif
/* Other threads _might_ be available for execution; signal this */
......@@ -2245,9 +2259,10 @@ scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *initialCap
m->ret = ret;
m->stat = NoStatus;
#if defined(RTS_SUPPORTS_THREADS)
initCondition(&m->wakeup);
#if defined(THREADED_RTS)
initCondition(&m->bound_thread_cond);
#else
initCondition(&m->wakeup);
#endif
#endif
......@@ -2459,9 +2474,10 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret, Capability *initialCapability)
m->ret = ret;
m->stat = NoStatus;
#if defined(RTS_SUPPORTS_THREADS)
initCondition(&m->wakeup);
#if defined(THREADED_RTS)
initCondition(&m->bound_thread_cond);
#else
initCondition(&m->wakeup);
#endif
#endif
......@@ -2512,9 +2528,10 @@ waitThread_(StgMainThread* m, Capability *initialCapability)
stat = m->stat;
#if defined(RTS_SUPPORTS_THREADS)
closeCondition(&m->wakeup);
#if defined(THREADED_RTS)
closeCondition(&m->bound_thread_cond);
#else
closeCondition(&m->wakeup);
#endif
#endif
......@@ -3498,7 +3515,11 @@ deleteThreadImmediately(StgTSO *tso)
if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
return;
}
unblockThread(tso);
#if defined(RTS_SUPPORTS_THREADS)
if (tso->why_blocked != BlockedOnCCall
&& tso->why_blocked != BlockedOnCCall_NoUnblockExc)
#endif
unblockThread(tso);
tso->what_next = ThreadKilled;
}
#endif
......
/* -----------------------------------------------------------------------------
* $Id: Schedule.h,v 1.39 2003/09/21 22:20:56 wolfgang Exp $
* $Id: Schedule.h,v 1.40 2003/10/01 10:49:09 wolfgang Exp $
*
* (c) The GHC Team 1998-1999
*
......@@ -190,9 +190,10 @@ typedef struct StgMainThread_ {
SchedulerStatus stat;
StgClosure ** ret;
#if defined(RTS_SUPPORTS_THREADS)
Condition wakeup;
#if defined(THREADED_RTS)
Condition bound_thread_cond;
#else
Condition wakeup;
#endif
#endif
struct StgMainThread_ *link;
......@@ -297,12 +298,12 @@ void labelThread(StgPtr tso, char *label);
#if defined(RTS_SUPPORTS_THREADS)
/* If no task is waiting for a capability,
* and if there is work to be done
* or if we need to wait for IO or delay requests,
* spawn a new worker thread.
*
* (Used by the RtsAPI)
*/
void
startSchedulerTask(void);
startSchedulerTaskIfNecessary(void);
#endif
#endif /* __SCHEDULE_H__ */
/* -----------------------------------------------------------------------------
* $Id: Select.c,v 1.29 2003/06/26 12:22:59 stolz Exp $
* $Id: Select.c,v 1.30 2003/10/01 10:49:09 wolfgang Exp $
*
* (c) The GHC Team 1995-2002
*
......@@ -351,4 +351,20 @@ wakeBlockedWorkerThread()
workerWakeupPending = rtsTrue;
}
}
/* resetWorkerWakeupPipeAfterFork
*
* To be called right after a fork().
* After the fork(), the worker wakeup pipe will be shared
* with the parent process, and that's something we don't want.
*/
void
resetWorkerWakeupPipeAfterFork()
{
if(workerWakeupInited) {
close(workerWakeupPipe[0]);
close(workerWakeupPipe[1]);
}
workerWakeupInited = rtsFalse;
}
#endif
......@@ -134,6 +134,12 @@ stopTaskManager ()
return;
}
void
resetTaskManagerAfterFork ()
{
barf("resetTaskManagerAfterFork not implemented for SMP");
}
#else
/************ THREADS version *****************/
......@@ -192,6 +198,13 @@ stopTaskManager ()
{
}
void
resetTaskManagerAfterFork ()
{
rts_n_waiting_tasks = 0;
taskCount = 0;
}
#endif
......
......@@ -28,6 +28,7 @@ extern TaskInfo *taskIds;
extern void startTaskManager ( nat maxTasks, void (*taskStart)(void) );
extern void stopTaskManager ( void );
void resetTaskManagerAfterFork ();
extern void startTask ( void (*taskStart)(void) );
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment