Commit 829a7d02 authored by Simon Marlow's avatar Simon Marlow
Browse files

Refactor the spark queue implementation into a generic work-stealing deque

So we can use this abstraction elsewhere in the RTS
parent 4fc80ef6
......@@ -4,35 +4,6 @@
*
* Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
*
* The implementation uses Double-Ended Queues with lock-free access
* (thereby often called "deque") as described in
*
* D.Chase and Y.Lev, Dynamic Circular Work-Stealing Deque.
* SPAA'05, July 2005, Las Vegas, USA.
* ACM 1-58113-986-1/05/0007
*
* Author: Jost Berthold MSRC 07-09/2008
*
* The DeQue is held as a circular array with known length. Positions
* of top (read-end) and bottom (write-end) always increase, and the
* array is accessed with indices modulo array-size. While this bears
* the risk of overflow, we assume that (with 64 bit indices), a
* program must run very long to reach that point.
*
* The write end of the queue (position bottom) can only be used with
* mutual exclusion, i.e. by exactly one caller at a time. At this
* end, new items can be enqueued using pushBottom()/newSpark(), and
* removed using popBottom()/reclaimSpark() (the latter implying a cas
* synchronisation with potential concurrent readers for the case of
* just one element).
*
* Multiple readers can steal()/findSpark() from the read end
* (position top), and are synchronised without a lock, based on a cas
* of the top position. One reader wins, the others return NULL for a
* failure.
*
* Both popBottom and steal also return NULL when the queue is empty.
*
-------------------------------------------------------------------------*/
#include "PosixSource.h"
......@@ -52,57 +23,6 @@
#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
/* internal helpers ... */
static StgWord
roundUp2(StgWord val)
{
StgWord rounded = 1;
/* StgWord is unsigned anyway, only catch 0 */
if (val == 0) {
barf("DeQue,roundUp2: invalid size 0 requested");
}
/* at least 1 bit set, shift up to its place */
do {
rounded = rounded << 1;
} while (0 != (val = val>>1));
return rounded;
}
#define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new)))
/* -----------------------------------------------------------------------------
*
* Initialising spark pools.
*
* -------------------------------------------------------------------------- */
/* constructor */
static SparkPool*
initPool(StgWord size)
{
StgWord realsize;
SparkPool *q;
realsize = roundUp2(size); /* to compute modulo as a bitwise & */
q = (SparkPool*) stgMallocBytes(sizeof(SparkPool), /* admin fields */
"newSparkPool");
q->elements = (StgClosurePtr*)
stgMallocBytes(realsize * sizeof(StgClosurePtr), /* dataspace */
"newSparkPool:data space");
q->top=0;
q->bottom=0;
q->topBound=0; /* read by writer, updated each time top is read */
q->size = realsize; /* power of 2 */
q->moduloSize = realsize - 1; /* n % size == n & moduloSize */
ASSERT_SPARK_POOL_INVARIANTS(q);
return q;
}
void
initSparkPools( void )
{
......@@ -110,159 +30,18 @@ initSparkPools( void )
/* walk over the capabilities, allocating a spark pool for each one */
nat i;
for (i = 0; i < n_capabilities; i++) {
capabilities[i].sparks = initPool(RtsFlags.ParFlags.maxLocalSparks);
capabilities[i].sparks = newWSDeque(RtsFlags.ParFlags.maxLocalSparks);
}
#else
/* allocate a single spark pool */
MainCapability->sparks = initPool(RtsFlags.ParFlags.maxLocalSparks);
MainCapability->sparks = newWSDeque(RtsFlags.ParFlags.maxLocalSparks);
#endif
}
void
freeSparkPool (SparkPool *pool)
{
/* should not interfere with concurrent findSpark() calls! And
nobody should use the pointer any more. We cross our fingers...*/
stgFree(pool->elements);
stgFree(pool);
}
/* -----------------------------------------------------------------------------
*
* reclaimSpark: remove a spark from the write end of the queue.
* Returns the removed spark, and NULL if a race is lost or the pool
* empty.
*
* If only one spark is left in the pool, we synchronise with
* concurrently stealing threads by using cas to modify the top field.
* This routine should NEVER be called by a task which does not own
* the capability. Can this be checked here?
*
* -------------------------------------------------------------------------- */
StgClosure *
reclaimSpark (SparkPool *deque)
{
/* also a bit tricky, has to avoid concurrent steal() calls by
accessing top with cas, when there is only one element left */
StgWord t, b;
StgClosurePtr* pos;
long currSize;
StgClosurePtr removed;
ASSERT_SPARK_POOL_INVARIANTS(deque);
b = deque->bottom;
/* "decrement b as a test, see what happens" */
deque->bottom = --b;
pos = (deque->elements) + (b & (deque->moduloSize));
t = deque->top; /* using topBound would give an *upper* bound, we
need a lower bound. We use the real top here, but
can update the topBound value */
deque->topBound = t;
currSize = b - t;
if (currSize < 0) { /* was empty before decrementing b, set b
consistently and abort */
deque->bottom = t;
return NULL;
}
removed = *pos;
if (currSize > 0) { /* no danger, still elements in buffer after b-- */
return removed;
}
/* otherwise, has someone meanwhile stolen the same (last) element?
Check and increment top value to know */
if ( !(CASTOP(&(deque->top),t,t+1)) ) {
removed = NULL; /* no success, but continue adjusting bottom */
}
deque->bottom = t+1; /* anyway, empty now. Adjust bottom consistently. */
deque->topBound = t+1; /* ...and cached top value as well */
ASSERT_SPARK_POOL_INVARIANTS(deque);
return removed;
}
/* -----------------------------------------------------------------------------
*
* tryStealSpark: try to steal a spark from a Capability.
*
* Returns a valid spark, or NULL if the pool was empty, and can
* occasionally return NULL if there was a race with another thread
* stealing from the same pool. In this case, try again later.
*
-------------------------------------------------------------------------- */
static StgClosurePtr
steal(SparkPool *deque)
{
StgClosurePtr* pos;
StgClosurePtr* arraybase;
StgWord sz;
StgClosurePtr stolen;
StgWord b,t;
// Can't do this on someone else's spark pool:
// ASSERT_SPARK_POOL_INVARIANTS(deque);
b = deque->bottom;
t = deque->top;
// NB. b and t are unsigned; we need a signed value for the test
// below.
if ((long)b - (long)t <= 0 ) {
return NULL; /* already looks empty, abort */
}
/* now access array, see pushBottom() */
arraybase = deque->elements;
sz = deque->moduloSize;
pos = arraybase + (t & sz);
stolen = *pos;
/* now decide whether we have won */
if ( !(CASTOP(&(deque->top),t,t+1)) ) {
/* lost the race, someon else has changed top in the meantime */
return NULL;
} /* else: OK, top has been incremented by the cas call */
// Can't do this on someone else's spark pool:
// ASSERT_SPARK_POOL_INVARIANTS(deque);
/* return stolen element */
return stolen;
}
StgClosure *
tryStealSpark (Capability *cap)
{
SparkPool *pool = cap->sparks;
StgClosure *stolen;
do {
stolen = steal(pool);
} while (stolen != NULL && !closure_SHOULD_SPARK(stolen));
return stolen;
}
/* -----------------------------------------------------------------------------
*
* "guesses" whether a deque is empty. Can return false negatives in
* presence of concurrent steal() calls, and false positives in
* presence of a concurrent pushBottom().
*
* -------------------------------------------------------------------------- */
rtsBool
looksEmpty(SparkPool* deque)
{
StgWord t = deque->top;
StgWord b = deque->bottom;
/* try to prefer false negatives by reading top first */
return ((long)b - (long)t <= 0);
/* => array is *never* completely filled, always 1 place free! */
freeWSDeque(pool);
}
/* -----------------------------------------------------------------------------
......@@ -281,69 +60,6 @@ createSparkThread (Capability *cap)
appendToRunQueue(cap,tso);
}
/* -----------------------------------------------------------------------------
*
* Create a new spark
*
* -------------------------------------------------------------------------- */
#define DISCARD_NEW
/* enqueue an element. Should always succeed by resizing the array
(not implemented yet, silently fails in that case). */
static void
pushBottom (SparkPool* deque, StgClosurePtr elem)
{
StgWord t;
StgClosurePtr* pos;
StgWord sz = deque->moduloSize;
StgWord b = deque->bottom;
ASSERT_SPARK_POOL_INVARIANTS(deque);
/* we try to avoid reading deque->top (accessed by all) and use
deque->topBound (accessed only by writer) instead.
This is why we do not just call empty(deque) here.
*/
t = deque->topBound;
if ( (StgInt)b - (StgInt)t >= (StgInt)sz ) {
/* NB. 1. sz == deque->size - 1, thus ">="
2. signed comparison, it is possible that t > b
*/
/* could be full, check the real top value in this case */
t = deque->top;
deque->topBound = t;
if (b - t >= sz) { /* really no space left :-( */
/* reallocate the array, copying the values. Concurrent steal()s
will in the meantime use the old one and modify only top.
This means: we cannot safely free the old space! Can keep it
on a free list internally here...
Potential bug in combination with steal(): if array is
replaced, it is unclear which one concurrent steal operations
use. Must read the array base address in advance in steal().
*/
#if defined(DISCARD_NEW)
ASSERT_SPARK_POOL_INVARIANTS(deque);
return; /* for now, silently fail */
#else
/* could make room by incrementing the top position here. In
* this case, should use CASTOP. If this fails, someone else has
* removed something, and new room will be available.
*/
ASSERT_SPARK_POOL_INVARIANTS(deque);
#endif
}
}
pos = (deque->elements) + (b & sz);
*pos = elem;
(deque->bottom)++;
ASSERT_SPARK_POOL_INVARIANTS(deque);
return;
}
/* --------------------------------------------------------------------------
* newSpark: create a new spark, as a result of calling "par"
* Called directly from STG.
......@@ -361,19 +77,40 @@ newSpark (StgRegTable *reg, StgClosure *p)
*/
p = UNTAG_CLOSURE(p);
ASSERT_SPARK_POOL_INVARIANTS(pool);
if (closure_SHOULD_SPARK(p)) {
pushBottom(pool,p);
pushWSDeque(pool,p);
}
cap->sparks_created++;
ASSERT_SPARK_POOL_INVARIANTS(pool);
return 1;
}
/* -----------------------------------------------------------------------------
*
* tryStealSpark: try to steal a spark from a Capability.
*
* Returns a valid spark, or NULL if the pool was empty, and can
* occasionally return NULL if there was a race with another thread
* stealing from the same pool. In this case, try again later.
*
-------------------------------------------------------------------------- */
StgClosure *
tryStealSpark (Capability *cap)
{
SparkPool *pool = cap->sparks;
StgClosure *stolen;
do {
stolen = stealWSDeque_(pool);
// use the no-loopy version, stealWSDeque_(), since if we get a
// spurious NULL here the caller may want to try stealing from
// other pools before trying again.
} while (stolen != NULL && !closure_SHOULD_SPARK(stolen));
return stolen;
}
/* --------------------------------------------------------------------------
* Remove all sparks from the spark queues which should not spark any
......@@ -412,11 +149,12 @@ pruneSparkQueue (evac_fn evac, void *user, Capability *cap)
pool->topBound = pool->top;
debugTrace(DEBUG_sched,
"markSparkQueue: current spark queue len=%d; (hd=%ld; tl=%ld)",
"markSparkQueue: current spark queue len=%ld; (hd=%ld; tl=%ld)",
sparkPoolSize(pool), pool->bottom, pool->top);
ASSERT_SPARK_POOL_INVARIANTS(pool);
elements = pool->elements;
ASSERT_WSDEQUE_INVARIANTS(pool);
elements = (StgClosurePtr *)pool->elements;
/* We have exclusive access to the structure here, so we can reset
bottom and top counters, and prune invalid sparks. Contents are
......@@ -513,10 +251,10 @@ pruneSparkQueue (evac_fn evac, void *user, Capability *cap)
debugTrace(DEBUG_sched, "pruned %d sparks", pruned_sparks);
debugTrace(DEBUG_sched,
"new spark queue len=%d; (hd=%ld; tl=%ld)",
"new spark queue len=%ld; (hd=%ld; tl=%ld)",
sparkPoolSize(pool), pool->bottom, pool->top);
ASSERT_SPARK_POOL_INVARIANTS(pool);
ASSERT_WSDEQUE_INVARIANTS(pool);
}
/* GC for the spark pool, called inside Capability.c for all
......@@ -530,11 +268,11 @@ traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
pool = cap->sparks;
ASSERT_SPARK_POOL_INVARIANTS(pool);
ASSERT_WSDEQUE_INVARIANTS(pool);
top = pool->top;
bottom = pool->bottom;
sparkp = pool->elements;
sparkp = (StgClosurePtr*)pool->elements;
modMask = pool->moduloSize;
while (top < bottom) {
......@@ -547,7 +285,7 @@ traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
}
debugTrace(DEBUG_sched,
"traversed spark queue, len=%d; (hd=%ld; tl=%ld)",
"traversed spark queue, len=%ld; (hd=%ld; tl=%ld)",
sparkPoolSize(pool), pool->bottom, pool->top);
}
......
/* -----------------------------------------------------------------------------
*
* (c) The GHC Team, 2000-2006
* (c) The GHC Team, 2000-2009
*
* Sparking support for GRAN, PAR and THREADED_RTS versions of the RTS.
*
......@@ -9,6 +9,8 @@
#ifndef SPARKS_H
#define SPARKS_H
#include "WSDeque.h"
#if defined(PARALLEL_HASKELL)
#error Sparks.c using new internal structure, needs major overhaul!
#endif
......@@ -17,68 +19,18 @@
#if defined(THREADED_RTS)
/* Spark pools: used to store pending sparks
* (THREADED_RTS & PARALLEL_HASKELL only)
* Implementation uses a DeQue to enable concurrent read accesses at
* the top end.
*/
typedef struct SparkPool_ {
/* Size of elements array. Used for modulo calculation: we round up
to powers of 2 and use the dyadic log (modulo == bitwise &) */
StgWord size;
StgWord moduloSize; /* bitmask for modulo */
/* top, index where multiple readers steal() (protected by a cas) */
volatile StgWord top;
/* bottom, index of next free place where one writer can push
elements. This happens unsynchronised. */
volatile StgWord bottom;
/* both position indices are continuously incremented, and used as
an index modulo the current array size. */
/* lower bound on the current top value. This is an internal
optimisation to avoid unnecessarily accessing the top field
inside pushBottom */
volatile StgWord topBound;
/* The elements array */
StgClosurePtr* elements;
/* Please note: the dataspace cannot follow the admin fields
immediately, as it should be possible to enlarge it without
disposing the old one automatically (as realloc would)! */
} SparkPool;
/* INVARIANTS, in this order: reasonable size,
topBound consistent, space pointer, space accessible to us.
NB. This is safe to use only (a) on a spark pool owned by the
current thread, or (b) when there's only one thread running, or no
stealing going on (e.g. during GC).
*/
#define ASSERT_SPARK_POOL_INVARIANTS(p) \
ASSERT((p)->size > 0); \
ASSERT((p)->topBound <= (p)->top); \
ASSERT((p)->elements != NULL); \
ASSERT(*((p)->elements) || 1); \
ASSERT(*((p)->elements - 1 + ((p)->size)) || 1);
// No: it is possible that top > bottom when using reclaimSpark()
// ASSERT((p)->bottom >= (p)->top);
// ASSERT((p)->size > (p)->bottom - (p)->top);
typedef WSDeque SparkPool;
// Initialisation
void initSparkPools (void);
// Take a spark from the "write" end of the pool. Can be called
// by the pool owner only.
StgClosure* reclaimSpark(SparkPool *pool);
INLINE_HEADER StgClosure* reclaimSpark(SparkPool *pool);
// Returns True if the spark pool is empty (can give a false positive
// if the pool is almost empty).
rtsBool looksEmpty(SparkPool* deque);
INLINE_HEADER rtsBool looksEmpty(SparkPool* deque);
StgClosure * tryStealSpark (Capability *cap);
void freeSparkPool (SparkPool *pool);
......@@ -87,30 +39,32 @@ void traverseSparkQueue(evac_fn evac, void *user, Capability *cap);
void pruneSparkQueue (evac_fn evac, void *user, Capability *cap);
INLINE_HEADER void discardSparks (SparkPool *pool);
INLINE_HEADER nat sparkPoolSize (SparkPool *pool);
#endif
INLINE_HEADER long sparkPoolSize (SparkPool *pool);
/* -----------------------------------------------------------------------------
* PRIVATE below here
* -------------------------------------------------------------------------- */
#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
INLINE_HEADER StgClosure* reclaimSpark(SparkPool *pool)
{
return popWSDeque(pool);
}
INLINE_HEADER rtsBool
emptySparkPool (SparkPool *pool)
{ return looksEmpty(pool); }
INLINE_HEADER rtsBool looksEmpty(SparkPool* deque)
{
return looksEmptyWSDeque(deque);
}
INLINE_HEADER nat
sparkPoolSize (SparkPool *pool)
{ return (pool->bottom - pool->top); }
INLINE_HEADER long sparkPoolSize (SparkPool *pool)
{
return dequeElements(pool);
}
INLINE_HEADER void
discardSparks (SparkPool *pool)
INLINE_HEADER void discardSparks (SparkPool *pool)
{
pool->top = pool->bottom;
// pool->topBound = pool->top;
discardElements(pool);
}
#endif
#endif // THREADED_RTS
#endif /* SPARKS_H */
/* -----------------------------------------------------------------------------
*
* (c) The GHC Team, 2009
*
* Work-stealing Deque data structure
*
* The implementation uses Double-Ended Queues with lock-free access
* (thereby often called "deque") as described in
*
* D.Chase and Y.Lev, Dynamic Circular Work-Stealing Deque.
* SPAA'05, July 2005, Las Vegas, USA.
* ACM 1-58113-986-1/05/0007
*
* Author: Jost Berthold MSRC 07-09/2008
*
* The DeQue is held as a circular array with known length. Positions
* of top (read-end) and bottom (write-end) always increase, and the
* array is accessed with indices modulo array-size. While this bears
* the risk of overflow, we assume that (with 64 bit indices), a
* program must run very long to reach that point.
*
* The write end of the queue (position bottom) can only be used with
* mutual exclusion, i.e. by exactly one caller at a time. At this
* end, new items can be enqueued using pushBottom()/newSpark(), and
* removed using popBottom()/reclaimSpark() (the latter implying a cas
* synchronisation with potential concurrent readers for the case of
* just one element).
*
* Multiple readers can steal from the read end (position top), and
* are synchronised without a lock, based on a cas of the top
* position. One reader wins, the others return NULL for a failure.
*
* Both popBottom and steal also return NULL when the queue is empty.
*
* ---------------------------------------------------------------------------*/
#include "Rts.h"
#include "RtsUtils.h"
#include "WSDeque.h"
#include "SMP.h" // for cas
#if defined(THREADED_RTS)
#define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new)))
/* -----------------------------------------------------------------------------
* newWSDeque
* -------------------------------------------------------------------------- */
/* internal helpers ... */
static StgWord
roundUp2(StgWord val)
{
StgWord rounded = 1;
/* StgWord is unsigned anyway, only catch 0 */
if (val == 0) {
barf("DeQue,roundUp2: invalid size 0 requested");
}
/* at least 1 bit set, shift up to its place */
do {
rounded = rounded << 1;
} while (0 != (val = val>>1));
return rounded;
}
WSDeque *
newWSDeque (nat size)
{
StgWord realsize;
WSDeque *q;
realsize = roundUp2(size); /* to compute modulo as a bitwise & */
q = (WSDeque*) stgMallocBytes(sizeof(WSDeque), /* admin fields */
"newWSDeque");
q->elements = stgMallocBytes(realsize * sizeof(StgClosurePtr), /* dataspace */
"newWSDeque:data space");
q->top=0;
q->bottom=0;
q->topBound=0; /* read by writer, updated each time top is read */
q->size = realsize; /* power of 2 */
q->moduloSize = realsize - 1; /* n % size == n & moduloSize */
ASSERT_WSDEQUE_INVARIANTS(q);
return q;
}
/* -----------------------------------------------------------------------------
* freeWSDeque