Commit a20ec0ce authored by simonmar's avatar simonmar

[project @ 2004-03-01 14:18:35 by simonmar]

Threaded RTS improvements:

  - Make the main_threads list doubly linked.  Have threads
    remove themselves from this list when they complete, rather
    than searching for completed main threads each time around
    the scheduler loop.  This removes an O(n) loop from the
    scheduler, but adds some new constraints (basically completed
    threads must remain on the run queue until dealt with, including
    threads which have been killed by an async exception).

  - Add a pointer from the TSO to the StgMainThread struct, for
    main threads.  This avoids a number of places where we had
    to traverse the list of main threads to find the right one,
    including one place in the scheduler loop.  Adding a field to
    a TSO is cheap.

  - taskStart: we should be resetting the startingWorkerThread flag
    in here.  Not sure why we aren't; maybe this got lost at some point.

  - Use the BlockedOnCCall flags in the non-threaded RTS too.  Q: what
    should happen if a thread does a foreign call which re-enters the
    RTS, and then sends an async exception to the original thread?
    Answer: it should deadlock, which it does in the threaded RTS, and
    this commit makes it do so in the non-threaded RTS too (see
    testsuite/tests/concurrent/should_run/conc040.hs).
parent 07e22b02
/* -----------------------------------------------------------------------------
* $Id: TSO.h,v 1.33 2003/11/12 17:27:05 sof Exp $
* $Id: TSO.h,v 1.34 2004/03/01 14:18:35 simonmar Exp $
*
* (c) The GHC Team, 1998-1999
*
......@@ -161,11 +161,9 @@ typedef enum {
, BlockedOnGA // blocked on a remote closure represented by a Global Address
, BlockedOnGA_NoSend // same as above but without sending a Fetch message
#endif
#if defined(RTS_SUPPORTS_THREADS)
, BlockedOnCCall
, BlockedOnCCall_NoUnblockExc // same as above but don't unblock async exceptions
// in resumeThread()
#endif
, BlockedOnCCall_NoUnblockExc // same as above but don't unblock
// async exceptions in resumeThread()
} StgTSOBlockReason;
#if defined(mingw32_TARGET_OS)
......@@ -204,12 +202,13 @@ typedef struct StgTSO_ {
StgMutClosure * mut_link; /* TSO's are mutable of course! */
struct StgTSO_* global_link; /* Links all threads together */
StgTSOWhatNext what_next : 16;
StgTSOBlockReason why_blocked : 16;
StgTSOBlockInfo block_info;
struct StgTSO_* blocked_exceptions;
StgThreadID id;
int saved_errno;
StgTSOWhatNext what_next : 16;
StgTSOBlockReason why_blocked : 16;
StgTSOBlockInfo block_info;
struct StgTSO_* blocked_exceptions;
StgThreadID id;
int saved_errno;
struct StgMainThread_* main;
MAYBE_EMPTY_STRUCT(StgTSOTickyInfo,ticky)
MAYBE_EMPTY_STRUCT(StgTSOProfInfo,prof)
......
/* -----------------------------------------------------------------------------
* $Id: Exception.h,v 1.7 2003/11/12 17:49:07 sof Exp $
* $Id: Exception.h,v 1.8 2004/03/01 14:18:35 simonmar Exp $
*
* (c) The GHC Team, 1998-2000
*
......@@ -27,6 +27,8 @@ interruptible(StgTSO *t)
#endif
case BlockedOnDelay:
return 1;
// NB. Threaded blocked on foreign calls (BlockedOnCCall) are
// *not* interruptible. We can't send these threads an exception.
default:
return 0;
}
......
/* ---------------------------------------------------------------------------
* $Id: Schedule.c,v 1.192 2004/02/27 16:16:31 simonmar Exp $
* $Id: Schedule.c,v 1.193 2004/03/01 14:18:35 simonmar Exp $
*
* (c) The GHC Team, 1998-2003
*
......@@ -259,6 +259,7 @@ static void
taskStart(void)
{
ACQUIRE_LOCK(&sched_mutex);
startingWorkerThread = rtsFalse;
schedule(NULL,NULL);
RELEASE_LOCK(&sched_mutex);
}
......@@ -431,92 +432,6 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
#endif
}
//
// Go through the list of main threads and wake up any
// clients whose computations have finished. ToDo: this
// should be done more efficiently without a linear scan
// of the main threads list, somehow...
//
#if defined(RTS_SUPPORTS_THREADS)
{
StgMainThread *m, **prev;
prev = &main_threads;
for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
if (m->tso->what_next == ThreadComplete
|| m->tso->what_next == ThreadKilled) {
if (m == mainThread) {
if (m->tso->what_next == ThreadComplete) {
if (m->ret) {
// NOTE: return val is tso->sp[1]
// (see StgStartup.hc)
*(m->ret) = (StgClosure *)m->tso->sp[1];
}
m->stat = Success;
} else {
if (m->ret) {
*(m->ret) = NULL;
}
if (was_interrupted) {
m->stat = Interrupted;
} else {
m->stat = Killed;
}
}
*prev = m->link;
#ifdef DEBUG
removeThreadLabel((StgWord)m->tso->id);
#endif
releaseCapability(cap);
return;
} else {
// The current OS thread can not handle the fact that
// the Haskell thread "m" has ended. "m" is bound;
// the scheduler loop in its bound OS thread has to
// return, so let's pass the capability directly to
// that thread.
passCapability(&m->bound_thread_cond);
continue;
}
}
}
}
#else /* not threaded */
# if defined(PAR)
/* in GUM do this only on the Main PE */
if (IAmMainThread)
# endif
/* If our main thread has finished or been killed, return.
*/
{
StgMainThread *m = main_threads;
if (m->tso->what_next == ThreadComplete
|| m->tso->what_next == ThreadKilled) {
#ifdef DEBUG
removeThreadLabel((StgWord)m->tso->id);
#endif
main_threads = main_threads->link;
if (m->tso->what_next == ThreadComplete) {
// We finished successfully, fill in the return value
// NOTE: return val is tso->sp[1] (see StgStartup.hc)
if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[1]; };
m->stat = Success;
return;
} else {
if (m->ret) { *(m->ret) = NULL; };
if (was_interrupted) {
m->stat = Interrupted;
} else {
m->stat = Killed;
}
return;
}
}
}
#endif
#if defined(RTS_USER_SIGNALS)
// check for signals each time around the scheduler
if (signals_pending()) {
......@@ -605,11 +520,9 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
m = main_threads;
switch (m->tso->why_blocked) {
case BlockedOnBlackHole:
raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
break;
case BlockedOnException:
case BlockedOnMVar:
raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
break;
default:
barf("deadlock: main thread blocked in a strange way");
......@@ -915,12 +828,7 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
#ifdef THREADED_RTS
{
StgMainThread *m;
for(m = main_threads; m; m = m->link)
{
if(m->tso == t)
break;
}
StgMainThread *m = t->main;
if(m)
{
......@@ -1144,7 +1052,6 @@ run_thread:
*/
threadPaused(t);
{
StgMainThread *m;
/* enlarge the stack */
StgTSO *new_t = threadStackOverflow(t);
......@@ -1152,10 +1059,8 @@ run_thread:
* main thread stack. It better not be on any other queues...
* (it shouldn't be).
*/
for (m = main_threads; m != NULL; m = m->link) {
if (m->tso == t) {
m->tso = new_t;
}
if (t->main != NULL) {
t->main->tso = new_t;
}
threadPaused(new_t);
PUSH_ON_RUN_QUEUE(new_t);
......@@ -1318,8 +1223,74 @@ run_thread:
!RtsFlags.ParFlags.ParStats.Suppressed)
DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
#endif
//
// Check whether the thread that just completed was a main
// thread, and if so return with the result.
//
// There is an assumption here that all thread completion goes
// through this point; we need to make sure that if a thread
// ends up in the ThreadKilled state, that it stays on the run
// queue so it can be dealt with here.
//
if (
#if defined(RTS_SUPPORTS_THREADS)
mainThread != NULL
#else
mainThread->tso == t
#endif
)
{
// We are a bound thread: this must be our thread that just
// completed.
ASSERT(mainThread->tso == t);
if (t->what_next == ThreadComplete) {
if (mainThread->ret) {
// NOTE: return val is tso->sp[1] (see StgStartup.hc)
*(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1];
}
mainThread->stat = Success;
} else {
if (mainThread->ret) {
*(mainThread->ret) = NULL;
}
if (was_interrupted) {
mainThread->stat = Interrupted;
} else {
mainThread->stat = Killed;
}
}
#ifdef DEBUG
removeThreadLabel((StgWord)mainThread->tso->id);
#endif
if (mainThread->prev == NULL) {
main_threads = mainThread->link;
} else {
mainThread->prev->link = mainThread->link;
}
if (mainThread->link != NULL) {
mainThread->link->prev = NULL;
}
releaseCapability(cap);
return;
}
#ifdef RTS_SUPPORTS_THREADS
ASSERT(t->main == NULL);
#else
if (t->main != NULL) {
// Must be a main thread that is not the topmost one. Leave
// it on the run queue until the stack has unwound to the
// point where we can deal with this. Leaving it on the run
// queue also ensures that the garbage collector knows about
// this thread and its return value (it gets dropped from the
// all_threads list so there's no other way to find it).
APPEND_TO_RUN_QUEUE(t);
}
#endif
break;
default:
barf("schedule: invalid thread return code %d", (int)ret);
}
......@@ -1400,12 +1371,7 @@ StgBool
isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
{
#ifdef THREADED_RTS
StgMainThread *m;
for(m = main_threads; m; m = m->link)
{
if(m->tso == tso)
return rtsTrue;
}
return (tso->main != NULL);
#endif
return rtsFalse;
}
......@@ -1550,17 +1516,12 @@ suspendThread( StgRegTable *reg,
cap->r.rCurrentTSO->link = suspended_ccalling_threads;
suspended_ccalling_threads = cap->r.rCurrentTSO;
#if defined(RTS_SUPPORTS_THREADS)
if(cap->r.rCurrentTSO->blocked_exceptions == NULL)
{
if(cap->r.rCurrentTSO->blocked_exceptions == NULL) {
cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
}
else
{
} else {
cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
}
#endif
/* Use the thread ID as the token; it should be unique */
tok = cap->r.rCurrentTSO->id;
......@@ -1616,13 +1577,10 @@ resumeThread( StgInt tok,
}
tso->link = END_TSO_QUEUE;
#if defined(RTS_SUPPORTS_THREADS)
if(tso->why_blocked == BlockedOnCCall)
{
if(tso->why_blocked == BlockedOnCCall) {
awakenBlockedQueueNoLock(tso->blocked_exceptions);
tso->blocked_exceptions = NULL;
}
#endif
/* Reset blocking status */
tso->why_blocked = NotBlocked;
......@@ -1751,6 +1709,7 @@ createThread(nat size)
tso->blocked_exceptions = NULL;
tso->saved_errno = 0;
tso->main = NULL;
tso->stack_size = stack_size;
tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
......@@ -1970,8 +1929,16 @@ scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
m->tso = tso;
tso->main = m;
m->ret = ret;
m->stat = NoStatus;
m->link = main_threads;
m->prev = NULL;
if (main_threads != NULL) {
main_threads->prev = m;
}
main_threads = m;
#if defined(RTS_SUPPORTS_THREADS)
// Allocating a new condition for each thread is expensive, so we
// cache one. This is a pretty feeble hack, but it helps speed up
......@@ -1995,9 +1962,6 @@ scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
*/
IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
m->link = main_threads;
main_threads = m;
PUSH_ON_RUN_QUEUE(tso);
// NB. Don't call THREAD_RUNNABLE() here, because the thread is
// bound and only runnable by *this* OS thread, so waking up other
......@@ -2201,25 +2165,6 @@ GetRoots( evac_fn evac )
// mark the signal handlers (signals should be already blocked)
markSignalHandlers(evac);
#endif
// main threads which have completed need to be retained until they
// are dealt with in the main scheduler loop. They won't be
// retained any other way: the GC will drop them from the
// all_threads list, so we have to be careful to treat them as roots
// here.
{
StgMainThread *m;
for (m = main_threads; m != NULL; m = m->link) {
switch (m->tso->what_next) {
case ThreadComplete:
case ThreadKilled:
evac((StgClosure **)&m->tso);
break;
default:
break;
}
}
}
}
/* -----------------------------------------------------------------------------
......@@ -2635,7 +2580,6 @@ awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
#else /* !GRAN && !PAR */
#ifdef RTS_SUPPORTS_THREADS
void
awakenBlockedQueueNoLock(StgTSO *tso)
{
......@@ -2643,7 +2587,6 @@ awakenBlockedQueueNoLock(StgTSO *tso)
tso = unblockOneLocked(tso);
}
}
#endif
void
awakenBlockedQueue(StgTSO *tso)
......@@ -2996,11 +2939,12 @@ deleteThreadImmediately(StgTSO *tso)
if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
return;
}
#if defined(RTS_SUPPORTS_THREADS)
if (tso->why_blocked != BlockedOnCCall
&& tso->why_blocked != BlockedOnCCall_NoUnblockExc)
#endif
if (tso->why_blocked != BlockedOnCCall &&
tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
unblockThread(tso);
}
tso->what_next = ThreadKilled;
}
......@@ -3334,14 +3278,12 @@ printThreadBlockage(StgTSO *tso)
tso->block_info.closure, info_type(tso->block_info.closure));
break;
#endif
#if defined(RTS_SUPPORTS_THREADS)
case BlockedOnCCall:
fprintf(stderr,"is blocked on an external call");
break;
case BlockedOnCCall_NoUnblockExc:
fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
break;
#endif
default:
barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
tso->why_blocked, tso->id, tso);
......
/* -----------------------------------------------------------------------------
* $Id: Schedule.h,v 1.44 2004/02/26 16:14:21 simonmar Exp $
* $Id: Schedule.h,v 1.45 2004/03/01 14:18:36 simonmar Exp $
*
* (c) The GHC Team 1998-1999
*
......@@ -33,10 +33,8 @@ void awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node);
#elif defined(PAR)
void awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node);
#else
void awakenBlockedQueue(StgTSO *tso);
#if defined(RTS_SUPPORTS_THREADS)
void awakenBlockedQueueNoLock(StgTSO *tso);
#endif
void awakenBlockedQueue (StgTSO *tso);
void awakenBlockedQueueNoLock (StgTSO *tso);
#endif
/* unblockOne()
......@@ -203,6 +201,7 @@ typedef struct StgMainThread_ {
Condition wakeup;
#endif
#endif
struct StgMainThread_ *prev;
struct StgMainThread_ *link;
} StgMainThread;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment