Commit 99df892c authored by Simon Marlow's avatar Simon Marlow

Refactoring and reorganisation of the scheduler

Change the way we look for work in the scheduler.  Previously,
checking to see whether there was anything to do was a
non-side-effecting operation, but this has changed now that we do
work-stealing.  This lead to a refactoring of the inner loop of the
scheduler.

Also, lots of cleanup in the new work-stealing code, but no functional
changes.

One new statistic is added to the +RTS -s output:

  SPARKS: 1430 (2 converted, 1427 pruned)

lets you know something about the use of `par` in the program.
parent cf9650f2
/*
Time-stamp: <2005-03-30 12:02:33 simonmar>
RTS specific types.
*/
/* -----------------------------------------------------------------------------
*
* (c) The GHC Team, 1998-2008
*
* RTS-specific types.
*
* ---------------------------------------------------------------------------*/
/* -------------------------------------------------------------------------
Generally useful typedefs
......@@ -37,40 +39,6 @@ typedef enum {
Types specific to the parallel runtime system.
*/
/* Spark pools: used to store pending sparks
* (THREADED_RTS & PARALLEL_HASKELL only)
* Implementation uses a DeQue to enable concurrent read accesses at
* the top end.
*/
typedef struct SparkPool_ {
/* Size of elements array. Used for modulo calculation: we round up
to powers of 2 and use the dyadic log (modulo == bitwise &) */
StgWord size;
StgWord moduloSize; /* bitmask for modulo */
/* top, index where multiple readers steal() (protected by a cas) */
StgWord top;
/* bottom, index of next free place where one writer can push
elements. This happens unsynchronised. */
StgWord bottom;
/* both position indices are continuously incremented, and used as
an index modulo the current array size. */
/* lower bound on the current top value. This is an internal
optimisation to avoid unnecessarily accessing the top field
inside pushBottom */
StgWord topBound;
/* The elements array */
StgClosurePtr* elements;
/* Please note: the dataspace cannot follow the admin fields
immediately, as it should be possible to enlarge it without
disposing the old one automatically (as realloc would)! */
} SparkPool;
typedef ullong rtsTime;
#if defined(PAR)
......
......@@ -54,15 +54,17 @@ globalWorkToDo (void)
#endif
#if defined(THREADED_RTS)
rtsBool stealWork( Capability *cap) {
rtsBool
stealWork (Capability *cap)
{
/* use the normal Sparks.h interface (internally modified to enable
concurrent stealing)
and immediately turn the spark into a thread when successful
*/
Capability *robbed;
SparkPool *pool;
StgClosurePtr spark;
rtsBool success = rtsFalse;
rtsBool retry;
nat i = 0;
debugTrace(DEBUG_sched,
......@@ -71,63 +73,40 @@ rtsBool stealWork( Capability *cap) {
if (n_capabilities == 1) { return rtsFalse; } // makes no sense...
/* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
start at a random place instead of 0 as well. */
for ( i=0 ; i < n_capabilities ; i++ ) {
robbed = &capabilities[i];
if (cap == robbed) // ourselves...
continue;
do {
retry = rtsFalse;
if (emptySparkPoolCap(robbed)) // nothing to steal here
continue;
spark = findSpark(robbed);
/* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
start at a random place instead of 0 as well. */
for ( i=0 ; i < n_capabilities ; i++ ) {
robbed = &capabilities[i];
if (cap == robbed) // ourselves...
continue;
if (spark == NULL && !emptySparkPoolCap(robbed)) {
spark = findSpark(robbed); // lost race in concurrent access, try again
}
if (spark != NULL) {
debugTrace(DEBUG_sched,
if (emptySparkPoolCap(robbed)) // nothing to steal here
continue;
spark = tryStealSpark(robbed->sparks);
if (spark == NULL && !emptySparkPoolCap(robbed)) {
// we conflicted with another thread while trying to steal;
// try again later.
retry = rtsTrue;
}
if (spark != NULL) {
debugTrace(DEBUG_sched,
"cap %d: Stole a spark from capability %d",
cap->no, robbed->no);
cap->no, robbed->no);
createSparkThread(cap,spark);
success = rtsTrue;
break; // got one, leave the loop
}
// otherwise: no success, try next one
}
debugTrace(DEBUG_sched,
"Leaving work stealing routine (%s)",
success?"one spark stolen":"thefts did not succeed");
return success;
}
createSparkThread(cap,spark);
return rtsTrue;
}
// otherwise: no success, try next one
}
} while (retry);
STATIC_INLINE rtsBool
anyWorkForMe( Capability *cap, Task *task )
{
if (task->tso != NULL) {
// A bound task only runs if its thread is on the run queue of
// the capability on which it was woken up. Otherwise, we
// can't be sure that we have the right capability: the thread
// might be woken up on some other capability, and task->cap
// could change under our feet.
return !emptyRunQueue(cap) && cap->run_queue_hd->bound == task;
} else {
// A vanilla worker task runs if either there is a lightweight
// thread at the head of the run queue, or the run queue is
// empty and (there are sparks to execute, or there is some
// other global condition to check, such as threads blocked on
// blackholes).
if (emptyRunQueue(cap)) {
return !emptySparkPoolCap(cap)
|| !emptyWakeupQueue(cap)
|| globalWorkToDo()
|| stealWork(cap); /* if all false: try to steal work */
} else {
return cap->run_queue_hd->bound == NULL;
}
}
debugTrace(DEBUG_sched, "No sparks stolen");
return rtsFalse;
}
#endif
......@@ -194,6 +173,9 @@ initCapability( Capability *cap, nat i )
cap->returning_tasks_tl = NULL;
cap->wakeup_queue_hd = END_TSO_QUEUE;
cap->wakeup_queue_tl = END_TSO_QUEUE;
cap->sparks_created = 0;
cap->sparks_converted = 0;
cap->sparks_pruned = 0;
#endif
cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1;
......@@ -326,7 +308,8 @@ giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
#if defined(THREADED_RTS)
void
releaseCapability_ (Capability* cap)
releaseCapability_ (Capability* cap,
rtsBool always_wakeup)
{
Task *task;
......@@ -384,8 +367,9 @@ releaseCapability_ (Capability* cap)
// If we have an unbound thread on the run queue, or if there's
// anything else to do, give the Capability to a worker thread.
if (!emptyRunQueue(cap) || !emptyWakeupQueue(cap)
|| !emptySparkPoolCap(cap) || globalWorkToDo()) {
if (always_wakeup ||
!emptyRunQueue(cap) || !emptyWakeupQueue(cap) ||
!emptySparkPoolCap(cap) || globalWorkToDo()) {
if (cap->spare_workers) {
giveCapabilityToTask(cap,cap->spare_workers);
// The worker Task pops itself from the queue;
......@@ -401,7 +385,15 @@ void
releaseCapability (Capability* cap USED_IF_THREADS)
{
ACQUIRE_LOCK(&cap->lock);
releaseCapability_(cap);
releaseCapability_(cap, rtsFalse);
RELEASE_LOCK(&cap->lock);
}
void
releaseAndWakeupCapability (Capability* cap USED_IF_THREADS)
{
ACQUIRE_LOCK(&cap->lock);
releaseCapability_(cap, rtsTrue);
RELEASE_LOCK(&cap->lock);
}
......@@ -427,7 +419,7 @@ releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
}
// Bound tasks just float around attached to their TSOs.
releaseCapability_(cap);
releaseCapability_(cap,rtsFalse);
RELEASE_LOCK(&cap->lock);
}
......@@ -534,16 +526,6 @@ yieldCapability (Capability** pCap, Task *task)
{
Capability *cap = *pCap;
// The fast path has no locking, if we don't enter this while loop
while ( waiting_for_gc
/* i.e. another capability triggered HeapOverflow, is busy
getting capabilities (stopping their owning tasks) */
|| cap->returning_tasks_hd != NULL
/* cap reserved for another task */
|| !anyWorkForMe(cap,task)
/* cap/task have no work */
) {
debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
// We must now release the capability and wait to be woken up
......@@ -588,7 +570,6 @@ yieldCapability (Capability** pCap, Task *task)
trace(TRACE_sched | DEBUG_sched, "resuming capability %d", cap->no);
ASSERT(cap->running_task == task);
}
*pCap = cap;
......@@ -630,7 +611,7 @@ wakeupThreadOnCapability (Capability *my_cap,
appendToRunQueue(other_cap,tso);
trace(TRACE_sched, "resuming capability %d", other_cap->no);
releaseCapability_(other_cap);
releaseCapability_(other_cap,rtsFalse);
} else {
appendToWakeupQueue(my_cap,other_cap,tso);
other_cap->context_switch = 1;
......@@ -765,7 +746,7 @@ shutdownCapability (Capability *cap, Task *task, rtsBool safe)
if (!emptyRunQueue(cap) || cap->spare_workers) {
debugTrace(DEBUG_sched,
"runnable threads or workers still alive, yielding");
releaseCapability_(cap); // this will wake up a worker
releaseCapability_(cap,rtsFalse); // this will wake up a worker
RELEASE_LOCK(&cap->lock);
yieldThread();
continue;
......
......@@ -23,9 +23,9 @@
#ifndef CAPABILITY_H
#define CAPABILITY_H
#include "RtsTypes.h"
#include "RtsFlags.h"
#include "Task.h"
#include "Sparks.h"
struct Capability_ {
// State required by the STG virtual machine when running Haskell
......@@ -91,6 +91,13 @@ struct Capability_ {
// woken up by another Capability.
StgTSO *wakeup_queue_hd;
StgTSO *wakeup_queue_tl;
SparkPool *sparks;
// Stats on spark creation/conversion
nat sparks_created;
nat sparks_converted;
nat sparks_pruned;
#endif
// Per-capability STM-related data
......@@ -100,8 +107,6 @@ struct Capability_ {
StgTRecHeader *free_trec_headers;
nat transaction_tokens;
SparkPool *sparks;
}; // typedef Capability, defined in RtsAPI.h
......@@ -147,12 +152,16 @@ void initCapabilities (void);
// ASSUMES: cap->running_task is the current Task.
//
#if defined(THREADED_RTS)
void releaseCapability (Capability* cap);
void releaseCapability_ (Capability* cap); // assumes cap->lock is held
void releaseCapability (Capability* cap);
void releaseAndWakeupCapability (Capability* cap);
void releaseCapability_ (Capability* cap, rtsBool always_wakeup);
// assumes cap->lock is held
#else
// releaseCapability() is empty in non-threaded RTS
INLINE_HEADER void releaseCapability (Capability* cap STG_UNUSED) {};
INLINE_HEADER void releaseCapability_ (Capability* cap STG_UNUSED) {};
INLINE_HEADER void releaseAndWakeupCapability (Capability* cap STG_UNUSED) {};
INLINE_HEADER void releaseCapability_ (Capability* cap STG_UNUSED,
rtsBool always_wakeup STG_UNUSED) {};
#endif
#if !IN_STG_CODE
......@@ -231,6 +240,14 @@ void shutdownCapability (Capability *cap, Task *task, rtsBool wait_foreign);
//
rtsBool tryGrabCapability (Capability *cap, Task *task);
// Try to steal a spark from other Capabilities
//
rtsBool stealWork (Capability *cap);
INLINE_HEADER rtsBool emptySparkPoolCap (Capability *cap);
INLINE_HEADER nat sparkPoolSizeCap (Capability *cap);
INLINE_HEADER void discardSparksCap (Capability *cap);
#else // !THREADED_RTS
// Grab a capability. (Only in the non-threaded RTS; in the threaded
......@@ -273,4 +290,18 @@ recordMutableCap (StgClosure *p, Capability *cap, nat gen)
*bd->free++ = (StgWord)p;
}
#if defined(THREADED_RTS)
INLINE_HEADER rtsBool
emptySparkPoolCap (Capability *cap)
{ return looksEmpty(cap->sparks); }
INLINE_HEADER nat
sparkPoolSizeCap (Capability *cap)
{ return sparkPoolSize(cap->sparks); }
INLINE_HEADER void
discardSparksCap (Capability *cap)
{ return discardSparks(cap->sparks); }
#endif
#endif /* CAPABILITY_H */
......@@ -137,17 +137,21 @@ static Capability *schedule (Capability *initialCapability, Task *task);
// scheduler clearer.
//
static void schedulePreLoop (void);
static void scheduleFindWork (Capability *cap);
#if defined(THREADED_RTS)
static void scheduleYield (Capability **pcap, Task *task);
#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
static void scheduleCheckBlackHoles (Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
static void schedulePushWork(Capability *cap, Task *task);
static rtsBool scheduleGetRemoteWork(Capability *cap);
#if defined(PARALLEL_HASKELL)
static rtsBool scheduleGetRemoteWork(Capability *cap);
static void scheduleSendPendingMessages(void);
#endif
#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
static void scheduleActivateSpark(Capability *cap);
#endif
static void schedulePostRunThread(Capability *cap, StgTSO *t);
......@@ -281,25 +285,6 @@ schedule (Capability *initialCapability, Task *task)
while (TERMINATION_CONDITION) {
#if defined(THREADED_RTS)
if (first) {
// don't yield the first time, we want a chance to run this
// thread for a bit, even if there are others banging at the
// door.
first = rtsFalse;
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
} else {
// Yield the capability to higher-priority tasks if necessary.
yieldCapability(&cap, task);
/* inside yieldCapability, attempts to steal work from other
capabilities, unless the capability has own work.
See (REMARK) below.
*/
}
#endif
/* THIS WAS THE PLACE FOR THREADED_RTS::schedulePushWork(cap,task) */
// Check whether we have re-entered the RTS from Haskell without
// going via suspendThread()/resumeThread (i.e. a 'safe' foreign
// call).
......@@ -367,62 +352,11 @@ schedule (Capability *initialCapability, Task *task)
barf("sched_state: %d", sched_state);
}
/* this was the place to activate a spark, now below... */
scheduleStartSignalHandlers(cap);
scheduleFindWork(cap);
// Only check the black holes here if we've nothing else to do.
// During normal execution, the black hole list only gets checked
// at GC time, to avoid repeatedly traversing this possibly long
// list each time around the scheduler.
if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
scheduleCheckWakeupThreads(cap);
scheduleCheckBlockedThreads(cap);
#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
/* work distribution in multithreaded and parallel systems
REMARK: IMHO best location for work-stealing as well.
tests above might yield some new jobs, so no need to steal a
spark in some cases. I believe the yieldCapability.. above
should be moved here.
*/
#if defined(PARALLEL_HASKELL)
/* if messages have been buffered... a NOOP in THREADED_RTS */
scheduleSendPendingMessages();
#endif
/* If the run queue is empty,...*/
if (emptyRunQueue(cap)) {
/* ...take one of our own sparks and turn it into a thread */
scheduleActivateSpark(cap);
/* if this did not work, try to steal a spark from someone else */
if (emptyRunQueue(cap)) {
#if defined(PARALLEL_HASKELL)
receivedFinish = scheduleGetRemoteWork(cap);
continue; // a new round, (hopefully) with new work
/*
in GUM, this a) sends out a FISH and returns IF no fish is
out already
b) (blocking) awaits and receives messages
in Eden, this is only the blocking receive, as b) in GUM.
in Threaded-RTS, this does plain nothing. Stealing routine
is inside Capability.c and called from
yieldCapability() at the very beginning, see REMARK.
*/
#endif
}
} else { /* i.e. run queue was (initially) not empty */
schedulePushWork(cap,task);
/* work pushing, currently relevant only for THREADED_RTS:
(pushes threads, wakes up idle capabilities for stealing) */
}
/* work pushing, currently relevant only for THREADED_RTS:
(pushes threads, wakes up idle capabilities for stealing) */
schedulePushWork(cap,task);
#if defined(PARALLEL_HASKELL)
/* since we perform a blocking receive and continue otherwise,
......@@ -439,9 +373,8 @@ schedule (Capability *initialCapability, Task *task)
}
#endif // PARALLEL_HASKELL: non-empty run queue!
#endif /* THREADED_RTS || PARALLEL_HASKELL */
scheduleDetectDeadlock(cap,task);
#if defined(THREADED_RTS)
cap = task->cap; // reload cap, it might have changed
#endif
......@@ -454,12 +387,27 @@ schedule (Capability *initialCapability, Task *task)
//
// win32: might be here due to awaitEvent() being abandoned
// as a result of a console event having been delivered.
if ( emptyRunQueue(cap) ) {
#if defined(THREADED_RTS)
if (first)
{
// XXX: ToDo
// // don't yield the first time, we want a chance to run this
// // thread for a bit, even if there are others banging at the
// // door.
// first = rtsFalse;
// ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
}
scheduleYield(&cap,task);
if (emptyRunQueue(cap)) continue; // look for work again
#endif
#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
if ( emptyRunQueue(cap) ) {
ASSERT(sched_state >= SCHED_INTERRUPTING);
#endif
continue; // nothing to do
}
#endif
//
// Get a thread to run
......@@ -682,13 +630,111 @@ schedulePreLoop(void)
// initialisation for scheduler - what cannot go into initScheduler()
}
/* -----------------------------------------------------------------------------
* scheduleFindWork()
*
* Search for work to do, and handle messages from elsewhere.
* -------------------------------------------------------------------------- */
static void
scheduleFindWork (Capability *cap)
{
scheduleStartSignalHandlers(cap);
// Only check the black holes here if we've nothing else to do.
// During normal execution, the black hole list only gets checked
// at GC time, to avoid repeatedly traversing this possibly long
// list each time around the scheduler.
if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
scheduleCheckWakeupThreads(cap);
scheduleCheckBlockedThreads(cap);
#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
// Try to activate one of our own sparks
if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
#endif
#if defined(THREADED_RTS)
// Try to steak work if we don't have any
if (emptyRunQueue(cap)) { stealWork(cap); }
#endif
#if defined(PARALLEL_HASKELL)
// if messages have been buffered...
scheduleSendPendingMessages();
#endif
#if defined(PARALLEL_HASKELL)
if (emptyRunQueue(cap)) {
receivedFinish = scheduleGetRemoteWork(cap);
continue; // a new round, (hopefully) with new work
/*
in GUM, this a) sends out a FISH and returns IF no fish is
out already
b) (blocking) awaits and receives messages
in Eden, this is only the blocking receive, as b) in GUM.
*/
}
#endif
}
#if defined(THREADED_RTS)
STATIC_INLINE rtsBool
shouldYieldCapability (Capability *cap, Task *task)
{
// we need to yield this capability to someone else if..
// - another thread is initiating a GC
// - another Task is returning from a foreign call
// - the thread at the head of the run queue cannot be run
// by this Task (it is bound to another Task, or it is unbound
// and this task it bound).
return (waiting_for_gc ||
cap->returning_tasks_hd != NULL ||
(!emptyRunQueue(cap) && (task->tso == NULL
? cap->run_queue_hd->bound != NULL
: cap->run_queue_hd->bound != task)));
}
// This is the single place where a Task goes to sleep. There are
// two reasons it might need to sleep:
// - there are no threads to run
// - we need to yield this Capability to someone else
// (see shouldYieldCapability())
//
// The return value indicates whether
static void
scheduleYield (Capability **pcap, Task *task)
{
Capability *cap = *pcap;
// if we have work, and we don't need to give up the Capability, continue.
if (!emptyRunQueue(cap) && !shouldYieldCapability(cap,task))
return;
// otherwise yield (sleep), and keep yielding if necessary.
do {
yieldCapability(&cap,task);
}
while (shouldYieldCapability(cap,task));
// note there may still be no threads on the run queue at this
// point, the caller has to check.
*pcap = cap;
return;
}
#endif
/* -----------------------------------------------------------------------------
* schedulePushWork()
*
* Push work to other Capabilities if we have some.
* -------------------------------------------------------------------------- */
#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
static void
schedulePushWork(Capability *cap USED_IF_THREADS,
Task *task USED_IF_THREADS)
......@@ -788,7 +834,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
// i is the next free capability to push to
for (; i < n_free_caps; i++) {
if (emptySparkPoolCap(free_caps[i])) {
spark = findSpark(cap);
spark = tryStealSpark(cap->sparks);
if (spark != NULL) {
debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
newSpark(&(free_caps[i]->r), spark);
......@@ -801,18 +847,14 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
// release the capabilities
for (i = 0; i < n_free_caps; i++) {
task->cap = free_caps[i];
releaseCapability(free_caps[i]);
releaseAndWakeupCapability(free_caps[i]);
}
// now wake them all up, and they might steal sparks if
// the did not get a thread
prodAllCapabilities();
}
task->cap = cap; // reset to point to our Capability.
#endif /* THREADED_RTS */
}
#endif /* THREADED_RTS || PARALLEL_HASKELL */
/* ----------------------------------------------------------------------------
* Start any pending signal handlers
......@@ -1031,7 +1073,12 @@ scheduleActivateSpark(Capability *cap)
on our run queue in the meantime ? But would need a lock.. */
return;
spark = findSpark(cap); // defined in Sparks.c
// Really we should be using reclaimSpark() here, but
// experimentally it doesn't seem to perform as well as just
// stealing from our own spark pool:
// spark = reclaimSpark(cap->sparks);
spark = tryStealSpark(cap->sparks); // defined in Sparks.c
if (spark != NULL) {
debugTrace(DEBUG_sched,
......@@ -1046,9 +1093,9 @@ scheduleActivateSpark(Capability *cap)
* Get work from a remote node (PARALLEL_HASKELL only)
* ------------------------------------------------------------------------- */
#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
#if defined(PARALLEL_HASKELL)
static rtsBool /* return value used in PARALLEL_HASKELL only */