Commit cf9650f2 authored by berthold@mathematik.uni-marburg.de's avatar berthold@mathematik.uni-marburg.de
Browse files

Work stealing for sparks

   Spark stealing support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
  
  Spark pools are per capability, separately allocated and held in the Capability 
  structure. The implementation uses Double-Ended Queues (deque) and cas-protected 
  access.
  
  The write end of the queue (position bottom) can only be used with
  mutual exclusion, i.e. by exactly one caller at a time.
  Multiple readers can steal()/findSpark() from the read end
  (position top), and are synchronised without a lock, based on a cas
  of the top position. One reader wins, the others return NULL for a
  failure.
  
  Work stealing is called when Capabilities find no other work (inside yieldCapability),
  and tries all capabilities 0..n-1 twice, unless a theft succeeds.
  
  Inside schedulePushWork, all considered cap.s (those which were idle and could 
  be grabbed) are woken up. Future versions should wake up capabilities immediately when 
  putting a new spark in the local pool, from newSpark().

Patch has been re-recorded due to conflicting bugfixes in the sparks.c, also fixing a 
(strange) conflict in the scheduler.
parent 7eeac4d1
......@@ -22,31 +22,6 @@
#ifndef REGS_H
#define REGS_H
/*
* Spark pools: used to store pending sparks
* (THREADED_RTS & PARALLEL_HASKELL only)
* This is a circular buffer. Invariants:
* - base <= hd < lim
* - base <= tl < lim
* - if hd==tl, then the pool is empty.
* - if hd == tl+1, then the pool is full.
* Adding to the pool is done by assigning to *tl++ (wrapping round as
* necessary). When adding to a full pool, we have the option of
* throwing away either the oldest (hd++) or the most recent (tl--) entry.
*/
typedef struct StgSparkPool_ {
StgClosure **base;
StgClosure **lim;
StgClosure **hd;
StgClosure **tl;
} StgSparkPool;
#define ASSERT_SPARK_POOL_INVARIANTS(p) \
ASSERT((p)->base <= (p)->hd); \
ASSERT((p)->hd < (p)->lim); \
ASSERT((p)->base <= (p)->tl); \
ASSERT((p)->tl < (p)->lim);
typedef struct {
StgFunPtr stgGCEnter1;
StgFunPtr stgGCFun;
......@@ -120,7 +95,6 @@ typedef struct StgRegTable_ {
StgWord rmp_result1[MP_INT_WORDS];
StgWord rmp_result2[MP_INT_WORDS];
StgWord rRet; // holds the return code of the thread
StgSparkPool rSparks; /* per-task spark pool */
} StgRegTable;
#if IN_STG_CODE
......@@ -163,10 +137,6 @@ typedef struct StgRegTable_ {
#define SAVE_CurrentTSO (BaseReg->rCurrentTSO)
#define SAVE_CurrentNursery (BaseReg->rCurrentNursery)
#define SAVE_HpAlloc (BaseReg->rHpAlloc)
#define SAVE_SparkHd (BaseReg->rSparks.hd)
#define SAVE_SparkTl (BaseReg->rSparks.tl)
#define SAVE_SparkBase (BaseReg->rSparks.base)
#define SAVE_SparkLim (BaseReg->rSparks.lim)
/* We sometimes need to save registers across a C-call, eg. if they
* are clobbered in the standard calling convention. We define the
......@@ -401,30 +371,6 @@ GLOBAL_REG_DECL(bdescr *,HpAlloc,REG_HpAlloc)
#define HpAlloc (BaseReg->rHpAlloc)
#endif
#if defined(REG_SparkHd) && !defined(NO_GLOBAL_REG_DECLS)
GLOBAL_REG_DECL(bdescr *,SparkHd,REG_SparkHd)
#else
#define SparkHd (BaseReg->rSparks.hd)
#endif
#if defined(REG_SparkTl) && !defined(NO_GLOBAL_REG_DECLS)
GLOBAL_REG_DECL(bdescr *,SparkTl,REG_SparkTl)
#else
#define SparkTl (BaseReg->rSparks.tl)
#endif
#if defined(REG_SparkBase) && !defined(NO_GLOBAL_REG_DECLS)
GLOBAL_REG_DECL(bdescr *,SparkBase,REG_SparkBase)
#else
#define SparkBase (BaseReg->rSparks.base)
#endif
#if defined(REG_SparkLim) && !defined(NO_GLOBAL_REG_DECLS)
GLOBAL_REG_DECL(bdescr *,SparkLim,REG_SparkLim)
#else
#define SparkLim (BaseReg->rSparks.lim)
#endif
/* -----------------------------------------------------------------------------
Get absolute function pointers from the register table, to save
code space. On x86,
......@@ -665,38 +611,6 @@ GLOBAL_REG_DECL(bdescr *,SparkLim,REG_SparkLim)
#define CALLER_RESTORE_HpAlloc /* nothing */
#endif
#ifdef CALLER_SAVES_SparkHd
#define CALLER_SAVE_SparkHd SAVE_SparkHd = SparkHd;
#define CALLER_RESTORE_SparkHd SparkHd = SAVE_SparkHd;
#else
#define CALLER_SAVE_SparkHd /* nothing */
#define CALLER_RESTORE_SparkHd /* nothing */
#endif
#ifdef CALLER_SAVES_SparkTl
#define CALLER_SAVE_SparkTl SAVE_SparkTl = SparkTl;
#define CALLER_RESTORE_SparkTl SparkTl = SAVE_SparkTl;
#else
#define CALLER_SAVE_SparkTl /* nothing */
#define CALLER_RESTORE_SparkTl /* nothing */
#endif
#ifdef CALLER_SAVES_SparkBase
#define CALLER_SAVE_SparkBase SAVE_SparkBase = SparkBase;
#define CALLER_RESTORE_SparkBase SparkBase = SAVE_SparkBase;
#else
#define CALLER_SAVE_SparkBase /* nothing */
#define CALLER_RESTORE_SparkBase /* nothing */
#endif
#ifdef CALLER_SAVES_SparkLim
#define CALLER_SAVE_SparkLim SAVE_SparkLim = SparkLim;
#define CALLER_RESTORE_SparkLim SparkLim = SAVE_SparkLim;
#else
#define CALLER_SAVE_SparkLim /* nothing */
#define CALLER_RESTORE_SparkLim /* nothing */
#endif
#endif /* IN_STG_CODE */
/* ----------------------------------------------------------------------------
......@@ -731,10 +645,6 @@ GLOBAL_REG_DECL(bdescr *,SparkLim,REG_SparkLim)
CALLER_SAVE_HpLim \
CALLER_SAVE_CurrentTSO \
CALLER_SAVE_CurrentNursery \
CALLER_SAVE_SparkHd \
CALLER_SAVE_SparkTl \
CALLER_SAVE_SparkBase \
CALLER_SAVE_SparkLim \
CALLER_SAVE_Base
#define CALLER_RESTORE_USER \
......@@ -763,11 +673,7 @@ GLOBAL_REG_DECL(bdescr *,SparkLim,REG_SparkLim)
CALLER_RESTORE_Hp \
CALLER_RESTORE_HpLim \
CALLER_RESTORE_CurrentTSO \
CALLER_RESTORE_CurrentNursery \
CALLER_RESTORE_SparkHd \
CALLER_RESTORE_SparkTl \
CALLER_RESTORE_SparkBase \
CALLER_RESTORE_SparkLim
CALLER_RESTORE_CurrentNursery
#else /* not IN_STG_CODE */
......
......@@ -37,6 +37,40 @@ typedef enum {
Types specific to the parallel runtime system.
*/
/* Spark pools: used to store pending sparks
* (THREADED_RTS & PARALLEL_HASKELL only)
* Implementation uses a DeQue to enable concurrent read accesses at
* the top end.
*/
typedef struct SparkPool_ {
/* Size of elements array. Used for modulo calculation: we round up
to powers of 2 and use the dyadic log (modulo == bitwise &) */
StgWord size;
StgWord moduloSize; /* bitmask for modulo */
/* top, index where multiple readers steal() (protected by a cas) */
StgWord top;
/* bottom, index of next free place where one writer can push
elements. This happens unsynchronised. */
StgWord bottom;
/* both position indices are continuously incremented, and used as
an index modulo the current array size. */
/* lower bound on the current top value. This is an internal
optimisation to avoid unnecessarily accessing the top field
inside pushBottom */
StgWord topBound;
/* The elements array */
StgClosurePtr* elements;
/* Please note: the dataspace cannot follow the admin fields
immediately, as it should be possible to enlarge it without
disposing the old one automatically (as realloc would)! */
} SparkPool;
typedef ullong rtsTime;
#if defined(PAR)
......
......@@ -54,6 +54,55 @@ globalWorkToDo (void)
#endif
#if defined(THREADED_RTS)
rtsBool stealWork( Capability *cap) {
/* use the normal Sparks.h interface (internally modified to enable
concurrent stealing)
and immediately turn the spark into a thread when successful
*/
Capability *robbed;
SparkPool *pool;
StgClosurePtr spark;
rtsBool success = rtsFalse;
nat i = 0;
debugTrace(DEBUG_sched,
"cap %d: Trying to steal work from other capabilities",
cap->no);
if (n_capabilities == 1) { return rtsFalse; } // makes no sense...
/* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
start at a random place instead of 0 as well. */
for ( i=0 ; i < n_capabilities ; i++ ) {
robbed = &capabilities[i];
if (cap == robbed) // ourselves...
continue;
if (emptySparkPoolCap(robbed)) // nothing to steal here
continue;
spark = findSpark(robbed);
if (spark == NULL && !emptySparkPoolCap(robbed)) {
spark = findSpark(robbed); // lost race in concurrent access, try again
}
if (spark != NULL) {
debugTrace(DEBUG_sched,
"cap %d: Stole a spark from capability %d",
cap->no, robbed->no);
createSparkThread(cap,spark);
success = rtsTrue;
break; // got one, leave the loop
}
// otherwise: no success, try next one
}
debugTrace(DEBUG_sched,
"Leaving work stealing routine (%s)",
success?"one spark stolen":"thefts did not succeed");
return success;
}
STATIC_INLINE rtsBool
anyWorkForMe( Capability *cap, Task *task )
{
......@@ -73,9 +122,11 @@ anyWorkForMe( Capability *cap, Task *task )
if (emptyRunQueue(cap)) {
return !emptySparkPoolCap(cap)
|| !emptyWakeupQueue(cap)
|| globalWorkToDo();
} else
|| globalWorkToDo()
|| stealWork(cap); /* if all false: try to steal work */
} else {
return cap->run_queue_hd->bound == NULL;
}
}
}
#endif
......@@ -778,7 +829,7 @@ void
freeCapability (Capability *cap) {
stgFree(cap->mut_lists);
#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
freeSparkPool(&cap->r.rSparks);
freeSparkPool(cap->sparks);
#endif
}
......
......@@ -23,6 +23,7 @@
#ifndef CAPABILITY_H
#define CAPABILITY_H
#include "RtsTypes.h"
#include "RtsFlags.h"
#include "Task.h"
......@@ -98,6 +99,9 @@ struct Capability_ {
StgTRecChunk *free_trec_chunks;
StgTRecHeader *free_trec_headers;
nat transaction_tokens;
SparkPool *sparks;
}; // typedef Capability, defined in RtsAPI.h
......
......@@ -137,17 +137,17 @@ static Capability *schedule (Capability *initialCapability, Task *task);
// scheduler clearer.
//
static void schedulePreLoop (void);
#if defined(THREADED_RTS)
static void schedulePushWork(Capability *cap, Task *task);
#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
static void scheduleCheckBlackHoles (Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
#if defined(PARALLEL_HASKELL)
#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
static void schedulePushWork(Capability *cap, Task *task);
static rtsBool scheduleGetRemoteWork(Capability *cap);
#if defined(PARALLEL_HASKELL)
static void scheduleSendPendingMessages(void);
#endif
static void scheduleActivateSpark(Capability *cap);
#endif
static void schedulePostRunThread(Capability *cap, StgTSO *t);
......@@ -291,13 +291,15 @@ schedule (Capability *initialCapability, Task *task)
} else {
// Yield the capability to higher-priority tasks if necessary.
yieldCapability(&cap, task);
/* inside yieldCapability, attempts to steal work from other
capabilities, unless the capability has own work.
See (REMARK) below.
*/
}
#endif
#if defined(THREADED_RTS)
schedulePushWork(cap,task);
#endif
/* THIS WAS THE PLACE FOR THREADED_RTS::schedulePushWork(cap,task) */
// Check whether we have re-entered the RTS from Haskell without
// going via suspendThread()/resumeThread (i.e. a 'safe' foreign
// call).
......@@ -365,21 +367,7 @@ schedule (Capability *initialCapability, Task *task)
barf("sched_state: %d", sched_state);
}
#if defined(THREADED_RTS)
// If the run queue is empty, take a spark and turn it into a thread.
{
if (emptyRunQueue(cap)) {
StgClosure *spark;
spark = findSpark(cap);
if (spark != NULL) {
debugTrace(DEBUG_sched,
"turning spark of closure %p into a thread",
(StgClosure *)spark);
createSparkThread(cap,spark);
}
}
}
#endif // THREADED_RTS
/* this was the place to activate a spark, now below... */
scheduleStartSignalHandlers(cap);
......@@ -393,11 +381,19 @@ schedule (Capability *initialCapability, Task *task)
scheduleCheckBlockedThreads(cap);
#if defined(PARALLEL_HASKELL)
/* message processing and work distribution goes here */
#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
/* work distribution in multithreaded and parallel systems
REMARK: IMHO best location for work-stealing as well.
tests above might yield some new jobs, so no need to steal a
spark in some cases. I believe the yieldCapability.. above
should be moved here.
*/
#if defined(PARALLEL_HASKELL)
/* if messages have been buffered... a NOOP in THREADED_RTS */
scheduleSendPendingMessages();
#endif
/* If the run queue is empty,...*/
if (emptyRunQueue(cap)) {
......@@ -406,6 +402,7 @@ schedule (Capability *initialCapability, Task *task)
/* if this did not work, try to steal a spark from someone else */
if (emptyRunQueue(cap)) {
#if defined(PARALLEL_HASKELL)
receivedFinish = scheduleGetRemoteWork(cap);
continue; // a new round, (hopefully) with new work
/*
......@@ -414,10 +411,20 @@ schedule (Capability *initialCapability, Task *task)
b) (blocking) awaits and receives messages
in Eden, this is only the blocking receive, as b) in GUM.
in Threaded-RTS, this does plain nothing. Stealing routine
is inside Capability.c and called from
yieldCapability() at the very beginning, see REMARK.
*/
#endif
}
}
} else { /* i.e. run queue was (initially) not empty */
schedulePushWork(cap,task);
/* work pushing, currently relevant only for THREADED_RTS:
(pushes threads, wakes up idle capabilities for stealing) */
}
#if defined(PARALLEL_HASKELL)
/* since we perform a blocking receive and continue otherwise,
either we never reach here or we definitely have work! */
// from here: non-empty run queue
......@@ -430,7 +437,9 @@ schedule (Capability *initialCapability, Task *task)
above, waits for messages as well! */
processMessages(cap, &receivedFinish);
}
#endif // PARALLEL_HASKELL
#endif // PARALLEL_HASKELL: non-empty run queue!
#endif /* THREADED_RTS || PARALLEL_HASKELL */
scheduleDetectDeadlock(cap,task);
#if defined(THREADED_RTS)
......@@ -679,11 +688,15 @@ schedulePreLoop(void)
* Push work to other Capabilities if we have some.
* -------------------------------------------------------------------------- */
#if defined(THREADED_RTS)
#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
static void
schedulePushWork(Capability *cap USED_IF_THREADS,
Task *task USED_IF_THREADS)
{
/* following code not for PARALLEL_HASKELL. I kept the call general,
future GUM versions might use pushing in a distributed setup */
#if defined(THREADED_RTS)
Capability *free_caps[n_capabilities], *cap0;
nat i, n_free_caps;
......@@ -726,7 +739,12 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
StgTSO *prev, *t, *next;
rtsBool pushed_to_all;
debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
debugTrace(DEBUG_sched,
"cap %d: %s and %d free capabilities, sharing...",
cap->no,
(!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
"excess threads on run queue":"sparks to share (>=2)",
n_free_caps);
i = 0;
pushed_to_all = rtsFalse;
......@@ -760,6 +778,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
cap->run_queue_tl = prev;
}
#ifdef SPARK_PUSHING
/* JB I left this code in place, it would work but is not necessary */
// If there are some free capabilities that we didn't push any
// threads to, then try to push a spark to each one.
if (!pushed_to_all) {
......@@ -775,16 +796,23 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
}
}
}
#endif /* SPARK_PUSHING */
// release the capabilities
for (i = 0; i < n_free_caps; i++) {
task->cap = free_caps[i];
releaseCapability(free_caps[i]);
}
// now wake them all up, and they might steal sparks if
// the did not get a thread
prodAllCapabilities();
}
task->cap = cap; // reset to point to our Capability.
#endif /* THREADED_RTS */
}
#endif
#endif /* THREADED_RTS || PARALLEL_HASKELL */
/* ----------------------------------------------------------------------------
* Start any pending signal handlers
......@@ -965,7 +993,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
* ------------------------------------------------------------------------- */
#if defined(PARALLEL_HASKELL)
static StgTSO *
static void
scheduleSendPendingMessages(void)
{
......@@ -984,10 +1012,10 @@ scheduleSendPendingMessages(void)
#endif
/* ----------------------------------------------------------------------------
* Activate spark threads (PARALLEL_HASKELL only)
* Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
* ------------------------------------------------------------------------- */
#if defined(PARALLEL_HASKELL)
#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
static void
scheduleActivateSpark(Capability *cap)
{
......@@ -1012,14 +1040,14 @@ scheduleActivateSpark(Capability *cap)
createSparkThread(cap,spark); // defined in Sparks.c
}
}
#endif // PARALLEL_HASKELL
#endif // PARALLEL_HASKELL || THREADED_RTS
/* ----------------------------------------------------------------------------
* Get work from a remote node (PARALLEL_HASKELL only)
* ------------------------------------------------------------------------- */
#if defined(PARALLEL_HASKELL)
static rtsBool
#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
static rtsBool /* return value used in PARALLEL_HASKELL only */
scheduleGetRemoteWork(Capability *cap)
{
#if defined(PARALLEL_HASKELL)
......@@ -1057,7 +1085,7 @@ scheduleGetRemoteWork(Capability *cap)
#endif /* PARALLEL_HASKELL */
}
#endif // PARALLEL_HASKELL
#endif // PARALLEL_HASKELL || THREADED_RTS
/* ----------------------------------------------------------------------------
* After running a thread...
......@@ -1483,6 +1511,14 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
performHeapProfile = rtsFalse;
}
#ifdef SPARKBALANCE
/* JB
Once we are all together... this would be the place to balance all
spark pools. No concurrent stealing or adding of new sparks can
occur. Should be defined in Sparks.c. */
balanceSparkPoolsCaps(n_capabilities, capabilities);
#endif
#if defined(THREADED_RTS)
// release our stash of capabilities.
for (i = 0; i < n_capabilities; i++) {
......
This diff is collapsed.
......@@ -9,17 +9,45 @@
#ifndef SPARKS_H
#define SPARKS_H
#if defined(PARALLEL_HASKELL)
#error Sparks.c using new internal structure, needs major overhaul!
#endif
/* typedef for SparkPool in RtsTypes.h */
#if defined(THREADED_RTS)
/* INVARIANTS, in this order: bottom/top consistent, reasonable size,
topBound consistent, space pointer, space accessible to us */
#define ASSERT_SPARK_POOL_INVARIANTS(p) \
ASSERT((p)->bottom >= (p)->top); \
ASSERT((p)->size > 0); \
ASSERT((p)->size > (p)->bottom - (p)->top); \
ASSERT((p)->topBound <= (p)->top); \
ASSERT((p)->elements != NULL); \
ASSERT(*((p)->elements) || 1); \
ASSERT(*((p)->elements - 1 + ((p)->size)) || 1);
// missing in old interface. Currently called by initSparkPools
// internally.
SparkPool* initPool(StgWord size);
// special case: accessing our own pool, at the write end
// otherwise, we can always steal from our pool as the others do...
StgClosure* reclaimSpark(Capability *cap);
rtsBool looksEmpty(SparkPool* deque);
// rest: same as old interface
StgClosure * findSpark (Capability *cap);
void initSparkPools (void);
void freeSparkPool (StgSparkPool *pool);
void freeSparkPool (SparkPool *pool);
void createSparkThread (Capability *cap, StgClosure *p);
void pruneSparkQueues (void);
void traverseSparkQueue(evac_fn evac, void *user, Capability *cap);
INLINE_HEADER void discardSparks (StgSparkPool *pool);
INLINE_HEADER nat sparkPoolSize (StgSparkPool *pool);
INLINE_HEADER rtsBool emptySparkPool (StgSparkPool *pool);
INLINE_HEADER void discardSparks (SparkPool *pool);
INLINE_HEADER nat sparkPoolSize (SparkPool *pool);
INLINE_HEADER void discardSparksCap (Capability *cap);
INLINE_HEADER nat sparkPoolSizeCap (Capability *cap);
......@@ -32,46 +60,33 @@ INLINE_HEADER rtsBool emptySparkPoolCap (Capability *cap);
#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
INLINE_HEADER rtsBool
emptySparkPool (StgSparkPool *pool)
{
return (pool->hd == pool->tl);
}
INLINE_HEADER rtsBool
emptySparkPool (SparkPool *pool)
{ return looksEmpty(pool); }
INLINE_HEADER rtsBool
emptySparkPoolCap (Capability *cap)
{ return emptySparkPool(&cap->r.rSparks); }
{ return looksEmpty(cap->sparks); }
INLINE_HEADER nat
sparkPoolSize (StgSparkPool *pool)
sparkPoolSize (SparkPool *pool)
{
if (pool->hd <= pool->tl) {
return (pool->tl - pool->hd);
} else {
return (pool->lim - pool->hd + pool->tl - pool->base);
}
return (pool->bottom - pool->top);
}
INLINE_HEADER nat
sparkPoolSizeCap (Capability *cap)
{ return sparkPoolSize(&cap->r.rSparks); }
{ return sparkPoolSize(cap->sparks); }
INLINE_HEADER void
discardSparks (StgSparkPool *pool)
discardSparks (SparkPool *pool)
{
pool->hd = pool->tl;
pool->top = pool->bottom = 0;
}
INLINE_HEADER void
discardSparksCap (Capability *cap)
{ return discardSparks(&cap->r.rSparks); }
#elif defined(THREADED_RTS)
INLINE_HEADER rtsBool
emptySparkPoolCap (Capability *cap STG_UNUSED)
{ return rtsTrue; }
{ return discardSparks(cap->sparks); }
#endif
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment