Commit 9bae7915 authored by Simon Marlow's avatar Simon Marlow
Browse files

Support for reducing the number of Capabilities with setNumCapabilities

This patch allows setNumCapabilities to /reduce/ the number of active
capabilities as well as increase it.  This is particularly tricky to
do, because a Capability is a large data structure and ties into the
rest of the system in many ways.  Trying to clean it all up would be
extremely error prone.

So instead, the solution is to mark the extra capabilities as
"disabled".  This has the following consequences:

  - threads on a disabled capability are migrated away by the
    scheduler loop

  - disabled capabilities do not participate in GC
    (see scheduleDoGC())

  - No spark threads are created on this capability
    (see scheduleActivateSpark())

  - We do not attempt to migrate threads *to* a disabled
    capability (see schedulePushWork()).

So a disabled capability should do no work, and does not participate
in GC, although it remains alive in other respects.  For example, a
blocked thread might wake up on a disabled capability, and it will get
quickly migrated to a live capability.  A disabled capability can
still initiate GC if necessary.  Indeed, it turns out to be hard to
migrate bound threads, so we wait until the next GC to do this (see
comments for details).
parent dff852b1
......@@ -34,6 +34,7 @@
Capability MainCapability;
nat n_capabilities = 0;
nat enabled_capabilities = 0;
Capability *capabilities = NULL;
// Holds the Capability which last became free. This is used so that
......@@ -323,6 +324,8 @@ initCapabilities( void )
#endif
enabled_capabilities = n_capabilities;
// There are no free capabilities to begin with. We will start
// a worker Task to each Capability, which will quickly put the
// Capability on the free list when it finds nothing to do.
......@@ -493,7 +496,7 @@ releaseCapability_ (Capability* cap,
// anything else to do, give the Capability to a worker thread.
if (always_wakeup ||
!emptyRunQueue(cap) || !emptyInbox(cap) ||
!emptySparkPoolCap(cap) || globalWorkToDo()) {
(!cap->disabled && !emptySparkPoolCap(cap)) || globalWorkToDo()) {
if (cap->spare_workers) {
giveCapabilityToTask(cap,cap->spare_workers);
// The worker Task pops itself from the queue;
......@@ -682,7 +685,8 @@ yieldCapability (Capability** pCap, Task *task)
gcWorkerThread(cap);
traceEventGcEnd(cap);
traceSparkCounters(cap);
return;
// See Note [migrated bound threads 2]
if (task->cap == cap) return;
}
debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
......@@ -768,6 +772,17 @@ yieldCapability (Capability** pCap, Task *task)
// hold Capabilty C, and task->cap == C, then task cannot be
// migrated under our feet.
// Note [migrated bound threads 2]
//
// Second tricky case;
// - A bound Task becomes a GC thread
// - scheduleDoGC() migrates the thread belonging to this Task,
// because the Capability it is on is disabled
// - after GC, gcWorkerThread() returns, but now we are
// holding a Capability that is not the same as task->cap
// - Hence we must check for this case and immediately give up the
// cap we hold.
/* ----------------------------------------------------------------------------
* prodCapability
*
......
......@@ -49,6 +49,8 @@ struct Capability_ {
// Has there been any activity on this Capability since the last GC?
nat idle;
rtsBool disabled;
// The run queue. The Task owning this Capability has exclusive
// access to its run queue, so can wake up threads without
// taking a lock, and the common path through the scheduler is
......@@ -197,6 +199,8 @@ INLINE_HEADER void releaseCapability_ (Capability* cap STG_UNUSED,
// declared in includes/rts/Threads.h:
// extern nat n_capabilities;
extern nat enabled_capabilities;
// Array of all the capabilities
//
extern Capability *capabilities;
......
......@@ -133,7 +133,7 @@ static Capability *schedule (Capability *initialCapability, Task *task);
// scheduler clearer.
//
static void schedulePreLoop (void);
static void scheduleFindWork (Capability *cap);
static void scheduleFindWork (Capability **pcap);
#if defined(THREADED_RTS)
static void scheduleYield (Capability **pcap, Task *task);
#endif
......@@ -145,8 +145,8 @@ static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
static void scheduleProcessInbox(Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
static void scheduleProcessInbox(Capability **cap);
static void scheduleDetectDeadlock (Capability **pcap, Task *task);
static void schedulePushWork(Capability *cap, Task *task);
#if defined(THREADED_RTS)
static void scheduleActivateSpark(Capability *cap);
......@@ -159,8 +159,7 @@ static void scheduleHandleThreadBlocked( StgTSO *t );
static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
StgTSO *t );
static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
static Capability *scheduleDoGC(Capability *cap, Task *task,
rtsBool force_major);
static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
static void deleteThread (Capability *cap, StgTSO *tso);
static void deleteAllThreads (Capability *cap);
......@@ -281,7 +280,7 @@ schedule (Capability *initialCapability, Task *task)
case SCHED_INTERRUPTING:
debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
/* scheduleDoGC() deletes all the threads */
cap = scheduleDoGC(cap,task,rtsFalse);
scheduleDoGC(&cap,task,rtsFalse);
// after scheduleDoGC(), we must be shutting down. Either some
// other Capability did the final GC, or we did it above,
......@@ -303,17 +302,13 @@ schedule (Capability *initialCapability, Task *task)
barf("sched_state: %d", sched_state);
}
scheduleFindWork(cap);
scheduleFindWork(&cap);
/* work pushing, currently relevant only for THREADED_RTS:
(pushes threads, wakes up idle capabilities for stealing) */
schedulePushWork(cap,task);
scheduleDetectDeadlock(cap,task);
#if defined(THREADED_RTS)
cap = task->cap; // reload cap, it might have changed
#endif
scheduleDetectDeadlock(&cap,task);
// Normally, the only way we can get here with no threads to
// run is if a keyboard interrupt received during
......@@ -396,6 +391,26 @@ schedule (Capability *initialCapability, Task *task)
deleteThread(cap,t);
}
// If this capability is disabled, migrate the thread away rather
// than running it. NB. but not if the thread is bound: it is
// really hard for a bound thread to migrate itself. Believe me,
// I tried several ways and couldn't find a way to do it.
// Instead, when everything is stopped for GC, we migrate all the
// threads on the run queue then (see scheduleDoGC()).
//
// ToDo: what about TSO_LOCKED? Currently we're migrating those
// when the number of capabilities drops, but we never migrate
// them back if it rises again. Presumably we should, but after
// the thread has been migrated we no longer know what capability
// it was originally on.
#ifdef THREADED_RTS
if (cap->disabled && !t->bound) {
Capability *dest_cap = &capabilities[cap->no % enabled_capabilities];
migrateThread(cap, t, dest_cap);
continue;
}
#endif
/* context switches are initiated by the timer signal, unless
* the user specified "context switch as often as possible", with
* +RTS -C0
......@@ -558,7 +573,7 @@ run_thread:
}
if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
cap = scheduleDoGC(cap,task,rtsFalse);
scheduleDoGC(&cap,task,rtsFalse);
}
} /* end of while() */
}
......@@ -608,16 +623,16 @@ schedulePreLoop(void)
* -------------------------------------------------------------------------- */
static void
scheduleFindWork (Capability *cap)
scheduleFindWork (Capability **pcap)
{
scheduleStartSignalHandlers(cap);
scheduleStartSignalHandlers(*pcap);
scheduleProcessInbox(cap);
scheduleProcessInbox(pcap);
scheduleCheckBlockedThreads(cap);
scheduleCheckBlockedThreads(*pcap);
#if defined(THREADED_RTS)
if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
#endif
}
......@@ -707,10 +722,10 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
// First grab as many free Capabilities as we can.
for (i=0, n_free_caps=0; i < n_capabilities; i++) {
cap0 = &capabilities[i];
if (cap != cap0 && tryGrabCapability(cap0,task)) {
if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
if (!emptyRunQueue(cap0)
|| cap->returning_tasks_hd != NULL
|| cap->inbox != (Message*)END_TSO_QUEUE) {
|| cap0->returning_tasks_hd != NULL
|| cap0->inbox != (Message*)END_TSO_QUEUE) {
// it already has some work, we just grabbed it at
// the wrong moment. Or maybe it's deadlocked!
releaseCapability(cap0);
......@@ -869,9 +884,10 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
* ------------------------------------------------------------------------- */
static void
scheduleDetectDeadlock (Capability *cap, Task *task)
scheduleDetectDeadlock (Capability **pcap, Task *task)
{
/*
Capability *cap = *pcap;
/*
* Detect deadlock: when we have no threads to run, there are no
* threads blocked, waiting for I/O, or sleeping, and all the
* other tasks are waiting for work, we must have a deadlock of
......@@ -896,7 +912,8 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
// they are unreachable and will therefore be sent an
// exception. Any threads thus released will be immediately
// runnable.
cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
cap = *pcap;
// when force_major == rtsTrue. scheduleDoGC sets
// recent_activity to ACTIVITY_DONE_GC and turns off the timer
// signal.
......@@ -976,16 +993,18 @@ scheduleSendPendingMessages(void)
* ------------------------------------------------------------------------- */
static void
scheduleProcessInbox (Capability *cap USED_IF_THREADS)
scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
{
#if defined(THREADED_RTS)
Message *m, *next;
int r;
Capability *cap = *pcap;
while (!emptyInbox(cap)) {
if (cap->r.rCurrentNursery->link == NULL ||
g0->n_new_large_words >= large_alloc_lim) {
scheduleDoGC(cap, cap->running_task, rtsFalse);
scheduleDoGC(pcap, cap->running_task, rtsFalse);
cap = *pcap;
}
// don't use a blocking acquire; if the lock is held by
......@@ -1023,7 +1042,7 @@ scheduleProcessInbox (Capability *cap USED_IF_THREADS)
static void
scheduleActivateSpark(Capability *cap)
{
if (anySparks())
if (anySparks() && !cap->disabled)
{
createSparkThread(cap);
debugTrace(DEBUG_sched, "creating a spark thread");
......@@ -1415,21 +1434,24 @@ static void releaseAllCapabilities(Capability *cap, Task *task)
* Perform a garbage collection if necessary
* -------------------------------------------------------------------------- */
static Capability *
scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
static void
scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
rtsBool force_major)
{
Capability *cap = *pcap;
rtsBool heap_census;
#ifdef THREADED_RTS
rtsBool idle_cap[n_capabilities];
rtsBool gc_type;
nat i, sync;
StgTSO *tso;
#endif
if (sched_state == SCHED_SHUTTING_DOWN) {
// The final GC has already been done, and the system is
// shutting down. We'll probably deadlock if we try to GC
// now.
return cap;
return;
}
#ifdef THREADED_RTS
......@@ -1459,12 +1481,19 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
yieldCapability() and releaseCapability() in Capability.c */
do {
sync = requestSync(&cap, task, gc_type);
sync = requestSync(pcap, task, gc_type);
cap = *pcap;
if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) {
// someone else had a pending sync request for a GC, so
// let's assume GC has been done and we don't need to GC
// again.
return cap;
return;
}
if (sched_state == SCHED_SHUTTING_DOWN) {
// The scheduler might now be shutting down. We tested
// this above, but it might have become true since then as
// we yielded the capability in requestSync().
return;
}
} while (sync);
......@@ -1502,11 +1531,18 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
|| (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
N >= RtsFlags.ParFlags.parGcLoadBalancingGen)) {
for (i=0; i < n_capabilities; i++) {
idle_cap[i] = rtsFalse;
if (capabilities[i].disabled) {
idle_cap[i] = tryGrabCapability(&capabilities[i], task);
} else {
idle_cap[i] = rtsFalse;
}
}
} else {
for (i=0; i < n_capabilities; i++) {
if (i == cap->no || capabilities[i].idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) {
if (capabilities[i].disabled) {
idle_cap[i] = tryGrabCapability(&capabilities[i], task);
} else if (i == cap->no ||
capabilities[i].idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) {
idle_cap[i] = rtsFalse;
} else {
idle_cap[i] = tryGrabCapability(&capabilities[i], task);
......@@ -1570,6 +1606,29 @@ delete_threads_and_gc:
sched_state = SCHED_SHUTTING_DOWN;
}
/*
* When there are disabled capabilities, we want to migrate any
* threads away from them. Normally this happens in the
* scheduler's loop, but only for unbound threads - it's really
* hard for a bound thread to migrate itself. So we have another
* go here.
*/
#if defined(THREADED_RTS)
for (i = enabled_capabilities; i < n_capabilities; i++) {
Capability *tmp_cap, *dest_cap;
tmp_cap = &capabilities[i];
ASSERT(tmp_cap->disabled);
if (i != cap->no) {
dest_cap = &capabilities[i % enabled_capabilities];
while (!emptyRunQueue(tmp_cap)) {
tso = popRunQueue(tmp_cap);
migrateThread(tmp_cap, tso, dest_cap);
if (tso->bound) { tso->bound->task->cap = dest_cap; }
}
}
}
#endif
heap_census = scheduleNeedHeapProfile(rtsTrue);
traceEventGcStart(cap);
......@@ -1663,7 +1722,7 @@ delete_threads_and_gc:
}
#endif
return cap;
return;
}
/* ---------------------------------------------------------------------------
......@@ -1848,7 +1907,7 @@ forkProcess(HsStablePtr *entry
}
/* ---------------------------------------------------------------------------
* Increase the number of Capabilities
* Changing the number of Capabilities
*
* Changing the number of Capabilities is very tricky! We can only do
* it with the system fully stopped, so we do a full sync with
......@@ -1873,17 +1932,13 @@ setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
Capability *cap;
nat sync;
StgTSO* t;
nat g;
Capability *old_capabilities;
if (new_n_capabilities == n_capabilities) return;
nat g, n;
Capability *old_capabilities = NULL;
if (new_n_capabilities < n_capabilities) {
barf("setNumCapabilities: reducing the number of Capabilities is not currently supported.");
}
if (new_n_capabilities == enabled_capabilities) return;
debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
n_capabilities, new_n_capabilities);
enabled_capabilities, new_n_capabilities);
cap = rts_lock();
task = cap->running_task;
......@@ -1896,31 +1951,76 @@ setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
pending_sync = 0;
if (new_n_capabilities < enabled_capabilities)
{
// Reducing the number of capabilities: we do not actually
// remove the extra capabilities, we just mark them as
// "disabled". This has the following effects:
//
// - threads on a disabled capability are migrated away by the
// scheduler loop
//
// - disabled capabilities do not participate in GC
// (see scheduleDoGC())
//
// - No spark threads are created on this capability
// (see scheduleActivateSpark())
//
// - We do not attempt to migrate threads *to* a disabled
// capability (see schedulePushWork()).
//
// but in other respects, a disabled capability remains
// alive. Threads may be woken up on a disabled capability,
// but they will be immediately migrated away.
//
// This approach is much easier than trying to actually remove
// the capability; we don't have to worry about GC data
// structures, the nursery, etc.
//
for (n = new_n_capabilities; n < enabled_capabilities; n++) {
capabilities[n].disabled = rtsTrue;
}
enabled_capabilities = new_n_capabilities;
}
else
{
// Increasing the number of enabled capabilities.
//
// enable any disabled capabilities, up to the required number
for (n = enabled_capabilities;
n < new_n_capabilities && n < n_capabilities; n++) {
capabilities[n].disabled = rtsFalse;
}
enabled_capabilities = n;
if (new_n_capabilities > n_capabilities) {
#if defined(TRACING)
// Allocate eventlog buffers for the new capabilities. Note this
// must be done before calling moreCapabilities(), because that
// will emit events to add the new capabilities to capsets.
tracingAddCapapilities(n_capabilities, new_n_capabilities);
// Allocate eventlog buffers for the new capabilities. Note this
// must be done before calling moreCapabilities(), because that
// will emit events to add the new capabilities to capsets.
tracingAddCapapilities(n_capabilities, new_n_capabilities);
#endif
// Resize the capabilities array
// NB. after this, capabilities points somewhere new. Any pointers
// of type (Capability *) are now invalid.
old_capabilities = moreCapabilities(n_capabilities, new_n_capabilities);
// Resize the capabilities array
// NB. after this, capabilities points somewhere new. Any pointers
// of type (Capability *) are now invalid.
old_capabilities = moreCapabilities(n_capabilities, new_n_capabilities);
// update our own cap pointer
cap = &capabilities[cap->no];
// update our own cap pointer
cap = &capabilities[cap->no];
// Resize and update storage manager data structures
storageAddCapabilities(n_capabilities, new_n_capabilities);
// Resize and update storage manager data structures
storageAddCapabilities(n_capabilities, new_n_capabilities);
// Update (Capability *) refs in the Task manager.
updateCapabilityRefs();
// Update (Capability *) refs in the Task manager.
updateCapabilityRefs();
// Update (Capability *) refs from TSOs
for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
for (t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) {
t->cap = &capabilities[t->cap->no];
// Update (Capability *) refs from TSOs
for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
for (t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) {
t->cap = &capabilities[t->cap->no];
}
}
}
}
......@@ -1931,7 +2031,9 @@ setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
startWorkerTasks(n_capabilities, new_n_capabilities);
// finally, update n_capabilities
n_capabilities = new_n_capabilities;
if (new_n_capabilities > n_capabilities) {
n_capabilities = enabled_capabilities = new_n_capabilities;
}
// We can't free the old array until now, because we access it
// while updating pointers in updateCapabilityRefs().
......@@ -2177,7 +2279,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
// move this thread from now on.
#if defined(THREADED_RTS)
cpu %= n_capabilities;
cpu %= enabled_capabilities;
if (cpu == cap->no) {
appendToRunQueue(cap,tso);
} else {
......@@ -2332,10 +2434,11 @@ exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
// If we haven't killed all the threads yet, do it now.
if (sched_state < SCHED_SHUTTING_DOWN) {
sched_state = SCHED_INTERRUPTING;
waitForReturnCapability(&task->cap,task);
scheduleDoGC(task->cap,task,rtsFalse);
Capability *cap = task->cap;
waitForReturnCapability(&cap,task);
scheduleDoGC(&cap,task,rtsFalse);
ASSERT(task->incall->tso == NULL);
releaseCapability(task->cap);
releaseCapability(cap);
}
sched_state = SCHED_SHUTTING_DOWN;
......@@ -2394,15 +2497,16 @@ static void
performGC_(rtsBool force_major)
{
Task *task;
Capability *cap = NULL;
// We must grab a new Task here, because the existing Task may be
// associated with a particular Capability, and chained onto the
// suspended_ccalls queue.
task = newBoundTask();
waitForReturnCapability(&task->cap,task);
scheduleDoGC(task->cap,task,force_major);
releaseCapability(task->cap);
waitForReturnCapability(&cap,task);
scheduleDoGC(&cap,task,force_major);
releaseCapability(cap);
boundTaskExiting(task);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment