Commit 988ad8ba authored by Simon Marlow's avatar Simon Marlow

Fix to thread migration

Summary:
If we had 2 threads on the run queue, say [A,B], and B is bound to the
current Task, then we would fail to migrate any threads.  This fixes it
so that we would migrate A in that case.

This will help parallelism a bit in programs that have lots of bound
threads.

Test Plan:
Test program in #12419, which is actually not a great program but it
does behave a bit better after this change.

Reviewers: ezyang, niteria, bgamari, austin, erikd

Subscribers: thomie

Differential Revision: https://phabricator.haskell.org/D2430

GHC Trac Issues: #12419
parent 55f5aed7
......@@ -702,13 +702,16 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
Capability *free_caps[n_capabilities], *cap0;
uint32_t i, n_wanted_caps, n_free_caps;
uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;
// migration can be turned off with +RTS -qm
if (!RtsFlags.ParFlags.migrate) return;
if (!RtsFlags.ParFlags.migrate) {
spare_threads = 0;
}
// Figure out how many capabilities we want to wake up. We need at least
// sparkPoolSize(cap) plus the number of spare threads we have.
n_wanted_caps = sparkPoolSizeCap(cap) + cap->n_run_queue - 1;
n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
if (n_wanted_caps == 0) return;
// First grab as many free Capabilities as we can. ToDo: we should use
......@@ -730,10 +733,22 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
}
}
// we now have n_free_caps free capabilities stashed in
// free_caps[]. Share our run queue equally with them. This is
// probably the simplest thing we could do; improvements we might
// want to do include:
// We now have n_free_caps free capabilities stashed in
// free_caps[]. Attempt to share our run queue equally with them.
// This is complicated slightly by the fact that we can't move
// some threads:
//
// - threads that have TSO_LOCKED cannot migrate
// - a thread that is bound to the current Task cannot be migrated
//
// So we walk through the run queue, migrating threads to
// free_caps[] round-robin, skipping over immovable threads. Each
// time through free_caps[] we keep one thread for ourselves,
// provided we haven't encountered one or more immovable threads
// in this pass.
//
// This is about the simplest thing we could do; improvements we
// might want to do include:
//
// - giving high priority to moving relatively new threads, on
// the gournds that they haven't had time to build up a
......@@ -748,10 +763,8 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
#endif
debugTrace(DEBUG_sched,
"cap %d: %s and %d free capabilities, sharing...",
cap->no,
(cap->n_run_queue > 1)?
"excess threads on run queue":"sparks to share (>=2)",
"cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
n_free_caps);
i = 0;
......@@ -759,27 +772,56 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
pushed_to_all = rtsFalse;
#endif
if (cap->run_queue_hd != END_TSO_QUEUE) {
prev = cap->run_queue_hd;
t = prev->_link;
prev->_link = END_TSO_QUEUE;
// We want to share threads equally amongst free_caps[] and the
// current capability, but sometimes we encounter immovable
// threads. This counter tracks the number of threads we have kept
// for the current capability minus the number of passes over
// free_caps[]. If it is great than zero (due to immovable
// threads), we should try to bring it back to zero again by not
// keeping any threads for the current capability.
uint32_t imbalance = 0;
// n_free_caps may be larger than the number of spare threads we have,
// if there were sparks in the spark pool. To avoid giving away all our
// threads in this case, we limit the number of caps that we give
// threads to, to the number of spare threads (n_run_queue-1).
uint32_t thread_recipients = stg_min(spare_threads, n_free_caps);
if (thread_recipients > 0) {
prev = END_TSO_QUEUE;
t = cap->run_queue_hd;
for (; t != END_TSO_QUEUE; t = next) {
next = t->_link;
t->_link = END_TSO_QUEUE;
if (t->bound == task->incall // don't move my bound thread
|| tsoLocked(t)) { // don't move a locked thread
setTSOLink(cap, prev, t);
if (prev == END_TSO_QUEUE) {
cap->run_queue_hd = t;
} else {
setTSOLink(cap, prev, t);
}
setTSOPrev(cap, t, prev);
prev = t;
} else if (i == n_free_caps) {
imbalance++;
} else if (i == thread_recipients) {
#ifdef SPARK_PUSHING
pushed_to_all = rtsTrue;
#endif
// If we have not already kept any threads for this
// capability during the current pass over free_caps[],
// keep one now.
if (imbalance == 0) {
if (prev == END_TSO_QUEUE) {
cap->run_queue_hd = t;
} else {
setTSOLink(cap, prev, t);
}
setTSOPrev(cap, t, prev);
prev = t;
} else {
imbalance--;
}
i = 0;
// keep one for us
setTSOLink(cap, prev, t);
setTSOPrev(cap, t, prev);
prev = t;
} else {
appendToRunQueue(free_caps[i],t);
cap->n_run_queue--;
......@@ -2194,9 +2236,6 @@ setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
n_capabilities = enabled_capabilities = new_n_capabilities;
}
// Start worker tasks on the new Capabilities
startWorkerTasks(old_n_capabilities, new_n_capabilities);
// We're done: release the original Capabilities
releaseAllCapabilities(old_n_capabilities, cap,task);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment