Commit 111ba4be authored by Simon Marlow's avatar Simon Marlow
Browse files

Fix deadlock (#10545)

yieldCapability() was not prepared to be called by a Task that is not
either a worker or a bound Task.  This could happen if we ended up in
yieldCapability via this call stack:

performGC()
scheduleDoGC()
requestSync()
yieldCapability()

and there were a few other ways this could happen via requestSync.
The fix is to handle this case in yieldCapability(): when the Task is
not a worker or a bound Task, we put it on the returning_workers
queue, where it will be woken up again.

Summary of changes:

* `yieldCapability`: factored out subroutine waitForWorkerCapability`
* `waitForReturnCapability` renamed to `waitForCapability`, and
  factored out subroutine `waitForReturnCapability`
* `releaseCapabilityAndQueue` worker renamed to `enqueueWorker`, does
  not take a lock and no longer tests if `!isBoundTask()`
* `yieldCapability` adjusted for refactorings, only change in behavior
  is when it is not a worker or bound task.

Test Plan:
* new test concurrent/should_run/performGC
* validate

Reviewers: niteria, austin, ezyang, bgamari

Subscribers: thomie, bgamari

Differential Revision: https://phabricator.haskell.org/D997

GHC Trac Issues: #10545
parent be0ce871
......@@ -43,7 +43,7 @@ nat enabled_capabilities = 0;
// The array of Capabilities. It's important that when we need
// to allocate more Capabilities we don't have to move the existing
// Capabilities, because there may be pointers to them in use
// (e.g. threads in waitForReturnCapability(), see #8209), so this is
// (e.g. threads in waitForCapability(), see #8209), so this is
// an array of Capability* rather than an array of Capability.
Capability **capabilities = NULL;
......@@ -450,11 +450,10 @@ giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
#endif
/* ----------------------------------------------------------------------------
* Function: releaseCapability(Capability*)
* releaseCapability
*
* Purpose: Letting go of a capability. Causes a
* 'returning worker' thread or a 'waiting worker'
* to wake up, in that order.
* The current Task (cap->task) releases the Capability. The Capability is
* marked free, and if there is any work to do, an appropriate Task is woken up.
* ------------------------------------------------------------------------- */
#if defined(THREADED_RTS)
......@@ -474,13 +473,13 @@ releaseCapability_ (Capability* cap,
// the go-ahead to return the result of an external call..
if (cap->returning_tasks_hd != NULL) {
giveCapabilityToTask(cap,cap->returning_tasks_hd);
// The Task pops itself from the queue (see waitForReturnCapability())
// The Task pops itself from the queue (see waitForCapability())
return;
}
// If there is a pending sync, then we should just leave the
// Capability free. The thread trying to sync will be about to
// call waitForReturnCapability().
// call waitForCapability().
if (pending_sync != 0 && pending_sync != SYNC_GC_PAR) {
last_free_capability = cap; // needed?
debugTrace(DEBUG_sched, "sync pending, set capability %d free", cap->no);
......@@ -549,62 +548,156 @@ releaseAndWakeupCapability (Capability* cap USED_IF_THREADS)
}
static void
releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
enqueueWorker (Capability* cap USED_IF_THREADS)
{
Task *task;
ACQUIRE_LOCK(&cap->lock);
task = cap->running_task;
// If the Task is stopped, we shouldn't be yielding, we should
// be just exiting.
ASSERT(!task->stopped);
ASSERT(task->worker);
// If the current task is a worker, save it on the spare_workers
// list of this Capability. A worker can mark itself as stopped,
// in which case it is not replaced on the spare_worker queue.
// This happens when the system is shutting down (see
// Schedule.c:workerStart()).
if (!isBoundTask(task))
if (cap->n_spare_workers < MAX_SPARE_WORKERS)
{
task->next = cap->spare_workers;
cap->spare_workers = task;
cap->n_spare_workers++;
}
else
{
if (cap->n_spare_workers < MAX_SPARE_WORKERS)
{
task->next = cap->spare_workers;
cap->spare_workers = task;
cap->n_spare_workers++;
debugTrace(DEBUG_sched, "%d spare workers already, exiting",
cap->n_spare_workers);
releaseCapability_(cap,rtsFalse);
// hold the lock until after workerTaskStop; c.f. scheduleWorker()
workerTaskStop(task);
RELEASE_LOCK(&cap->lock);
shutdownThread();
}
}
#endif
/* ----------------------------------------------------------------------------
* waitForWorkerCapability(task)
*
* waits to be given a Capability, and then returns the Capability. The task
* must be either a worker (and on a cap->spare_workers queue), or a bound Task.
* ------------------------------------------------------------------------- */
#if defined(THREADED_RTS)
static Capability * waitForWorkerCapability (Task *task)
{
Capability *cap;
for (;;) {
ACQUIRE_LOCK(&task->lock);
// task->lock held, cap->lock not held
if (!task->wakeup) waitCondition(&task->cond, &task->lock);
cap = task->cap;
task->wakeup = rtsFalse;
RELEASE_LOCK(&task->lock);
debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);
ACQUIRE_LOCK(&cap->lock);
if (cap->running_task != NULL) {
debugTrace(DEBUG_sched,
"capability %d is owned by another task", cap->no);
RELEASE_LOCK(&cap->lock);
continue;
}
else
{
debugTrace(DEBUG_sched, "%d spare workers already, exiting",
cap->n_spare_workers);
releaseCapability_(cap,rtsFalse);
// hold the lock until after workerTaskStop; c.f. scheduleWorker()
workerTaskStop(task);
if (task->cap != cap) {
// see Note [migrated bound threads]
debugTrace(DEBUG_sched,
"task has been migrated to cap %d", task->cap->no);
RELEASE_LOCK(&cap->lock);
shutdownThread();
continue;
}
if (task->incall->tso == NULL) {
ASSERT(cap->spare_workers != NULL);
// if we're not at the front of the queue, release it
// again. This is unlikely to happen.
if (cap->spare_workers != task) {
giveCapabilityToTask(cap,cap->spare_workers);
RELEASE_LOCK(&cap->lock);
continue;
}
cap->spare_workers = task->next;
task->next = NULL;
cap->n_spare_workers--;
}
cap->running_task = task;
RELEASE_LOCK(&cap->lock);
break;
}
// Bound tasks just float around attached to their TSOs.
releaseCapability_(cap,rtsFalse);
return cap;
}
RELEASE_LOCK(&cap->lock);
#endif /* THREADED_RTS */
/* ----------------------------------------------------------------------------
* waitForReturnCapability (Task *task)
*
* The Task should be on the cap->returning_tasks queue of a Capability. This
* function waits for the Task to be woken up, and returns the Capability that
* it was woken up on.
*
* ------------------------------------------------------------------------- */
#if defined(THREADED_RTS)
static Capability * waitForReturnCapability (Task *task)
{
Capability *cap;
for (;;) {
ACQUIRE_LOCK(&task->lock);
// task->lock held, cap->lock not held
if (!task->wakeup) waitCondition(&task->cond, &task->lock);
cap = task->cap;
task->wakeup = rtsFalse;
RELEASE_LOCK(&task->lock);
// now check whether we should wake up...
ACQUIRE_LOCK(&cap->lock);
if (cap->running_task == NULL) {
if (cap->returning_tasks_hd != task) {
giveCapabilityToTask(cap,cap->returning_tasks_hd);
RELEASE_LOCK(&cap->lock);
continue;
}
cap->running_task = task;
popReturningTask(cap);
RELEASE_LOCK(&cap->lock);
break;
}
RELEASE_LOCK(&cap->lock);
}
return cap;
}
#endif
#endif /* THREADED_RTS */
/* ----------------------------------------------------------------------------
* waitForReturnCapability (Capability **pCap, Task *task)
* waitForCapability (Capability **pCap, Task *task)
*
* Purpose: when an OS thread returns from an external call,
* it calls waitForReturnCapability() (via Schedule.resumeThread())
* it calls waitForCapability() (via Schedule.resumeThread())
* to wait for permission to enter the RTS & communicate the
* result of the external call back to the Haskell thread that
* made it.
*
* ------------------------------------------------------------------------- */
void
waitForReturnCapability (Capability **pCap, Task *task)
void waitForCapability (Capability **pCap, Task *task)
{
#if !defined(THREADED_RTS)
......@@ -641,10 +734,9 @@ waitForReturnCapability (Capability **pCap, Task *task)
ASSERT(task->cap == cap);
}
ACQUIRE_LOCK(&cap->lock);
debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no);
ACQUIRE_LOCK(&cap->lock);
if (!cap->running_task) {
// It's free; just grab it
cap->running_task = task;
......@@ -652,31 +744,7 @@ waitForReturnCapability (Capability **pCap, Task *task)
} else {
newReturningTask(cap,task);
RELEASE_LOCK(&cap->lock);
for (;;) {
ACQUIRE_LOCK(&task->lock);
// task->lock held, cap->lock not held
if (!task->wakeup) waitCondition(&task->cond, &task->lock);
cap = task->cap;
task->wakeup = rtsFalse;
RELEASE_LOCK(&task->lock);
// now check whether we should wake up...
ACQUIRE_LOCK(&cap->lock);
if (cap->running_task == NULL) {
if (cap->returning_tasks_hd != task) {
giveCapabilityToTask(cap,cap->returning_tasks_hd);
RELEASE_LOCK(&cap->lock);
continue;
}
cap->running_task = task;
popReturningTask(cap);
RELEASE_LOCK(&cap->lock);
break;
}
RELEASE_LOCK(&cap->lock);
}
cap = waitForReturnCapability(task);
}
#ifdef PROFILING
......@@ -691,11 +759,30 @@ waitForReturnCapability (Capability **pCap, Task *task)
#endif
}
#if defined(THREADED_RTS)
/* ----------------------------------------------------------------------------
* yieldCapability
*
* Give up the Capability, and return when we have it again. This is called
* when either we know that the Capability should be given to another Task, or
* there is nothing to do right now. One of the following is true:
*
* - The current Task is a worker, and there's a bound thread at the head of
* the run queue (or vice versa)
*
* - The run queue is empty. We'll be woken up again when there's work to
* do.
*
* - Another Task is trying to do parallel GC (pending_sync == SYNC_GC_PAR).
* We should become a GC worker for a while.
*
* - Another Task is trying to acquire all the Capabilities (pending_sync !=
* SYNC_GC_PAR), either to do a sequential GC, forkProcess, or
* setNumCapabilities. We should give up the Capability temporarily.
*
* ------------------------------------------------------------------------- */
#if defined (THREADED_RTS)
/* See Note [GC livelock] in Schedule.c for why we have gcAllowed
and return the rtsBool */
rtsBool /* Did we GC? */
......@@ -714,63 +801,39 @@ yieldCapability (Capability** pCap, Task *task, rtsBool gcAllowed)
}
}
debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
// We must now release the capability and wait to be woken up
// again.
task->wakeup = rtsFalse;
releaseCapabilityAndQueueWorker(cap);
for (;;) {
ACQUIRE_LOCK(&task->lock);
// task->lock held, cap->lock not held
if (!task->wakeup) waitCondition(&task->cond, &task->lock);
cap = task->cap;
task->wakeup = rtsFalse;
RELEASE_LOCK(&task->lock);
debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);
ACQUIRE_LOCK(&cap->lock);
if (cap->running_task != NULL) {
debugTrace(DEBUG_sched,
"capability %d is owned by another task", cap->no);
RELEASE_LOCK(&cap->lock);
continue;
}
// We must now release the capability and wait to be woken up again.
task->wakeup = rtsFalse;
if (task->cap != cap) {
// see Note [migrated bound threads]
debugTrace(DEBUG_sched,
"task has been migrated to cap %d", task->cap->no);
RELEASE_LOCK(&cap->lock);
continue;
}
ACQUIRE_LOCK(&cap->lock);
if (task->incall->tso == NULL) {
ASSERT(cap->spare_workers != NULL);
// if we're not at the front of the queue, release it
// again. This is unlikely to happen.
if (cap->spare_workers != task) {
giveCapabilityToTask(cap,cap->spare_workers);
RELEASE_LOCK(&cap->lock);
continue;
}
cap->spare_workers = task->next;
task->next = NULL;
cap->n_spare_workers--;
}
// If this is a worker thread, put it on the spare_workers queue
if (isWorker(task)) {
enqueueWorker(cap);
}
cap->running_task = task;
RELEASE_LOCK(&cap->lock);
break;
}
releaseCapability_(cap, rtsFalse);
debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
ASSERT(cap->running_task == task);
if (isWorker(task) || isBoundTask(task)) {
RELEASE_LOCK(&cap->lock);
cap = waitForWorkerCapability(task);
} else {
// Not a worker Task, or a bound Task. The only way we can be woken up
// again is to put ourselves on the returning_tasks queue, so that's
// what we do. We still hold cap->lock at this point
// The Task waiting for this Capability does not have it
// yet, so we can be sure to be woken up later. (see #10545)
newReturningTask(cap,task);
RELEASE_LOCK(&cap->lock);
cap = waitForReturnCapability(task);
}
debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
ASSERT(cap->running_task == task);
#ifdef PROFILING
cap->r.rCCCS = CCS_SYSTEM;
cap->r.rCCCS = CCS_SYSTEM;
#endif
*pCap = cap;
......@@ -780,6 +843,8 @@ yieldCapability (Capability** pCap, Task *task, rtsBool gcAllowed)
return rtsFalse;
}
#endif /* THREADED_RTS */
// Note [migrated bound threads]
//
// There's a tricky case where:
......@@ -815,6 +880,8 @@ yieldCapability (Capability** pCap, Task *task, rtsBool gcAllowed)
* get every Capability into the GC.
* ------------------------------------------------------------------------- */
#if defined (THREADED_RTS)
void
prodCapability (Capability *cap, Task *task)
{
......@@ -826,6 +893,8 @@ prodCapability (Capability *cap, Task *task)
RELEASE_LOCK(&cap->lock);
}
#endif /* THREADED_RTS */
/* ----------------------------------------------------------------------------
* tryGrabCapability
*
......@@ -833,6 +902,8 @@ prodCapability (Capability *cap, Task *task)
*
* ------------------------------------------------------------------------- */
#if defined (THREADED_RTS)
rtsBool
tryGrabCapability (Capability *cap, Task *task)
{
......
......@@ -248,7 +248,7 @@ extern volatile StgWord pending_sync;
//
// On return, *cap is non-NULL, and points to the Capability acquired.
//
void waitForReturnCapability (Capability **cap/*in/out*/, Task *task);
void waitForCapability (Capability **cap/*in/out*/, Task *task);
EXTERN_INLINE void recordMutableCap (StgClosure *p, Capability *cap, nat gen);
......@@ -269,12 +269,6 @@ EXTERN_INLINE void recordClosureMutated (Capability *cap, StgClosure *p);
//
rtsBool yieldCapability (Capability** pCap, Task *task, rtsBool gcAllowed);
// Acquires a capability for doing some work.
//
// On return: pCap points to the capability.
//
void waitForCapability (Task *task, Mutex *mutex, Capability **pCap);
// Wakes up a worker thread on just one Capability, used when we
// need to service some global event.
//
......
......@@ -564,7 +564,7 @@ rts_lock (void)
}
cap = NULL;
waitForReturnCapability(&cap, task);
waitForCapability(&cap, task);
if (task->incall->prev_stack == NULL) {
// This is a new outermost call from C into Haskell land.
......
......@@ -1424,7 +1424,7 @@ static void acquireAllCapabilities(Capability *cap, Task *task)
// all the Capabilities, but even so it's a slightly
// unsavoury invariant.
task->cap = tmpcap;
waitForReturnCapability(&tmpcap, task);
waitForCapability(&tmpcap, task);
if (tmpcap->no != i) {
barf("acquireAllCapabilities: got the wrong capability");
}
......@@ -1801,7 +1801,7 @@ forkProcess(HsStablePtr *entry
task = newBoundTask();
cap = NULL;
waitForReturnCapability(&cap, task);
waitForCapability(&cap, task);
#ifdef THREADED_RTS
do {
......@@ -2278,7 +2278,7 @@ resumeThread (void *task_)
task->cap = cap;
// Wait for permission to re-enter the RTS with the result.
waitForReturnCapability(&cap,task);
waitForCapability(&cap,task);
// we might be on a different capability now... but if so, our
// entry on the suspended_ccalls list will also have been
// migrated.
......@@ -2408,7 +2408,7 @@ void scheduleWorker (Capability *cap, Task *task)
// cap->lock until we've finished workerTaskStop() below.
//
// There may be workers still involved in foreign calls; those
// will just block in waitForReturnCapability() because the
// will just block in waitForCapability() because the
// Capability has been shut down.
//
ACQUIRE_LOCK(&cap->lock);
......@@ -2499,7 +2499,7 @@ exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
if (sched_state < SCHED_SHUTTING_DOWN) {
sched_state = SCHED_INTERRUPTING;
Capability *cap = task->cap;
waitForReturnCapability(&cap,task);
waitForCapability(&cap,task);
scheduleDoGC(&cap,task,rtsTrue);
ASSERT(task->incall->tso == NULL);
releaseCapability(cap);
......@@ -2523,7 +2523,7 @@ freeScheduler( void )
still_running = freeTaskManager();
// We can only free the Capabilities if there are no Tasks still
// running. We might have a Task about to return from a foreign
// call into waitForReturnCapability(), for example (actually,
// call into waitForCapability(), for example (actually,
// this should be the *only* thing that a still-running Task can
// do at this point, and it will block waiting for the
// Capability).
......@@ -2567,7 +2567,7 @@ performGC_(rtsBool force_major)
// TODO: do we need to traceTask*() here?
waitForReturnCapability(&cap,task);
waitForCapability(&cap,task);
scheduleDoGC(&cap,task,force_major);
releaseCapability(cap);
boundTaskExiting(task);
......
......@@ -167,6 +167,17 @@ isBoundTask (Task *task)
return (task->incall->tso != NULL);
}
// A Task is currently a worker if
// (a) it was created as a worker (task->worker), and
// (b) it has not left and re-entered Haskell, in which case
// task->incall->prev_stack would be non-NULL.
//
INLINE_HEADER rtsBool
isWorker (Task *task)
{
return (task->worker && task->incall->prev_stack == NULL);
}
// Linked list of all tasks.
//
extern Task *all_tasks;
......
This diff is collapsed.
......@@ -104,6 +104,10 @@ test('allocLimit4', [ extra_run_opts('+RTS -xq300k -RTS'),
omit_ways(['ghci']) ],
compile_and_run, [''])
test('performGC', [ only_ways(['threaded1','threaded2'])
, extra_run_opts('400 +RTS -qg -RTS') ],
compile_and_run, [''])
# -----------------------------------------------------------------------------
# These tests we only do for a full run
......
module Main (main) where
-- Test for #10545
import System.Environment
import Control.Concurrent
import Control.Exception
import Control.Monad
import RandomPGC
import System.Mem
import qualified Data.Set as Set
main = do
[n] <- getArgs
forkIO $ doSomeWork
forM [1..read n] $ \n -> do print n; threadDelay 1000; performMinorGC
doSomeWork :: IO ()
doSomeWork = forever $ do
ns <- replicateM 10000 randomIO :: IO [Int]
ms <- replicateM 1000 randomIO
let set = Set.fromList ns
elems = filter (`Set.member` set) ms
evaluate $ sum elems
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55