Commit 5000229d authored by simonmar's avatar simonmar
Browse files

[project @ 2005-04-07 15:56:34 by simonmar]

A much simpler way of stopping all the other threads to do a GC in SMP
mode: the thread that wants to do GC just acquires all the
capabilities, and releases them after doing the GC.
parent 2109f5e2
......@@ -217,12 +217,6 @@ StgTSO *CurrentTSO;
*/
StgTSO dummy_tso;
# if defined(SMP)
static Condition gc_pending_cond = INIT_COND_VAR;
# endif
static rtsBool ready_to_gc;
/*
* Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
* in an MT setting, needed to signal that a worker thread shouldn't hang around
......@@ -295,8 +289,8 @@ static rtsBool scheduleHandleYield( StgTSO *t, nat prev_what_next );
static void scheduleHandleThreadBlocked( StgTSO *t );
static rtsBool scheduleHandleThreadFinished( StgMainThread *mainThread,
Capability *cap, StgTSO *t );
static void scheduleDoHeapProfile(void);
static void scheduleDoGC(void);
static rtsBool scheduleDoHeapProfile(void);
static void scheduleDoGC(Capability *cap);
static void unblockThread(StgTSO *tso);
static rtsBool checkBlackHoles(void);
......@@ -341,7 +335,7 @@ startSchedulerTaskIfNecessary(void)
{
if ( !EMPTY_RUN_QUEUE()
&& !shutting_down_scheduler // not if we're shutting down
&& !startingWorkerThread )
&& !startingWorkerThread)
{
// we don't want to start another worker thread
// just because the last one hasn't yet reached the
......@@ -429,6 +423,7 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
# endif
#endif
nat prev_what_next;
rtsBool ready_to_gc;
// Pre-condition: sched_mutex is held.
// We might have a capability, passed in as initialCapability.
......@@ -467,19 +462,6 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
IF_DEBUG(scheduler, printAllThreads());
#if defined(SMP)
//
// Wait until GC has completed, if necessary.
//
if (ready_to_gc) {
if (cap != NULL) {
releaseCapability(cap);
IF_DEBUG(scheduler,sched_belch("waiting for GC"));
waitCondition( &gc_pending_cond, &sched_mutex );
}
}
#endif
#if defined(RTS_SUPPORTS_THREADS)
// Yield the capability to higher-priority tasks if necessary.
//
......@@ -744,6 +726,8 @@ run_thread:
schedulePostRunThread();
ready_to_gc = rtsFalse;
switch (ret) {
case HeapOverflow:
ready_to_gc = scheduleHandleHeapOverflow(cap,t);
......@@ -773,8 +757,8 @@ run_thread:
barf("schedule: invalid thread return code %d", (int)ret);
}
scheduleDoHeapProfile();
scheduleDoGC();
if (scheduleDoHeapProfile()) { ready_to_gc = rtsFalse; }
if (ready_to_gc) { scheduleDoGC(cap); }
} /* end of while() */
IF_PAR_DEBUG(verbose,
......@@ -901,13 +885,11 @@ scheduleDetectDeadlock(void)
awaitUserSignals();
#if !defined(RTS_SUPPORTS_THREADS)
if (signals_pending()) {
RELEASE_LOCK(&sched_mutex);
startSignalHandlers();
ACQUIRE_LOCK(&sched_mutex);
}
#endif
// either we have threads to run, or we were interrupted:
ASSERT(!EMPTY_RUN_QUEUE() || interrupted);
......@@ -1484,9 +1466,13 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
if (cap->r.rCurrentNursery->u.back != NULL) {
cap->r.rCurrentNursery->u.back->link = bd;
} else {
#ifdef SMP
cap->r.rNursery = g0s0->blocks = bd;
#else
ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
g0s0->blocks == cap->r.rNursery);
cap->r.rNursery = g0s0->blocks = bd;
#endif
}
cap->r.rCurrentNursery->u.back = bd;
......@@ -1506,11 +1492,14 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
}
}
#if !defined(SMP)
// don't forget to update the block count in g0s0.
g0s0->n_blocks += blocks;
// This assert can be a killer if the app is doing lots
// of large block allocations.
ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
#endif
// now update the nursery to point to the new block
cap->r.rCurrentNursery = bd;
......@@ -1849,10 +1838,10 @@ scheduleHandleThreadFinished( StgMainThread *mainThread
* Perform a heap census, if PROFILING
* -------------------------------------------------------------------------- */
static void
static rtsBool
scheduleDoHeapProfile(void)
{
#ifdef PROFILING
#if defined(PROFILING)
// When we have +RTS -i0 and we're heap profiling, do a census at
// every GC. This lets us get repeatable runs for debugging.
if (performHeapProfile ||
......@@ -1861,9 +1850,10 @@ scheduleDoHeapProfile(void)
GarbageCollect(GetRoots, rtsTrue);
heapCensus();
performHeapProfile = rtsFalse;
ready_to_gc = rtsFalse; // we already GC'd
return rtsTrue; // true <=> we already GC'd
}
#endif
return rtsFalse;
}
/* -----------------------------------------------------------------------------
......@@ -1872,66 +1862,90 @@ scheduleDoHeapProfile(void)
* -------------------------------------------------------------------------- */
static void
scheduleDoGC(void)
scheduleDoGC( Capability *cap )
{
StgTSO *t;
#ifdef SMP
int n_capabilities = RtsFlags.ParFlags.nNodes - 1;
// subtract one because we're already holding one.
Capability *caps[n_capabilities];
#endif
#ifdef SMP
// The last task to stop actually gets to do the GC. The rest
// of the tasks release their capabilities and wait gc_pending_cond.
if (ready_to_gc && allFreeCapabilities())
#else
if (ready_to_gc)
// In order to GC, there must be no threads running Haskell code.
// Therefore, the GC thread needs to hold *all* the capabilities,
// and release them after the GC has completed.
//
// This seems to be the simplest way: previous attempts involved
// making all the threads with capabilities give up their
// capabilities and sleep except for the *last* one, which
// actually did the GC. But it's quite hard to arrange for all
// the other tasks to sleep and stay asleep.
//
caps[n_capabilities] = cap;
while (n_capabilities > 0) {
IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities));
waitForReturnCapability(&sched_mutex, &cap);
n_capabilities--;
caps[n_capabilities] = cap;
}
#endif
{
/* Kick any transactions which are invalid back to their
* atomically frames. When next scheduled they will try to
* commit, this commit will fail and they will retry.
*/
for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
if (!stmValidateTransaction (t -> trec)) {
IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
// strip the stack back to the ATOMICALLY_FRAME, aborting
// the (nested) transaction, and saving the stack of any
// partially-evaluated thunks on the heap.
raiseAsync_(t, NULL, rtsTrue);
/* Kick any transactions which are invalid back to their
* atomically frames. When next scheduled they will try to
* commit, this commit will fail and they will retry.
*/
for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
if (!stmValidateTransaction (t -> trec)) {
IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
// strip the stack back to the ATOMICALLY_FRAME, aborting
// the (nested) transaction, and saving the stack of any
// partially-evaluated thunks on the heap.
raiseAsync_(t, NULL, rtsTrue);
#ifdef REG_R1
ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
#endif
}
}
}
// so this happens periodically:
scheduleCheckBlackHoles();
/* everybody back, start the GC.
* Could do it in this thread, or signal a condition var
* to do it in another thread. Either way, we need to
* broadcast on gc_pending_cond afterward.
*/
}
// so this happens periodically:
scheduleCheckBlackHoles();
/* everybody back, start the GC.
* Could do it in this thread, or signal a condition var
* to do it in another thread. Either way, we need to
* broadcast on gc_pending_cond afterward.
*/
#if defined(RTS_SUPPORTS_THREADS)
IF_DEBUG(scheduler,sched_belch("doing GC"));
IF_DEBUG(scheduler,sched_belch("doing GC"));
#endif
GarbageCollect(GetRoots,rtsFalse);
ready_to_gc = rtsFalse;
GarbageCollect(GetRoots,rtsFalse);
#if defined(SMP)
broadcastCondition(&gc_pending_cond);
{
// release our stash of capabilities.
nat i;
for (i = 0; i < RtsFlags.ParFlags.nNodes-1; i++) {
releaseCapability(caps[i]);
}
}
#endif
#if defined(GRAN)
/* add a ContinueThread event to continue execution of current thread */
new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
ContinueThread,
t, (StgClosure*)NULL, (rtsSpark*)NULL);
IF_GRAN_DEBUG(bq,
debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
G_EVENTQ(0);
G_CURR_THREADQ(0));
/* add a ContinueThread event to continue execution of current thread */
new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
ContinueThread,
t, (StgClosure*)NULL, (rtsSpark*)NULL);
IF_GRAN_DEBUG(bq,
debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
G_EVENTQ(0);
G_CURR_THREADQ(0));
#endif /* GRAN */
}
}
/* ---------------------------------------------------------------------------
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment