Commit 4368121d authored by Simon Marlow's avatar Simon Marlow

Add some more flexibility to the multiproc scheduler

There are two new options in the -threaded RTS:
 
  -qm       Don't automatically migrate threads between CPUs
  -qw       Migrate a thread to the current CPU when it is woken up

previously both of these were effectively off, i.e. threads were
migrated between CPUs willy-milly, and threads were always migrated to
the current CPU when woken up.  This is the first step in tweaking the
scheduling for more effective work balancing, there will no doubt be
more to come.
parent 354cefe7
......@@ -164,6 +164,8 @@ struct PAR_FLAGS {
#ifdef THREADED_RTS
struct PAR_FLAGS {
nat nNodes; /* number of threads to run simultaneously */
rtsBool migrate; /* migrate threads between capabilities */
rtsBool wakeupMigrate; /* migrate a thread on wakeup */
unsigned int maxLocalSparks;
};
#endif /* THREADED_RTS */
......
......@@ -93,6 +93,8 @@ typedef StgWord32 StgThreadID;
*/
#define TSO_DIRTY 1
#define tsoDirty(tso) ((tso)->flags & TSO_DIRTY)
/*
* Type returned after running a thread. Values of this type
* include HeapOverflow, StackOverflow etc. See Constants.h for the
......@@ -134,43 +136,44 @@ typedef union {
*/
typedef struct StgTSO_ {
StgHeader header;
struct StgTSO_* link; /* Links threads onto blocking queues */
struct StgTSO_* global_link; /* Links all threads together */
StgWord16 what_next; /* Values defined in Constants.h */
StgWord16 why_blocked; /* Values defined in Constants.h */
StgWord32 flags;
StgTSOBlockInfo block_info;
struct StgTSO_* blocked_exceptions;
StgThreadID id;
int saved_errno;
struct Task_* bound; // non-NULL for a bound thread
struct StgTRecHeader_ *trec; /* STM transaction record */
StgHeader header;
struct StgTSO_* link; /* Links threads onto blocking queues */
struct StgTSO_* global_link; /* Links all threads together */
StgWord16 what_next; /* Values defined in Constants.h */
StgWord16 why_blocked; /* Values defined in Constants.h */
StgWord32 flags;
StgTSOBlockInfo block_info;
struct StgTSO_* blocked_exceptions;
StgThreadID id;
int saved_errno;
struct Task_* bound;
struct Capability_* cap;
struct StgTRecHeader_ * trec; /* STM transaction record */
#ifdef TICKY_TICKY
/* TICKY-specific stuff would go here. */
/* TICKY-specific stuff would go here. */
#endif
#ifdef PROFILING
StgTSOProfInfo prof;
StgTSOProfInfo prof;
#endif
#ifdef PAR
StgTSOParInfo par;
StgTSOParInfo par;
#endif
#ifdef GRAN
StgTSOGranInfo gran;
StgTSOGranInfo gran;
#endif
#ifdef DIST
StgTSODistInfo dist;
StgTSODistInfo dist;
#endif
/* The thread stack... */
StgWord stack_size; /* stack size in *words* */
StgWord max_stack_size; /* maximum stack size in *words* */
StgPtr sp;
StgWord stack[FLEXIBLE_ARRAY];
/* The thread stack... */
StgWord32 stack_size; /* stack size in *words* */
StgWord32 max_stack_size; /* maximum stack size in *words* */
StgPtr sp;
StgWord stack[FLEXIBLE_ARRAY];
} StgTSO;
/* -----------------------------------------------------------------------------
......
......@@ -32,7 +32,7 @@ stg_ap_0_fast
IF_DEBUG(sanity,
foreign "C" checkStackChunk(Sp "ptr",
CurrentTSO + TSO_OFFSET_StgTSO_stack +
WDS(StgTSO_stack_size(CurrentTSO)) "ptr") [R1]);
WDS(TO_W_(StgTSO_stack_size(CurrentTSO))) "ptr") [R1]);
ENTER();
}
......
......@@ -67,7 +67,9 @@ anyWorkForMe( Capability *cap, Task *task )
// other global condition to check, such as threads blocked on
// blackholes).
if (emptyRunQueue(cap)) {
return !emptySparkPoolCap(cap) || globalWorkToDo();
return !emptySparkPoolCap(cap)
|| !emptyWakeupQueue(cap)
|| globalWorkToDo();
} else
return cap->run_queue_hd->bound == NULL;
}
......@@ -135,6 +137,8 @@ initCapability( Capability *cap, nat i )
cap->suspended_ccalling_tasks = NULL;
cap->returning_tasks_hd = NULL;
cap->returning_tasks_tl = NULL;
cap->wakeup_queue_hd = END_TSO_QUEUE;
cap->wakeup_queue_tl = END_TSO_QUEUE;
#endif
cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1;
......@@ -296,7 +300,8 @@ releaseCapability_ (Capability* cap)
// If we have an unbound thread on the run queue, or if there's
// anything else to do, give the Capability to a worker thread.
if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) {
if (!emptyRunQueue(cap) || !emptyWakeupQueue(cap)
|| !emptySparkPoolCap(cap) || globalWorkToDo()) {
if (cap->spare_workers) {
giveCapabilityToTask(cap,cap->spare_workers);
// The worker Task pops itself from the queue;
......@@ -501,6 +506,37 @@ yieldCapability (Capability** pCap, Task *task)
return;
}
/* ----------------------------------------------------------------------------
* Wake up a thread on a Capability.
*
* This is used when the current Task is running on a Capability and
* wishes to wake up a thread on a different Capability.
* ------------------------------------------------------------------------- */
void
wakeupThreadOnCapability (Capability *cap, StgTSO *tso)
{
ASSERT(tso->cap == cap);
ASSERT(tso->bound ? tso->bound->cap == cap : 1);
ACQUIRE_LOCK(&cap->lock);
if (cap->running_task == NULL) {
// nobody is running this Capability, we can add our thread
// directly onto the run queue and start up a Task to run it.
appendToRunQueue(cap,tso);
// start it up
cap->running_task = myTask(); // precond for releaseCapability_()
releaseCapability_(cap);
} else {
appendToWakeupQueue(cap,tso);
// someone is running on this Capability, so it cannot be
// freed without first checking the wakeup queue (see
// releaseCapability_).
}
RELEASE_LOCK(&cap->lock);
}
/* ----------------------------------------------------------------------------
* prodCapabilities
*
......
......@@ -70,7 +70,7 @@ struct Capability_ {
// Worker Tasks waiting in the wings. Singly-linked.
Task *spare_workers;
// This lock protects running_task and returning_tasks_{hd,tl}.
// This lock protects running_task, returning_tasks_{hd,tl}, wakeup_queue.
Mutex lock;
// Tasks waiting to return from a foreign call, or waiting to make
......@@ -80,6 +80,12 @@ struct Capability_ {
// check whether it is NULL without taking the lock, however.
Task *returning_tasks_hd; // Singly-linked, with head/tail
Task *returning_tasks_tl;
// A list of threads to append to this Capability's run queue at
// the earliest opportunity. These are threads that have been
// woken up by another Capability.
StgTSO *wakeup_queue_hd;
StgTSO *wakeup_queue_tl;
#endif
// Per-capability STM-related data
......@@ -189,6 +195,11 @@ void yieldCapability (Capability** pCap, Task *task);
//
void waitForCapability (Task *task, Mutex *mutex, Capability **pCap);
// Wakes up a thread on a Capability (probably a different Capability
// from the one held by the current Task).
//
void wakeupThreadOnCapability (Capability *cap, StgTSO *tso);
// Wakes up a worker thread on just one Capability, used when we
// need to service some global event.
//
......
......@@ -380,7 +380,7 @@ retry_pop_stack:
* entry code in StgStartup.cmm.
*/
Sp = CurrentTSO + TSO_OFFSET_StgTSO_stack
+ WDS(StgTSO_stack_size(CurrentTSO)) - WDS(2);
+ WDS(TO_W_(StgTSO_stack_size(CurrentTSO))) - WDS(2);
Sp(1) = R1; /* save the exception */
Sp(0) = stg_enter_info; /* so that GC can traverse this stack */
StgTSO_what_next(CurrentTSO) = ThreadKilled::I16;
......
......@@ -219,6 +219,8 @@ void initRtsFlagsDefaults(void)
#ifdef THREADED_RTS
RtsFlags.ParFlags.nNodes = 1;
RtsFlags.ParFlags.migrate = rtsTrue;
RtsFlags.ParFlags.wakeupMigrate = rtsFalse;
#endif
#ifdef PAR
......@@ -437,6 +439,8 @@ usage_text[] = {
#endif /* DEBUG */
#if defined(THREADED_RTS)
" -N<n> Use <n> OS threads (default: 1)",
" -qm Don't automatically migrate threads between CPUs",
" -qw Migrate a thread to the current CPU when it is woken up",
#endif
#if defined(THREADED_RTS) || defined(PAR)
" -e<size> Size of spark pools (default 100)",
......@@ -1049,6 +1053,25 @@ error = rtsTrue;
}
}
) break;
case 'q':
switch (rts_argv[arg][2]) {
case '\0':
errorBelch("incomplete RTS option: %s",rts_argv[arg]);
error = rtsTrue;
break;
case 'm':
RtsFlags.ParFlags.migrate = rtsFalse;
break;
case 'w':
RtsFlags.ParFlags.wakeupMigrate = rtsTrue;
break;
default:
errorBelch("unknown RTS option: %s",rts_argv[arg]);
error = rtsTrue;
break;
}
break;
#endif
/* =========== PARALLEL =========================== */
case 'e':
......@@ -1063,10 +1086,12 @@ error = rtsTrue;
}
) break;
#ifdef PAR
case 'q':
PAR_BUILD_ONLY(
process_par_option(arg, rts_argc, rts_argv, &error);
) break;
#endif
/* =========== GRAN =============================== */
......
......@@ -482,6 +482,21 @@ schedule (Capability *initialCapability, Task *task)
// list each time around the scheduler.
if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
// Any threads that were woken up by other Capabilities get
// appended to our run queue.
if (!emptyWakeupQueue(cap)) {
ACQUIRE_LOCK(&cap->lock);
if (emptyRunQueue(cap)) {
cap->run_queue_hd = cap->wakeup_queue_hd;
cap->run_queue_tl = cap->wakeup_queue_tl;
} else {
cap->run_queue_tl->link = cap->wakeup_queue_hd;
cap->run_queue_tl = cap->wakeup_queue_tl;
}
cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
RELEASE_LOCK(&cap->lock);
}
scheduleCheckBlockedThreads(cap);
scheduleDetectDeadlock(cap,task);
......@@ -604,6 +619,7 @@ run_thread:
// Run the current thread
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
ASSERT(t->cap == cap);
prev_what_next = t->what_next;
......@@ -674,6 +690,7 @@ run_thread:
#endif
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
ASSERT(t->cap == cap);
// ----------------------------------------------------------------------
......@@ -772,6 +789,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
Capability *free_caps[n_capabilities], *cap0;
nat i, n_free_caps;
// migration can be turned off with +RTS -qg
if (!RtsFlags.ParFlags.migrate) return;
// Check whether we have more threads on our run queue, or sparks
// in our pool, that we could hand to another Capability.
if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
......@@ -834,6 +854,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
appendToRunQueue(free_caps[i],t);
if (t->bound) { t->bound->cap = free_caps[i]; }
t->cap = free_caps[i];
i++;
}
}
......@@ -2491,6 +2512,7 @@ createThread(Capability *cap, nat size)
tso->saved_errno = 0;
tso->bound = NULL;
tso->cap = cap;
tso->stack_size = stack_size;
tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
......@@ -2698,6 +2720,7 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
// This TSO is now a bound thread; make the Task and TSO
// point to each other.
tso->bound = task;
tso->cap = cap;
task->tso = tso;
task->ret = ret;
......@@ -2905,16 +2928,21 @@ GetRoots( evac_fn evac )
for (i = 0; i < n_capabilities; i++) {
cap = &capabilities[i];
evac((StgClosure **)&cap->run_queue_hd);
evac((StgClosure **)&cap->run_queue_tl);
evac((StgClosure **)(void *)&cap->run_queue_hd);
evac((StgClosure **)(void *)&cap->run_queue_tl);
#if defined(THREADED_RTS)
evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
#endif
for (task = cap->suspended_ccalling_tasks; task != NULL;
task=task->next) {
IF_DEBUG(scheduler,sched_belch("evac'ing suspended TSO %d", task->suspended_tso->id));
evac((StgClosure **)&task->suspended_tso);
evac((StgClosure **)(void *)&task->suspended_tso);
}
}
#if !defined(THREADED_RTS)
evac((StgClosure **)(void *)&blocked_queue_hd);
evac((StgClosure **)(void *)&blocked_queue_tl);
......@@ -3211,21 +3239,29 @@ unblockOne(Capability *cap, StgTSO *tso)
ASSERT(get_itbl(tso)->type == TSO);
ASSERT(tso->why_blocked != NotBlocked);
tso->why_blocked = NotBlocked;
next = tso->link;
tso->link = END_TSO_QUEUE;
// We might have just migrated this TSO to our Capability:
if (tso->bound) {
tso->bound->cap = cap;
if (RtsFlags.ParFlags.wakeupMigrate || tso->cap == cap) {
// We are waking up this thread on the current Capability, which
// might involve migrating it from the Capability it was last on.
if (tso->bound) {
ASSERT(tso->bound->cap == tso->cap);
tso->bound->cap = cap;
}
tso->cap = cap;
appendToRunQueue(cap,tso);
// we're holding a newly woken thread, make sure we context switch
// quickly so we can migrate it if necessary.
context_switch = 1;
} else {
// we'll try to wake it up on the Capability it was last on.
wakeupThreadOnCapability(tso->cap, tso);
}
appendToRunQueue(cap,tso);
// we're holding a newly woken thread, make sure we context switch
// quickly so we can migrate it if necessary.
context_switch = 1;
IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
IF_DEBUG(scheduler,sched_belch("waking up thread %ld on cap %d", (long)tso->id, tso->cap->no));
return next;
}
......@@ -3675,6 +3711,7 @@ unblockThread(Capability *cap, StgTSO *tso)
if (tso->bound) {
tso->bound->cap = cap;
}
tso->cap = cap;
}
#endif
......@@ -4171,13 +4208,8 @@ resurrectThreads (StgTSO *threads)
all_threads = tso;
IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
// Wake up the thread on the Capability it was last on for a
// bound thread, or last_free_capability otherwise.
if (tso->bound) {
cap = tso->bound->cap;
} else {
cap = last_free_capability;
}
// Wake up the thread on the Capability it was last on
cap = tso->cap;
switch (tso->why_blocked) {
case BlockedOnMVar:
......
......@@ -259,6 +259,20 @@ appendToBlockedQueue(StgTSO *tso)
}
#endif
#if defined(THREADED_RTS)
STATIC_INLINE void
appendToWakeupQueue (Capability *cap, StgTSO *tso)
{
ASSERT(tso->link == END_TSO_QUEUE);
if (cap->wakeup_queue_hd == END_TSO_QUEUE) {
cap->wakeup_queue_hd = tso;
} else {
cap->wakeup_queue_tl->link = tso;
}
cap->wakeup_queue_tl = tso;
}
#endif
/* Check whether various thread queues are empty
*/
STATIC_INLINE rtsBool
......@@ -273,6 +287,14 @@ emptyRunQueue(Capability *cap)
return emptyQueue(cap->run_queue_hd);
}
#if defined(THREADED_RTS)
STATIC_INLINE rtsBool
emptyWakeupQueue(Capability *cap)
{
return emptyQueue(cap->wakeup_queue_hd);
}
#endif
#if !defined(THREADED_RTS)
#define EMPTY_BLOCKED_QUEUE() (emptyQueue(blocked_queue_hd))
#define EMPTY_SLEEPING_QUEUE() (emptyQueue(sleeping_queue))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment