Commit 6270f2b9 authored by simonmar's avatar simonmar
Browse files

[project @ 2005-05-25 08:33:15 by simonmar]

something very strange happened with previous commit; try again
parent 6b7f3c93
......@@ -841,8 +841,9 @@ scheduleCheckBlockedThreads(void)
// We shouldn't be here...
barf("schedule: awaitEvent() in threaded RTS");
#else
awaitEvent( EMPTY_RUN_QUEUE() );
awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking );
#endif
}
}
......@@ -1675,7 +1676,696 @@ scheduleHandleThreadBlocked( StgTSO *t
/*
ngoq Dogh!
ASSERT(procStatus[CurrentProc]==Busy ||
((procStatus[Curren
((procStatus[CurrentProc]==Fetching) &&
(t->block_info.closure!=(StgClosure*)NULL)));
if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
!(!RtsFlags.GranFlags.DoAsyncFetch &&
procStatus[CurrentProc]==Fetching))
procStatus[CurrentProc] = Idle;
*/
#elif defined(PAR)
IF_DEBUG(scheduler,
debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n",
t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
IF_PAR_DEBUG(bq,
if (t->block_info.closure!=(StgClosure*)NULL)
print_bq(t->block_info.closure));
/* Send a fetch (if BlockedOnGA) and dump event to log file */
blockThread(t);
/* whatever we schedule next, we must log that schedule */
emitSchedule = rtsTrue;
#else /* !GRAN */
// We don't need to do anything. The thread is blocked, and it
// has tidied up its stack and placed itself on whatever queue
// it needs to be on.
#if !defined(SMP)
ASSERT(t->why_blocked != NotBlocked);
// This might not be true under SMP: we don't have
// exclusive access to this TSO, so someone might have
// woken it up by now. This actually happens: try
// conc023 +RTS -N2.
#endif
IF_DEBUG(scheduler,
debugBelch("--<< thread %d (%s) stopped: ",
t->id, whatNext_strs[t->what_next]);
printThreadBlockage(t);
debugBelch("\n"));
/* Only for dumping event to log file
ToDo: do I need this in GranSim, too?
blockThread(t);
*/
#endif
}
/* -----------------------------------------------------------------------------
* Handle a thread that returned to the scheduler with ThreadFinished
* ASSUMES: sched_mutex
* -------------------------------------------------------------------------- */
static rtsBool
scheduleHandleThreadFinished( StgMainThread *mainThread
USED_WHEN_RTS_SUPPORTS_THREADS,
Capability *cap,
StgTSO *t )
{
/* Need to check whether this was a main thread, and if so,
* return with the return value.
*
* We also end up here if the thread kills itself with an
* uncaught exception, see Exception.cmm.
*/
IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n",
t->id, whatNext_strs[t->what_next]));
#if defined(GRAN)
endThread(t, CurrentProc); // clean-up the thread
#elif defined(PARALLEL_HASKELL)
/* For now all are advisory -- HWL */
//if(t->priority==AdvisoryPriority) ??
advisory_thread_count--; // JB: Caution with this counter, buggy!
# if defined(DIST)
if(t->dist.priority==RevalPriority)
FinishReval(t);
# endif
# if defined(EDENOLD)
// the thread could still have an outport... (BUG)
if (t->eden.outport != -1) {
// delete the outport for the tso which has finished...
IF_PAR_DEBUG(eden_ports,
debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
t->eden.outport, t->id));
deleteOPT(t);
}
// thread still in the process (HEAVY BUG! since outport has just been closed...)
if (t->eden.epid != -1) {
IF_PAR_DEBUG(eden_ports,
debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
t->id, t->eden.epid));
removeTSOfromProcess(t);
}
# endif
# if defined(PAR)
if (RtsFlags.ParFlags.ParStats.Full &&
!RtsFlags.ParFlags.ParStats.Suppressed)
DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
// t->par only contains statistics: left out for now...
IF_PAR_DEBUG(fish,
debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
t->id,t,t->par.sparkname));
# endif
#endif // PARALLEL_HASKELL
//
// Check whether the thread that just completed was a main
// thread, and if so return with the result.
//
// There is an assumption here that all thread completion goes
// through this point; we need to make sure that if a thread
// ends up in the ThreadKilled state, that it stays on the run
// queue so it can be dealt with here.
//
if (
#if defined(RTS_SUPPORTS_THREADS)
mainThread != NULL
#else
mainThread->tso == t
#endif
)
{
// We are a bound thread: this must be our thread that just
// completed.
ASSERT(mainThread->tso == t);
if (t->what_next == ThreadComplete) {
if (mainThread->ret) {
// NOTE: return val is tso->sp[1] (see StgStartup.hc)
*(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1];
}
mainThread->stat = Success;
} else {
if (mainThread->ret) {
*(mainThread->ret) = NULL;
}
if (interrupted) {
mainThread->stat = Interrupted;
} else {
mainThread->stat = Killed;
}
}
#ifdef DEBUG
removeThreadLabel((StgWord)mainThread->tso->id);
#endif
if (mainThread->prev == NULL) {
ASSERT(mainThread == main_threads);
main_threads = mainThread->link;
} else {
mainThread->prev->link = mainThread->link;
}
if (mainThread->link != NULL) {
mainThread->link->prev = mainThread->prev;
}
releaseCapability(cap);
return rtsTrue; // tells schedule() to return
}
#ifdef RTS_SUPPORTS_THREADS
ASSERT(t->main == NULL);
#else
if (t->main != NULL) {
// Must be a main thread that is not the topmost one. Leave
// it on the run queue until the stack has unwound to the
// point where we can deal with this. Leaving it on the run
// queue also ensures that the garbage collector knows about
// this thread and its return value (it gets dropped from the
// all_threads list so there's no other way to find it).
APPEND_TO_RUN_QUEUE(t);
}
#endif
return rtsFalse;
}
/* -----------------------------------------------------------------------------
* Perform a heap census, if PROFILING
* -------------------------------------------------------------------------- */
static rtsBool
scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
{
#if defined(PROFILING)
// When we have +RTS -i0 and we're heap profiling, do a census at
// every GC. This lets us get repeatable runs for debugging.
if (performHeapProfile ||
(RtsFlags.ProfFlags.profileInterval==0 &&
RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
GarbageCollect(GetRoots, rtsTrue);
heapCensus();
performHeapProfile = rtsFalse;
return rtsTrue; // true <=> we already GC'd
}
#endif
return rtsFalse;
}
/* -----------------------------------------------------------------------------
* Perform a garbage collection if necessary
* ASSUMES: sched_mutex
* -------------------------------------------------------------------------- */
static void
scheduleDoGC( rtsBool force_major )
{
StgTSO *t;
#ifdef SMP
Capability *cap;
static rtsBool waiting_for_gc;
int n_capabilities = RtsFlags.ParFlags.nNodes - 1;
// subtract one because we're already holding one.
Capability *caps[n_capabilities];
#endif
#ifdef SMP
// In order to GC, there must be no threads running Haskell code.
// Therefore, the GC thread needs to hold *all* the capabilities,
// and release them after the GC has completed.
//
// This seems to be the simplest way: previous attempts involved
// making all the threads with capabilities give up their
// capabilities and sleep except for the *last* one, which
// actually did the GC. But it's quite hard to arrange for all
// the other tasks to sleep and stay asleep.
//
// Someone else is already trying to GC
if (waiting_for_gc) return;
waiting_for_gc = rtsTrue;
while (n_capabilities > 0) {
IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities));
waitForReturnCapability(&sched_mutex, &cap);
n_capabilities--;
caps[n_capabilities] = cap;
}
waiting_for_gc = rtsFalse;
#endif
/* Kick any transactions which are invalid back to their
* atomically frames. When next scheduled they will try to
* commit, this commit will fail and they will retry.
*/
for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
if (!stmValidateTransaction (t -> trec)) {
IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
// strip the stack back to the ATOMICALLY_FRAME, aborting
// the (nested) transaction, and saving the stack of any
// partially-evaluated thunks on the heap.
raiseAsync_(t, NULL, rtsTrue);
#ifdef REG_R1
ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
#endif
}
}
}
// so this happens periodically:
scheduleCheckBlackHoles();
IF_DEBUG(scheduler, printAllThreads());
/* everybody back, start the GC.
* Could do it in this thread, or signal a condition var
* to do it in another thread. Either way, we need to
* broadcast on gc_pending_cond afterward.
*/
#if defined(RTS_SUPPORTS_THREADS)
IF_DEBUG(scheduler,sched_belch("doing GC"));
#endif
GarbageCollect(GetRoots, force_major);
#if defined(SMP)
{
// release our stash of capabilities.
nat i;
for (i = 0; i < RtsFlags.ParFlags.nNodes-1; i++) {
releaseCapability(caps[i]);
}
}
#endif
#if defined(GRAN)
/* add a ContinueThread event to continue execution of current thread */
new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
ContinueThread,
t, (StgClosure*)NULL, (rtsSpark*)NULL);
IF_GRAN_DEBUG(bq,
debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
G_EVENTQ(0);
G_CURR_THREADQ(0));
#endif /* GRAN */
}
/* ---------------------------------------------------------------------------
* rtsSupportsBoundThreads(): is the RTS built to support bound threads?
* used by Control.Concurrent for error checking.
* ------------------------------------------------------------------------- */
StgBool
rtsSupportsBoundThreads(void)
{
#if defined(RTS_SUPPORTS_THREADS)
return rtsTrue;
#else
return rtsFalse;
#endif
}
/* ---------------------------------------------------------------------------
* isThreadBound(tso): check whether tso is bound to an OS thread.
* ------------------------------------------------------------------------- */
StgBool
isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
{
#if defined(RTS_SUPPORTS_THREADS)
return (tso->main != NULL);
#endif
return rtsFalse;
}
/* ---------------------------------------------------------------------------
* Singleton fork(). Do not copy any running threads.
* ------------------------------------------------------------------------- */
#ifndef mingw32_HOST_OS
#define FORKPROCESS_PRIMOP_SUPPORTED
#endif
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
static void
deleteThreadImmediately(StgTSO *tso);
#endif
StgInt
forkProcess(HsStablePtr *entry
#ifndef FORKPROCESS_PRIMOP_SUPPORTED
STG_UNUSED
#endif
)
{
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
pid_t pid;
StgTSO* t,*next;
StgMainThread *m;
SchedulerStatus rc;
IF_DEBUG(scheduler,sched_belch("forking!"));
rts_lock(); // This not only acquires sched_mutex, it also
// makes sure that no other threads are running
pid = fork();
if (pid) { /* parent */
/* just return the pid */
rts_unlock();
return pid;
} else { /* child */
// delete all threads
run_queue_hd = run_queue_tl = END_TSO_QUEUE;
for (t = all_threads; t != END_TSO_QUEUE; t = next) {
next = t->link;
// don't allow threads to catch the ThreadKilled exception
deleteThreadImmediately(t);
}
// wipe the main thread list
while((m = main_threads) != NULL) {
main_threads = m->link;
# ifdef THREADED_RTS
closeCondition(&m->bound_thread_cond);
# endif
stgFree(m);
}
rc = rts_evalStableIO(entry, NULL); // run the action
rts_checkSchedStatus("forkProcess",rc);
rts_unlock();
hs_exit(); // clean up and exit
stg_exit(0);
}
#else /* !FORKPROCESS_PRIMOP_SUPPORTED */
barf("forkProcess#: primop not supported, sorry!\n");
return -1;
#endif
}
/* ---------------------------------------------------------------------------
* deleteAllThreads(): kill all the live threads.
*
* This is used when we catch a user interrupt (^C), before performing
* any necessary cleanups and running finalizers.
*
* Locks: sched_mutex held.
* ------------------------------------------------------------------------- */
void
deleteAllThreads ( void )
{
StgTSO* t, *next;
IF_DEBUG(scheduler,sched_belch("deleting all threads"));
for (t = all_threads; t != END_TSO_QUEUE; t = next) {
if (t->what_next == ThreadRelocated) {
next = t->link;
} else {
next = t->global_link;
deleteThread(t);
}
}
// The run queue now contains a bunch of ThreadKilled threads. We
// must not throw these away: the main thread(s) will be in there
// somewhere, and the main scheduler loop has to deal with it.
// Also, the run queue is the only thing keeping these threads from
// being GC'd, and we don't want the "main thread has been GC'd" panic.
ASSERT(blocked_queue_hd == END_TSO_QUEUE);
ASSERT(blackhole_queue == END_TSO_QUEUE);
ASSERT(sleeping_queue == END_TSO_QUEUE);
}
/* startThread and insertThread are now in GranSim.c -- HWL */
/* ---------------------------------------------------------------------------
* Suspending & resuming Haskell threads.
*
* When making a "safe" call to C (aka _ccall_GC), the task gives back
* its capability before calling the C function. This allows another
* task to pick up the capability and carry on running Haskell
* threads. It also means that if the C call blocks, it won't lock
* the whole system.
*
* The Haskell thread making the C call is put to sleep for the
* duration of the call, on the susepended_ccalling_threads queue. We
* give out a token to the task, which it can use to resume the thread
* on return from the C function.
* ------------------------------------------------------------------------- */
StgInt
suspendThread( StgRegTable *reg )
{
nat tok;
Capability *cap;
int saved_errno = errno;
/* assume that *reg is a pointer to the StgRegTable part
* of a Capability.
*/
cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
ACQUIRE_LOCK(&sched_mutex);
IF_DEBUG(scheduler,
sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
// XXX this might not be necessary --SDM
cap->r.rCurrentTSO->what_next = ThreadRunGHC;
threadPaused(cap->r.rCurrentTSO);
cap->r.rCurrentTSO->link = suspended_ccalling_threads;
suspended_ccalling_threads = cap->r.rCurrentTSO;
if(cap->r.rCurrentTSO->blocked_exceptions == NULL) {
cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
} else {
cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
}
/* Use the thread ID as the token; it should be unique */
tok = cap->r.rCurrentTSO->id;
/* Hand back capability */
cap->r.rInHaskell = rtsFalse;
releaseCapability(cap);
#if defined(RTS_SUPPORTS_THREADS)
/* Preparing to leave the RTS, so ensure there's a native thread/task
waiting to take over.
*/
IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
#endif
RELEASE_LOCK(&sched_mutex);
errno = saved_errno;
return tok;
}
StgRegTable *
resumeThread( StgInt tok )
{
StgTSO *tso, **prev;
Capability *cap;
int saved_errno = errno;
#if defined(RTS_SUPPORTS_THREADS)
/* Wait for permission to re-enter the RTS with the result. */
ACQUIRE_LOCK(&sched_mutex);
waitForReturnCapability(&sched_mutex, &cap);
IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
#else
grabCapability(&cap);
#endif
/* Remove the thread off of the suspended list */
prev = &suspended_ccalling_threads;
for (tso = suspended_ccalling_threads;
tso != END_TSO_QUEUE;
prev = &tso->link, tso = tso->link) {
if (tso->id == (StgThreadID)tok) {
*prev = tso->link;
break;
}
}
if (tso == END_TSO_QUEUE) {
barf("resumeThread: thread not found");
}
tso->link = END_TSO_QUEUE;
if(tso->why_blocked == BlockedOnCCall) {
awakenBlockedQueueNoLock(tso->blocked_exceptions);
tso->blocked_exceptions = NULL;
}
/* Reset blocking status */
tso->why_blocked = NotBlocked;
cap->r.rCurrentTSO = tso;
cap->r.rInHaskell = rtsTrue;
RELEASE_LOCK(&sched_mutex);
errno = saved_errno;
return &cap->r;
}
/* ---------------------------------------------------------------------------
* Comparing Thread ids.
*
* This is used from STG land in the implementation of the
* instances of Eq/Ord for ThreadIds.
* ------------------------------------------------------------------------ */
int
cmp_thread(StgPtr tso1, StgPtr tso2)
{
StgThreadID id1 = ((StgTSO *)tso1)->id;
StgThreadID id2 = ((StgTSO *)tso2)->id;
if (id1 < id2) return (-1);
if (id1 > id2) return 1;
return 0;
}
/* ---------------------------------------------------------------------------
* Fetching the ThreadID from an StgTSO.
*
* This is used in the implementation of Show for ThreadIds.
* ------------------------------------------------------------------------ */
int
rts_getThreadId(StgPtr tso)
{
return ((StgTSO *)tso)->id;
}
#ifdef DEBUG
void
labelThread(StgPtr tso, char *label)
{
int len;
void *buf;
/* Caveat: Once set, you can only set the thread name to "" */
len = strlen(label)+1;
buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
strncpy(buf,label,len);
/* Update will free the old memory for us */
updateThreadLabel(((StgTSO *)tso)->id,buf);
}
#endif /* DEBUG */
/* ---------------------------------------------------------------------------
Create a new thread.
The new thread starts with the given stack size. Before the
scheduler can run, however, this thread needs to have a closure
(and possibly some arguments) pushed on its stack. See
pushClosure() in Schedule.h.
createGenThread() and createIOThread() (in SchedAPI.h) are
convenient packaged versions of this function.
currently pri (priority) is only used in a GRAN setup -- HWL
------------------------------------------------------------------------ */
#if defined(GRAN)