Commit 3ebcd3de authored by Simon Marlow's avatar Simon Marlow

Use mutator threads to do GC, instead of having a separate pool of GC threads

Previously, the GC had its own pool of threads to use as workers when
doing parallel GC.  There was a "leader", which was the mutator thread
that initiated the GC, and the other threads were taken from the pool.

This was simple and worked fine for sequential programs, where we did
most of the benchmarking for the parallel GC, but falls down for
parallel programs.  When we have N mutator threads and N cores, at GC
time we would have to stop N-1 mutator threads and start up N-1 GC
threads, and hope that the OS schedules them all onto separate cores.
It practice it doesn't, as you might expect.

Now we use the mutator threads to do GC.  This works quite nicely,
particularly for parallel programs, where each mutator thread scans
its own spark pool, which is probably in its cache anyway.

There are some flag changes:

  -g<n> is removed (-g1 is still accepted for backwards compat).
  There's no way to have a different number of GC threads than mutator
  threads now.

  -q1       Use one OS thread for GC (turns off parallel GC)
  -qg<n>    Use parallel GC for generations >= <n> (default: 1)

Using parallel GC only for generations >=1 works well for sequential
programs.  Compiling an ordinary sequential program with -threaded and
running it with -N2 or more should help if you do a lot of GC.  I've
found that adding -qg0 (do parallel GC for generation 0 too) speeds up
some parallel programs, but slows down some sequential programs.
Being conservative, I left the threshold at 1.

ToDo: document the new options.
parent c373ebdb
......@@ -179,7 +179,9 @@ struct PAR_FLAGS {
rtsBool migrate; /* migrate threads between capabilities */
rtsBool wakeupMigrate; /* migrate a thread on wakeup */
unsigned int maxLocalSparks;
nat gcThreads; /* number of threads for parallel GC */
rtsBool parGcEnabled; /* enable parallel GC */
rtsBool parGcGen; /* do parallel GC in this generation
* and higher only */
};
#endif /* THREADED_RTS */
......
......@@ -220,7 +220,7 @@ extern bdescr * splitLargeBlock (bdescr *bd, nat blocks);
-------------------------------------------------------------------------- */
extern void GarbageCollect(rtsBool force_major_gc);
extern void GarbageCollect(rtsBool force_major_gc, nat gc_type, Capability *cap);
/* -----------------------------------------------------------------------------
Generational garbage collection support
......
......@@ -26,6 +26,7 @@
#include "Schedule.h"
#include "Sparks.h"
#include "Trace.h"
#include "GC.h"
// one global capability, this is the Capability for non-threaded
// builds, and for +RTS -N1
......@@ -190,6 +191,7 @@ initCapability( Capability *cap, nat i )
cap->no = i;
cap->in_haskell = rtsFalse;
cap->in_gc = rtsFalse;
cap->run_queue_hd = END_TSO_QUEUE;
cap->run_queue_tl = END_TSO_QUEUE;
......@@ -358,14 +360,7 @@ releaseCapability_ (Capability* cap,
return;
}
/* if waiting_for_gc was the reason to release the cap: thread
comes from yieldCap->releaseAndQueueWorker. Unconditionally set
cap. free and return (see default after the if-protected other
special cases). Thread will wait on cond.var and re-acquire the
same cap after GC (GC-triggering cap. calls releaseCap and
enters the spare_workers case)
*/
if (waiting_for_gc) {
if (waiting_for_gc == PENDING_GC_SEQ) {
last_free_capability = cap; // needed?
trace(TRACE_sched | DEBUG_sched,
"GC pending, set capability %d free", cap->no);
......@@ -557,6 +552,12 @@ yieldCapability (Capability** pCap, Task *task)
{
Capability *cap = *pCap;
if (waiting_for_gc == PENDING_GC_PAR) {
debugTrace(DEBUG_sched, "capability %d: becoming a GC thread", cap->no);
gcWorkerThread(cap);
return;
}
debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
// We must now release the capability and wait to be woken up
......@@ -655,58 +656,21 @@ wakeupThreadOnCapability (Capability *my_cap,
}
/* ----------------------------------------------------------------------------
* prodCapabilities
* prodCapability
*
* Used to indicate that the interrupted flag is now set, or some
* other global condition that might require waking up a Task on each
* Capability.
* ------------------------------------------------------------------------- */
static void
prodCapabilities(rtsBool all)
{
nat i;
Capability *cap;
Task *task;
for (i=0; i < n_capabilities; i++) {
cap = &capabilities[i];
ACQUIRE_LOCK(&cap->lock);
if (!cap->running_task) {
if (cap->spare_workers) {
trace(TRACE_sched, "resuming capability %d", cap->no);
task = cap->spare_workers;
ASSERT(!task->stopped);
giveCapabilityToTask(cap,task);
if (!all) {
RELEASE_LOCK(&cap->lock);
return;
}
}
}
RELEASE_LOCK(&cap->lock);
}
return;
}
void
prodAllCapabilities (void)
{
prodCapabilities(rtsTrue);
}
/* ----------------------------------------------------------------------------
* prodOneCapability
*
* Like prodAllCapabilities, but we only require a single Task to wake
* up in order to service some global event, such as checking for
* deadlock after some idle time has passed.
* If a Capability is currently idle, wake up a Task on it. Used to
* get every Capability into the GC.
* ------------------------------------------------------------------------- */
void
prodOneCapability (void)
prodCapability (Capability *cap, Task *task)
{
prodCapabilities(rtsFalse);
ACQUIRE_LOCK(&cap->lock);
if (!cap->running_task) {
cap->running_task = task;
releaseCapability_(cap,rtsTrue);
}
RELEASE_LOCK(&cap->lock);
}
/* ----------------------------------------------------------------------------
......
......@@ -50,6 +50,9 @@ struct Capability_ {
// catching unsafe call-ins.
rtsBool in_haskell;
// true if this Capability is currently in the GC
rtsBool in_gc;
// The run queue. The Task owning this Capability has exclusive
// access to its run queue, so can wake up threads without
// taking a lock, and the common path through the scheduler is
......@@ -191,6 +194,8 @@ extern Capability *capabilities;
extern Capability *last_free_capability;
// GC indicator, in scope for the scheduler
#define PENDING_GC_SEQ 1
#define PENDING_GC_PAR 2
extern volatile StgWord waiting_for_gc;
// Acquires a capability at a return point. If *cap is non-NULL, then
......@@ -237,6 +242,7 @@ void wakeupThreadOnCapability (Capability *my_cap, Capability *other_cap,
// need to service some global event.
//
void prodOneCapability (void);
void prodCapability (Capability *cap, Task *task);
// Similar to prodOneCapability(), but prods all of them.
//
......
......@@ -214,7 +214,8 @@ void initRtsFlagsDefaults(void)
RtsFlags.ParFlags.nNodes = 1;
RtsFlags.ParFlags.migrate = rtsTrue;
RtsFlags.ParFlags.wakeupMigrate = rtsFalse;
RtsFlags.ParFlags.gcThreads = 1;
RtsFlags.ParFlags.parGcEnabled = 1;
RtsFlags.ParFlags.parGcGen = 1;
#endif
#ifdef PAR
......@@ -450,8 +451,9 @@ usage_text[] = {
"",
#endif /* DEBUG */
#if defined(THREADED_RTS) && !defined(NOSMP)
" -N<n> Use <n> OS threads (default: 1) (also sets -g)",
" -g<n> Use <n> OS threads for GC (default: 1)",
" -N<n> Use <n> OS threads (default: 1)",
" -q1 Use one OS thread for GC (turns off parallel GC)",
" -qg<n> Use parallel GC only for generations >= <n> (default: 1)",
" -qm Don't automatically migrate threads between CPUs",
" -qw Migrate a thread to the current CPU when it is woken up",
#endif
......@@ -1132,8 +1134,6 @@ error = rtsTrue;
if (rts_argv[arg][2] != '\0') {
RtsFlags.ParFlags.nNodes
= strtol(rts_argv[arg]+2, (char **) NULL, 10);
// set -g at the same time as -N by default
RtsFlags.ParFlags.gcThreads = RtsFlags.ParFlags.nNodes;
if (RtsFlags.ParFlags.nNodes <= 0) {
errorBelch("bad value for -N");
error = rtsTrue;
......@@ -1149,15 +1149,17 @@ error = rtsTrue;
case 'g':
THREADED_BUILD_ONLY(
if (rts_argv[arg][2] != '\0') {
RtsFlags.ParFlags.gcThreads
= strtol(rts_argv[arg]+2, (char **) NULL, 10);
if (RtsFlags.ParFlags.gcThreads <= 0) {
errorBelch("bad value for -g");
error = rtsTrue;
}
}
) break;
switch (rts_argv[arg][2]) {
case '1':
// backwards compat only
RtsFlags.ParFlags.parGcEnabled = rtsFalse;
break;
default:
errorBelch("unknown RTS option: %s",rts_argv[arg]);
error = rtsTrue;
break;
}
) break;
case 'q':
switch (rts_argv[arg][2]) {
......@@ -1165,6 +1167,18 @@ error = rtsTrue;
errorBelch("incomplete RTS option: %s",rts_argv[arg]);
error = rtsTrue;
break;
case '1':
RtsFlags.ParFlags.parGcEnabled = rtsFalse;
break;
case 'g':
if (rts_argv[arg][3] != '\0') {
RtsFlags.ParFlags.parGcGen
= strtol(rts_argv[arg]+3, (char **) NULL, 10);
} else {
errorBelch("bad value for -qg");
error = rtsTrue;
}
break;
case 'm':
RtsFlags.ParFlags.migrate = rtsFalse;
break;
......
......@@ -31,6 +31,7 @@
#include "Updates.h"
#include "Proftimer.h"
#include "ProfHeap.h"
#include "GC.h"
/* PARALLEL_HASKELL includes go here */
......@@ -1478,7 +1479,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
#ifdef THREADED_RTS
/* extern static volatile StgWord waiting_for_gc;
lives inside capability.c */
rtsBool was_waiting;
rtsBool gc_type, prev_pending_gc;
nat i;
#endif
......@@ -1490,6 +1491,16 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
}
#ifdef THREADED_RTS
if (sched_state < SCHED_INTERRUPTING
&& RtsFlags.ParFlags.parGcEnabled
&& N >= RtsFlags.ParFlags.parGcGen
&& ! oldest_gen->steps[0].mark)
{
gc_type = PENDING_GC_PAR;
} else {
gc_type = PENDING_GC_SEQ;
}
// In order to GC, there must be no threads running Haskell code.
// Therefore, the GC thread needs to hold *all* the capabilities,
// and release them after the GC has completed.
......@@ -1500,39 +1511,55 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
// actually did the GC. But it's quite hard to arrange for all
// the other tasks to sleep and stay asleep.
//
/* Other capabilities are prevented from running yet more Haskell
threads if waiting_for_gc is set. Tested inside
yieldCapability() and releaseCapability() in Capability.c */
was_waiting = cas(&waiting_for_gc, 0, 1);
if (was_waiting) {
prev_pending_gc = cas(&waiting_for_gc, 0, gc_type);
if (prev_pending_gc) {
do {
debugTrace(DEBUG_sched, "someone else is trying to GC...");
if (cap) yieldCapability(&cap,task);
debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...",
prev_pending_gc);
ASSERT(cap);
yieldCapability(&cap,task);
} while (waiting_for_gc);
return cap; // NOTE: task->cap might have changed here
}
setContextSwitches();
for (i=0; i < n_capabilities; i++) {
debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
if (cap != &capabilities[i]) {
Capability *pcap = &capabilities[i];
// we better hope this task doesn't get migrated to
// another Capability while we're waiting for this one.
// It won't, because load balancing happens while we have
// all the Capabilities, but even so it's a slightly
// unsavoury invariant.
task->cap = pcap;
waitForReturnCapability(&pcap, task);
if (pcap != &capabilities[i]) {
barf("scheduleDoGC: got the wrong capability");
}
}
// The final shutdown GC is always single-threaded, because it's
// possible that some of the Capabilities have no worker threads.
if (gc_type == PENDING_GC_SEQ)
{
// single-threaded GC: grab all the capabilities
for (i=0; i < n_capabilities; i++) {
debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
if (cap != &capabilities[i]) {
Capability *pcap = &capabilities[i];
// we better hope this task doesn't get migrated to
// another Capability while we're waiting for this one.
// It won't, because load balancing happens while we have
// all the Capabilities, but even so it's a slightly
// unsavoury invariant.
task->cap = pcap;
waitForReturnCapability(&pcap, task);
if (pcap != &capabilities[i]) {
barf("scheduleDoGC: got the wrong capability");
}
}
}
}
else
{
// multi-threaded GC: make sure all the Capabilities donate one
// GC thread each.
debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
waiting_for_gc = rtsFalse;
waitForGcThreads(cap);
}
#endif
// so this happens periodically:
......@@ -1545,23 +1572,23 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
* state, then we should take the opportunity to delete all the
* threads in the system.
*/
if (sched_state >= SCHED_INTERRUPTING) {
deleteAllThreads(&capabilities[0]);
if (sched_state == SCHED_INTERRUPTING) {
deleteAllThreads(cap);
sched_state = SCHED_SHUTTING_DOWN;
}
heap_census = scheduleNeedHeapProfile(rtsTrue);
/* everybody back, start the GC.
* Could do it in this thread, or signal a condition var
* to do it in another thread. Either way, we need to
* broadcast on gc_pending_cond afterward.
*/
#if defined(THREADED_RTS)
debugTrace(DEBUG_sched, "doing GC");
// reset waiting_for_gc *before* GC, so that when the GC threads
// emerge they don't immediately re-enter the GC.
waiting_for_gc = 0;
GarbageCollect(force_major || heap_census, gc_type, cap);
#else
GarbageCollect(force_major || heap_census, 0, cap);
#endif
GarbageCollect(force_major || heap_census);
if (heap_census) {
debugTrace(DEBUG_sched, "performing heap census");
heapCensus();
......@@ -1587,12 +1614,14 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
}
#if defined(THREADED_RTS)
// release our stash of capabilities.
for (i = 0; i < n_capabilities; i++) {
if (cap != &capabilities[i]) {
task->cap = &capabilities[i];
releaseCapability(&capabilities[i]);
}
if (gc_type == PENDING_GC_SEQ) {
// release our stash of capabilities.
for (i = 0; i < n_capabilities; i++) {
if (cap != &capabilities[i]) {
task->cap = &capabilities[i];
releaseCapability(&capabilities[i]);
}
}
}
if (cap) {
task->cap = cap;
......@@ -2131,7 +2160,13 @@ exitScheduler(
// If we haven't killed all the threads yet, do it now.
if (sched_state < SCHED_SHUTTING_DOWN) {
sched_state = SCHED_INTERRUPTING;
scheduleDoGC(NULL,task,rtsFalse);
#if defined(THREADED_RTS)
waitForReturnCapability(&task->cap,task);
scheduleDoGC(task->cap,task,rtsFalse);
releaseCapability(task->cap);
#else
scheduleDoGC(&MainCapability,task,rtsFalse);
#endif
}
sched_state = SCHED_SHUTTING_DOWN;
......@@ -2184,13 +2219,17 @@ static void
performGC_(rtsBool force_major)
{
Task *task;
// We must grab a new Task here, because the existing Task may be
// associated with a particular Capability, and chained onto the
// suspended_ccalling_tasks queue.
ACQUIRE_LOCK(&sched_mutex);
task = newBoundTask();
RELEASE_LOCK(&sched_mutex);
scheduleDoGC(NULL,task,force_major);
waitForReturnCapability(&task->cap,task);
scheduleDoGC(task->cap,task,force_major);
releaseCapability(task->cap);
boundTaskExiting(task);
}
......
......@@ -613,11 +613,11 @@ stat_exit(int alloc)
}
#if defined(THREADED_RTS)
if (RtsFlags.ParFlags.gcThreads > 1) {
if (RtsFlags.ParFlags.parGcEnabled) {
statsPrintf("\n Parallel GC work balance: %.2f (%ld / %ld, ideal %d)\n",
(double)GC_par_avg_copied / (double)GC_par_max_copied,
(lnat)GC_par_avg_copied, (lnat)GC_par_max_copied,
RtsFlags.ParFlags.gcThreads
RtsFlags.ParFlags.nNodes
);
}
#endif
......
......@@ -138,7 +138,6 @@ DECLARE_GCT
static void mark_root (void *user, StgClosure **root);
static void zero_static_object_list (StgClosure* first_static);
static nat initialise_N (rtsBool force_major_gc);
static void alloc_gc_threads (void);
static void init_collected_gen (nat g, nat threads);
static void init_uncollected_gen (nat g, nat threads);
static void init_gc_thread (gc_thread *t);
......@@ -149,8 +148,9 @@ static void start_gc_threads (void);
static void scavenge_until_all_done (void);
static nat inc_running (void);
static nat dec_running (void);
static void wakeup_gc_threads (nat n_threads);
static void shutdown_gc_threads (nat n_threads);
static void wakeup_gc_threads (nat n_threads, nat me);
static void shutdown_gc_threads (nat n_threads, nat me);
static void continue_gc_threads (nat n_threads, nat me);
#if 0 && defined(DEBUG)
static void gcCAFs (void);
......@@ -180,7 +180,9 @@ StgPtr oldgen_scan;
-------------------------------------------------------------------------- */
void
GarbageCollect ( rtsBool force_major_gc )
GarbageCollect (rtsBool force_major_gc,
nat gc_type USED_IF_THREADS,
Capability *cap USED_IF_THREADS)
{
bdescr *bd;
step *stp;
......@@ -234,26 +236,24 @@ GarbageCollect ( rtsBool force_major_gc )
*/
n = initialise_N(force_major_gc);
/* Allocate + initialise the gc_thread structures.
*/
alloc_gc_threads();
/* Start threads, so they can be spinning up while we finish initialisation.
*/
start_gc_threads();
#if defined(THREADED_RTS)
/* How many threads will be participating in this GC?
* We don't try to parallelise minor GC, or mark/compact/sweep GC.
* We don't try to parallelise minor GCs (unless the user asks for
* it with +RTS -gn0), or mark/compact/sweep GC.
*/
#if defined(THREADED_RTS)
if (n < (4*1024*1024 / BLOCK_SIZE) || oldest_gen->steps[0].mark) {
n_gc_threads = 1;
if (gc_type == PENDING_GC_PAR) {
n_gc_threads = RtsFlags.ParFlags.nNodes;
} else {
n_gc_threads = RtsFlags.ParFlags.gcThreads;
n_gc_threads = 1;
}
#else
n_gc_threads = 1;
#endif
trace(TRACE_gc|DEBUG_gc, "GC (gen %d): %d KB to collect, %ld MB in use, using %d thread(s)",
N, n * (BLOCK_SIZE / 1024), mblocks_allocated, n_gc_threads);
......@@ -302,7 +302,15 @@ GarbageCollect ( rtsBool force_major_gc )
}
// this is the main thread
#ifdef THREADED_RTS
if (n_gc_threads == 1) {
gct = gc_threads[0];
} else {
gct = gc_threads[cap->no];
}
#else
gct = gc_threads[0];
#endif
/* -----------------------------------------------------------------------
* follow all the roots that we know about:
......@@ -323,7 +331,7 @@ GarbageCollect ( rtsBool force_major_gc )
// NB. do this after the mutable lists have been saved above, otherwise
// the other GC threads will be writing into the old mutable lists.
inc_running();
wakeup_gc_threads(n_gc_threads);
wakeup_gc_threads(n_gc_threads, gct->thread_index);
for (g = RtsFlags.GcFlags.generations-1; g > N; g--) {
scavenge_mutable_list(&generations[g]);
......@@ -378,7 +386,7 @@ GarbageCollect ( rtsBool force_major_gc )
break;
}
shutdown_gc_threads(n_gc_threads);
shutdown_gc_threads(n_gc_threads, gct->thread_index);
// Update pointers from the Task list
update_task_list();
......@@ -756,6 +764,9 @@ GarbageCollect ( rtsBool force_major_gc )
slop = calcLiveBlocks() * BLOCK_SIZE_W - live;
stat_endGC(allocated, live, copied, N, max_copied, avg_copied, slop);
// Guess which generation we'll collect *next* time
initialise_N(force_major_gc);
#if defined(RTS_USER_SIGNALS)
if (RtsFlags.MiscFlags.install_signal_handlers) {
// unblock signals again
......@@ -763,6 +774,8 @@ GarbageCollect ( rtsBool force_major_gc )
}
#endif
continue_gc_threads(n_gc_threads, gct->thread_index);
RELEASE_SM_LOCK;
gct = saved_gct;
......@@ -814,6 +827,11 @@ initialise_N (rtsBool force_major_gc)
Initialise the gc_thread structures.
-------------------------------------------------------------------------- */
#define GC_THREAD_INACTIVE 0
#define GC_THREAD_STANDING_BY 1
#define GC_THREAD_RUNNING 2
#define GC_THREAD_WAITING_TO_CONTINUE 3
static gc_thread *
alloc_gc_thread (int n)
{
......@@ -826,11 +844,11 @@ alloc_gc_thread (int n)
#ifdef THREADED_RTS
t->id = 0;
initCondition(&t->wake_cond);
initMutex(&t->wake_mutex);
t->wakeup = rtsTrue; // starts true, so we can wait for the
initSpinLock(&t->gc_spin);
initSpinLock(&t->mut_spin);
ACQUIRE_SPIN_LOCK(&t->gc_spin);
t->wakeup = GC_THREAD_INACTIVE; // starts true, so we can wait for the
// thread to start up, see wakeup_gc_threads
t->exit = rtsFalse;
#endif
t->thread_index = n;
......@@ -864,17 +882,17 @@ alloc_gc_thread (int n)
}
static void
alloc_gc_threads (void)
void
initGcThreads (void)
{
if (gc_threads == NULL) {
#if defined(THREADED_RTS)
nat i;
gc_threads = stgMallocBytes (RtsFlags.ParFlags.gcThreads *
gc_threads = stgMallocBytes (RtsFlags.ParFlags.nNodes *
sizeof(gc_thread*),
"alloc_gc_threads");
for (i = 0; i < RtsFlags.ParFlags.gcThreads; i++) {
for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
gc_threads[i] = alloc_gc_thread(i);
}
#else
......@@ -992,113 +1010,107 @@ loop:
}
#if defined(THREADED_RTS)
//
// gc_thread_work(): Scavenge until there's no work left to do and all
// the running threads are idle.
//
static void
gc_thread_work (void)
void
gcWorkerThread (Capability *cap)
{
// gc_running_threads has already been incremented for us; this is
// a worker thread and the main thread bumped gc_running_threads
// before waking us up.
cap->in_gc = rtsTrue;
gct = gc_threads[cap->no];
gct->id = osThreadId();
// Wait until we're told to wake up
RELEASE_SPIN_LOCK(&gct->mut_spin);
gct->wakeup = GC_THREAD_STANDING_BY;
debugTrace(DEBUG_gc, "GC thread %d standing by...", gct->thread_index);
ACQUIRE_SPIN_LOCK(&gct->gc_spin);
#ifdef USE_PAPI
// start performance counters in this thread...
if (gct->papi_events == -1) {
papi_init_eventset(&gct->papi_events);
}
papi_thread_start_gc1_count(gct->papi_events);
#endif
// Every thread evacuates some roots.
gct->evac_step = 0;
markSomeCapabilities(mark_root, gct, gct->thread_index, n_gc_threads,
rtsTrue/*prune sparks*/);
scavenge_until_all_done();
}
static void
gc_thread_mainloop (void)
{
while (!gct->exit) {
// Wait until we're told to wake up
ACQUIRE_LOCK(&gct->wake_mutex);
gct->wakeup = rtsFalse;