Commit c5cd2343 authored by simonmar's avatar simonmar

[project @ 2005-11-18 15:24:12 by simonmar]

Two improvements to the SMP runtime:

  - support for 'par', aka sparks.  Load balancing is very primitive
    right now, but I have seen programs that go faster using par.

  - support for backing off when a thread is found to be duplicating
    a computation currently underway in another thread.  This also
    fixes some instability in SMP, because it turned out that when
    an update frame points to an indirection, which can happen if
    a thunk is under evaluation in multiple threads, then after GC
    has shorted out the indirection the update will trash the value.
    Now we suspend the duplicate computation to the heap before this
    can happen.

Additionally:

  - stack squeezing is separate from lazy blackholing, and now only
    happens if there's a reasonable amount of squeezing to be done
    in relation to the number of words of stack that have to be moved.
    This means we won't try to shift 10Mb of stack just to save 2
    words at the bottom (it probably never happened, but still).

  - update frames are now marked when they have been visited by lazy
    blackholing, as per the SMP paper.

  - cleaned up raiseAsync() a bit.
parent 6c17d627
......@@ -339,6 +339,11 @@ typedef struct {
* - In StgTRecHeader, it might be worthwhile having separate chunks
* of read-only and read-write locations. This would save a
* new_value field in the read-only locations.
*
* - In StgAtomicallyFrame, we could combine the waiting bit into
* the header (maybe a different info tbl for a waiting transaction).
* This means we can specialise the code for the atomically frame
* (it immediately switches on frame->waiting anyway).
*/
typedef struct StgTVarWaitQueue_ {
......
......@@ -24,15 +24,17 @@
#include "gmp.h" // Needs MP_INT definition
/*
* This is the table that holds shadow-locations for all the STG
* registers. The shadow locations are used when:
*
* 1) the particular register isn't mapped to a real machine
* register, probably because there's a shortage of real registers.
* 2) caller-saves registers are saved across a CCall
/*
* Spark pools: used to store pending sparks (SMP & PARALLEL_HASKELL only)
* This is a circular buffer. Invariants:
* - base <= hd < lim
* - base <= tl < lim
* - if hd==tl, then the pool is empty.
* - if hd == tl+1, then the pool is full.
* Adding to the pool is done by assigning to *tl++ (wrapping round as
* necessary). When adding to a full pool, we have the option of
* throwing away either the oldest (hd++) or the most recent (tl--) entry.
*/
typedef struct StgSparkPool_ {
StgClosure **base;
StgClosure **lim;
......@@ -40,6 +42,12 @@ typedef struct StgSparkPool_ {
StgClosure **tl;
} StgSparkPool;
#define ASSERT_SPARK_POOL_INVARIANTS(p) \
ASSERT((p)->base <= (p)->hd); \
ASSERT((p)->hd < (p)->lim); \
ASSERT((p)->base <= (p)->tl); \
ASSERT((p)->tl < (p)->lim);
typedef struct {
StgFunPtr stgGCEnter1;
StgFunPtr stgGCFun;
......@@ -64,6 +72,14 @@ typedef union {
StgTSOPtr t;
} StgUnion;
/*
* This is the table that holds shadow-locations for all the STG
* registers. The shadow locations are used when:
*
* 1) the particular register isn't mapped to a real machine
* register, probably because there's a shortage of real registers.
* 2) caller-saves registers are saved across a CCall
*/
typedef struct StgRegTable_ {
StgUnion rR1;
StgUnion rR2;
......
......@@ -62,6 +62,7 @@ struct DEBUG_FLAGS {
rtsBool linker; /* 'l' the object linker */
rtsBool apply; /* 'a' */
rtsBool stm; /* 'm' */
rtsBool squeeze; /* 'z' stack squeezing & lazy blackholing */
};
struct COST_CENTRE_FLAGS {
......
......@@ -37,6 +37,7 @@
/* Stack frames */
RTS_RET_INFO(stg_upd_frame_info);
RTS_RET_INFO(stg_marked_upd_frame_info);
RTS_RET_INFO(stg_noupd_frame_info);
RTS_RET_INFO(stg_seq_frame_info);
RTS_RET_INFO(stg_catch_frame_info);
......@@ -45,6 +46,7 @@ RTS_RET_INFO(stg_atomically_frame_info);
RTS_RET_INFO(stg_catch_stm_frame_info);
RTS_ENTRY(stg_upd_frame_ret);
RTS_ENTRY(stg_marked_upd_frame_ret);
RTS_ENTRY(stg_seq_frame_ret);
/* Entry code for constructors created by the bytecode interpreter */
......
......@@ -392,7 +392,7 @@ extern lnat countNurseryBlocks ( void );
Functions from GC.c
-------------------------------------------------------------------------- */
extern void threadPaused ( StgTSO * );
extern void threadPaused ( Capability *cap, StgTSO * );
extern StgClosure * isAlive ( StgClosure *p );
extern void markCAFs ( evac_fn evac );
......
......@@ -23,6 +23,7 @@
#include "OSThreads.h"
#include "Capability.h"
#include "Schedule.h"
#include "Sparks.h"
#if !defined(SMP)
Capability MainCapability; // for non-SMP, we have one global capability
......@@ -74,6 +75,8 @@ anyWorkForMe( Capability *cap, Task *task )
} else {
return (cap->run_queue_hd->bound == task);
}
} else if (task->tso == NULL && !emptySparkPoolCap(cap)) {
return rtsTrue;
}
return globalWorkToDo();
}
......@@ -263,7 +266,7 @@ releaseCapability_ (Capability* cap)
// If we have an unbound thread on the run queue, or if there's
// anything else to do, give the Capability to a worker thread.
if (!emptyRunQueue(cap) || globalWorkToDo()) {
if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) {
if (cap->spare_workers) {
giveCapabilityToTask(cap,cap->spare_workers);
// The worker Task pops itself from the queue;
......
......@@ -4249,74 +4249,6 @@ gcCAFs(void)
#endif
/* -----------------------------------------------------------------------------
Lazy black holing.
Whenever a thread returns to the scheduler after possibly doing
some work, we have to run down the stack and black-hole all the
closures referred to by update frames.
-------------------------------------------------------------------------- */
static void
threadLazyBlackHole(StgTSO *tso)
{
StgClosure *frame;
StgRetInfoTable *info;
StgClosure *bh;
StgPtr stack_end;
stack_end = &tso->stack[tso->stack_size];
frame = (StgClosure *)tso->sp;
while (1) {
info = get_ret_itbl(frame);
switch (info->i.type) {
case UPDATE_FRAME:
bh = ((StgUpdateFrame *)frame)->updatee;
/* if the thunk is already blackholed, it means we've also
* already blackholed the rest of the thunks on this stack,
* so we can stop early.
*
* The blackhole made for a CAF is a CAF_BLACKHOLE, so they
* don't interfere with this optimisation.
*/
if (bh->header.info == &stg_BLACKHOLE_info) {
return;
}
if (bh->header.info != &stg_CAF_BLACKHOLE_info) {
#if (!defined(LAZY_BLACKHOLING)) && defined(DEBUG)
debugBelch("Unexpected lazy BHing required at 0x%04lx\n",(long)bh);
#endif
#ifdef PROFILING
// @LDV profiling
// We pretend that bh is now dead.
LDV_recordDead_FILL_SLOP_DYNAMIC((StgClosure *)bh);
#endif
SET_INFO(bh,&stg_BLACKHOLE_info);
// We pretend that bh has just been created.
LDV_RECORD_CREATE(bh);
}
frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
break;
case STOP_FRAME:
return;
// normal stack frames; do nothing except advance the pointer
default:
frame = (StgClosure *)((StgPtr)frame + stack_frame_sizeW(frame));
}
}
}
/* -----------------------------------------------------------------------------
* Stack squeezing
*
......@@ -4328,12 +4260,11 @@ threadLazyBlackHole(StgTSO *tso)
struct stack_gap { StgWord gap_size; struct stack_gap *next_gap; };
static void
threadSqueezeStack(StgTSO *tso)
stackSqueeze(StgTSO *tso, StgPtr bottom)
{
StgPtr frame;
rtsBool prev_was_update_frame;
StgClosure *updatee = NULL;
StgPtr bottom;
StgRetInfoTable *info;
StgWord current_gap_size;
struct stack_gap *gap;
......@@ -4344,8 +4275,6 @@ threadSqueezeStack(StgTSO *tso)
// contains two values: the size of the gap, and the distance
// to the next gap (or the stack top).
bottom = &(tso->stack[tso->stack_size]);
frame = tso->sp;
ASSERT(frame < bottom);
......@@ -4363,20 +4292,6 @@ threadSqueezeStack(StgTSO *tso)
{
StgUpdateFrame *upd = (StgUpdateFrame *)frame;
if (upd->updatee->header.info == &stg_BLACKHOLE_info) {
// found a BLACKHOLE'd update frame; we've been here
// before, in a previous GC, so just break out.
// Mark the end of the gap, if we're in one.
if (current_gap_size != 0) {
gap = (struct stack_gap *)(frame-sizeofW(StgUpdateFrame));
}
frame += sizeofW(StgUpdateFrame);
goto done_traversing;
}
if (prev_was_update_frame) {
TICK_UPD_SQUEEZED();
......@@ -4409,31 +4324,6 @@ threadSqueezeStack(StgTSO *tso)
// single update frame, or the topmost update frame in a series
else {
StgClosure *bh = upd->updatee;
// Do lazy black-holing
if (bh->header.info != &stg_BLACKHOLE_info &&
bh->header.info != &stg_CAF_BLACKHOLE_info) {
#if (!defined(LAZY_BLACKHOLING)) && defined(DEBUG)
debugBelch("Unexpected lazy BHing required at 0x%04lx",(long)bh);
#endif
#ifdef DEBUG
// zero out the slop so that the sanity checker can tell
// where the next closure is.
DEBUG_FILL_SLOP(bh);
#endif
#ifdef PROFILING
// We pretend that bh is now dead.
// ToDo: is the slop filling the same as DEBUG_FILL_SLOP?
LDV_recordDead_FILL_SLOP_DYNAMIC((StgClosure *)bh);
#endif
// Todo: maybe use SET_HDR() and remove LDV_RECORD_CREATE()?
SET_INFO(bh,&stg_BLACKHOLE_info);
// We pretend that bh has just been created.
LDV_RECORD_CREATE(bh);
}
prev_was_update_frame = rtsTrue;
updatee = upd->updatee;
frame += sizeofW(StgUpdateFrame);
......@@ -4456,8 +4346,10 @@ threadSqueezeStack(StgTSO *tso)
}
}
done_traversing:
if (current_gap_size != 0) {
gap = (struct stack_gap *) (frame - sizeofW(StgUpdateFrame));
}
// Now we have a stack with gaps in it, and we have to walk down
// shoving the stack up to fill in the gaps. A diagram might
// help:
......@@ -4515,12 +4407,110 @@ done_traversing:
* turned on.
* -------------------------------------------------------------------------- */
void
threadPaused(StgTSO *tso)
threadPaused(Capability *cap, StgTSO *tso)
{
if ( RtsFlags.GcFlags.squeezeUpdFrames == rtsTrue )
threadSqueezeStack(tso); // does black holing too
else
threadLazyBlackHole(tso);
StgClosure *frame;
StgRetInfoTable *info;
StgClosure *bh;
StgPtr stack_end;
nat words_to_squeeze = 0;
nat weight = 0;
nat weight_pending = 0;
rtsBool prev_was_update_frame;
stack_end = &tso->stack[tso->stack_size];
frame = (StgClosure *)tso->sp;
while (1) {
// If we've already marked this frame, then stop here.
if (frame->header.info == (StgInfoTable *)&stg_marked_upd_frame_info) {
goto end;
}
info = get_ret_itbl(frame);
switch (info->i.type) {
case UPDATE_FRAME:
SET_INFO(frame, (StgInfoTable *)&stg_marked_upd_frame_info);
bh = ((StgUpdateFrame *)frame)->updatee;
if (closure_IND(bh) || bh->header.info == &stg_BLACKHOLE_info) {
IF_DEBUG(squeeze, debugBelch("suspending duplicate work: %d words of stack\n", (StgPtr)frame - tso->sp));
// If this closure is already an indirection, then
// suspend the computation up to this point:
suspendComputation(cap,tso,(StgPtr)frame);
// Now drop the update frame, and arrange to return
// the value to the frame underneath:
tso->sp = (StgPtr)frame + sizeofW(StgUpdateFrame) - 2;
tso->sp[1] = (StgWord)bh;
tso->sp[0] = (W_)&stg_enter_info;
// And continue with threadPaused; there might be
// yet more computation to suspend.
threadPaused(cap,tso);
return;
}
if (bh->header.info != &stg_CAF_BLACKHOLE_info) {
#if (!defined(LAZY_BLACKHOLING)) && defined(DEBUG)
debugBelch("Unexpected lazy BHing required at 0x%04lx\n",(long)bh);
#endif
// zero out the slop so that the sanity checker can tell
// where the next closure is.
DEBUG_FILL_SLOP(bh);
#ifdef PROFILING
// @LDV profiling
// We pretend that bh is now dead.
LDV_recordDead_FILL_SLOP_DYNAMIC((StgClosure *)bh);
#endif
SET_INFO(bh,&stg_BLACKHOLE_info);
// We pretend that bh has just been created.
LDV_RECORD_CREATE(bh);
}
frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
if (prev_was_update_frame) {
words_to_squeeze += sizeofW(StgUpdateFrame);
weight += weight_pending;
weight_pending = 0;
}
prev_was_update_frame = rtsTrue;
break;
case STOP_FRAME:
goto end;
// normal stack frames; do nothing except advance the pointer
default:
{
nat frame_size = stack_frame_sizeW(frame);
weight_pending += frame_size;
frame = (StgClosure *)((StgPtr)frame + frame_size);
prev_was_update_frame = rtsFalse;
}
}
}
end:
IF_DEBUG(squeeze,
debugBelch("words_to_squeeze: %d, weight: %d, squeeze: %s\n",
words_to_squeeze, weight,
weight < words_to_squeeze ? "YES" : "NO"));
// Should we squeeze or not? Arbitrary heuristic: we squeeze if
// the number of words we have to shift down is less than the
// number of stack words we squeeze away by doing so.
if (1 /*RtsFlags.GcFlags.squeezeUpdFrames == rtsTrue &&
weight < words_to_squeeze*/) {
stackSqueeze(tso, (StgPtr)frame);
}
}
/* -----------------------------------------------------------------------------
......
......@@ -57,7 +57,7 @@
#define RETURN_TO_SCHEDULER(todo,retcode) \
SAVE_STACK_POINTERS; \
cap->r.rCurrentTSO->what_next = (todo); \
threadPaused(cap->r.rCurrentTSO); \
threadPaused(cap,cap->r.rCurrentTSO); \
cap->r.rRet = (retcode); \
return cap;
......
......@@ -190,6 +190,7 @@ void initRtsFlagsDefaults(void)
RtsFlags.DebugFlags.gran = rtsFalse;
RtsFlags.DebugFlags.par = rtsFalse;
RtsFlags.DebugFlags.linker = rtsFalse;
RtsFlags.DebugFlags.squeeze = rtsFalse;
#endif
#if defined(PROFILING) || defined(PAR)
......@@ -431,6 +432,7 @@ usage_text[] = {
" -DP DEBUG: par",
" -Dl DEBUG: linker",
" -Dm DEBUG: stm",
" -Dz DEBUG: stack squezing",
"",
#endif /* DEBUG */
#if defined(SMP)
......@@ -726,6 +728,9 @@ error = rtsTrue;
case 'm':
RtsFlags.DebugFlags.stm = rtsTrue;
break;
case 'z':
RtsFlags.DebugFlags.squeeze = rtsTrue;
break;
default:
bad_option( rts_argv[arg] );
}
......
......@@ -250,7 +250,7 @@ static void AllRoots(evac_fn evac);
static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
rtsBool stop_at_atomically);
rtsBool stop_at_atomically, StgPtr stop_here);
static void deleteThread (Capability *cap, StgTSO *tso);
static void deleteRunQueue (Capability *cap);
......@@ -396,7 +396,7 @@ schedule (Capability *initialCapability, Task *task)
#ifdef SMP
schedulePushWork(cap,task);
#endif
#endif
// Check whether we have re-entered the RTS from Haskell without
// going via suspendThread()/resumeThread (i.e. a 'safe' foreign
......@@ -415,6 +415,9 @@ schedule (Capability *initialCapability, Task *task)
//
if (interrupted) {
deleteRunQueue(cap);
#if defined(SMP)
discardSparksCap(cap);
#endif
if (shutting_down_scheduler) {
IF_DEBUG(scheduler, sched_belch("shutting down"));
// If we are a worker, just exit. If we're a bound thread
......@@ -428,23 +431,17 @@ schedule (Capability *initialCapability, Task *task)
}
}
#if defined(not_yet) && defined(SMP)
//
// Top up the run queue from our spark pool. We try to make the
// number of threads in the run queue equal to the number of
// free capabilities.
//
#if defined(SMP)
// If the run queue is empty, take a spark and turn it into a thread.
{
StgClosure *spark;
if (emptyRunQueue()) {
spark = findSpark(rtsFalse);
if (spark == NULL) {
break; /* no more sparks in the pool */
} else {
createSparkThread(spark);
if (emptyRunQueue(cap)) {
StgClosure *spark;
spark = findSpark(cap);
if (spark != NULL) {
IF_DEBUG(scheduler,
sched_belch("==^^ turning spark of closure %p into a thread",
sched_belch("turning spark of closure %p into a thread",
(StgClosure *)spark));
createSparkThread(cap,spark);
}
}
}
......@@ -739,9 +736,10 @@ schedulePushWork(Capability *cap USED_WHEN_SMP,
Capability *free_caps[n_capabilities], *cap0;
nat i, n_free_caps;
// Check whether we have more threads on our run queue that we
// could hand to another Capability.
if (emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) {
// Check whether we have more threads on our run queue, or sparks
// in our pool, that we could hand to another Capability.
if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
&& sparkPoolSizeCap(cap) < 2) {
return;
}
......@@ -772,31 +770,54 @@ schedulePushWork(Capability *cap USED_WHEN_SMP,
if (n_free_caps > 0) {
StgTSO *prev, *t, *next;
rtsBool pushed_to_all;
IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
prev = cap->run_queue_hd;
t = prev->link;
prev->link = END_TSO_QUEUE;
i = 0;
for (; t != END_TSO_QUEUE; t = next) {
next = t->link;
t->link = END_TSO_QUEUE;
if (t->what_next == ThreadRelocated
|| t->bound == task) { // don't move my bound thread
prev->link = t;
prev = t;
} else if (i == n_free_caps) {
i = 0;
// keep one for us
prev->link = t;
prev = t;
} else {
appendToRunQueue(free_caps[i],t);
if (t->bound) { t->bound->cap = free_caps[i]; }
i++;
pushed_to_all = rtsFalse;
if (cap->run_queue_hd != END_TSO_QUEUE) {
prev = cap->run_queue_hd;
t = prev->link;
prev->link = END_TSO_QUEUE;
for (; t != END_TSO_QUEUE; t = next) {
next = t->link;
t->link = END_TSO_QUEUE;
if (t->what_next == ThreadRelocated
|| t->bound == task) { // don't move my bound thread
prev->link = t;
prev = t;
} else if (i == n_free_caps) {
pushed_to_all = rtsTrue;
i = 0;
// keep one for us
prev->link = t;
prev = t;
} else {
appendToRunQueue(free_caps[i],t);
if (t->bound) { t->bound->cap = free_caps[i]; }
i++;
}
}
cap->run_queue_tl = prev;
}
// If there are some free capabilities that we didn't push any
// threads to, then try to push a spark to each one.
if (!pushed_to_all) {
StgClosure *spark;
// i is the next free capability to push to
for (; i < n_free_caps; i++) {
if (emptySparkPoolCap(free_caps[i])) {
spark = findSpark(cap);
if (spark != NULL) {
IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
newSpark(&(free_caps[i]->r), spark);
}
}
}
}
cap->run_queue_tl = prev;
// release the capabilities
for (i = 0; i < n_free_caps; i++) {
......@@ -812,15 +833,20 @@ schedulePushWork(Capability *cap USED_WHEN_SMP,
* Start any pending signal handlers
* ------------------------------------------------------------------------- */
#if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
static void
scheduleStartSignalHandlers(Capability *cap)
{
#if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
if (signals_pending()) { // safe outside the lock
startSignalHandlers(cap);
}
#endif
}
#else
static void
scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
{
}
#endif
/* ----------------------------------------------------------------------------
* Check for blocked threads that can be woken up.
......@@ -1926,7 +1952,7 @@ scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
// ATOMICALLY_FRAME, aborting the (nested)
// transaction, and saving the stack of any
// partially-evaluated thunks on the heap.
raiseAsync_(cap, t, NULL, rtsTrue);
raiseAsync_(cap, t, NULL, rtsTrue, NULL);
#ifdef REG_R1
ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
......@@ -2165,7 +2191,7 @@ suspendThread (StgRegTable *reg)
// XXX this might not be necessary --SDM
tso->what_next = ThreadRunGHC;
threadPaused(tso);
threadPaused(cap,tso);
if(tso->blocked_exceptions == NULL) {
tso->why_blocked = BlockedOnCCall;
......@@ -2660,6 +2686,10 @@ initScheduler(void)
initTaskManager();
#if defined(SMP) || defined(PARALLEL_HASKELL)