Commit f2ca6dee authored by Simon Marlow's avatar Simon Marlow

Initial parallel GC support

eg. use +RTS -g2 -RTS for 2 threads.  Only major GCs are parallelised,
minor GCs are still sequential. Don't use more threads than you
have CPUs.

It works most of the time, although you won't see much speedup yet.
Tuning and more work on stability still required.
parent d5bd3e82
......@@ -93,6 +93,7 @@
#define ATOMICALLY_FRAME 69
#define CATCH_RETRY_FRAME 70
#define CATCH_STM_FRAME 71
#define N_CLOSURE_TYPES 72
#define WHITEHOLE 72
#define N_CLOSURE_TYPES 73
#endif /* CLOSURETYPES_H */
......@@ -97,9 +97,10 @@ StgWord16 closure_flags[] = {
/* TREC_HEADER = */ ( _NS| _MUT|_UPT ),
/* ATOMICALLY_FRAME = */ ( _BTM ),
/* CATCH_RETRY_FRAME = */ ( _BTM ),
/* CATCH_STM_FRAME = */ ( _BTM )
/* CATCH_STM_FRAME = */ ( _BTM ),
/* WHITEHOLE = */ ( 0 )
};
#if N_CLOSURE_TYPES != 72
#if N_CLOSURE_TYPES != 73
#error Closure types changed: update ClosureFlags.c!
#endif
......@@ -394,12 +394,9 @@ INFO_TABLE(stg_SE_CAF_BLACKHOLE,0,1,SE_CAF_BLACKHOLE,"SE_CAF_BLACKHOLE","SE_CAF_
/* ----------------------------------------------------------------------------
Whiteholes are used for the "locked" state of a closure (see lockClosure())
The closure type is BLAKCHOLE, just because we need a valid closure type
for sanity checking.
------------------------------------------------------------------------- */
INFO_TABLE(stg_WHITEHOLE, 0,0, BLACKHOLE, "WHITEHOLE", "WHITEHOLE")
INFO_TABLE(stg_WHITEHOLE, 0,0, WHITEHOLE, "WHITEHOLE", "WHITEHOLE")
{ foreign "C" barf("WHITEHOLE object entered!") never returns; }
/* ----------------------------------------------------------------------------
......
......@@ -310,7 +310,8 @@ allocGroup (nat n)
{
bdescr *bd, *rem;
ASSERT_SM_LOCK();
// Todo: not true in multithreaded GC, where we use allocBlock_sync().
// ASSERT_SM_LOCK();
if (n == 0) barf("allocGroup: requested zero blocks");
......@@ -439,7 +440,8 @@ freeGroup(bdescr *p)
{
nat p_on_free_list = 0;
ASSERT_SM_LOCK();
// Todo: not true in multithreaded GC
// ASSERT_SM_LOCK();
ASSERT(p->free != (P_)-1);
......
This diff is collapsed.
......@@ -47,6 +47,7 @@
#include "Scav.h"
#include "GCUtils.h"
#include "MarkWeak.h"
#include "Sparks.h"
#include <string.h> // for memset()
......@@ -117,12 +118,16 @@ nat mutlist_MUTVARS,
/* Thread-local data for each GC thread
*/
gc_thread *gc_threads = NULL;
gc_thread *gct = NULL; // this thread's gct TODO: make thread-local
// gc_thread *gct = NULL; // this thread's gct TODO: make thread-local
// For stats:
long copied; // *words* copied & scavenged during this GC
long scavd_copied; // *words* copied only during this GC
#ifdef THREADED_RTS
SpinLock recordMutableGen_sync;
#endif
/* -----------------------------------------------------------------------------
Static function declarations
-------------------------------------------------------------------------- */
......@@ -137,6 +142,11 @@ static void init_gc_thread (gc_thread *t);
static void update_task_list (void);
static void resize_generations (void);
static void resize_nursery (void);
static void start_gc_threads (void);
static void gc_thread_work (void);
static nat inc_running (void);
static nat dec_running (void);
static void wakeup_gc_threads (nat n_threads);
#if 0 && defined(DEBUG)
static void gcCAFs (void);
......@@ -227,6 +237,10 @@ GarbageCollect ( rtsBool force_major_gc )
*/
alloc_gc_threads();
/* Start threads, so they can be spinning up while we finish initialisation.
*/
start_gc_threads();
/* How many threads will be participating in this GC?
* We don't try to parallelise minor GC.
*/
......@@ -253,8 +267,11 @@ GarbageCollect ( rtsBool force_major_gc )
*/
static_objects = END_OF_STATIC_LIST;
scavenged_static_objects = END_OF_STATIC_LIST;
#ifdef THREADED_RTS
initSpinLock(&static_objects_sync);
initSpinLock(&recordMutableGen_sync);
initSpinLock(&gc_alloc_block_sync);
#endif
// Initialise all the generations/steps that we're collecting.
......@@ -283,12 +300,16 @@ GarbageCollect ( rtsBool force_major_gc )
init_gc_thread(&gc_threads[t]);
}
// the main thread is running: this prevents any other threads from
// exiting prematurely, so we can start them now.
inc_running();
wakeup_gc_threads(n_threads);
// Initialise stats
copied = 0;
scavd_copied = 0;
// start threads etc.
// For now, we just have one thread, and set gct to gc_threads[0]
// this is the main thread
gct = &gc_threads[0];
/* -----------------------------------------------------------------------
......@@ -329,25 +350,28 @@ GarbageCollect ( rtsBool force_major_gc )
* Repeatedly scavenge all the areas we know about until there's no
* more scavenging to be done.
*/
{
rtsBool flag;
loop:
flag = rtsFalse;
scavenge_loop();
// if any blackholes are alive, make the threads that wait on
// them alive too.
if (traverseBlackholeQueue())
flag = rtsTrue;
if (flag) { goto loop; }
for (;;)
{
gc_thread_work();
// The other threads are now stopped. We might recurse back to
// here, but from now on this is the only thread.
// if any blackholes are alive, make the threads that wait on
// them alive too.
if (traverseBlackholeQueue()) {
inc_running();
continue;
}
// must be last... invariant is that everything is fully
// scavenged at this point.
if (traverseWeakPtrList()) { // returns rtsTrue if evaced something
inc_running();
continue;
}
// must be last... invariant is that everything is fully
// scavenged at this point.
if (traverseWeakPtrList()) { // returns rtsTrue if evaced something
goto loop;
}
// If we get to here, there's really nothing left to do.
break;
}
// Update pointers from the Task list
......@@ -400,7 +424,8 @@ GarbageCollect ( rtsBool force_major_gc )
ws = &thr->steps[g][s];
if (g==0 && s==0) continue;
ASSERT( ws->scan_bd == ws->todo_bd );
// Not true?
// ASSERT( ws->scan_bd == ws->todo_bd );
ASSERT( ws->scan_bd ? ws->scan == ws->scan_bd->free : 1 );
// Push the final block
......@@ -679,25 +704,6 @@ GetRoots( evac_fn evac )
Capability *cap;
Task *task;
#if defined(GRAN)
for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
evac((StgClosure **)&run_queue_hds[i]);
if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
evac((StgClosure **)&run_queue_tls[i]);
if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
evac((StgClosure **)&blocked_queue_hds[i]);
if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
evac((StgClosure **)&blocked_queue_tls[i]);
if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
evac((StgClosure **)&ccalling_threads[i]);
}
markEventQueue();
#else /* !GRAN */
for (i = 0; i < n_capabilities; i++) {
cap = &capabilities[i];
evac((StgClosure **)(void *)&cap->run_queue_hd);
......@@ -715,17 +721,15 @@ GetRoots( evac_fn evac )
}
#if !defined(THREADED_RTS)
evac((StgClosure **)(void *)&blocked_queue_hd);
evac((StgClosure **)(void *)&blocked_queue_tl);
evac((StgClosure **)(void *)&sleeping_queue);
#endif
#endif
// evac((StgClosure **)&blackhole_queue);
#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
#if defined(THREADED_RTS)
markSparkQueue(evac);
#endif
......@@ -856,6 +860,14 @@ alloc_gc_thread (gc_thread *t, int n)
nat g, s;
step_workspace *ws;
#ifdef THREADED_RTS
t->id = 0;
initCondition(&t->wake_cond);
initMutex(&t->wake_mutex);
t->wakeup = rtsFalse;
t->exit = rtsFalse;
#endif
t->thread_index = n;
t->free_blocks = NULL;
t->gc_count = 0;
......@@ -897,7 +909,6 @@ alloc_gc_threads (void)
if (gc_threads == NULL) {
#if defined(THREADED_RTS)
nat i;
gc_threads = stgMallocBytes (RtsFlags.ParFlags.gcThreads *
sizeof(gc_thread),
"alloc_gc_threads");
......@@ -914,6 +925,146 @@ alloc_gc_threads (void)
}
}
/* ----------------------------------------------------------------------------
Start GC threads
------------------------------------------------------------------------- */
static nat gc_running_threads;
#if defined(THREADED_RTS)
static Mutex gc_running_mutex;
#endif
static nat
inc_running (void)
{
nat n_running;
ACQUIRE_LOCK(&gc_running_mutex);
n_running = ++gc_running_threads;
RELEASE_LOCK(&gc_running_mutex);
return n_running;
}
static nat
dec_running (void)
{
nat n_running;
ACQUIRE_LOCK(&gc_running_mutex);
n_running = --gc_running_threads;
RELEASE_LOCK(&gc_running_mutex);
return n_running;
}
//
// gc_thread_work(): Scavenge until there's no work left to do and all
// the running threads are idle.
//
static void
gc_thread_work (void)
{
nat r;
debugTrace(DEBUG_gc, "GC thread %d working", gct->thread_index);
// gc_running_threads has already been incremented for us; either
// this is the main thread and we incremented it inside
// GarbageCollect(), or this is a worker thread and the main
// thread bumped gc_running_threads before waking us up.
loop:
scavenge_loop();
// scavenge_loop() only exits when there's no work to do
r = dec_running();
debugTrace(DEBUG_gc, "GC thread %d idle (%d still running)",
gct->thread_index, r);
while (gc_running_threads != 0) {
if (any_work()) {
inc_running();
goto loop;
}
// any_work() does not remove the work from the queue, it
// just checks for the presence of work. If we find any,
// then we increment gc_running_threads and go back to
// scavenge_loop() to perform any pending work.
}
// All threads are now stopped
debugTrace(DEBUG_gc, "GC thread %d finished.", gct->thread_index);
}
#if defined(THREADED_RTS)
static void
gc_thread_mainloop (void)
{
while (!gct->exit) {
// Wait until we're told to wake up
ACQUIRE_LOCK(&gct->wake_mutex);
while (!gct->wakeup) {
debugTrace(DEBUG_gc, "GC thread %d standing by...",
gct->thread_index);
waitCondition(&gct->wake_cond, &gct->wake_mutex);
}
RELEASE_LOCK(&gct->wake_mutex);
gct->wakeup = rtsFalse;
if (gct->exit) break;
gc_thread_work();
}
}
#endif
#if defined(THREADED_RTS)
static void
gc_thread_entry (gc_thread *my_gct)
{
gct = my_gct;
debugTrace(DEBUG_gc, "GC thread %d starting...", gct->thread_index);
gct->id = osThreadId();
gc_thread_mainloop();
}
#endif
static void
start_gc_threads (void)
{
#if defined(THREADED_RTS)
nat i;
OSThreadId id;
static rtsBool done = rtsFalse;
gc_running_threads = 0;
initMutex(&gc_running_mutex);
if (!done) {
// Start from 1: the main thread is 0
for (i = 1; i < RtsFlags.ParFlags.gcThreads; i++) {
createOSThread(&id, (OSThreadProc*)&gc_thread_entry,
&gc_threads[i]);
}
done = rtsTrue;
}
#endif
}
static void
wakeup_gc_threads (nat n_threads USED_IF_THREADS)
{
#if defined(THREADED_RTS)
nat i;
for (i=1; i < n_threads; i++) {
inc_running();
ACQUIRE_LOCK(&gc_threads[i].wake_mutex);
gc_threads[i].wakeup = rtsTrue;
signalCondition(&gc_threads[i].wake_cond);
RELEASE_LOCK(&gc_threads[i].wake_mutex);
}
#endif
}
/* ----------------------------------------------------------------------------
Initialise a generation that is to be collected
------------------------------------------------------------------------- */
......
......@@ -110,6 +110,10 @@ typedef struct step_workspace_ {
typedef struct gc_thread_ {
#ifdef THREADED_RTS
OSThreadId id; // The OS thread that this struct belongs to
Mutex wake_mutex;
Condition wake_cond; // So we can go to sleep between GCs
rtsBool wakeup;
rtsBool exit;
#endif
nat thread_index; // a zero based index identifying the thread
......@@ -148,7 +152,8 @@ extern nat N;
extern rtsBool major_gc;
extern gc_thread *gc_threads;
extern gc_thread *gct; // this thread's gct TODO: make thread-local
register gc_thread *gct __asm__("%rbx");
// extern gc_thread *gct; // this thread's gct TODO: make thread-local
extern StgClosure* static_objects;
extern StgClosure* scavenged_static_objects;
......@@ -165,6 +170,10 @@ extern StgPtr oldgen_scan;
extern long copied;
extern long scavd_copied;
#ifdef THREADED_RTS
extern SpinLock static_objects_sync;
#endif
#ifdef DEBUG
extern nat mutlist_MUTVARS, mutlist_MUTARRS, mutlist_MVARS, mutlist_OTHERS;
#endif
......
......@@ -1494,7 +1494,7 @@ scavenge_mutable_list(generation *gen)
static void
scavenge_static(void)
{
StgClosure* p = static_objects;
StgClosure* p;
const StgInfoTable *info;
/* Always evacuate straight to the oldest generation for static
......@@ -1503,13 +1503,26 @@ scavenge_static(void)
/* keep going until we've scavenged all the objects on the linked
list... */
while (p != END_OF_STATIC_LIST) {
while (1) {
ACQUIRE_SPIN_LOCK(&static_objects_sync);
/* get the next static object from the list. Remember, there might
* be more stuff on this list after each evacuation...
* (static_objects is a global)
*/
p = static_objects;
if (p == END_OF_STATIC_LIST) {
RELEASE_SPIN_LOCK(&static_objects_sync);
break;
}
ASSERT(LOOKS_LIKE_CLOSURE_PTR(p));
info = get_itbl(p);
/*
if (info->type==RBH)
info = REVERT_INFOPTR(info); // if it's an RBH, look at the orig closure
if (info->type==RBH)
info = REVERT_INFOPTR(info); // if it's an RBH, look at the orig closure
*/
// make sure the info pointer is into text space
......@@ -1520,6 +1533,8 @@ scavenge_static(void)
*STATIC_LINK(info,p) = scavenged_static_objects;
scavenged_static_objects = p;
RELEASE_SPIN_LOCK(&static_objects_sync);
switch (info -> type) {
case IND_STATIC:
......@@ -1564,12 +1579,6 @@ scavenge_static(void)
}
ASSERT(gct->failed_to_evac == rtsFalse);
/* get the next static object from the list. Remember, there might
* be more stuff on this list now that we've done some evacuating!
* (static_objects is a global)
*/
p = static_objects;
}
}
......@@ -1947,3 +1956,39 @@ loop:
if (work_to_do) goto loop;
}
rtsBool
any_work (void)
{
int g, s;
step_workspace *ws;
write_barrier();
// scavenge static objects
if (major_gc && static_objects != END_OF_STATIC_LIST) {
return rtsTrue;
}
// scavenge objects in compacted generation
if (mark_stack_overflowed || oldgen_scan_bd != NULL ||
(mark_stack_bdescr != NULL && !mark_stack_empty())) {
return rtsTrue;
}
// Check for global work in any step. We don't need to check for
// local work, because we have already exited scavenge_loop(),
// which means there is no local work for this thread.
for (g = RtsFlags.GcFlags.generations; --g >= 0; ) {
for (s = generations[g].n_steps; --s >= 0; ) {
if (g == 0 && s == 0 && RtsFlags.GcFlags.generations > 1) {
continue;
}
ws = &gct->steps[g][s];
if (ws->todo_large_objects) return rtsTrue;
if (ws->stp->todos) return rtsTrue;
}
}
return rtsFalse;
}
......@@ -11,5 +11,6 @@
*
* ---------------------------------------------------------------------------*/
void scavenge_loop (void);
void scavenge_mutable_list (generation *g);
void scavenge_loop (void);
rtsBool any_work (void);
void scavenge_mutable_list (generation *g);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment