Commit 92e7d6c9 authored by Simon Marlow's avatar Simon Marlow
Browse files

Allow the number of capabilities to be increased at runtime (#3729)

At present the number of capabilities can only be *increased*, not
decreased.  The latter presents a few more challenges!
parent 8b75acd3
......@@ -2112,8 +2112,10 @@ f "2" = 2
<sect2 id="parallel-options">
<title>RTS options for SMP parallelism</title>
<para>To run a program on multiple CPUs, use the
RTS <option>-N</option> option:</para>
<para>There are two ways to run a program on multiple
processors:
call <literal>GHC.Conc.setNumCapabilities</literal> from your
program, or use the RTS <option>-N</option> option.</para>
<variablelist>
<varlistentry>
......@@ -2148,7 +2150,13 @@ f "2" = 2
<para>The current value of the <option>-N</option> option
is available to the Haskell program
via <literal>GHC.Conc.numCapabilities</literal>.</para>
via <literal>GHC.Conc.getNumCapabilities</literal>, and
it may be changed while the program is running by
calling <literal>GHC.Conc.setNumCapabilities</literal>.
Note: in the current implementation,
the <option>-N</option> value may only
be <emphasis>increased</emphasis>, not decreased, by
calling <literal>GHC.Conc.setNumCapabilities</literal>.</para>
</listitem>
</varlistentry>
</variablelist>
......
......@@ -57,4 +57,14 @@ extern unsigned int n_capabilities;
extern Capability MainCapability;
#endif
//
// Change the number of capabilities (only supports increasing the
// current value at the moment).
//
#if defined(THREADED_RTS)
extern void setNumCapabilities (nat new);
#else
extern void setNumCapabilities (nat new) GNU_ATTRIBUTE(__noreturn__);
#endif
#endif /* RTS_THREADS_H */
......@@ -27,6 +27,8 @@
#include "STM.h"
#include "RtsUtils.h"
#include <string.h>
// one global capability, this is the Capability for non-threaded
// builds, and for +RTS -N1
Capability MainCapability;
......@@ -299,7 +301,6 @@ initCapabilities( void )
traceCapsetCreate(CAPSET_CLOCKDOMAIN_DEFAULT, CapsetTypeClockdomain);
#if defined(THREADED_RTS)
nat i;
#ifndef REG_Base
// We can't support multiple CPUs if BaseReg is not a register
......@@ -309,24 +310,10 @@ initCapabilities( void )
}
#endif
n_capabilities = 0;
moreCapabilities(0, RtsFlags.ParFlags.nNodes);
n_capabilities = RtsFlags.ParFlags.nNodes;
if (n_capabilities == 1) {
capabilities = &MainCapability;
// THREADED_RTS must work on builds that don't have a mutable
// BaseReg (eg. unregisterised), so in this case
// capabilities[0] must coincide with &MainCapability.
} else {
capabilities = stgMallocBytes(n_capabilities * sizeof(Capability),
"initCapabilities");
}
for (i = 0; i < n_capabilities; i++) {
initCapability(&capabilities[i], i);
}
debugTrace(DEBUG_sched, "allocated %d capabilities", n_capabilities);
#else /* !THREADED_RTS */
n_capabilities = 1;
......@@ -341,6 +328,46 @@ initCapabilities( void )
last_free_capability = &capabilities[0];
}
Capability *
moreCapabilities (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
{
#if defined(THREADED_RTS)
nat i;
Capability *old_capabilities = capabilities;
if (to == 1) {
// THREADED_RTS must work on builds that don't have a mutable
// BaseReg (eg. unregisterised), so in this case
// capabilities[0] must coincide with &MainCapability.
capabilities = &MainCapability;
} else {
capabilities = stgMallocBytes(to * sizeof(Capability),
"moreCapabilities");
if (from > 0) {
memcpy(capabilities, old_capabilities, from * sizeof(Capability));
}
}
for (i = from; i < to; i++) {
initCapability(&capabilities[i], i);
}
last_free_capability = NULL;
debugTrace(DEBUG_sched, "allocated %d more capabilities", to - from);
// Return the old array to free later.
if (from > 1) {
return old_capabilities;
} else {
return NULL;
}
#else
return NULL;
#endif
}
/* ----------------------------------------------------------------------------
* setContextSwitches: cause all capabilities to context switch as
* soon as possible.
......@@ -426,7 +453,10 @@ releaseCapability_ (Capability* cap,
return;
}
if (pending_sync == SYNC_GC_SEQ || pending_sync == SYNC_FORK) {
// If there is a pending sync, then we should just leave the
// Capability free. The thread trying to sync will be about to
// call waitForReturnCapability().
if (pending_sync != 0 && pending_sync != SYNC_GC_PAR) {
last_free_capability = cap; // needed?
debugTrace(DEBUG_sched, "sync pending, set capability %d free", cap->no);
return;
......
......@@ -165,6 +165,10 @@ regTableToCapability (StgRegTable *reg)
//
void initCapabilities (void);
// Add and initialise more Capabilities
//
Capability * moreCapabilities (nat from, nat to);
// Release a capability. This is called by a Task that is exiting
// Haskell to make a foreign call, or in various other cases when we
// want to relinquish a Capability that we currently hold.
......@@ -206,7 +210,7 @@ extern Capability *last_free_capability;
//
#define SYNC_GC_SEQ 1
#define SYNC_GC_PAR 2
#define SYNC_FORK 3
#define SYNC_OTHER 3
extern volatile StgWord pending_sync;
// Acquires a capability at a return point. If *cap is non-NULL, then
......
......@@ -848,6 +848,7 @@ typedef struct _RtsSymbolVal {
SymI_HasProto(stg_readTVarzh) \
SymI_HasProto(stg_readTVarIOzh) \
SymI_HasProto(resumeThread) \
SymI_HasProto(setNumCapabilities) \
SymI_HasProto(resolveObjs) \
SymI_HasProto(stg_retryzh) \
SymI_HasProto(rts_apply) \
......
......@@ -134,6 +134,8 @@ static void scheduleYield (Capability **pcap, Task *task);
#if defined(THREADED_RTS)
static nat requestSync (Capability **pcap, Task *task, nat sync_type);
static void acquireAllCapabilities(Capability *cap, Task *task);
static void releaseAllCapabilities(Capability *cap, Task *task);
static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
......@@ -1359,7 +1361,7 @@ static nat requestSync (Capability **pcap, Task *task, nat sync_type)
//
// Grab all the capabilities except the one we already hold. Used
// when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
// before a fork (SYNC_FORK).
// before a fork (SYNC_OTHER).
//
// Only call this after requestSync(), otherwise a deadlock might
// ensue if another thread is trying to synchronise.
......@@ -1380,13 +1382,26 @@ static void acquireAllCapabilities(Capability *cap, Task *task)
// unsavoury invariant.
task->cap = tmpcap;
waitForReturnCapability(&tmpcap, task);
if (tmpcap != &capabilities[i]) {
if (tmpcap->no != i) {
barf("acquireAllCapabilities: got the wrong capability");
}
}
}
task->cap = cap;
}
static void releaseAllCapabilities(Capability *cap, Task *task)
{
nat i;
for (i = 0; i < n_capabilities; i++) {
if (cap->no != i) {
task->cap = &capabilities[i];
releaseCapability(&capabilities[i]);
}
}
task->cap = cap;
}
#endif
/* -----------------------------------------------------------------------------
......@@ -1581,17 +1596,7 @@ delete_threads_and_gc:
#if defined(THREADED_RTS)
if (gc_type == SYNC_GC_SEQ) {
// release our stash of capabilities.
for (i = 0; i < n_capabilities; i++) {
if (cap != &capabilities[i]) {
task->cap = &capabilities[i];
releaseCapability(&capabilities[i]);
}
}
}
if (cap) {
task->cap = cap;
} else {
task->cap = NULL;
releaseAllCapabilities(cap, task);
}
#endif
......@@ -1629,7 +1634,7 @@ forkProcess(HsStablePtr *entry
#ifdef THREADED_RTS
do {
sync = requestSync(&cap, task, SYNC_FORK);
sync = requestSync(&cap, task, SYNC_OTHER);
} while (sync);
acquireAllCapabilities(cap,task);
......@@ -1779,6 +1784,105 @@ forkProcess(HsStablePtr *entry
#endif
}
/* ---------------------------------------------------------------------------
* Increase the number of Capabilities
*
* Changing the number of Capabilities is very tricky! We can only do
* it with the system fully stopped, so we do a full sync with
* requestSync(SYNC_OTHER) and grab all the capabilities.
*
* Then we resize the appropriate data structures, and update all
* references to the old data structures which have now moved.
* Finally we release the Capabilities we are holding, and start
* worker Tasks on the new Capabilities we created.
*
* ------------------------------------------------------------------------- */
void
setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
{
#if !defined(THREADED_RTS)
barf("setNumCapabilities: not supported in the non-threaded RTS");
#else
Task *task;
Capability *cap;
nat sync;
StgTSO* t;
nat g;
Capability *old_capabilities;
if (new_n_capabilities == n_capabilities) return;
if (new_n_capabilities < n_capabilities) {
barf("setNumCapabilities: reducing the number of Capabilities is not currently supported.");
}
debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
n_capabilities, new_n_capabilities);
cap = rts_lock();
task = cap->running_task;
do {
sync = requestSync(&cap, task, SYNC_OTHER);
} while (sync);
acquireAllCapabilities(cap,task);
pending_sync = 0;
#if defined(TRACING)
// Allocate eventlog buffers for the new capabilities. Note this
// must be done before calling moreCapabilities(), because that
// will emit events to add the new capabilities to capsets.
tracingAddCapapilities(n_capabilities, new_n_capabilities);
#endif
// Resize the capabilities array
// NB. after this, capabilities points somewhere new. Any pointers
// of type (Capability *) are now invalid.
old_capabilities = moreCapabilities(n_capabilities, new_n_capabilities);
// update our own cap pointer
cap = &capabilities[cap->no];
// Resize and update storage manager data structures
storageAddCapabilities(n_capabilities, new_n_capabilities);
// Update (Capability *) refs in the Task manager.
updateCapabilityRefs();
// Update (Capability *) refs from TSOs
for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
for (t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) {
t->cap = &capabilities[t->cap->no];
}
}
// We're done: release the original Capabilities
releaseAllCapabilities(cap,task);
// Start worker tasks on the new Capabilities
startWorkerTasks(n_capabilities, new_n_capabilities);
// finally, update n_capabilities
n_capabilities = new_n_capabilities;
// We can't free the old array until now, because we access it
// while updating pointers in updateCapabilityRefs().
if (old_capabilities) {
stgFree(old_capabilities);
}
rts_unlock(cap);
#endif // THREADED_RTS
}
/* ---------------------------------------------------------------------------
* Delete all the threads in the system
* ------------------------------------------------------------------------- */
......@@ -2010,7 +2114,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
// move this thread from now on.
#if defined(THREADED_RTS)
cpu %= RtsFlags.ParFlags.nNodes;
cpu %= n_capabilities;
if (cpu == cap->no) {
appendToRunQueue(cap,tso);
} else {
......@@ -2085,6 +2189,26 @@ void scheduleWorker (Capability *cap, Task *task)
}
#endif
/* ---------------------------------------------------------------------------
* Start new worker tasks on Capabilities from--to
* -------------------------------------------------------------------------- */
static void
startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
{
#if defined(THREADED_RTS)
nat i;
Capability *cap;
for (i = from; i < to; i++) {
cap = &capabilities[i];
ACQUIRE_LOCK(&cap->lock);
startWorkerTask(cap);
RELEASE_LOCK(&cap->lock);
}
#endif
}
/* ---------------------------------------------------------------------------
* initScheduler()
*
......@@ -2122,26 +2246,16 @@ initScheduler(void)
initTaskManager();
RELEASE_LOCK(&sched_mutex);
#if defined(THREADED_RTS)
/*
* Eagerly start one worker to run each Capability, except for
* Capability 0. The idea is that we're probably going to start a
* bound thread on Capability 0 pretty soon, so we don't want a
* worker task hogging it.
*/
{
nat i;
Capability *cap;
for (i = 1; i < n_capabilities; i++) {
cap = &capabilities[i];
ACQUIRE_LOCK(&cap->lock);
startWorkerTask(cap);
RELEASE_LOCK(&cap->lock);
}
}
#endif
startWorkerTasks(1, n_capabilities);
RELEASE_LOCK(&sched_mutex);
}
void
......
......@@ -622,7 +622,7 @@ stat_exit(int alloc)
statsPrintf("\n Parallel GC work balance: %.2f (%ld / %ld, ideal %d)\n",
(double)GC_par_avg_copied / (double)GC_par_max_copied,
(lnat)GC_par_avg_copied, (lnat)GC_par_max_copied,
RtsFlags.ParFlags.nNodes
n_capabilities
);
}
#endif
......
......@@ -325,6 +325,34 @@ discardTasksExcept (Task *keep)
RELEASE_LOCK(&all_tasks_mutex);
}
//
// After the capabilities[] array has moved, we have to adjust all
// (Capability *) pointers to point to the new array. The old array
// is still valid at this point.
//
void updateCapabilityRefs (void)
{
Task *task;
InCall *incall;
ACQUIRE_LOCK(&all_tasks_mutex);
for (task = all_tasks; task != NULL; task=task->all_link) {
if (task->cap != NULL) {
task->cap = &capabilities[task->cap->no];
}
for (incall = task->incall; incall != NULL; incall = incall->prev_stack) {
if (incall->suspended_cap != NULL) {
incall->suspended_cap = &capabilities[incall->suspended_cap->no];
}
}
}
RELEASE_LOCK(&all_tasks_mutex);
}
void
taskTimeStamp (Task *task USED_IF_THREADS)
{
......
......@@ -235,6 +235,11 @@ void interruptWorkerTask (Task *task);
#endif /* THREADED_RTS */
// Update any (Capability *) pointers belonging to Tasks after the
// Capability array is moved/resized.
//
void updateCapabilityRefs (void);
// -----------------------------------------------------------------------------
// INLINE functions... private from here on down:
......
......@@ -143,6 +143,13 @@ void resetTracing (void)
}
}
void tracingAddCapapilities (nat from, nat to)
{
if (eventlog_enabled) {
moreCapEventBufs(from,to);
}
}
/* ---------------------------------------------------------------------------
Emitting trace messages/events
--------------------------------------------------------------------------- */
......
......@@ -28,6 +28,7 @@ void initTracing (void);
void endTracing (void);
void freeTracing (void);
void resetTracing (void);
void tracingAddCapapilities (nat from, nat to);
#endif /* TRACING */
......
......@@ -254,12 +254,8 @@ initEventLogging(void)
#else
n_caps = 1;
#endif
capEventBuf = stgMallocBytes(n_caps * sizeof(EventsBuf),"initEventLogging");
moreCapEventBufs(0,n_caps);
for (c = 0; c < n_caps; ++c) {
// Init buffer for events.
initEventsBuf(&capEventBuf[c], EVENT_LOG_SIZE, c);
}
initEventsBuf(&eventBuf, EVENT_LOG_SIZE, (EventCapNo)(-1));
// Write in buffer: the header begin marker.
......@@ -417,7 +413,26 @@ endEventLogging(void)
}
}
void
void
moreCapEventBufs (nat from, nat to)
{
nat c;
if (from > 0) {
capEventBuf = stgReallocBytes(capEventBuf, to * sizeof(EventsBuf),
"moreCapEventBufs");
} else {
capEventBuf = stgMallocBytes(to * sizeof(EventsBuf),
"moreCapEventBufs");
}
for (c = from; c < to; ++c) {
initEventsBuf(&capEventBuf[c], EVENT_LOG_SIZE, c);
}
}
void
freeEventLogging(void)
{
StgWord8 c;
......
......@@ -26,6 +26,7 @@ void endEventLogging(void);
void freeEventLogging(void);
void abortEventLogging(void); // #4512 - after fork child needs to abort
void flushEventLog(void); // event log inherited from parent
void moreCapEventBufs (nat from, nat to);
/*
* Post a scheduler event to the capability's event buffer (an event
......
......@@ -260,7 +260,7 @@ GarbageCollect (rtsBool force_major_gc,
* it with +RTS -gn0), or mark/compact/sweep GC.
*/
if (gc_type == SYNC_GC_PAR) {
n_gc_threads = RtsFlags.ParFlags.nNodes;
n_gc_threads = n_capabilities;
} else {
n_gc_threads = 1;
}
......@@ -854,29 +854,39 @@ new_gc_thread (nat n, gc_thread *t)
void
initGcThreads (void)
initGcThreads (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
{
if (gc_threads == NULL) {
#if defined(THREADED_RTS)
nat i;
gc_threads = stgMallocBytes (RtsFlags.ParFlags.nNodes *
sizeof(gc_thread*),
"alloc_gc_threads");
nat i;
for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
gc_threads[i] =
stgMallocBytes(sizeof(gc_thread) +
RtsFlags.GcFlags.generations * sizeof(gen_workspace),
"alloc_gc_threads");
if (from > 0) {
gc_threads = stgReallocBytes (gc_threads, to * sizeof(gc_thread*),
"initGcThreads");
} else {
gc_threads = stgMallocBytes (to * sizeof(gc_thread*),
"initGcThreads");
}
new_gc_thread(i, gc_threads[i]);
}
// We have to update the gct->cap pointers to point to the new
// Capability array now.
for (i = 0; i < from; i++) {
gc_threads[i]->cap = &capabilities[gc_threads[i]->cap->no];
}
for (i = from; i < to; i++) {
gc_threads[i] =
stgMallocBytes(sizeof(gc_thread) +
RtsFlags.GcFlags.generations * sizeof(gen_workspace),
"alloc_gc_threads");
new_gc_thread(i, gc_threads[i]);
}
#else
gc_threads = stgMallocBytes (sizeof(gc_thread*),"alloc_gc_threads");
gc_threads[0] = gct;
new_gc_thread(0,gc_threads[0]);
ASSERT(from == 0 && to == 1);
gc_threads = stgMallocBytes (sizeof(gc_thread*),"alloc_gc_threads");
gc_threads[0] = gct;
new_gc_thread(0,gc_threads[0]);
#endif
}
}
void
......@@ -1097,7 +1107,7 @@ gcWorkerThread (Capability *cap)
void
waitForGcThreads (Capability *cap USED_IF_THREADS)
{
const nat n_threads = RtsFlags.ParFlags.nNodes;
const nat n_threads = n_capabilities;
const nat me = cap->no;
nat i, j;
rtsBool retry = rtsTrue;
......@@ -1178,7 +1188,7 @@ shutdown_gc_threads (nat me USED_IF_THREADS)
void
releaseGCThreads (Capability *cap USED_IF_THREADS)
{
const nat n_threads = RtsFlags.ParFlags.nNodes;
const nat n_threads = n_capabilities;
const nat me = cap->no;