Commit d572aed6 authored by Simon Marlow's avatar Simon Marlow
Browse files

Fix race condition in wakeupThreadOnCapability() (#2574)

wakeupThreadOnCapbility() is used to signal another capability that
there is a thread waiting to be added to its run queue.  It adds the
thread to the (locked) wakeup queue on the remote capability.  In
order to do this, it has to modify the TSO's link field, which has a
write barrier.  The write barrier might put the TSO on the mutable
list, and the bug was that it was using the mutable list of the
*target* capability, which we do not have exclusive access to.  We
should be using the current Capabilty's mutable list in this case.
parent 4318aa60
......@@ -540,57 +540,40 @@ yieldCapability (Capability** pCap, Task *task)
* ------------------------------------------------------------------------- */
void
wakeupThreadOnCapability (Capability *cap, StgTSO *tso)
wakeupThreadOnCapability (Capability *my_cap,
Capability *other_cap,
StgTSO *tso)
{
ASSERT(tso->cap == cap);
ASSERT(tso->bound ? tso->bound->cap == cap : 1);
ASSERT_LOCK_HELD(&cap->lock);
ACQUIRE_LOCK(&other_cap->lock);
tso->cap = cap;
// ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
if (tso->bound) {
ASSERT(tso->bound->cap == tso->cap);
tso->bound->cap = other_cap;
}
tso->cap = other_cap;
ASSERT(tso->bound ? tso->bound->cap == other_cap : 1);
if (cap->running_task == NULL) {
if (other_cap->running_task == NULL) {
// nobody is running this Capability, we can add our thread
// directly onto the run queue and start up a Task to run it.
appendToRunQueue(cap,tso);
// start it up
cap->running_task = myTask(); // precond for releaseCapability_()
trace(TRACE_sched, "resuming capability %d", cap->no);
releaseCapability_(cap);
other_cap->running_task = myTask();
// precond for releaseCapability_() and appendToRunQueue()
appendToRunQueue(other_cap,tso);
trace(TRACE_sched, "resuming capability %d", other_cap->no);
releaseCapability_(other_cap);
} else {
appendToWakeupQueue(cap,tso);
appendToWakeupQueue(my_cap,other_cap,tso);
// someone is running on this Capability, so it cannot be
// freed without first checking the wakeup queue (see
// releaseCapability_).
}
}
void
wakeupThreadOnCapability_lock (Capability *cap, StgTSO *tso)
{
ACQUIRE_LOCK(&cap->lock);
migrateThreadToCapability (cap, tso);
RELEASE_LOCK(&cap->lock);
}
void
migrateThreadToCapability (Capability *cap, StgTSO *tso)
{
// ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
if (tso->bound) {
ASSERT(tso->bound->cap == tso->cap);
tso->bound->cap = cap;
}
tso->cap = cap;
wakeupThreadOnCapability(cap,tso);
}
void
migrateThreadToCapability_lock (Capability *cap, StgTSO *tso)
{
ACQUIRE_LOCK(&cap->lock);
migrateThreadToCapability (cap, tso);
RELEASE_LOCK(&cap->lock);
RELEASE_LOCK(&other_cap->lock);
}
/* ----------------------------------------------------------------------------
......@@ -818,7 +801,7 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta)
}
#if defined(THREADED_RTS)
markSparkQueue (evac, user, cap);
traverseSparkQueue (evac, user, cap);
#endif
}
......
......@@ -202,11 +202,8 @@ void waitForCapability (Task *task, Mutex *mutex, Capability **pCap);
// Wakes up a thread on a Capability (probably a different Capability
// from the one held by the current Task).
//
void wakeupThreadOnCapability (Capability *cap, StgTSO *tso);
void wakeupThreadOnCapability_lock (Capability *cap, StgTSO *tso);
void migrateThreadToCapability (Capability *cap, StgTSO *tso);
void migrateThreadToCapability_lock (Capability *cap, StgTSO *tso);
void wakeupThreadOnCapability (Capability *my_cap, Capability *other_cap,
StgTSO *tso);
// Wakes up a worker thread on just one Capability, used when we
// need to service some global event.
......@@ -252,6 +249,8 @@ recordMutableCap (StgClosure *p, Capability *cap, nat gen)
{
bdescr *bd;
// We must own this Capability in order to modify its mutable list.
ASSERT(cap->running_task == myTask());
bd = cap->mut_lists[gen];
if (bd->free >= bd->start + BLOCK_SIZE_W) {
bdescr *new_bd;
......
......@@ -1871,7 +1871,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
if (cpu == cap->no) {
appendToRunQueue(cap,tso);
} else {
migrateThreadToCapability_lock(&capabilities[cpu],tso);
wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
}
#else
appendToRunQueue(cap,tso);
......@@ -2312,8 +2312,6 @@ checkBlackHoles (Capability *cap)
if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
IF_DEBUG(sanity,checkTSO(t));
t = unblockOne(cap, t);
// urk, the threads migrate to the current capability
// here, but we'd like to keep them on the original one.
*prev = t;
any_woke_up = rtsTrue;
} else {
......
......@@ -237,16 +237,21 @@ appendToBlockedQueue(StgTSO *tso)
#endif
#if defined(THREADED_RTS)
// Assumes: my_cap is owned by the current Task. We hold
// other_cap->lock, but we do not necessarily own other_cap; another
// Task may be running on it.
INLINE_HEADER void
appendToWakeupQueue (Capability *cap, StgTSO *tso)
appendToWakeupQueue (Capability *my_cap, Capability *other_cap, StgTSO *tso)
{
ASSERT(tso->_link == END_TSO_QUEUE);
if (cap->wakeup_queue_hd == END_TSO_QUEUE) {
cap->wakeup_queue_hd = tso;
if (other_cap->wakeup_queue_hd == END_TSO_QUEUE) {
other_cap->wakeup_queue_hd = tso;
} else {
setTSOLink(cap, cap->wakeup_queue_tl, tso);
// my_cap is passed to setTSOLink() because it may need to
// write to the mutable list.
setTSOLink(my_cap, other_cap->wakeup_queue_tl, tso);
}
cap->wakeup_queue_tl = tso;
other_cap->wakeup_queue_tl = tso;
}
#endif
......
......@@ -510,7 +510,7 @@ unblockOne_ (Capability *cap, StgTSO *tso,
context_switch = 1;
} else {
// we'll try to wake it up on the Capability it was last on.
wakeupThreadOnCapability_lock(tso->cap, tso);
wakeupThreadOnCapability(cap, tso->cap, tso);
}
#else
appendToRunQueue(cap,tso);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment