Commit bb01a96b authored by simonmar's avatar simonmar
Browse files

[project @ 2004-10-14 14:58:37 by simonmar]

Threaded RTS improvements:

 - Unix only: implement waitRead#, waitWrite# and delay# in Haskell,
   by having a single Haskell thread (the IO manager) performing a blocking
   select() operation.  Threads communicate with the IO manager
   via channels.  This is faster than doing the select() in the RTS,
   because we only restart the select() when a new request arrives,
   rather than each time around the scheduler.

   On Windows we just make blocking IO calls, we don't have a fancy IO
   manager (yet).

 - Simplify the scheduler for the threaded RTS, now that we don't have
   to wait for IO in the scheduler loop.

 - Remove detectBlackHoles(), which isn't used now (not sure how long
   this has been unused for... perhaps it was needed back when main threads
   used to be GC roots, so we had to check for blackholes manually rather
   than relying on the GC.)

Signals aren't quite right in the threaded RTS.  In fact, they're
slightly worse than before, because the thread receiving signals might
be blocked in a C call - previously there always be another thread
stuck in awaitEvent() that would notice the signal, but that's not
true now.  I can't see an easy fix yet.
parent 61b570fd
......@@ -56,8 +56,8 @@ nat rts_n_waiting_workers = 0;
* exclusive access to the RTS and all its data structures (that are not
* locked by the Scheduler's mutex).
*
* thread_ready_cond is signalled whenever noCapabilities doesn't hold.
*
* thread_ready_cond is signalled whenever
* !noCapabilities && !EMPTY_RUN_QUEUE().
*/
Condition thread_ready_cond = INIT_COND_VAR;
......@@ -82,6 +82,12 @@ static rtsBool passingCapability = rtsFalse;
#define UNUSED_IF_NOT_SMP STG_UNUSED
#endif
#if defined(RTS_USER_SIGNALS)
#define ANY_WORK_TO_DO() (!EMPTY_RUN_QUEUE() || interrupted || signals_pending())
#else
#define ANY_WORK_TO_DO() (!EMPTY_RUN_QUEUE() || interrupted)
#endif
/* ----------------------------------------------------------------------------
Initialisation
------------------------------------------------------------------------- */
......@@ -211,7 +217,7 @@ releaseCapability( Capability* cap UNUSED_IF_NOT_SMP )
rts_n_free_capabilities = 1;
#endif
// Signal that a capability is available
if (rts_n_waiting_tasks > 0) {
if (rts_n_waiting_tasks > 0 && ANY_WORK_TO_DO()) {
signalCondition(&thread_ready_cond);
}
startSchedulerTaskIfNecessary();
......@@ -263,7 +269,6 @@ waitForReturnCapability( Mutex* pMutex, Capability** pCap )
if ( noCapabilities() || passingCapability ) {
rts_n_waiting_workers++;
wakeBlockedWorkerThread();
context_switch = 1; // make sure it's our turn soon
waitCondition(&returning_worker_cond, pMutex);
#if defined(SMP)
......@@ -294,8 +299,16 @@ yieldCapability( Capability** pCap )
// Pre-condition: pMutex is assumed held, the current thread
// holds the capability pointed to by pCap.
if ( rts_n_waiting_workers > 0 || passingCapability ) {
IF_DEBUG(scheduler, sched_belch("worker: giving up capability"));
if ( rts_n_waiting_workers > 0 || passingCapability || !ANY_WORK_TO_DO()) {
IF_DEBUG(scheduler,
if (rts_n_waiting_workers > 0) {
sched_belch("worker: giving up capability (returning wkr)");
} else if (passingCapability) {
sched_belch("worker: giving up capability (passing capability)");
} else {
sched_belch("worker: giving up capability (no threads to run)");
}
);
releaseCapability(*pCap);
*pCap = NULL;
}
......@@ -324,13 +337,14 @@ yieldCapability( Capability** pCap )
* passed to this thread using passCapability.
* ------------------------------------------------------------------------- */
void
void
waitForCapability( Mutex* pMutex, Capability** pCap, Condition* pThreadCond )
{
// Pre-condition: pMutex is held.
while ( noCapabilities() ||
(passingCapability && passTarget != pThreadCond)) {
while ( noCapabilities() ||
(passingCapability && passTarget != pThreadCond) ||
!ANY_WORK_TO_DO()) {
IF_DEBUG(scheduler,
sched_belch("worker: wait for capability (cond: %p)",
pThreadCond));
......@@ -384,6 +398,27 @@ passCapabilityToWorker( void )
#endif /* RTS_SUPPORTS_THREADS */
/* ----------------------------------------------------------------------------
threadRunnable()
Signals that a thread has been placed on the run queue, so a worker
might need to be woken up to run it.
ToDo: should check whether the thread at the front of the queue is
bound, and if so wake up the appropriate worker.
-------------------------------------------------------------------------- */
void
threadRunnable ( void )
{
#if defined(RTS_SUPPORTS_THREADS)
if ( !noCapabilities && ANY_WORK_TO_DO() && rts_n_waiting_tasks > 0 ) {
signalCondition(&thread_ready_cond);
}
startSchedulerTaskIfNecessary();
#endif
}
/* ------------------------------------------------------------------------- */
#if defined(SMP)
......
......@@ -31,6 +31,10 @@ extern void initCapabilities( void );
//
extern void releaseCapability( Capability* cap );
// Signal that a thread has become runnable
//
extern void threadRunnable ( void );
#ifdef RTS_SUPPORTS_THREADS
// Gives up the current capability IFF there is a higher-priority
// thread waiting for it. This happens in one of two ways:
......
......@@ -1342,6 +1342,10 @@ mkApUpd0zh_fast
waitReadzh_fast
{
/* args: R1 */
#ifdef THREADED_RTS
foreign "C" barf("waitRead# on threaded RTS");
#endif
ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16;
StgTSO_block_info(CurrentTSO) = R1;
......@@ -1354,6 +1358,10 @@ waitReadzh_fast
waitWritezh_fast
{
/* args: R1 */
#ifdef THREADED_RTS
foreign "C" barf("waitWrite# on threaded RTS");
#endif
ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16;
StgTSO_block_info(CurrentTSO) = R1;
......@@ -1374,6 +1382,10 @@ delayzh_fast
W_ t, prev, target;
#endif
#ifdef THREADED_RTS
foreign "C" barf("delay# on threaded RTS");
#endif
/* args: R1 (microsecond delay amount) */
ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
StgTSO_why_blocked(CurrentTSO) = BlockedOnDelay::I16;
......@@ -1432,6 +1444,10 @@ asyncReadzh_fast
W_ ares;
CInt reqID;
#ifdef THREADED_RTS
foreign "C" barf("asyncRead# on threaded RTS");
#endif
/* args: R1 = fd, R2 = isSock, R3 = len, R4 = buf */
ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16;
......@@ -1454,6 +1470,10 @@ asyncWritezh_fast
W_ ares;
CInt reqID;
#ifdef THREADED_RTS
foreign "C" barf("asyncWrite# on threaded RTS");
#endif
/* args: R1 = fd, R2 = isSock, R3 = len, R4 = buf */
ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16;
......
......@@ -453,13 +453,13 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
// run queue is empty, and there are no other tasks running, we
// can wait indefinitely for something to happen.
//
if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue)
if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
{
#if defined(RTS_SUPPORTS_THREADS)
|| EMPTY_RUN_QUEUE()
// We shouldn't be here...
barf("schedule: awaitEvent() in threaded RTS");
#endif
)
{
awaitEvent( EMPTY_RUN_QUEUE() );
awaitEvent( EMPTY_RUN_QUEUE() );
}
// we can be interrupted while waiting for I/O...
if (interrupted) continue;
......@@ -479,18 +479,13 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
if ( EMPTY_THREAD_QUEUES() )
{
IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
// Garbage collection can release some new threads due to
// either (a) finalizers or (b) threads resurrected because
// they are about to be send BlockedOnDeadMVar. Any threads
// thus released will be immediately runnable.
// they are unreachable and will therefore be sent an
// exception. Any threads thus released will be immediately
// runnable.
GarbageCollect(GetRoots,rtsTrue);
if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
IF_DEBUG(scheduler,
sched_belch("still deadlocked, checking for black holes..."));
detectBlackHoles();
if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
#if defined(RTS_USER_SIGNALS)
......@@ -1457,12 +1452,6 @@ forkProcess(HsStablePtr *entry
stgFree(m);
}
# ifdef RTS_SUPPORTS_THREADS
resetTaskManagerAfterFork(); // tell startTask() and friends that
startingWorkerThread = rtsFalse; // we have no worker threads any more
resetWorkerWakeupPipeAfterFork();
# endif
rc = rts_evalStableIO(entry, NULL); // run the action
rts_checkSchedStatus("forkProcess",rc);
......@@ -1568,8 +1557,6 @@ suspendThread( StgRegTable *reg )
IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
#endif
/* Other threads _might_ be available for execution; signal this */
THREAD_RUNNABLE();
RELEASE_LOCK(&sched_mutex);
errno = saved_errno;
......@@ -1933,11 +1920,10 @@ static void scheduleThread_ (StgTSO* tso);
void
scheduleThread_(StgTSO *tso)
{
// Precondition: sched_mutex must be held.
// The thread goes at the *end* of the run-queue, to avoid possible
// starvation of any threads already on the queue.
APPEND_TO_RUN_QUEUE(tso);
THREAD_RUNNABLE();
threadRunnable();
}
void
......@@ -1997,7 +1983,7 @@ scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
APPEND_TO_RUN_QUEUE(tso);
// NB. Don't call THREAD_RUNNABLE() here, because the thread is
// NB. Don't call threadRunnable() here, because the thread is
// bound and only runnable by *this* OS thread, so waking up other
// workers will just slow things down.
......@@ -2428,7 +2414,7 @@ unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
next = bqe->link;
((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
APPEND_TO_RUN_QUEUE((StgTSO *)bqe);
THREAD_RUNNABLE();
threadRunnable();
unblockCount(bqe, node);
/* reset blocking status after dumping event */
((StgTSO *)bqe)->why_blocked = NotBlocked;
......@@ -2473,7 +2459,7 @@ unblockOneLocked(StgTSO *tso)
next = tso->link;
tso->link = END_TSO_QUEUE;
APPEND_TO_RUN_QUEUE(tso);
THREAD_RUNNABLE();
threadRunnable();
IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
return next;
}
......@@ -2644,9 +2630,6 @@ interruptStgRts(void)
{
interrupted = 1;
context_switch = 1;
#ifdef RTS_SUPPORTS_THREADS
wakeBlockedWorkerThread();
#endif
}
/* -----------------------------------------------------------------------------
......@@ -3277,70 +3260,6 @@ resurrectThreads( StgTSO *threads )
}
}
/* -----------------------------------------------------------------------------
* Blackhole detection: if we reach a deadlock, test whether any
* threads are blocked on themselves. Any threads which are found to
* be self-blocked get sent a NonTermination exception.
*
* This is only done in a deadlock situation in order to avoid
* performance overhead in the normal case.
*
* Locks: sched_mutex is held upon entry and exit.
* -------------------------------------------------------------------------- */
#if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
static void
detectBlackHoles( void )
{
StgTSO *tso = all_threads;
StgPtr frame;
StgClosure *blocked_on;
StgRetInfoTable *info;
for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
while (tso->what_next == ThreadRelocated) {
tso = tso->link;
ASSERT(get_itbl(tso)->type == TSO);
}
if (tso->why_blocked != BlockedOnBlackHole) {
continue;
}
blocked_on = tso->block_info.closure;
frame = tso->sp;
while(1) {
info = get_ret_itbl((StgClosure *)frame);
switch (info->i.type) {
case UPDATE_FRAME:
if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
/* We are blocking on one of our own computations, so
* send this thread the NonTermination exception.
*/
IF_DEBUG(scheduler,
sched_belch("thread %d is blocked on itself", tso->id));
raiseAsync(tso, (StgClosure *)NonTermination_closure);
goto done;
}
frame = (StgPtr)((StgUpdateFrame *)frame + 1);
continue;
case STOP_FRAME:
goto done;
// normal stack frames; do nothing except advance the pointer
default:
frame += stack_frame_sizeW((StgClosure *)frame);
}
}
done: ;
}
}
#endif
/* ----------------------------------------------------------------------------
* Debugging: why is a thread blocked
* [Also provides useful information when debugging threaded programs
......
......@@ -156,7 +156,6 @@ extern nat RTS_VAR(rts_n_waiting_workers);
extern nat RTS_VAR(rts_n_waiting_tasks);
#endif
StgBool rtsSupportsBoundThreads(void);
StgBool isThreadBound(StgTSO *tso);
extern SchedulerStatus rts_mainLazyIO(HaskellObj p, /*out*/HaskellObj *ret);
......@@ -280,17 +279,6 @@ void labelThread(StgPtr tso, char *label);
} \
blocked_queue_tl = tso;
/* Signal that a runnable thread has become available, in
* case there are any waiting tasks to execute it.
*/
#if defined(RTS_SUPPORTS_THREADS)
#define THREAD_RUNNABLE() \
wakeBlockedWorkerThread(); \
context_switch = 1;
#else
#define THREAD_RUNNABLE() /* nothing */
#endif
/* Check whether various thread queues are empty
*/
#define EMPTY_QUEUE(q) (q == END_TSO_QUEUE)
......
......@@ -37,13 +37,6 @@
/* last timestamp */
nat timestamp = 0;
#ifdef RTS_SUPPORTS_THREADS
static rtsBool isWorkerBlockedInAwaitEvent = rtsFalse;
static rtsBool workerWakeupPending = rtsFalse;
static int workerWakeupPipe[2];
static rtsBool workerWakeupInited = rtsFalse;
#endif
/* There's a clever trick here to avoid problems when the time wraps
* around. Since our maximum delay is smaller than 31 bits of ticks
* (it's actually 31 bits of microseconds), we can safely check
......@@ -163,34 +156,6 @@ awaitEvent(rtsBool wait)
}
}
#ifdef RTS_SUPPORTS_THREADS
if(!workerWakeupInited) {
pipe(workerWakeupPipe);
workerWakeupInited = rtsTrue;
}
FD_SET(workerWakeupPipe[0], &rfd);
maxfd = workerWakeupPipe[0] > maxfd ? workerWakeupPipe[0] : maxfd;
#endif
/* Release the scheduler lock while we do the poll.
* this means that someone might muck with the blocked_queue
* while we do this, but it shouldn't matter:
*
* - another task might poll for I/O and remove one
* or more threads from the blocked_queue.
* - more I/O threads may be added to blocked_queue.
* - more delayed threads may be added to blocked_queue. We'll
* just subtract delta from their delays after the poll.
*
* I believe none of these cases lead to trouble --SDM.
*/
#ifdef RTS_SUPPORTS_THREADS
isWorkerBlockedInAwaitEvent = rtsTrue;
workerWakeupPending = rtsFalse;
#endif
RELEASE_LOCK(&sched_mutex);
/* Check for any interesting events */
tv.tv_sec = min / 1000000;
......@@ -223,10 +188,6 @@ awaitEvent(rtsBool wait)
barf("select failed");
}
}
ACQUIRE_LOCK(&sched_mutex);
#ifdef RTS_SUPPORTS_THREADS
isWorkerBlockedInAwaitEvent = rtsFalse;
#endif
/* We got a signal; could be one of ours. If so, we need
* to start up the signal handler straight away, otherwise
......@@ -235,9 +196,7 @@ awaitEvent(rtsBool wait)
*/
#if defined(RTS_USER_SIGNALS)
if (signals_pending()) {
RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
startSignalHandlers();
ACQUIRE_LOCK(&sched_mutex);
return; /* still hold the lock */
}
#endif
......@@ -258,24 +217,8 @@ awaitEvent(rtsBool wait)
if (run_queue_hd != END_TSO_QUEUE) {
return; /* still hold the lock */
}
#ifdef RTS_SUPPORTS_THREADS
/* If another worker thread wants to take over,
* return to the scheduler
*/
if (needToYieldToReturningWorker()) {
return; /* still hold the lock */
}
#endif
#ifdef RTS_SUPPORTS_THREADS
isWorkerBlockedInAwaitEvent = rtsTrue;
#endif
RELEASE_LOCK(&sched_mutex);
}
ACQUIRE_LOCK(&sched_mutex);
/* Step through the waiting queue, unblocking every thread that now has
* a file descriptor in a ready state.
*/
......@@ -317,51 +260,5 @@ awaitEvent(rtsBool wait)
}
}
#if defined(RTS_SUPPORTS_THREADS)
// if we were woken up by wakeBlockedWorkerThread,
// read the dummy byte from the pipe
if(select_succeeded && FD_ISSET(workerWakeupPipe[0], &rfd)) {
unsigned char dummy;
wait = rtsFalse;
read(workerWakeupPipe[0],&dummy,1);
}
#endif
} while (wait && !interrupted && run_queue_hd == END_TSO_QUEUE);
}
#ifdef RTS_SUPPORTS_THREADS
/* wakeBlockedWorkerThread
*
* If a worker thread is currently blocked within awaitEvent,
* wake it.
* Must be called with sched_mutex held.
*/
void
wakeBlockedWorkerThread()
{
if(isWorkerBlockedInAwaitEvent && !workerWakeupPending) {
unsigned char dummy = 42; // Any value will do here
// write something so that select() wakes up
write(workerWakeupPipe[1],&dummy,1);
workerWakeupPending = rtsTrue;
}
}
/* resetWorkerWakeupPipeAfterFork
*
* To be called right after a fork().
* After the fork(), the worker wakeup pipe will be shared
* with the parent process, and that's something we don't want.
*/
void
resetWorkerWakeupPipeAfterFork()
{
if(workerWakeupInited) {
close(workerWakeupPipe[0]);
close(workerWakeupPipe[1]);
}
workerWakeupInited = rtsFalse;
}
#endif
......@@ -54,22 +54,25 @@ static nat n_haskell_handlers = 0;
StgPtr pending_handler_buf[N_PENDING_HANDLERS];
StgPtr *next_pending_handler = pending_handler_buf;
/* -----------------------------------------------------------------------------
* Signal handling
* -------------------------------------------------------------------------- */
#ifdef RTS_SUPPORTS_THREADS
pthread_t signalHandlingThread;
#endif
// Handle all signals in the current thread.
// Called from Capability.c whenever the main capability is granted to a thread
// and in installDefaultHandlers
// Handle all signals in the current thread.
// Called from Capability.c whenever the main capability is granted to a thread
// and in installDefaultHandlers
void
handleSignalsInThisThread()
handleSignalsInThisThread(void)
{
#ifdef RTS_SUPPORTS_THREADS
signalHandlingThread = pthread_self();
#endif
}
/* -----------------------------------------------------------------------------
* Allocate/resize the table of signal handlers.
* -------------------------------------------------------------------------- */
......
......@@ -28,6 +28,7 @@ extern void markSignalHandlers (evac_fn evac);
extern void initDefaultHandlers(void);
extern void handleSignalsInThisThread(void);
extern void handleSignalsInPrevThread(void);
#elif defined(mingw32_TARGET_OS)
#define RTS_USER_SIGNALS 1
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment