Commit aa779e09 authored by Simon Marlow's avatar Simon Marlow
Browse files

Don't move Capabilities in setNumCapabilities (#8209)

We have various problems with reallocating the array of Capabilities,
due to threads in waitForReturnCapability that are already holding a
pointer to a Capability.

Rather than add more locking to make this safer, I decided it would be
easier to ensure that we never move the Capabilities at all.  The
capabilities array is now an array of pointers to Capabaility.  There
are extra indirections, but it rarely matters - we don't often access
Capabilities via the array, normally we already have a pointer to
one.  I ran the parallel benchmarks and didn't see any difference.
parent 5a3918fe
......@@ -35,7 +35,13 @@ Capability MainCapability;
nat n_capabilities = 0;
nat enabled_capabilities = 0;
Capability *capabilities = NULL;
// The array of Capabilities. It's important that when we need
// to allocate more Capabilities we don't have to move the existing
// Capabilities, because there may be pointers to them in use
// (e.g. threads in waitForReturnCapability(), see #8209), so this is
// an array of Capability* rather than an array of Capability.
Capability **capabilities = NULL;
// Holds the Capability which last became free. This is used so that
// an in-call has a chance of quickly finding a free Capability.
......@@ -126,7 +132,7 @@ findSpark (Capability *cap)
/* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
start at a random place instead of 0 as well. */
for ( i=0 ; i < n_capabilities ; i++ ) {
robbed = &capabilities[i];
robbed = capabilities[i];
if (cap == robbed) // ourselves...
continue;
......@@ -169,7 +175,7 @@ anySparks (void)
nat i;
for (i=0; i < n_capabilities; i++) {
if (!emptySparkPoolCap(&capabilities[i])) {
if (!emptySparkPoolCap(capabilities[i])) {
return rtsTrue;
}
}
......@@ -323,7 +329,8 @@ initCapabilities( void )
#else /* !THREADED_RTS */
n_capabilities = 1;
capabilities = &MainCapability;
capabilities = stgMallocBytes(sizeof(Capability*), "initCapabilities");
capabilities[0] = &MainCapability;
initCapability(&MainCapability, 0);
#endif
......@@ -333,46 +340,40 @@ initCapabilities( void )
// There are no free capabilities to begin with. We will start
// a worker Task to each Capability, which will quickly put the
// Capability on the free list when it finds nothing to do.
last_free_capability = &capabilities[0];
last_free_capability = capabilities[0];
}
Capability *
void
moreCapabilities (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
{
#if defined(THREADED_RTS)
nat i;
Capability *old_capabilities = capabilities;
Capability **old_capabilities = capabilities;
capabilities = stgMallocBytes(to * sizeof(Capability*), "moreCapabilities");
if (to == 1) {
// THREADED_RTS must work on builds that don't have a mutable
// BaseReg (eg. unregisterised), so in this case
// capabilities[0] must coincide with &MainCapability.
capabilities = &MainCapability;
} else {
capabilities = stgMallocBytes(to * sizeof(Capability),
"moreCapabilities");
if (from > 0) {
memcpy(capabilities, old_capabilities, from * sizeof(Capability));
}
capabilities[0] = &MainCapability;
}
for (i = from; i < to; i++) {
initCapability(&capabilities[i], i);
for (i = 0; i < to; i++) {
if (i < from) {
capabilities[i] = old_capabilities[i];
} else {
capabilities[i] = stgMallocBytes(sizeof(Capability),
"moreCapabilities");
initCapability(capabilities[i], i);
}
}
last_free_capability = &capabilities[0];
debugTrace(DEBUG_sched, "allocated %d more capabilities", to - from);
// Return the old array to free later.
if (from > 1) {
return old_capabilities;
} else {
return NULL;
if (old_capabilities != NULL) {
stgFree(old_capabilities);
}
#else
return NULL;
#endif
}
......@@ -385,7 +386,7 @@ void contextSwitchAllCapabilities(void)
{
nat i;
for (i=0; i < n_capabilities; i++) {
contextSwitchCapability(&capabilities[i]);
contextSwitchCapability(capabilities[i]);
}
}
......@@ -393,7 +394,7 @@ void interruptAllCapabilities(void)
{
nat i;
for (i=0; i < n_capabilities; i++) {
interruptCapability(&capabilities[i]);
interruptCapability(capabilities[i]);
}
}
......@@ -606,8 +607,8 @@ waitForReturnCapability (Capability **pCap, Task *task)
// otherwise, search for a free capability
cap = NULL;
for (i = 0; i < n_capabilities; i++) {
if (!capabilities[i].running_task) {
cap = &capabilities[i];
if (!capabilities[i]->running_task) {
cap = capabilities[i];
break;
}
}
......@@ -955,7 +956,7 @@ shutdownCapabilities(Task *task, rtsBool safe)
nat i;
for (i=0; i < n_capabilities; i++) {
ASSERT(task->incall->tso == NULL);
shutdownCapability(&capabilities[i], task, safe);
shutdownCapability(capabilities[i], task, safe);
}
#if defined(THREADED_RTS)
ASSERT(checkSparkCountInvariant());
......@@ -981,11 +982,13 @@ freeCapabilities (void)
#if defined(THREADED_RTS)
nat i;
for (i=0; i < n_capabilities; i++) {
freeCapability(&capabilities[i]);
freeCapability(capabilities[i]);
stgFree(capabilities[i]);
}
#else
freeCapability(&MainCapability);
#endif
stgFree(capabilities);
traceCapsetDelete(CAPSET_OSPROCESS_DEFAULT);
traceCapsetDelete(CAPSET_CLOCKDOMAIN_DEFAULT);
}
......@@ -1032,7 +1035,7 @@ markCapabilities (evac_fn evac, void *user)
{
nat n;
for (n = 0; n < n_capabilities; n++) {
markCapability(evac, user, &capabilities[n], rtsFalse);
markCapability(evac, user, capabilities[n], rtsFalse);
}
}
......@@ -1044,13 +1047,13 @@ rtsBool checkSparkCountInvariant (void)
nat i;
for (i = 0; i < n_capabilities; i++) {
sparks.created += capabilities[i].spark_stats.created;
sparks.dud += capabilities[i].spark_stats.dud;
sparks.overflowed+= capabilities[i].spark_stats.overflowed;
sparks.converted += capabilities[i].spark_stats.converted;
sparks.gcd += capabilities[i].spark_stats.gcd;
sparks.fizzled += capabilities[i].spark_stats.fizzled;
remaining += sparkPoolSize(capabilities[i].sparks);
sparks.created += capabilities[i]->spark_stats.created;
sparks.dud += capabilities[i]->spark_stats.dud;
sparks.overflowed+= capabilities[i]->spark_stats.overflowed;
sparks.converted += capabilities[i]->spark_stats.converted;
sparks.gcd += capabilities[i]->spark_stats.gcd;
sparks.fizzled += capabilities[i]->spark_stats.fizzled;
remaining += sparkPoolSize(capabilities[i]->sparks);
}
/* The invariant is
......
......@@ -132,8 +132,8 @@ struct Capability_ {
StgTRecHeader *free_trec_headers;
nat transaction_tokens;
} // typedef Capability is defined in RtsAPI.h
// Capabilities are stored in an array, so make sure that adjacent
// Capabilities don't share any cache-lines:
// We never want a Capability to overlap a cache line with anything
// else, so round it up to a cache line size:
#ifndef mingw32_HOST_OS
ATTRIBUTE_ALIGNED(64)
#endif
......@@ -181,7 +181,7 @@ void initCapabilities (void);
// Add and initialise more Capabilities
//
Capability * moreCapabilities (nat from, nat to);
void moreCapabilities (nat from, nat to);
// Release a capability. This is called by a Task that is exiting
// Haskell to make a foreign call, or in various other cases when we
......@@ -211,7 +211,7 @@ INLINE_HEADER void releaseCapability_ (Capability* cap STG_UNUSED,
// Array of all the capabilities
//
extern Capability *capabilities;
extern Capability **capabilities;
// The Capability that was last free. Used as a good guess for where
// to assign new threads.
......
......@@ -149,7 +149,7 @@ initProfiling1 (void)
{
nat n;
for (n=0; n < n_capabilities; n++) {
capabilities[n].r.rCCCS = CCS_SYSTEM;
capabilities[n]->r.rCCCS = CCS_SYSTEM;
}
}
......
......@@ -76,7 +76,7 @@ handleProfTick(void)
if (do_prof_ticks) {
nat n;
for (n=0; n < n_capabilities; n++) {
capabilities[n].r.rCCCS->time_ticks++;
capabilities[n]->r.rCCCS->time_ticks++;
}
}
#endif
......
......@@ -1789,7 +1789,7 @@ computeRetainerSet( void )
// because we can find MUT_VAR objects which have not been
// visited during retainer profiling.
for (n = 0; n < n_capabilities; n++) {
for (bd = capabilities[n].mut_lists[g]; bd != NULL; bd = bd->link) {
for (bd = capabilities[n]->mut_lists[g]; bd != NULL; bd = bd->link) {
for (ml = bd->start; ml < bd->free; ml++) {
maybeInitRetainerSet((StgClosure *)*ml);
......
......@@ -376,7 +376,7 @@ schedule (Capability *initialCapability, Task *task)
// it was originally on.
#ifdef THREADED_RTS
if (cap->disabled && !t->bound) {
Capability *dest_cap = &capabilities[cap->no % enabled_capabilities];
Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
migrateThread(cap, t, dest_cap);
continue;
}
......@@ -716,7 +716,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
// First grab as many free Capabilities as we can.
for (i=0, n_free_caps=0; i < n_capabilities; i++) {
cap0 = &capabilities[i];
cap0 = capabilities[i];
if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
if (!emptyRunQueue(cap0)
|| cap0->returning_tasks_hd != NULL
......@@ -1395,7 +1395,7 @@ static void acquireAllCapabilities(Capability *cap, Task *task)
for (i=0; i < n_capabilities; i++) {
debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", i, n_capabilities);
tmpcap = &capabilities[i];
tmpcap = capabilities[i];
if (tmpcap != cap) {
// we better hope this task doesn't get migrated to
// another Capability while we're waiting for this one.
......@@ -1418,8 +1418,8 @@ static void releaseAllCapabilities(nat n, Capability *cap, Task *task)
for (i = 0; i < n; i++) {
if (cap->no != i) {
task->cap = &capabilities[i];
releaseCapability(&capabilities[i]);
task->cap = capabilities[i];
releaseCapability(capabilities[i]);
}
}
task->cap = cap;
......@@ -1540,21 +1540,21 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
|| (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen)) {
for (i=0; i < n_capabilities; i++) {
if (capabilities[i].disabled) {
idle_cap[i] = tryGrabCapability(&capabilities[i], task);
if (capabilities[i]->disabled) {
idle_cap[i] = tryGrabCapability(capabilities[i], task);
} else {
idle_cap[i] = rtsFalse;
}
}
} else {
for (i=0; i < n_capabilities; i++) {
if (capabilities[i].disabled) {
idle_cap[i] = tryGrabCapability(&capabilities[i], task);
if (capabilities[i]->disabled) {
idle_cap[i] = tryGrabCapability(capabilities[i], task);
} else if (i == cap->no ||
capabilities[i].idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) {
capabilities[i]->idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) {
idle_cap[i] = rtsFalse;
} else {
idle_cap[i] = tryGrabCapability(&capabilities[i], task);
idle_cap[i] = tryGrabCapability(capabilities[i], task);
if (!idle_cap[i]) {
n_failed_trygrab_idles++;
} else {
......@@ -1575,7 +1575,7 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
for (i=0; i < n_capabilities; i++) {
gc_threads[i]->idle = idle_cap[i];
capabilities[i].idle++;
capabilities[i]->idle++;
}
// For all capabilities participating in this GC, wait until
......@@ -1606,10 +1606,10 @@ delete_threads_and_gc:
// threads. It just avoids the GC having to do any work to
// figure out that any remaining sparks are garbage.
for (i = 0; i < n_capabilities; i++) {
capabilities[i].spark_stats.gcd +=
sparkPoolSize(capabilities[i].sparks);
capabilities[i]->spark_stats.gcd +=
sparkPoolSize(capabilities[i]->sparks);
// No race here since all Caps are stopped.
discardSparksCap(&capabilities[i]);
discardSparksCap(capabilities[i]);
}
#endif
sched_state = SCHED_SHUTTING_DOWN;
......@@ -1625,10 +1625,10 @@ delete_threads_and_gc:
#if defined(THREADED_RTS)
for (i = enabled_capabilities; i < n_capabilities; i++) {
Capability *tmp_cap, *dest_cap;
tmp_cap = &capabilities[i];
tmp_cap = capabilities[i];
ASSERT(tmp_cap->disabled);
if (i != cap->no) {
dest_cap = &capabilities[i % enabled_capabilities];
dest_cap = capabilities[i % enabled_capabilities];
while (!emptyRunQueue(tmp_cap)) {
tso = popRunQueue(tmp_cap);
migrateThread(tmp_cap, tso, dest_cap);
......@@ -1703,11 +1703,11 @@ delete_threads_and_gc:
for (i = 0; i < n_capabilities; i++) {
if (i != cap->no) {
if (idle_cap[i]) {
ASSERT(capabilities[i].running_task == task);
task->cap = &capabilities[i];
releaseCapability(&capabilities[i]);
ASSERT(capabilities[i]->running_task == task);
task->cap = capabilities[i];
releaseCapability(capabilities[i]);
} else {
ASSERT(capabilities[i].running_task != task);
ASSERT(capabilities[i]->running_task != task);
}
}
}
......@@ -1799,7 +1799,7 @@ forkProcess(HsStablePtr *entry
ACQUIRE_LOCK(&task->lock);
for (i=0; i < n_capabilities; i++) {
ACQUIRE_LOCK(&capabilities[i].lock);
ACQUIRE_LOCK(&capabilities[i]->lock);
}
stopTimer(); // See #4074
......@@ -1820,8 +1820,8 @@ forkProcess(HsStablePtr *entry
RELEASE_LOCK(&task->lock);
for (i=0; i < n_capabilities; i++) {
releaseCapability_(&capabilities[i],rtsFalse);
RELEASE_LOCK(&capabilities[i].lock);
releaseCapability_(capabilities[i],rtsFalse);
RELEASE_LOCK(&capabilities[i]->lock);
}
boundTaskExiting(task);
......@@ -1837,7 +1837,7 @@ forkProcess(HsStablePtr *entry
initMutex(&task->lock);
for (i=0; i < n_capabilities; i++) {
initMutex(&capabilities[i].lock);
initMutex(&capabilities[i]->lock);
}
#endif
......@@ -1871,7 +1871,7 @@ forkProcess(HsStablePtr *entry
discardTasksExcept(task);
for (i=0; i < n_capabilities; i++) {
cap = &capabilities[i];
cap = capabilities[i];
// Empty the run queue. It seems tempting to let all the
// killed threads stay on the run queue as zombies to be
......@@ -1900,7 +1900,7 @@ forkProcess(HsStablePtr *entry
releaseCapability(cap);
}
}
cap = &capabilities[0];
cap = capabilities[0];
task->cap = cap;
// Empty the threads lists. Otherwise, the garbage
......@@ -1965,8 +1965,7 @@ setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
Task *task;
Capability *cap;
nat sync;
StgTSO* t;
nat g, n;
nat n;
Capability *old_capabilities = NULL;
nat old_n_capabilities = n_capabilities;
......@@ -2013,8 +2012,8 @@ setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
// structures, the nursery, etc.
//
for (n = new_n_capabilities; n < enabled_capabilities; n++) {
capabilities[n].disabled = rtsTrue;
traceCapDisable(&capabilities[n]);
capabilities[n]->disabled = rtsTrue;
traceCapDisable(capabilities[n]);
}
enabled_capabilities = new_n_capabilities;
}
......@@ -2025,8 +2024,8 @@ setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
// enable any disabled capabilities, up to the required number
for (n = enabled_capabilities;
n < new_n_capabilities && n < n_capabilities; n++) {
capabilities[n].disabled = rtsFalse;
traceCapEnable(&capabilities[n]);
capabilities[n]->disabled = rtsFalse;
traceCapEnable(capabilities[n]);
}
enabled_capabilities = n;
......@@ -2042,23 +2041,10 @@ setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
// Resize the capabilities array
// NB. after this, capabilities points somewhere new. Any pointers
// of type (Capability *) are now invalid.
old_capabilities = moreCapabilities(n_capabilities, new_n_capabilities);
// update our own cap pointer
cap = &capabilities[cap->no];
moreCapabilities(n_capabilities, new_n_capabilities);
// Resize and update storage manager data structures
storageAddCapabilities(n_capabilities, new_n_capabilities);
// Update (Capability *) refs in the Task manager.
updateCapabilityRefs();
// Update (Capability *) refs from TSOs
for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
for (t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) {
t->cap = &capabilities[t->cap->no];
}
}
}
}
......@@ -2324,7 +2310,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
if (cpu == cap->no) {
appendToRunQueue(cap,tso);
} else {
migrateThread(cap, tso, &capabilities[cpu]);
migrateThread(cap, tso, capabilities[cpu]);
}
#else
appendToRunQueue(cap,tso);
......@@ -2407,7 +2393,7 @@ startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
Capability *cap;
for (i = from; i < to; i++) {
cap = &capabilities[i];
cap = capabilities[i];
ACQUIRE_LOCK(&cap->lock);
startWorkerTask(cap);
RELEASE_LOCK(&cap->lock);
......@@ -2506,9 +2492,6 @@ freeScheduler( void )
// Capability).
if (still_running == 0) {
freeCapabilities();
if (n_capabilities != 1) {
stgFree(capabilities);
}
}
RELEASE_LOCK(&sched_mutex);
#if defined(THREADED_RTS)
......
......@@ -346,10 +346,10 @@ calcTotalAllocated(void)
W_ tot_alloc = 0;
W_ n;
for (n = 0; n < n_capabilities; n++) {
tot_alloc += capabilities[n].total_allocated;
traceEventHeapAllocated(&capabilities[n],
tot_alloc += capabilities[n]->total_allocated;
traceEventHeapAllocated(capabilities[n],
CAPSET_HEAP_DEFAULT,
capabilities[n].total_allocated * sizeof(W_));
capabilities[n]->total_allocated * sizeof(W_));
}
return tot_alloc;
......@@ -730,12 +730,12 @@ stat_exit (void)
nat i;
SparkCounters sparks = { 0, 0, 0, 0, 0, 0};
for (i = 0; i < n_capabilities; i++) {
sparks.created += capabilities[i].spark_stats.created;
sparks.dud += capabilities[i].spark_stats.dud;
sparks.overflowed+= capabilities[i].spark_stats.overflowed;
sparks.converted += capabilities[i].spark_stats.converted;
sparks.gcd += capabilities[i].spark_stats.gcd;
sparks.fizzled += capabilities[i].spark_stats.fizzled;
sparks.created += capabilities[i]->spark_stats.created;
sparks.dud += capabilities[i]->spark_stats.dud;
sparks.overflowed+= capabilities[i]->spark_stats.overflowed;
sparks.converted += capabilities[i]->spark_stats.converted;
sparks.gcd += capabilities[i]->spark_stats.gcd;
sparks.fizzled += capabilities[i]->spark_stats.fizzled;
}
statsPrintf(" SPARKS: %" FMT_Word " (%" FMT_Word " converted, %" FMT_Word " overflowed, %" FMT_Word " dud, %" FMT_Word " GC'd, %" FMT_Word " fizzled)\n\n",
......@@ -900,10 +900,10 @@ statDescribeGens(void)
mut = 0;
for (i = 0; i < n_capabilities; i++) {
mut += countOccupied(capabilities[i].mut_lists[g]);
mut += countOccupied(capabilities[i]->mut_lists[g]);
// Add the pinned object block.
bd = capabilities[i].pinned_object_block;
bd = capabilities[i]->pinned_object_block;
if (bd != NULL) {
gen_live += bd->free - bd->start;
gen_blocks += bd->blocks;
......@@ -999,12 +999,12 @@ extern void getSparkStats( SparkCounters *s ) {
s->gcd = 0;
s->fizzled = 0;
for (i = 0; i < n_capabilities; i++) {
s->created += capabilities[i].spark_stats.created;
s->dud += capabilities[i].spark_stats.dud;
s->overflowed+= capabilities[i].spark_stats.overflowed;
s->converted += capabilities[i].spark_stats.converted;
s->gcd += capabilities[i].spark_stats.gcd;
s->fizzled += capabilities[i].spark_stats.fizzled;
s->created += capabilities[i]->spark_stats.created;
s->dud += capabilities[i]->spark_stats.dud;
s->overflowed+= capabilities[i]->spark_stats.overflowed;
s->converted += capabilities[i]->spark_stats.converted;
s->gcd += capabilities[i]->spark_stats.gcd;
s->fizzled += capabilities[i]->spark_stats.fizzled;
}
}
#endif
......
......@@ -326,34 +326,6 @@ discardTasksExcept (Task *keep)
RELEASE_LOCK(&all_tasks_mutex);
}
//
// After the capabilities[] array has moved, we have to adjust all
// (Capability *) pointers to point to the new array. The old array
// is still valid at this point.
//
void updateCapabilityRefs (void)
{
Task *task;
InCall *incall;
ACQUIRE_LOCK(&all_tasks_mutex);
for (task = all_tasks; task != NULL; task=task->all_next) {
if (task->cap != NULL) {
task->cap = &capabilities[task->cap->no];
}
for (incall = task->incall; incall != NULL; incall = incall->prev_stack) {
if (incall->suspended_cap != NULL) {
incall->suspended_cap = &capabilities[incall->suspended_cap->no];
}
}
}
RELEASE_LOCK(&all_tasks_mutex);
}
#if defined(THREADED_RTS)
void
......
......@@ -213,11 +213,6 @@ void interruptWorkerTask (Task *task);
#endif /* THREADED_RTS */
// Update any (Capability *) pointers belonging to Tasks after the
// Capability array is moved/resized.
//
void updateCapabilityRefs (void);
// For stats
extern nat taskCount;
extern nat workerCount;
......
......@@ -801,7 +801,7 @@ printAllThreads(void)
debugBelch("all threads:\n");
for (i = 0; i < n_capabilities; i++) {
cap = &capabilities[i];
cap = capabilities[i];
debugBelch("threads on capability %d:\n", cap->no);
for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) {
printThreadStatus(t);
......
......@@ -933,7 +933,7 @@ compact(StgClosure *static_objects)
bdescr *bd;
StgPtr p;
for (n = 0; n < n_capabilities; n++) {
for (bd = capabilities[n].mut_lists[g];
for (bd = capabilities[n]->mut_lists[g];
bd != NULL; bd = bd->link) {
for (p = bd->start; p < bd->free; p++) {
thread((StgClosure **)p);
......
......@@ -236,8 +236,8 @@ GarbageCollect (nat collect_gen,
// attribute any costs to CCS_GC
#ifdef PROFILING
for (n = 0; n < n_capabilities; n++) {