Commit 0aae1e17 authored by Edward Z. Yang's avatar Edward Z. Yang

Better abstraction over run queues.

This adds some new functions: peekRunQueue, promoteInRunQueue,
singletonRunQueue and truncateRunQueue which help abstract away
manual linked list manipulation, making it easier to swap in
a new queue implementation.
Signed-off-by: Edward Z. Yang's avatarEdward Z. Yang <ezyang@mit.edu>
parent 8f731f2b
......@@ -472,13 +472,13 @@ releaseCapability_ (Capability* cap,
// If the next thread on the run queue is a bound thread,
// give this Capability to the appropriate Task.
if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
if (!emptyRunQueue(cap) && peekRunQueue(cap)->bound) {
// Make sure we're not about to try to wake ourselves up
// ASSERT(task != cap->run_queue_hd->bound);
// assertion is false: in schedule() we force a yield after
// ThreadBlocked, but the thread may be back on the run queue
// by now.
task = cap->run_queue_hd->bound->task;
task = peekRunQueue(cap)->bound->task;
giveCapabilityToTask(cap, task);
return;
}
......
......@@ -246,8 +246,7 @@ loop:
// the current thread, since in that case it will not be on
// the run queue.
if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
removeFromRunQueue(cap, owner);
pushOnRunQueue(cap,owner);
promoteInRunQueue(cap, owner);
}
// point to the BLOCKING_QUEUE from the BLACKHOLE
......@@ -293,8 +292,7 @@ loop:
// See above, #3838
if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
removeFromRunQueue(cap, owner);
pushOnRunQueue(cap,owner);
promoteInRunQueue(cap, owner);
}
return 1; // blocked
......
......@@ -579,6 +579,13 @@ removeFromRunQueue (Capability *cap, StgTSO *tso)
IF_DEBUG(sanity, checkRunQueue(cap));
}
void
promoteInRunQueue (Capability *cap, StgTSO *tso)
{
removeFromRunQueue(cap, tso);
pushOnRunQueue(cap, tso);
}
/* ----------------------------------------------------------------------------
* Setting up the scheduler loop
* ------------------------------------------------------------------------- */
......@@ -635,8 +642,8 @@ shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
return ((pending_sync && !didGcLast) ||
cap->returning_tasks_hd != NULL ||
(!emptyRunQueue(cap) && (task->incall->tso == NULL
? cap->run_queue_hd->bound != NULL
: cap->run_queue_hd->bound != task->incall)));
? peekRunQueue(cap)->bound != NULL
: peekRunQueue(cap)->bound != task->incall)));
}
// This is the single place where a Task goes to sleep. There are
......@@ -700,10 +707,10 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
// Check whether we have more threads on our run queue, or sparks
// in our pool, that we could hand to another Capability.
if (cap->run_queue_hd == END_TSO_QUEUE) {
if (emptyRunQueue(cap)) {
if (sparkPoolSizeCap(cap) < 2) return;
} else {
if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
if (singletonRunQueue(cap) &&
sparkPoolSizeCap(cap) < 1) return;
}
......@@ -743,7 +750,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
debugTrace(DEBUG_sched,
"cap %d: %s and %d free capabilities, sharing...",
cap->no,
(!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
(!emptyRunQueue(cap) && !singletonRunQueue(cap))?
"excess threads on run queue":"sparks to share (>=2)",
n_free_caps);
......@@ -1860,8 +1867,7 @@ forkProcess(HsStablePtr *entry
// cleaned up later, but some of them may correspond to
// bound threads for which the corresponding Task does not
// exist.
cap->run_queue_hd = END_TSO_QUEUE;
cap->run_queue_tl = END_TSO_QUEUE;
truncateRunQueue(cap);
// Any suspended C-calling Tasks are no more, their OS threads
// don't exist now:
......
......@@ -183,7 +183,14 @@ popRunQueue (Capability *cap)
return t;
}
extern void removeFromRunQueue (Capability *cap, StgTSO *tso);
INLINE_HEADER StgTSO *
peekRunQueue (Capability *cap)
{
return cap->run_queue_hd;
}
void removeFromRunQueue (Capability *cap, StgTSO *tso);
extern void promoteInRunQueue (Capability *cap, StgTSO *tso);
/* Add a thread to the end of the blocked queue.
*/
......@@ -215,6 +222,22 @@ emptyRunQueue(Capability *cap)
return emptyQueue(cap->run_queue_hd);
}
/* assumes that the queue is not empty; so combine this with
* an emptyRunQueue check! */
INLINE_HEADER rtsBool
singletonRunQueue(Capability *cap)
{
ASSERT(!emptyRunQueue(cap));
return cap->run_queue_hd->_link == END_TSO_QUEUE;
}
INLINE_HEADER void
truncateRunQueue(Capability *cap)
{
cap->run_queue_hd = END_TSO_QUEUE;
cap->run_queue_tl = END_TSO_QUEUE;
}
#if !defined(THREADED_RTS)
#define EMPTY_BLOCKED_QUEUE() (emptyQueue(blocked_queue_hd))
#define EMPTY_SLEEPING_QUEUE() (emptyQueue(sleeping_queue))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment