Commit 5638488b authored by Simon Marlow's avatar Simon Marlow
Browse files

Improvements to shutting down of the runtime

Yet another attempt at shutdown & interruption.  This one appears to
work better; ^C is more responsive in multi threaded / SMP, and I
fixed one case where the runtime wasn't responding to ^C at all.
parent 04089f99
...@@ -44,7 +44,7 @@ STATIC_INLINE rtsBool ...@@ -44,7 +44,7 @@ STATIC_INLINE rtsBool
globalWorkToDo (void) globalWorkToDo (void)
{ {
return blackholes_need_checking return blackholes_need_checking
|| interrupted || sched_state >= SCHED_INTERRUPTING
; ;
} }
#endif #endif
...@@ -286,7 +286,7 @@ releaseCapability_ (Capability* cap) ...@@ -286,7 +286,7 @@ releaseCapability_ (Capability* cap)
// is interrupted, we only create a worker task if there // is interrupted, we only create a worker task if there
// are threads that need to be completed. If the system is // are threads that need to be completed. If the system is
// shutting down, we never create a new worker. // shutting down, we never create a new worker.
if (!shutting_down_scheduler) { if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
IF_DEBUG(scheduler, IF_DEBUG(scheduler,
sched_belch("starting new worker on capability %d", cap->no)); sched_belch("starting new worker on capability %d", cap->no));
startWorkerTask(cap, workerStart); startWorkerTask(cap, workerStart);
...@@ -575,7 +575,7 @@ shutdownCapability (Capability *cap, Task *task) ...@@ -575,7 +575,7 @@ shutdownCapability (Capability *cap, Task *task)
{ {
nat i; nat i;
ASSERT(interrupted && shutting_down_scheduler); ASSERT(sched_state == SCHED_SHUTTING_DOWN);
task->cap = cap; task->cap = cap;
......
...@@ -137,7 +137,7 @@ nat recent_activity = ACTIVITY_YES; ...@@ -137,7 +137,7 @@ nat recent_activity = ACTIVITY_YES;
/* if this flag is set as well, give up execution /* if this flag is set as well, give up execution
* LOCK: none (changes once, from false->true) * LOCK: none (changes once, from false->true)
*/ */
rtsBool interrupted = rtsFalse; rtsBool sched_state = SCHED_RUNNING;
/* Next thread ID to allocate. /* Next thread ID to allocate.
* LOCK: sched_mutex * LOCK: sched_mutex
...@@ -227,8 +227,9 @@ static void scheduleHandleThreadBlocked( StgTSO *t ); ...@@ -227,8 +227,9 @@ static void scheduleHandleThreadBlocked( StgTSO *t );
static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task, static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
StgTSO *t ); StgTSO *t );
static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc); static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major, static Capability *scheduleDoGC(Capability *cap, Task *task,
void (*get_roots)(evac_fn)); rtsBool force_major,
void (*get_roots)(evac_fn));
static void unblockThread(Capability *cap, StgTSO *tso); static void unblockThread(Capability *cap, StgTSO *tso);
static rtsBool checkBlackHoles(Capability *cap); static rtsBool checkBlackHoles(Capability *cap);
...@@ -240,7 +241,7 @@ static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, ...@@ -240,7 +241,7 @@ static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
rtsBool stop_at_atomically, StgPtr stop_here); rtsBool stop_at_atomically, StgPtr stop_here);
static void deleteThread (Capability *cap, StgTSO *tso); static void deleteThread (Capability *cap, StgTSO *tso);
static void deleteRunQueue (Capability *cap); static void deleteAllThreads (Capability *cap);
#ifdef DEBUG #ifdef DEBUG
static void printThreadBlockage(StgTSO *tso); static void printThreadBlockage(StgTSO *tso);
...@@ -394,28 +395,67 @@ schedule (Capability *initialCapability, Task *task) ...@@ -394,28 +395,67 @@ schedule (Capability *initialCapability, Task *task)
stg_exit(EXIT_FAILURE); stg_exit(EXIT_FAILURE);
} }
// The interruption / shutdown sequence.
//
// In order to cleanly shut down the runtime, we want to:
// * make sure that all main threads return to their callers
// with the state 'Interrupted'.
// * clean up all OS threads assocated with the runtime
// * free all memory etc.
//
// So the sequence for ^C goes like this:
//
// * ^C handler sets sched_state := SCHED_INTERRUPTING and
// arranges for some Capability to wake up
//
// * all threads in the system are halted, and the zombies are
// placed on the run queue for cleaning up. We acquire all
// the capabilities in order to delete the threads, this is
// done by scheduleDoGC() for convenience (because GC already
// needs to acquire all the capabilities). We can't kill
// threads involved in foreign calls.
//
// * sched_state := SCHED_INTERRUPTED
//
// * somebody calls shutdownHaskell(), which calls exitScheduler()
//
// * sched_state := SCHED_SHUTTING_DOWN
// //
// Test for interruption. If interrupted==rtsTrue, then either // * all workers exit when the run queue on their capability
// we received a keyboard interrupt (^C), or the scheduler is // drains. All main threads will also exit when their TSO
// trying to shut down all the tasks (shutting_down_scheduler) in // reaches the head of the run queue and they can return.
// the threaded RTS.
// //
if (interrupted) { // * eventually all Capabilities will shut down, and the RTS can
deleteRunQueue(cap); // exit.
//
// * We might be left with threads blocked in foreign calls,
// we should really attempt to kill these somehow (TODO);
switch (sched_state) {
case SCHED_RUNNING:
break;
case SCHED_INTERRUPTING:
IF_DEBUG(scheduler, sched_belch("SCHED_INTERRUPTING"));
#if defined(THREADED_RTS) #if defined(THREADED_RTS)
discardSparksCap(cap); discardSparksCap(cap);
#endif #endif
if (shutting_down_scheduler) { /* scheduleDoGC() deletes all the threads */
IF_DEBUG(scheduler, sched_belch("shutting down")); cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
// If we are a worker, just exit. If we're a bound thread break;
// then we will exit below when we've removed our TSO from case SCHED_INTERRUPTED:
// the run queue. IF_DEBUG(scheduler, sched_belch("SCHED_INTERRUPTED"));
if (task->tso == NULL && emptyRunQueue(cap)) { break;
return cap; case SCHED_SHUTTING_DOWN:
} IF_DEBUG(scheduler, sched_belch("SCHED_SHUTTING_DOWN"));
} else { // If we are a worker, just exit. If we're a bound thread
IF_DEBUG(scheduler, sched_belch("interrupted")); // then we will exit below when we've removed our TSO from
// the run queue.
if (task->tso == NULL && emptyRunQueue(cap)) {
return cap;
} }
break;
default:
barf("sched_state: %d", sched_state);
} }
#if defined(THREADED_RTS) #if defined(THREADED_RTS)
...@@ -459,7 +499,7 @@ schedule (Capability *initialCapability, Task *task) ...@@ -459,7 +499,7 @@ schedule (Capability *initialCapability, Task *task)
// as a result of a console event having been delivered. // as a result of a console event having been delivered.
if ( emptyRunQueue(cap) ) { if ( emptyRunQueue(cap) ) {
#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS) #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
ASSERT(interrupted); ASSERT(sched_state >= SCHED_INTERRUPTING);
#endif #endif
continue; // nothing to do continue; // nothing to do
} }
...@@ -684,10 +724,7 @@ run_thread: ...@@ -684,10 +724,7 @@ run_thread:
if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; } if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
if (ready_to_gc) { if (ready_to_gc) {
scheduleDoGC(cap,task,rtsFalse,GetRoots); cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
#if defined(THREADED_RTS)
cap = task->cap; // reload cap, it might have changed
#endif
} }
} /* end of while() */ } /* end of while() */
...@@ -924,10 +961,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task) ...@@ -924,10 +961,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
// they are unreachable and will therefore be sent an // they are unreachable and will therefore be sent an
// exception. Any threads thus released will be immediately // exception. Any threads thus released will be immediately
// runnable. // runnable.
scheduleDoGC( cap, task, rtsTrue/*force major GC*/, GetRoots ); cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/, GetRoots);
#if defined(THREADED_RTS)
cap = task->cap; // reload cap, it might have changed
#endif
recent_activity = ACTIVITY_DONE_GC; recent_activity = ACTIVITY_DONE_GC;
...@@ -949,7 +983,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task) ...@@ -949,7 +983,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
} }
// either we have threads to run, or we were interrupted: // either we have threads to run, or we were interrupted:
ASSERT(!emptyRunQueue(cap) || interrupted); ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
} }
#endif #endif
...@@ -1843,7 +1877,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) ...@@ -1843,7 +1877,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
if (task->ret) { if (task->ret) {
*(task->ret) = NULL; *(task->ret) = NULL;
} }
if (interrupted) { if (sched_state >= SCHED_INTERRUPTING) {
task->stat = Interrupted; task->stat = Interrupted;
} else { } else {
task->stat = Killed; task->stat = Killed;
...@@ -1895,7 +1929,7 @@ scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED ) ...@@ -1895,7 +1929,7 @@ scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
* Perform a garbage collection if necessary * Perform a garbage collection if necessary
* -------------------------------------------------------------------------- */ * -------------------------------------------------------------------------- */
static void static Capability *
scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
rtsBool force_major, void (*get_roots)(evac_fn)) rtsBool force_major, void (*get_roots)(evac_fn))
{ {
...@@ -1924,7 +1958,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, ...@@ -1924,7 +1958,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
IF_DEBUG(scheduler, sched_belch("someone else is trying to GC...")); IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
if (cap) yieldCapability(&cap,task); if (cap) yieldCapability(&cap,task);
} while (waiting_for_gc); } while (waiting_for_gc);
return; // NOTE: task->cap might have changed here return cap; // NOTE: task->cap might have changed here
} }
for (i=0; i < n_capabilities; i++) { for (i=0; i < n_capabilities; i++) {
...@@ -1984,6 +2018,16 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, ...@@ -1984,6 +2018,16 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
IF_DEBUG(scheduler, printAllThreads()); IF_DEBUG(scheduler, printAllThreads());
/*
* We now have all the capabilities; if we're in an interrupting
* state, then we should take the opportunity to delete all the
* threads in the system.
*/
if (sched_state >= SCHED_INTERRUPTING) {
deleteAllThreads(&capabilities[0]);
sched_state = SCHED_INTERRUPTED;
}
/* everybody back, start the GC. /* everybody back, start the GC.
* Could do it in this thread, or signal a condition var * Could do it in this thread, or signal a condition var
* to do it in another thread. Either way, we need to * to do it in another thread. Either way, we need to
...@@ -2019,6 +2063,8 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, ...@@ -2019,6 +2063,8 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
G_EVENTQ(0); G_EVENTQ(0);
G_CURR_THREADQ(0)); G_CURR_THREADQ(0));
#endif /* GRAN */ #endif /* GRAN */
return cap;
} }
/* --------------------------------------------------------------------------- /* ---------------------------------------------------------------------------
...@@ -2137,22 +2183,34 @@ forkProcess(HsStablePtr *entry ...@@ -2137,22 +2183,34 @@ forkProcess(HsStablePtr *entry
} }
/* --------------------------------------------------------------------------- /* ---------------------------------------------------------------------------
* Delete the threads on the run queue of the current capability. * Delete all the threads in the system
* ------------------------------------------------------------------------- */ * ------------------------------------------------------------------------- */
static void static void
deleteRunQueue (Capability *cap) deleteAllThreads ( Capability *cap )
{ {
StgTSO *t, *next; StgTSO* t, *next;
for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) { IF_DEBUG(scheduler,sched_belch("deleting all threads"));
ASSERT(t->what_next != ThreadRelocated); for (t = all_threads; t != END_TSO_QUEUE; t = next) {
next = t->link; if (t->what_next == ThreadRelocated) {
deleteThread(cap, t); next = t->link;
} } else {
} next = t->global_link;
deleteThread(cap,t);
}
}
/* startThread and insertThread are now in GranSim.c -- HWL */ // The run queue now contains a bunch of ThreadKilled threads. We
// must not throw these away: the main thread(s) will be in there
// somewhere, and the main scheduler loop has to deal with it.
// Also, the run queue is the only thing keeping these threads from
// being GC'd, and we don't want the "main thread has been GC'd" panic.
#if !defined(THREADED_RTS)
ASSERT(blocked_queue_hd == END_TSO_QUEUE);
ASSERT(sleeping_queue == END_TSO_QUEUE);
#endif
}
/* ----------------------------------------------------------------------------- /* -----------------------------------------------------------------------------
Managing the suspended_ccalling_tasks list. Managing the suspended_ccalling_tasks list.
...@@ -2702,7 +2760,7 @@ initScheduler(void) ...@@ -2702,7 +2760,7 @@ initScheduler(void)
all_threads = END_TSO_QUEUE; all_threads = END_TSO_QUEUE;
context_switch = 0; context_switch = 0;
interrupted = 0; sched_state = SCHED_RUNNING;
RtsFlags.ConcFlags.ctxtSwitchTicks = RtsFlags.ConcFlags.ctxtSwitchTicks =
RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS; RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
...@@ -2752,18 +2810,25 @@ initScheduler(void) ...@@ -2752,18 +2810,25 @@ initScheduler(void)
void void
exitScheduler( void ) exitScheduler( void )
{ {
interrupted = rtsTrue; Task *task = NULL;
shutting_down_scheduler = rtsTrue;
#if defined(THREADED_RTS)
ACQUIRE_LOCK(&sched_mutex);
task = newBoundTask();
RELEASE_LOCK(&sched_mutex);
#endif
// If we haven't killed all the threads yet, do it now.
if (sched_state < SCHED_INTERRUPTED) {
sched_state = SCHED_INTERRUPTING;
scheduleDoGC(NULL,task,rtsFalse,GetRoots);
}
sched_state = SCHED_SHUTTING_DOWN;
#if defined(THREADED_RTS) #if defined(THREADED_RTS)
{ {
Task *task;
nat i; nat i;
ACQUIRE_LOCK(&sched_mutex);
task = newBoundTask();
RELEASE_LOCK(&sched_mutex);
for (i = 0; i < n_capabilities; i++) { for (i = 0; i < n_capabilities; i++) {
shutdownCapability(&capabilities[i], task); shutdownCapability(&capabilities[i], task);
} }
...@@ -3273,7 +3338,7 @@ awakenBlockedQueue(Capability *cap, StgTSO *tso) ...@@ -3273,7 +3338,7 @@ awakenBlockedQueue(Capability *cap, StgTSO *tso)
void void
interruptStgRts(void) interruptStgRts(void)
{ {
interrupted = 1; sched_state = SCHED_INTERRUPTING;
context_switch = 1; context_switch = 1;
#if defined(THREADED_RTS) #if defined(THREADED_RTS)
prodAllCapabilities(); prodAllCapabilities();
...@@ -3581,6 +3646,11 @@ unblockThread(Capability *cap, StgTSO *tso) ...@@ -3581,6 +3646,11 @@ unblockThread(Capability *cap, StgTSO *tso)
tso->why_blocked = NotBlocked; tso->why_blocked = NotBlocked;
tso->block_info.closure = NULL; tso->block_info.closure = NULL;
appendToRunQueue(cap,tso); appendToRunQueue(cap,tso);
// We might have just migrated this TSO to our Capability:
if (tso->bound) {
tso->bound->cap = cap;
}
} }
#endif #endif
......
...@@ -106,15 +106,16 @@ void initThread(StgTSO *tso, nat stack_size); ...@@ -106,15 +106,16 @@ void initThread(StgTSO *tso, nat stack_size);
*/ */
extern int RTS_VAR(context_switch); extern int RTS_VAR(context_switch);
/* Interrupted flag. /* The state of the scheduler. This is used to control the sequence
* Locks required : none (makes one transition from false->true) * of events during shutdown, and when the runtime is interrupted
* using ^C.
*/ */
extern rtsBool RTS_VAR(interrupted); #define SCHED_RUNNING 0 /* running as normal */
#define SCHED_INTERRUPTING 1 /* ^C detected, before threads are deleted */
#define SCHED_INTERRUPTED 2 /* ^C detected, after threads deleted */
#define SCHED_SHUTTING_DOWN 3 /* final shutdown */
/* Shutdown flag. extern rtsBool RTS_VAR(sched_state);
* Locks required : none (makes one transition from false->true)
*/
extern rtsBool shutting_down_scheduler;
/* /*
* flag that tracks whether we have done any execution in this time slice. * flag that tracks whether we have done any execution in this time slice.
......
...@@ -48,41 +48,41 @@ handle_tick(int unused STG_UNUSED) ...@@ -48,41 +48,41 @@ handle_tick(int unused STG_UNUSED)
if (ticks_to_ctxt_switch <= 0) { if (ticks_to_ctxt_switch <= 0) {
ticks_to_ctxt_switch = RtsFlags.ConcFlags.ctxtSwitchTicks; ticks_to_ctxt_switch = RtsFlags.ConcFlags.ctxtSwitchTicks;
context_switch = 1; /* schedule a context switch */ context_switch = 1; /* schedule a context switch */
}
}
#if defined(THREADED_RTS) #if defined(THREADED_RTS)
/* /*
* If we've been inactive for idleGCDelayTicks (set by +RTS * If we've been inactive for idleGCDelayTicks (set by +RTS
* -I), tell the scheduler to wake up and do a GC, to check * -I), tell the scheduler to wake up and do a GC, to check
* for threads that are deadlocked. * for threads that are deadlocked.
*/
switch (recent_activity) {
case ACTIVITY_YES:
recent_activity = ACTIVITY_MAYBE_NO;
ticks_to_gc = RtsFlags.GcFlags.idleGCDelayTicks;
break;
case ACTIVITY_MAYBE_NO:
if (ticks_to_gc == 0) break; /* 0 ==> no idle GC */
ticks_to_gc--;
if (ticks_to_gc == 0) {
ticks_to_gc = RtsFlags.GcFlags.idleGCDelayTicks;
recent_activity = ACTIVITY_INACTIVE;
blackholes_need_checking = rtsTrue;
/* hack: re-use the blackholes_need_checking flag */
/* ToDo: this doesn't work. Can't invoke
* pthread_cond_signal from a signal handler.
* Furthermore, we can't prod a capability that we
* might be holding. What can we do?
*/ */
switch (recent_activity) { prodOneCapability();
case ACTIVITY_YES:
recent_activity = ACTIVITY_MAYBE_NO;
ticks_to_gc = RtsFlags.GcFlags.idleGCDelayTicks;
break;
case ACTIVITY_MAYBE_NO:
if (ticks_to_gc == 0) break; /* 0 ==> no idle GC */
ticks_to_gc--;
if (ticks_to_gc == 0) {
ticks_to_gc = RtsFlags.GcFlags.idleGCDelayTicks;
recent_activity = ACTIVITY_INACTIVE;
blackholes_need_checking = rtsTrue;
/* hack: re-use the blackholes_need_checking flag */
/* ToDo: this doesn't work. Can't invoke
* pthread_cond_signal from a signal handler.
* Furthermore, we can't prod a capability that we
* might be holding. What can we do?
*/
prodOneCapability();
}
break;
default:
break;
}
#endif
} }
break;
default:
break;
} }
#endif
} }
int int
......
...@@ -215,7 +215,7 @@ awaitEvent(rtsBool wait) ...@@ -215,7 +215,7 @@ awaitEvent(rtsBool wait)
/* we were interrupted, return to the scheduler immediately. /* we were interrupted, return to the scheduler immediately.
*/ */
if (interrupted) { if (sched_state >= SCHED_INTERRUPTING) {
return; /* still hold the lock */ return; /* still hold the lock */
} }
...@@ -272,7 +272,8 @@ awaitEvent(rtsBool wait) ...@@ -272,7 +272,8 @@ awaitEvent(rtsBool wait)
} }
} }
} while (wait && !interrupted && emptyRunQueue(&MainCapability)); } while (wait && sched_state == SCHED_RUNNING
&& emptyRunQueue(&MainCapability));
} }
#endif /* THREADED_RTS */ #endif /* THREADED_RTS */
...@@ -253,7 +253,7 @@ anyUserHandlers(void) ...@@ -253,7 +253,7 @@ anyUserHandlers(void)
void void
awaitUserSignals(void) awaitUserSignals(void)
{ {
while (!signals_pending() && !interrupted) { while (!signals_pending() && sched_state == SCHED_RUNNING) {
pause(); pause();
} }
} }
...@@ -432,7 +432,7 @@ shutdown_handler(int sig STG_UNUSED) ...@@ -432,7 +432,7 @@ shutdown_handler(int sig STG_UNUSED)
// If we're already trying to interrupt the RTS, terminate with // If we're already trying to interrupt the RTS, terminate with
// extreme prejudice. So the first ^C tries to exit the program // extreme prejudice. So the first ^C tries to exit the program
// cleanly, and the second one just kills it. // cleanly, and the second one just kills it.
if (interrupted) { if (sched_state >= SCHED_INTERRUPTING) {
stg_exit(EXIT_INTERRUPTED); stg_exit(EXIT_INTERRUPTED);
} else { } else {
interruptStgRts(); interruptStgRts();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment