Commit 2726a2f1 authored by Simon Marlow's avatar Simon Marlow

Move a thread to the front of the run queue when another thread blocks on it

This fixes #3838, and was made possible by the new BLACKHOLE
infrastructure.  To allow reording of the run queue I had to make it
doubly-linked, which entails some extra trickiness with regard to
GC write barriers and suchlike.
parent 4b7fdaa8
......@@ -46,6 +46,7 @@ typedef struct {
/* Reason for thread being blocked. See comment above struct StgTso_. */
typedef union {
StgClosure *closure;
StgTSO *prev; // a back-link when the TSO is on the run queue (NotBlocked)
struct MessageBlackHole_ *bh;
struct MessageThrowTo_ *throwto;
struct MessageWakeup_ *wakeup;
......@@ -163,6 +164,7 @@ typedef struct StgTSO_ {
void dirty_TSO (Capability *cap, StgTSO *tso);
void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target);
void setTSOPrev (Capability *cap, StgTSO *tso, StgTSO *target);
// Apply to a TSO before looking at it if you are not sure whether it
// might be ThreadRelocated or not (basically, that's most of the time
......
......@@ -244,7 +244,21 @@ loop:
bq->link = owner->bq;
owner->bq = bq;
dirty_TSO(cap, owner); // we modified owner->bq
// If the owner of the blackhole is currently runnable, then
// bump it to the front of the run queue. This gives the
// blocked-on thread a little boost which should help unblock
// this thread, and may avoid a pile-up of other threads
// becoming blocked on the same BLACKHOLE (#3838).
//
// NB. we check to make sure that the owner is not the same as
// the current thread, since in that case it will not be on
// the run queue.
if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
removeFromRunQueue(cap, owner);
pushOnRunQueue(cap,owner);
}
// point to the BLOCKING_QUEUE from the BLACKHOLE
write_barrier(); // make the BQ visible
((StgInd*)bh)->indirectee = (StgClosure *)bq;
......@@ -280,12 +294,18 @@ loop:
if (info == &stg_BLOCKING_QUEUE_CLEAN_info) {
bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
recordClosureMutated(cap,bq);
recordClosureMutated(cap,(StgClosure*)bq);
}
debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d",
(lnat)msg->tso->id, (lnat)owner->id);
// See above, #3838
if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
removeFromRunQueue(cap, owner);
pushOnRunQueue(cap,owner);
}
return 1; // blocked
}
......
......@@ -158,17 +158,6 @@ static void deleteAllThreads (Capability *cap);
static void deleteThread_(Capability *cap, StgTSO *tso);
#endif
/* -----------------------------------------------------------------------------
* Putting a thread on the run queue: different scheduling policies
* -------------------------------------------------------------------------- */
STATIC_INLINE void
addToRunQueue( Capability *cap, StgTSO *t )
{
// this does round-robin scheduling; good for concurrency
appendToRunQueue(cap,t);
}
/* ---------------------------------------------------------------------------
Main scheduling loop.
......@@ -568,6 +557,30 @@ run_thread:
} /* end of while() */
}
/* -----------------------------------------------------------------------------
* Run queue operations
* -------------------------------------------------------------------------- */
void
removeFromRunQueue (Capability *cap, StgTSO *tso)
{
if (tso->block_info.prev == END_TSO_QUEUE) {
ASSERT(cap->run_queue_hd == tso);
cap->run_queue_hd = tso->_link;
} else {
setTSOLink(cap, tso->block_info.prev, tso->_link);
}
if (tso->_link == END_TSO_QUEUE) {
ASSERT(cap->run_queue_tl == tso);
cap->run_queue_tl = tso->block_info.prev;
} else {
setTSOPrev(cap, tso->_link, tso->block_info.prev);
}
tso->_link = tso->block_info.prev = END_TSO_QUEUE;
IF_DEBUG(sanity, checkRunQueue(cap));
}
/* ----------------------------------------------------------------------------
* Setting up the scheduler loop
* ------------------------------------------------------------------------- */
......@@ -743,12 +756,14 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
|| t->bound == task->incall // don't move my bound thread
|| tsoLocked(t)) { // don't move a locked thread
setTSOLink(cap, prev, t);
setTSOPrev(cap, t, prev);
prev = t;
} else if (i == n_free_caps) {
pushed_to_all = rtsTrue;
i = 0;
// keep one for us
setTSOLink(cap, prev, t);
setTSOPrev(cap, t, prev);
prev = t;
} else {
appendToRunQueue(free_caps[i],t);
......@@ -761,6 +776,8 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
}
}
cap->run_queue_tl = prev;
IF_DEBUG(sanity, checkRunQueue(cap));
}
#ifdef SPARK_PUSHING
......@@ -1093,7 +1110,7 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
// context switch flag, and we end up waiting for a GC.
// See #1984, and concurrent/should_run/1984
cap->context_switch = 0;
addToRunQueue(cap,t);
appendToRunQueue(cap,t);
} else {
pushOnRunQueue(cap,t);
}
......@@ -1162,7 +1179,7 @@ scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
//debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
checkTSO(t));
addToRunQueue(cap,t);
appendToRunQueue(cap,t);
return rtsFalse;
}
......
......@@ -118,8 +118,10 @@ appendToRunQueue (Capability *cap, StgTSO *tso)
ASSERT(tso->_link == END_TSO_QUEUE);
if (cap->run_queue_hd == END_TSO_QUEUE) {
cap->run_queue_hd = tso;
tso->block_info.prev = END_TSO_QUEUE;
} else {
setTSOLink(cap, cap->run_queue_tl, tso);
setTSOPrev(cap, tso, cap->run_queue_tl);
}
cap->run_queue_tl = tso;
traceEventThreadRunnable (cap, tso);
......@@ -135,6 +137,10 @@ EXTERN_INLINE void
pushOnRunQueue (Capability *cap, StgTSO *tso)
{
setTSOLink(cap, tso, cap->run_queue_hd);
tso->block_info.prev = END_TSO_QUEUE;
if (cap->run_queue_hd != END_TSO_QUEUE) {
setTSOPrev(cap, cap->run_queue_hd, tso);
}
cap->run_queue_hd = tso;
if (cap->run_queue_tl == END_TSO_QUEUE) {
cap->run_queue_tl = tso;
......@@ -149,6 +155,7 @@ popRunQueue (Capability *cap)
StgTSO *t = cap->run_queue_hd;
ASSERT(t != END_TSO_QUEUE);
cap->run_queue_hd = t->_link;
cap->run_queue_hd->block_info.prev = END_TSO_QUEUE;
t->_link = END_TSO_QUEUE; // no write barrier req'd
if (cap->run_queue_hd == END_TSO_QUEUE) {
cap->run_queue_tl = END_TSO_QUEUE;
......@@ -156,6 +163,8 @@ popRunQueue (Capability *cap)
return t;
}
extern void removeFromRunQueue (Capability *cap, StgTSO *tso);
/* Add a thread to the end of the blocked queue.
*/
#if !defined(THREADED_RTS)
......
......@@ -119,7 +119,8 @@ revertCAFs( void )
{
StgIndStatic *c;
for (c = (StgIndStatic *)revertible_caf_list; c != NULL;
for (c = (StgIndStatic *)revertible_caf_list;
c != (StgIndStatic *)END_OF_STATIC_LIST;
c = (StgIndStatic *)c->static_link)
{
SET_INFO(c, c->saved_info);
......
......@@ -331,7 +331,8 @@ checkClosure( StgClosure* p )
ASSERT(LOOKS_LIKE_CLOSURE_PTR(bq->bh));
ASSERT(get_itbl(bq->owner)->type == TSO);
ASSERT(bq->queue == END_TSO_QUEUE || get_itbl(bq->queue)->type == TSO);
ASSERT(bq->queue == (MessageBlackHole*)END_TSO_QUEUE
|| get_itbl(bq->queue)->type == TSO);
ASSERT(bq->link == (StgBlockingQueue*)END_TSO_QUEUE ||
get_itbl(bq->link)->type == IND ||
get_itbl(bq->link)->type == BLOCKING_QUEUE);
......@@ -745,6 +746,18 @@ findMemoryLeak (void)
reportUnmarkedBlocks();
}
void
checkRunQueue(Capability *cap)
{
StgTSO *prev, *tso;
prev = END_TSO_QUEUE;
for (tso = cap->run_queue_hd; tso != END_TSO_QUEUE;
prev = tso, tso = tso->_link) {
ASSERT(prev == END_TSO_QUEUE || prev->_link == tso);
ASSERT(tso->block_info.prev == prev);
}
ASSERT(cap->run_queue_tl == prev);
}
/* -----------------------------------------------------------------------------
Memory leak detection
......
......@@ -36,6 +36,8 @@ StgOffset checkClosure ( StgClosure* p );
void checkMutableList ( bdescr *bd, nat gen );
void checkMutableLists ( rtsBool checkTSOs );
void checkRunQueue (Capability *cap);
void memInventory (rtsBool show);
void checkBQ (StgTSO *bqe, StgClosure *closure);
......
......@@ -69,10 +69,24 @@ scavengeTSO (StgTSO *tso)
saved_eager = gct->eager_promotion;
gct->eager_promotion = rtsFalse;
evacuate((StgClosure **)&tso->blocked_exceptions);
evacuate((StgClosure **)&tso->bq);
// scavange current transaction record
evacuate((StgClosure **)&tso->trec);
// scavenge this thread's stack
scavenge_stack(tso->sp, &(tso->stack[tso->stack_size]));
tso->dirty = gct->failed_to_evac;
evacuate((StgClosure **)&tso->_link);
if ( tso->why_blocked == BlockedOnMVar
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgWakeup
|| tso->why_blocked == BlockedOnMsgThrowTo
|| tso->why_blocked == NotBlocked
) {
evacuate(&tso->block_info.closure);
}
......@@ -86,26 +100,10 @@ scavengeTSO (StgTSO *tso)
}
#endif
evacuate((StgClosure **)&tso->blocked_exceptions);
evacuate((StgClosure **)&tso->bq);
// scavange current transaction record
evacuate((StgClosure **)&tso->trec);
// scavenge this thread's stack
scavenge_stack(tso->sp, &(tso->stack[tso->stack_size]));
if (gct->failed_to_evac) {
tso->dirty = 1;
evacuate((StgClosure **)&tso->_link);
if (tso->dirty == 0 && gct->failed_to_evac) {
tso->flags |= TSO_LINK_DIRTY;
} else {
tso->dirty = 0;
evacuate((StgClosure **)&tso->_link);
if (gct->failed_to_evac) {
tso->flags |= TSO_LINK_DIRTY;
} else {
tso->flags &= ~TSO_LINK_DIRTY;
}
tso->flags &= ~TSO_LINK_DIRTY;
}
gct->eager_promotion = saved_eager;
......@@ -1407,6 +1405,14 @@ scavenge_mutable_list(bdescr *bd, generation *gen)
// ASSERT(tso->flags & TSO_LINK_DIRTY);
evacuate((StgClosure **)&tso->_link);
if ( tso->why_blocked == BlockedOnMVar
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgWakeup
|| tso->why_blocked == BlockedOnMsgThrowTo
|| tso->why_blocked == NotBlocked
) {
evacuate((StgClosure **)&tso->block_info.prev);
}
if (gct->failed_to_evac) {
recordMutableGen_GC((StgClosure *)p,gen->no);
gct->failed_to_evac = rtsFalse;
......
......@@ -720,6 +720,16 @@ setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target)
tso->_link = target;
}
void
setTSOPrev (Capability *cap, StgTSO *tso, StgTSO *target)
{
if (tso->dirty == 0 && (tso->flags & TSO_LINK_DIRTY) == 0) {
tso->flags |= TSO_LINK_DIRTY;
recordClosureMutated(cap,(StgClosure*)tso);
}
tso->block_info.prev = target;
}
void
dirty_TSO (Capability *cap, StgTSO *tso)
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment