Commit 55f5aed7 authored by Simon Marlow's avatar Simon Marlow

Track the lengths of the thread queues

Summary:
Knowing the length of the run queue in O(1) time is useful: for example
we don't have to traverse the run queue to know how many threads we have
to migrate in schedulePushWork().

Test Plan: validate

Reviewers: ezyang, erikd, bgamari, austin

Subscribers: thomie

Differential Revision: https://phabricator.haskell.org/D2437
parent 36565a9b
......@@ -99,7 +99,7 @@ findSpark (Capability *cap)
rtsBool retry;
uint32_t i = 0;
if (!emptyRunQueue(cap) || cap->returning_tasks_hd != NULL) {
if (!emptyRunQueue(cap) || cap->n_returning_tasks != 0) {
// If there are other threads, don't try to run any new
// sparks: sparks might be speculative, we don't want to take
// resources away from the main computation.
......@@ -212,6 +212,7 @@ newReturningTask (Capability *cap, Task *task)
cap->returning_tasks_hd = task;
}
cap->returning_tasks_tl = task;
cap->n_returning_tasks++;
}
STATIC_INLINE Task *
......@@ -226,6 +227,7 @@ popReturningTask (Capability *cap)
cap->returning_tasks_tl = NULL;
}
task->next = NULL;
cap->n_returning_tasks--;
return task;
}
#endif
......@@ -249,6 +251,7 @@ initCapability (Capability *cap, uint32_t i)
cap->run_queue_hd = END_TSO_QUEUE;
cap->run_queue_tl = END_TSO_QUEUE;
cap->n_run_queue = 0;
#if defined(THREADED_RTS)
initMutex(&cap->lock);
......@@ -256,8 +259,10 @@ initCapability (Capability *cap, uint32_t i)
cap->spare_workers = NULL;
cap->n_spare_workers = 0;
cap->suspended_ccalls = NULL;
cap->n_suspended_ccalls = 0;
cap->returning_tasks_hd = NULL;
cap->returning_tasks_tl = NULL;
cap->n_returning_tasks = 0;
cap->inbox = (Message*)END_TSO_QUEUE;
cap->sparks = allocSparkPool();
cap->spark_stats.created = 0;
......@@ -507,7 +512,7 @@ releaseCapability_ (Capability* cap,
// Check to see whether a worker thread can be given
// the go-ahead to return the result of an external call..
if (cap->returning_tasks_hd != NULL) {
if (cap->n_returning_tasks != 0) {
giveCapabilityToTask(cap,cap->returning_tasks_hd);
// The Task pops itself from the queue (see waitForCapability())
return;
......
......@@ -66,6 +66,7 @@ struct Capability_ {
// also lock-free.
StgTSO *run_queue_hd;
StgTSO *run_queue_tl;
uint32_t n_run_queue;
// Tasks currently making safe foreign calls. Doubly-linked.
// When returning, a task first acquires the Capability before
......@@ -74,6 +75,7 @@ struct Capability_ {
// the returning_tasks list, we must also migrate its entry from
// this list.
InCall *suspended_ccalls;
uint32_t n_suspended_ccalls;
// One mutable list per generation, so we don't need to take any
// locks when updating an old-generation thunk. This also lets us
......@@ -130,6 +132,7 @@ struct Capability_ {
// check whether it is NULL without taking the lock, however.
Task *returning_tasks_hd; // Singly-linked, with head/tail
Task *returning_tasks_tl;
uint32_t n_returning_tasks;
// Messages, or END_TSO_QUEUE.
// Locks required: cap->lock
......@@ -171,15 +174,27 @@ struct Capability_ {
ASSERT(task->cap == cap); \
ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task)
#if defined(THREADED_RTS)
#define ASSERT_THREADED_CAPABILITY_INVARIANTS(cap,task) \
ASSERT(cap->returning_tasks_hd == NULL ? \
cap->returning_tasks_tl == NULL && cap->n_returning_tasks == 0 \
: 1);
#else
#define ASSERT_THREADED_CAPABILITY_INVARIANTS(cap,task) /* nothing */
#endif
// Sometimes a Task holds a Capability, but the Task is not associated
// with that Capability (ie. task->cap != cap). This happens when
// (a) a Task holds multiple Capabilities, and (b) when the current
// Task is bound, its thread has just blocked, and it may have been
// moved to another Capability.
#define ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task) \
ASSERT(cap->run_queue_hd == END_TSO_QUEUE ? \
cap->run_queue_tl == END_TSO_QUEUE : 1); \
ASSERT(myTask() == task); \
#define ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task) \
ASSERT(cap->run_queue_hd == END_TSO_QUEUE ? \
cap->run_queue_tl == END_TSO_QUEUE && cap->n_run_queue == 0 \
: 1); \
ASSERT(cap->suspended_ccalls == NULL ? cap->n_suspended_ccalls == 0 : 1); \
ASSERT_THREADED_CAPABILITY_INVARIANTS(cap,task); \
ASSERT(myTask() == task); \
ASSERT_TASK_ID(task);
#if defined(THREADED_RTS)
......
......@@ -574,6 +574,7 @@ removeFromRunQueue (Capability *cap, StgTSO *tso)
setTSOPrev(cap, tso->_link, tso->block_info.prev);
}
tso->_link = tso->block_info.prev = END_TSO_QUEUE;
cap->n_run_queue--;
IF_DEBUG(sanity, checkRunQueue(cap));
}
......@@ -639,7 +640,7 @@ shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
// progress at all.
return ((pending_sync && !didGcLast) ||
cap->returning_tasks_hd != NULL ||
cap->n_returning_tasks != 0 ||
(!emptyRunQueue(cap) && (task->incall->tso == NULL
? peekRunQueue(cap)->bound != NULL
: peekRunQueue(cap)->bound != task->incall)));
......@@ -700,31 +701,15 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
Capability *free_caps[n_capabilities], *cap0;
uint32_t i, n_wanted_caps, n_free_caps;
StgTSO *t;
// migration can be turned off with +RTS -qm
if (!RtsFlags.ParFlags.migrate) return;
// Check whether we have more threads on our run queue, or sparks
// in our pool, that we could hand to another Capability.
if (emptyRunQueue(cap)) {
if (sparkPoolSizeCap(cap) < 2) return;
} else {
if (singletonRunQueue(cap) &&
sparkPoolSizeCap(cap) < 1) return;
}
// Figure out how many capabilities we want to wake up. We need at least
// sparkPoolSize(cap) plus the number of spare threads we have.
t = cap->run_queue_hd;
n_wanted_caps = sparkPoolSizeCap(cap);
if (t != END_TSO_QUEUE) {
do {
t = t->_link;
if (t == END_TSO_QUEUE) break;
n_wanted_caps++;
} while (n_wanted_caps < n_capabilities-1);
}
n_wanted_caps = sparkPoolSizeCap(cap) + cap->n_run_queue - 1;
if (n_wanted_caps == 0) return;
// First grab as many free Capabilities as we can. ToDo: we should use
// capabilities on the same NUMA node preferably, but not exclusively.
......@@ -734,7 +719,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
cap0 = capabilities[i];
if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
if (!emptyRunQueue(cap0)
|| cap0->returning_tasks_hd != NULL
|| cap0->n_returning_tasks != 0
|| cap0->inbox != (Message*)END_TSO_QUEUE) {
// it already has some work, we just grabbed it at
// the wrong moment. Or maybe it's deadlocked!
......@@ -765,7 +750,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
debugTrace(DEBUG_sched,
"cap %d: %s and %d free capabilities, sharing...",
cap->no,
(!emptyRunQueue(cap) && !singletonRunQueue(cap))?
(cap->n_run_queue > 1)?
"excess threads on run queue":"sparks to share (>=2)",
n_free_caps);
......@@ -797,6 +782,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
prev = t;
} else {
appendToRunQueue(free_caps[i],t);
cap->n_run_queue--;
traceEventMigrateThread (cap, t, free_caps[i]->no);
......@@ -2039,10 +2025,12 @@ forkProcess(HsStablePtr *entry
// bound threads for which the corresponding Task does not
// exist.
truncateRunQueue(cap);
cap->n_run_queue = 0;
// Any suspended C-calling Tasks are no more, their OS threads
// don't exist now:
cap->suspended_ccalls = NULL;
cap->n_suspended_ccalls = 0;
#if defined(THREADED_RTS)
// Wipe our spare workers list, they no longer exist. New
......@@ -2051,6 +2039,7 @@ forkProcess(HsStablePtr *entry
cap->n_spare_workers = 0;
cap->returning_tasks_hd = NULL;
cap->returning_tasks_tl = NULL;
cap->n_returning_tasks = 0;
#endif
// Release all caps except 0, we'll use that for starting
......@@ -2277,6 +2266,7 @@ suspendTask (Capability *cap, Task *task)
cap->suspended_ccalls->prev = incall;
}
cap->suspended_ccalls = incall;
cap->n_suspended_ccalls++;
}
STATIC_INLINE void
......@@ -2295,6 +2285,7 @@ recoverSuspendedTask (Capability *cap, Task *task)
incall->next->prev = incall->prev;
}
incall->next = incall->prev = NULL;
cap->n_suspended_ccalls--;
}
/* ---------------------------------------------------------------------------
......
......@@ -139,6 +139,7 @@ appendToRunQueue (Capability *cap, StgTSO *tso)
setTSOPrev(cap, tso, cap->run_queue_tl);
}
cap->run_queue_tl = tso;
cap->n_run_queue++;
}
/* Push a thread on the beginning of the run queue.
......@@ -159,6 +160,7 @@ pushOnRunQueue (Capability *cap, StgTSO *tso)
if (cap->run_queue_tl == END_TSO_QUEUE) {
cap->run_queue_tl = tso;
}
cap->n_run_queue++;
}
/* Pop the first thread off the runnable queue.
......@@ -176,6 +178,7 @@ popRunQueue (Capability *cap)
if (cap->run_queue_hd == END_TSO_QUEUE) {
cap->run_queue_tl = END_TSO_QUEUE;
}
cap->n_run_queue--;
return t;
}
......@@ -214,16 +217,7 @@ emptyQueue (StgTSO *q)
INLINE_HEADER rtsBool
emptyRunQueue(Capability *cap)
{
return emptyQueue(cap->run_queue_hd);
}
/* assumes that the queue is not empty; so combine this with
* an emptyRunQueue check! */
INLINE_HEADER rtsBool
singletonRunQueue(Capability *cap)
{
ASSERT(!emptyRunQueue(cap));
return cap->run_queue_hd->_link == END_TSO_QUEUE;
return cap->n_run_queue == 0;
}
INLINE_HEADER void
......@@ -231,6 +225,7 @@ truncateRunQueue(Capability *cap)
{
cap->run_queue_hd = END_TSO_QUEUE;
cap->run_queue_tl = END_TSO_QUEUE;
cap->n_run_queue = 0;
}
#if !defined(THREADED_RTS)
......
......@@ -839,12 +839,14 @@ checkRunQueue(Capability *cap)
{
StgTSO *prev, *tso;
prev = END_TSO_QUEUE;
for (tso = cap->run_queue_hd; tso != END_TSO_QUEUE;
prev = tso, tso = tso->_link) {
uint32_t n;
for (n = 0, tso = cap->run_queue_hd; tso != END_TSO_QUEUE;
prev = tso, tso = tso->_link, n++) {
ASSERT(prev == END_TSO_QUEUE || prev->_link == tso);
ASSERT(tso->block_info.prev == prev);
}
ASSERT(cap->run_queue_tl == prev);
ASSERT(cap->n_run_queue == n);
}
/* -----------------------------------------------------------------------------
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment