Commit 4e354226 authored by Simon Marlow's avatar Simon Marlow

Use work-stealing for load-balancing in the GC

  
New flag: "+RTS -qb" disables load-balancing in the parallel GC
(though this is subject to change, I think we will probably want to do
something more automatic before releasing this).

To get the "PARGC3" configuration described in the "Runtime support
for Multicore Haskell" paper, use "+RTS -qg0 -qb -RTS".

The main advantage of this is that it allows us to easily disable
load-balancing altogether, which turns out to be important in parallel
programs.  Maintaining locality is sometimes more important that
spreading the work out in parallel GC.  There is a side benefit in
that the parallel GC should have improved locality even when
load-balancing, because each processor prefers to take work from its
own queue before stealing from others.
parent 8fda9784
......@@ -183,6 +183,7 @@ struct PAR_FLAGS {
rtsBool parGcEnabled; /* enable parallel GC */
rtsBool parGcGen; /* do parallel GC in this generation
* and higher only */
rtsBool parGcLoadBalancing; /* do load-balancing in parallel GC */
};
#endif /* THREADED_RTS */
......
......@@ -81,7 +81,6 @@ typedef struct step_ {
#if defined(THREADED_RTS)
char pad[128]; // make sure the following is
// on a separate cache line.
SpinLock sync_todo; // lock for todos
SpinLock sync_large_objects; // lock for large_objects
// and scavenged_large_objects
#endif
......@@ -93,10 +92,6 @@ typedef struct step_ {
unsigned int n_old_blocks; // number of blocks in from-space
unsigned int live_estimate; // for sweeping: estimate of live data
bdescr * todos; // blocks waiting to be scavenged
bdescr * todos_last;
unsigned int n_todos; // count of above
bdescr * part_blocks; // partially-full scanned blocks
unsigned int n_part_blocks; // count of above
......
......@@ -226,6 +226,7 @@ void initRtsFlagsDefaults(void)
RtsFlags.ParFlags.wakeupMigrate = rtsFalse;
RtsFlags.ParFlags.parGcEnabled = 1;
RtsFlags.ParFlags.parGcGen = 1;
RtsFlags.ParFlags.parGcLoadBalancing = 1;
#endif
#ifdef PAR
......@@ -1211,6 +1212,9 @@ error = rtsTrue;
error = rtsTrue;
}
break;
case 'b':
RtsFlags.ParFlags.parGcLoadBalancing = rtsFalse;
break;
case 'm':
RtsFlags.ParFlags.migrate = rtsFalse;
break;
......
......@@ -716,7 +716,6 @@ stat_exit(int alloc)
statsPrintf("whitehole_spin: %"FMT_Word64"\n", whitehole_spin);
for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
for (s = 0; s < generations[g].n_steps; s++) {
statsPrintf("gen[%d].steps[%d].sync_todo: %"FMT_Word64"\n", g, s, generations[g].steps[s].sync_todo.spin);
statsPrintf("gen[%d].steps[%d].sync_large_objects: %"FMT_Word64"\n", g, s, generations[g].steps[s].sync_large_objects.spin);
}
}
......
......@@ -43,8 +43,6 @@
#include "WSDeque.h"
#include "SMP.h" // for cas
#if defined(THREADED_RTS)
#define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new)))
/* -----------------------------------------------------------------------------
......@@ -285,5 +283,3 @@ pushWSDeque (WSDeque* q, void * elem)
ASSERT_WSDEQUE_INVARIANTS(q);
return rtsTrue;
}
#endif
......@@ -9,8 +9,6 @@
#ifndef WSDEQUE_H
#define WSDEQUE_H
#if defined(THREADED_RTS)
typedef struct WSDeque_ {
// Size of elements array. Used for modulo calculation: we round up
// to powers of 2 and use the dyadic log (modulo == bitwise &)
......@@ -125,6 +123,4 @@ discardElements (WSDeque *q)
// pool->topBound = pool->top;
}
#endif // THREADED_RTS
#endif // WSDEQUE_H
......@@ -125,6 +125,8 @@ nat n_gc_threads;
// For stats:
long copied; // *words* copied & scavenged during this GC
rtsBool work_stealing;
DECLARE_GCT
/* -----------------------------------------------------------------------------
......@@ -231,6 +233,19 @@ GarbageCollect (rtsBool force_major_gc,
*/
n = initialise_N(force_major_gc);
#if defined(THREADED_RTS)
work_stealing = RtsFlags.ParFlags.parGcLoadBalancing;
// It's not always a good idea to do load balancing in parallel
// GC. In particular, for a parallel program we don't want to
// lose locality by moving cached data into another CPU's cache
// (this effect can be quite significant).
//
// We could have a more complex way to deterimine whether to do
// work stealing or not, e.g. it might be a good idea to do it
// if the heap is big. For now, we just turn it on or off with
// a flag.
#endif
/* Start threads, so they can be spinning up while we finish initialisation.
*/
start_gc_threads();
......@@ -879,7 +894,9 @@ alloc_gc_thread (int n)
ws->gct = t;
ws->todo_bd = NULL;
ws->buffer_todo_bd = NULL;
ws->todo_q = newWSDeque(128);
ws->todo_overflow = NULL;
ws->n_todo_overflow = 0;
ws->part_list = NULL;
ws->n_part_blocks = 0;
......@@ -971,8 +988,23 @@ any_work (void)
}
ws = &gct->steps[s];
if (ws->todo_large_objects) return rtsTrue;
if (ws->step->todos) return rtsTrue;
if (!looksEmptyWSDeque(ws->todo_q)) return rtsTrue;
if (ws->todo_overflow) return rtsTrue;
}
#if defined(THREADED_RTS)
if (work_stealing) {
nat n;
// look for work to steal
for (n = 0; n < n_gc_threads; n++) {
if (n == gct->thread_index) continue;
for (s = total_steps-1; s >= 0; s--) {
ws = &gc_threads[n]->steps[s];
if (!looksEmptyWSDeque(ws->todo_q)) return rtsTrue;
}
}
}
#endif
gct->no_work++;
......@@ -1001,18 +1033,18 @@ loop:
r = dec_running();
debugTrace(DEBUG_gc, "GC thread %d idle (%d still running)",
gct->thread_index, r);
gct->thread_index, r);
while (gc_running_threads != 0) {
// usleep(1);
if (any_work()) {
inc_running();
goto loop;
}
// any_work() does not remove the work from the queue, it
// just checks for the presence of work. If we find any,
// then we increment gc_running_threads and go back to
// scavenge_loop() to perform any pending work.
if (any_work()) {
inc_running();
goto loop;
}
// any_work() does not remove the work from the queue, it
// just checks for the presence of work. If we find any,
// then we increment gc_running_threads and go back to
// scavenge_loop() to perform any pending work.
}
// All threads are now stopped
......@@ -1207,11 +1239,6 @@ init_collected_gen (nat g, nat n_threads)
stp->n_words = 0;
stp->live_estimate = 0;
// we don't have any to-be-scavenged blocks yet
stp->todos = NULL;
stp->todos_last = NULL;
stp->n_todos = 0;
// initialise the large object queues.
stp->scavenged_large_objects = NULL;
stp->n_scavenged_large_blocks = 0;
......@@ -1284,9 +1311,12 @@ init_collected_gen (nat g, nat n_threads)
// allocate the first to-space block; extra blocks will be
// chained on as necessary.
ws->todo_bd = NULL;
ws->buffer_todo_bd = NULL;
ASSERT(looksEmptyWSDeque(ws->todo_q));
alloc_todo_block(ws,0);
ws->todo_overflow = NULL;
ws->n_todo_overflow = 0;
ws->scavd_list = NULL;
ws->n_scavd_blocks = 0;
}
......@@ -1329,7 +1359,7 @@ init_uncollected_gen (nat g, nat threads)
for (t = 0; t < threads; t++) {
ws = &gc_threads[t]->steps[g * RtsFlags.GcFlags.steps + s];
ws->buffer_todo_bd = NULL;
ASSERT(looksEmptyWSDeque(ws->todo_q));
ws->todo_large_objects = NULL;
ws->part_list = NULL;
......
......@@ -28,6 +28,8 @@ extern StgPtr oldgen_scan;
extern long copied;
extern rtsBool work_stealing;
#ifdef DEBUG
extern nat mutlist_MUTVARS, mutlist_MUTARRS, mutlist_MVARS, mutlist_OTHERS;
#endif
......
......@@ -15,6 +15,7 @@
#define GCTHREAD_H
#include "OSThreads.h"
#include "WSDeque.h"
/* -----------------------------------------------------------------------------
General scheme
......@@ -81,13 +82,14 @@ typedef struct step_workspace_ {
StgPtr todo_free; // free ptr for todo_bd
StgPtr todo_lim; // lim for todo_bd
bdescr * buffer_todo_bd; // buffer to reduce contention
// on the step's todos list
WSDeque * todo_q;
bdescr * todo_overflow;
nat n_todo_overflow;
// where large objects to be scavenged go
bdescr * todo_large_objects;
// Objects that have already been, scavenged.
// Objects that have already been scavenged.
bdescr * scavd_list;
nat n_scavd_blocks; // count of blocks in this list
......@@ -95,7 +97,7 @@ typedef struct step_workspace_ {
bdescr * part_list;
unsigned int n_part_blocks; // count of above
StgWord pad[5];
StgWord pad[3];
} step_workspace ATTRIBUTE_ALIGNED(64);
// align so that computing gct->steps[n] is a shift, not a multiply
......
......@@ -19,6 +19,9 @@
#include "GCUtils.h"
#include "Printer.h"
#include "Trace.h"
#ifdef THREADED_RTS
#include "WSDeque.h"
#endif
#ifdef THREADED_RTS
SpinLock gc_alloc_block_sync;
......@@ -72,34 +75,47 @@ freeChain_sync(bdescr *bd)
-------------------------------------------------------------------------- */
bdescr *
grab_todo_block (step_workspace *ws)
grab_local_todo_block (step_workspace *ws)
{
bdescr *bd;
step *stp;
stp = ws->step;
bd = NULL;
if (ws->buffer_todo_bd)
bd = ws->todo_overflow;
if (bd != NULL)
{
ws->todo_overflow = bd->link;
bd->link = NULL;
ws->n_todo_overflow--;
return bd;
}
bd = popWSDeque(ws->todo_q);
if (bd != NULL)
{
bd = ws->buffer_todo_bd;
ASSERT(bd->link == NULL);
ws->buffer_todo_bd = NULL;
return bd;
}
ACQUIRE_SPIN_LOCK(&stp->sync_todo);
if (stp->todos) {
bd = stp->todos;
if (stp->todos == stp->todos_last) {
stp->todos_last = NULL;
return NULL;
}
bdescr *
steal_todo_block (nat s)
{
nat n;
bdescr *bd;
// look for work to steal
for (n = 0; n < n_gc_threads; n++) {
if (n == gct->thread_index) continue;
bd = stealWSDeque(gc_threads[n]->steps[s].todo_q);
if (bd) {
return bd;
}
stp->todos = bd->link;
stp->n_todos--;
bd->link = NULL;
}
RELEASE_SPIN_LOCK(&stp->sync_todo);
return bd;
}
return NULL;
}
void
......@@ -145,7 +161,7 @@ todo_block_full (nat size, step_workspace *ws)
// this block to push, and there's enough room in
// this block to evacuate the current object, then just increase
// the limit.
if (ws->step->todos != NULL ||
if (!looksEmptyWSDeque(ws->todo_q) ||
(ws->todo_free - bd->u.scan < WORK_UNIT_WORDS / 2)) {
if (ws->todo_free + size < bd->start + BLOCK_SIZE_W) {
ws->todo_lim = stg_min(bd->start + BLOCK_SIZE_W,
......@@ -178,20 +194,15 @@ todo_block_full (nat size, step_workspace *ws)
{
step *stp;
stp = ws->step;
trace(TRACE_gc|DEBUG_gc, "push todo block %p (%ld words), step %d, n_todos: %d",
trace(TRACE_gc|DEBUG_gc, "push todo block %p (%ld words), step %d, todo_q: %ld",
bd->start, (unsigned long)(bd->free - bd->u.scan),
stp->abs_no, stp->n_todos);
// ToDo: use buffer_todo
ACQUIRE_SPIN_LOCK(&stp->sync_todo);
if (stp->todos_last == NULL) {
stp->todos_last = bd;
stp->todos = bd;
} else {
stp->todos_last->link = bd;
stp->todos_last = bd;
stp->abs_no, dequeElements(ws->todo_q));
if (!pushWSDeque(ws->todo_q, bd)) {
bd->link = ws->todo_overflow;
ws->todo_overflow = bd;
ws->n_todo_overflow++;
}
stp->n_todos++;
RELEASE_SPIN_LOCK(&stp->sync_todo);
}
}
......@@ -207,7 +218,7 @@ todo_block_full (nat size, step_workspace *ws)
StgPtr
alloc_todo_block (step_workspace *ws, nat size)
{
bdescr *bd/*, *hd, *tl*/;
bdescr *bd/*, *hd, *tl */;
// Grab a part block if we have one, and it has enough room
if (ws->part_list != NULL &&
......@@ -221,12 +232,12 @@ alloc_todo_block (step_workspace *ws, nat size)
{
// blocks in to-space get the BF_EVACUATED flag.
// allocBlocks_sync(4, &hd, &tl,
// allocBlocks_sync(16, &hd, &tl,
// ws->step->gen_no, ws->step, BF_EVACUATED);
//
// tl->link = ws->part_list;
// ws->part_list = hd->link;
// ws->n_part_blocks += 3;
// ws->n_part_blocks += 15;
//
// bd = hd;
......
......@@ -17,10 +17,12 @@ bdescr *allocBlock_sync(void);
void freeChain_sync(bdescr *bd);
void push_scanned_block (bdescr *bd, step_workspace *ws);
bdescr *grab_todo_block (step_workspace *ws);
StgPtr todo_block_full (nat size, step_workspace *ws);
StgPtr alloc_todo_block (step_workspace *ws, nat size);
bdescr *grab_local_todo_block (step_workspace *ws);
bdescr *steal_todo_block (nat s);
// Returns true if a block is partially full. This predicate is used to try
// to re-use partial blocks wherever possible, and to reduce wastage.
// We might need to tweak the actual value.
......
......@@ -335,6 +335,7 @@ scavenge_block (bdescr *bd)
// time around the loop.
while (p < bd->free || (bd == ws->todo_bd && p < ws->todo_free)) {
ASSERT(bd->link == NULL);
ASSERT(LOOKS_LIKE_CLOSURE_PTR(p));
info = get_itbl((StgClosure *)p);
......@@ -1915,7 +1916,7 @@ loop:
break;
}
if ((bd = grab_todo_block(ws)) != NULL) {
if ((bd = grab_local_todo_block(ws)) != NULL) {
scavenge_block(bd);
did_something = rtsTrue;
break;
......@@ -1926,6 +1927,28 @@ loop:
did_anything = rtsTrue;
goto loop;
}
#if defined(THREADED_RTS)
if (work_stealing) {
// look for work to steal
for (s = total_steps-1; s >= 0; s--) {
if (s == 0 && RtsFlags.GcFlags.generations > 1) {
continue;
}
if ((bd = steal_todo_block(s)) != NULL) {
scavenge_block(bd);
did_something = rtsTrue;
break;
}
}
if (did_something) {
did_anything = rtsTrue;
goto loop;
}
}
#endif
// only return when there is no more work to do
return did_anything;
......
......@@ -104,7 +104,6 @@ initStep (step *stp, int g, int s)
stp->compact = 0;
stp->bitmap = NULL;
#ifdef THREADED_RTS
initSpinLock(&stp->sync_todo);
initSpinLock(&stp->sync_large_objects);
#endif
stp->threads = END_TSO_QUEUE;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment