Commit 8b75acd3 authored by Simon Marlow's avatar Simon Marlow
Browse files

Make forkProcess work with +RTS -N

Consider this experimental for the time being.  There are a lot of
things that could go wrong, but I've verified that at least it works
on the test cases we have.

I also did some API cleanups while I was here.  Previously we had:

Capability * rts_eval (Capability *cap, HaskellObj p, /*out*/HaskellObj *ret);

but this API is particularly error-prone: if you forget to discard the
Capability * you passed in and use the return value instead, then
you're in for subtle bugs with +RTS -N later on.  So I changed all
these functions to this form:

void rts_eval (/* inout */ Capability **cap,
               /* in    */ HaskellObj p,
               /* out */   HaskellObj *ret)

It's much harder to use this version incorrectly, because you have to
pass the Capability in by reference.
parent 657773c8
......@@ -609,8 +609,8 @@ mkFExportCBits dflags c_nm maybe_target arg_htys res_hty is_IO_res_ty cc
, declareCResult
, text "cap = rts_lock();"
-- create the application + perform it.
, ptext (sLit "cap=rts_evalIO") <> parens (
cap <>
, ptext (sLit "rts_evalIO") <> parens (
char '&' <> cap <>
ptext (sLit "rts_apply") <> parens (
cap <>
text "(HaskellObj)"
......
......@@ -181,32 +181,44 @@ HsBool rts_getBool ( HaskellObj );
The versions ending in '_' allow you to specify an initial stack size.
Note that these calls may cause Garbage Collection, so all HaskellObj
references are rendered invalid by these calls.
All of these functions take a (Capability **) - there is a
Capability pointer both input and output. We use an inout
parameter because this is less error-prone for the client than a
return value - the client could easily forget to use the return
value, whereas incorrectly using an inout parameter will usually
result in a type error.
------------------------------------------------------------------------- */
Capability *
rts_eval (Capability *, HaskellObj p, /*out*/HaskellObj *ret);
Capability *
rts_eval_ (Capability *, HaskellObj p, unsigned int stack_size,
/*out*/HaskellObj *ret);
void rts_eval (/* inout */ Capability **,
/* in */ HaskellObj p,
/* out */ HaskellObj *ret);
void rts_eval_ (/* inout */ Capability **,
/* in */ HaskellObj p,
/* in */ unsigned int stack_size,
/* out */ HaskellObj *ret);
Capability *
rts_evalIO (Capability *, HaskellObj p, /*out*/HaskellObj *ret);
void rts_evalIO (/* inout */ Capability **,
/* in */ HaskellObj p,
/* out */ HaskellObj *ret);
Capability *
rts_evalStableIO (Capability *, HsStablePtr s, /*out*/HsStablePtr *ret);
void rts_evalStableIO (/* inout */ Capability **,
/* in */ HsStablePtr s,
/* out */ HsStablePtr *ret);
Capability *
rts_evalLazyIO (Capability *, HaskellObj p, /*out*/HaskellObj *ret);
void rts_evalLazyIO (/* inout */ Capability **,
/* in */ HaskellObj p,
/* out */ HaskellObj *ret);
Capability *
rts_evalLazyIO_ (Capability *, HaskellObj p, unsigned int stack_size,
/*out*/HaskellObj *ret);
void rts_evalLazyIO_ (/* inout */ Capability **,
/* in */ HaskellObj p,
/* in */ unsigned int stack_size,
/* out */ HaskellObj *ret);
void
rts_checkSchedStatus (char* site, Capability *);
void rts_checkSchedStatus (char* site, Capability *);
SchedulerStatus
rts_getSchedStatus (Capability *cap);
SchedulerStatus rts_getSchedStatus (Capability *cap);
/* --------------------------------------------------------------------------
Wrapper closures
......
......@@ -20,8 +20,9 @@
//
StgTSO *createThread (Capability *cap, nat stack_size);
Capability *scheduleWaitThread (StgTSO *tso, /*out*/HaskellObj* ret,
Capability *cap);
void scheduleWaitThread (/* in */ StgTSO *tso,
/* out */ HaskellObj* ret,
/* inout */ Capability **cap);
StgTSO *createGenThread (Capability *cap, nat stack_size,
StgClosure *closure);
......
......@@ -40,8 +40,12 @@ Capability *capabilities = NULL;
// locking, so we don't do that.
Capability *last_free_capability = NULL;
/* GC indicator, in scope for the scheduler, init'ed to false */
volatile StgWord waiting_for_gc = 0;
/*
* Indicates that the RTS wants to synchronise all the Capabilities
* for some reason. All Capabilities should stop and return to the
* scheduler.
*/
volatile StgWord pending_sync = 0;
/* Let foreign code get the current Capability -- assuming there is one!
* This is useful for unsafe foreign calls because they are called with
......@@ -422,13 +426,12 @@ releaseCapability_ (Capability* cap,
return;
}
if (waiting_for_gc == PENDING_GC_SEQ) {
if (pending_sync == SYNC_GC_SEQ || pending_sync == SYNC_FORK) {
last_free_capability = cap; // needed?
debugTrace(DEBUG_sched, "GC pending, set capability %d free", cap->no);
debugTrace(DEBUG_sched, "sync pending, set capability %d free", cap->no);
return;
}
// If the next thread on the run queue is a bound thread,
// give this Capability to the appropriate Task.
if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
......@@ -536,7 +539,7 @@ releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
#endif
/* ----------------------------------------------------------------------------
* waitForReturnCapability( Task *task )
* waitForReturnCapability (Capability **pCap, Task *task)
*
* Purpose: when an OS thread returns from an external call,
* it calls waitForReturnCapability() (via Schedule.resumeThread())
......@@ -643,7 +646,7 @@ yieldCapability (Capability** pCap, Task *task)
{
Capability *cap = *pCap;
if (waiting_for_gc == PENDING_GC_PAR) {
if (pending_sync == SYNC_GC_PAR) {
traceEventGcStart(cap);
gcWorkerThread(cap);
traceEventGcEnd(cap);
......
......@@ -199,10 +199,15 @@ extern Capability *capabilities;
//
extern Capability *last_free_capability;
// GC indicator, in scope for the scheduler
#define PENDING_GC_SEQ 1
#define PENDING_GC_PAR 2
extern volatile StgWord waiting_for_gc;
//
// Indicates that the RTS wants to synchronise all the Capabilities
// for some reason. All Capabilities should stop and return to the
// scheduler.
//
#define SYNC_GC_SEQ 1
#define SYNC_GC_PAR 2
#define SYNC_FORK 3
extern volatile StgWord pending_sync;
// Acquires a capability at a return point. If *cap is non-NULL, then
// this is taken as a preference for the Capability we wish to
......
......@@ -421,36 +421,39 @@ createStrictIOThread(Capability *cap, nat stack_size, StgClosure *closure)
Evaluating Haskell expressions
------------------------------------------------------------------------- */
Capability *
rts_eval (Capability *cap, HaskellObj p, /*out*/HaskellObj *ret)
void rts_eval (/* inout */ Capability **cap,
/* in */ HaskellObj p,
/* out */ HaskellObj *ret)
{
StgTSO *tso;
tso = createGenThread(cap, RtsFlags.GcFlags.initialStkSize, p);
return scheduleWaitThread(tso,ret,cap);
tso = createGenThread(*cap, RtsFlags.GcFlags.initialStkSize, p);
scheduleWaitThread(tso,ret,cap);
}
Capability *
rts_eval_ (Capability *cap, HaskellObj p, unsigned int stack_size,
/*out*/HaskellObj *ret)
void rts_eval_ (/* inout */ Capability **cap,
/* in */ HaskellObj p,
/* in */ unsigned int stack_size,
/* out */ HaskellObj *ret)
{
StgTSO *tso;
tso = createGenThread(cap, stack_size, p);
return scheduleWaitThread(tso,ret,cap);
tso = createGenThread(*cap, stack_size, p);
scheduleWaitThread(tso,ret,cap);
}
/*
* rts_evalIO() evaluates a value of the form (IO a), forcing the action's
* result to WHNF before returning.
*/
Capability *
rts_evalIO (Capability *cap, HaskellObj p, /*out*/HaskellObj *ret)
void rts_evalIO (/* inout */ Capability **cap,
/* in */ HaskellObj p,
/* out */ HaskellObj *ret)
{
StgTSO* tso;
tso = createStrictIOThread(cap, RtsFlags.GcFlags.initialStkSize, p);
return scheduleWaitThread(tso,ret,cap);
tso = createStrictIOThread(*cap, RtsFlags.GcFlags.initialStkSize, p);
scheduleWaitThread(tso,ret,cap);
}
/*
......@@ -459,49 +462,50 @@ rts_evalIO (Capability *cap, HaskellObj p, /*out*/HaskellObj *ret)
* action's result to WHNF before returning. The result is returned
* in a StablePtr.
*/
Capability *
rts_evalStableIO (Capability *cap, HsStablePtr s, /*out*/HsStablePtr *ret)
void rts_evalStableIO (/* inout */ Capability **cap,
/* in */ HsStablePtr s,
/* out */ HsStablePtr *ret)
{
StgTSO* tso;
StgClosure *p, *r;
SchedulerStatus stat;
p = (StgClosure *)deRefStablePtr(s);
tso = createStrictIOThread(cap, RtsFlags.GcFlags.initialStkSize, p);
tso = createStrictIOThread(*cap, RtsFlags.GcFlags.initialStkSize, p);
// async exceptions are always blocked by default in the created
// thread. See #1048.
tso->flags |= TSO_BLOCKEX | TSO_INTERRUPTIBLE;
cap = scheduleWaitThread(tso,&r,cap);
stat = rts_getSchedStatus(cap);
scheduleWaitThread(tso,&r,cap);
stat = rts_getSchedStatus(*cap);
if (stat == Success && ret != NULL) {
ASSERT(r != NULL);
*ret = getStablePtr((StgPtr)r);
}
return cap;
}
/*
* Like rts_evalIO(), but doesn't force the action's result.
*/
Capability *
rts_evalLazyIO (Capability *cap, HaskellObj p, /*out*/HaskellObj *ret)
void rts_evalLazyIO (/* inout */ Capability **cap,
/* in */ HaskellObj p,
/* out */ HaskellObj *ret)
{
StgTSO *tso;
tso = createIOThread(cap, RtsFlags.GcFlags.initialStkSize, p);
return scheduleWaitThread(tso,ret,cap);
tso = createIOThread(*cap, RtsFlags.GcFlags.initialStkSize, p);
scheduleWaitThread(tso,ret,cap);
}
Capability *
rts_evalLazyIO_ (Capability *cap, HaskellObj p, unsigned int stack_size,
/*out*/HaskellObj *ret)
void rts_evalLazyIO_ (/* inout */ Capability **cap,
/* in */ HaskellObj p,
/* in */ unsigned int stack_size,
/* out */ HaskellObj *ret)
{
StgTSO *tso;
tso = createIOThread(cap, stack_size, p);
return scheduleWaitThread(tso,ret,cap);
tso = createIOThread(*cap, stack_size, p);
scheduleWaitThread(tso,ret,cap);
}
/* Convenience function for decoding the returned status. */
......
......@@ -60,7 +60,7 @@ static void real_main(void)
/* ToDo: want to start with a larger stack size */
{
Capability *cap = rts_lock();
cap = rts_evalLazyIO(cap,progmain_closure, NULL);
rts_evalLazyIO(&cap,progmain_closure, NULL);
status = rts_getSchedStatus(cap);
taskTimeStamp(myTask());
rts_unlock(cap);
......
......@@ -431,7 +431,7 @@ static void flushStdHandles(void)
{
Capability *cap;
cap = rts_lock();
cap = rts_evalIO(cap, flushStdHandles_closure, NULL);
rts_evalIO(&cap, flushStdHandles_closure, NULL);
rts_unlock(cap);
}
......
......@@ -40,6 +40,7 @@
#include "Timer.h"
#include "ThreadPaused.h"
#include "Messages.h"
#include "Stable.h"
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
......@@ -130,6 +131,10 @@ static void scheduleFindWork (Capability *cap);
#if defined(THREADED_RTS)
static void scheduleYield (Capability **pcap, Task *task);
#endif
#if defined(THREADED_RTS)
static nat requestSync (Capability **pcap, Task *task, nat sync_type);
static void acquireAllCapabilities(Capability *cap, Task *task);
#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
static void scheduleProcessInbox(Capability *cap);
......@@ -617,7 +622,7 @@ shouldYieldCapability (Capability *cap, Task *task)
// - the thread at the head of the run queue cannot be run
// by this Task (it is bound to another Task, or it is unbound
// and this task it bound).
return (waiting_for_gc ||
return (pending_sync ||
cap->returning_tasks_hd != NULL ||
(!emptyRunQueue(cap) && (task->incall->tso == NULL
? cap->run_queue_hd->bound != NULL
......@@ -1318,6 +1323,72 @@ scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
}
}
/* -----------------------------------------------------------------------------
* Start a synchronisation of all capabilities
* -------------------------------------------------------------------------- */
// Returns:
// 0 if we successfully got a sync
// non-0 if there was another sync request in progress,
// and we yielded to it. The value returned is the
// type of the other sync request.
//
#if defined(THREADED_RTS)
static nat requestSync (Capability **pcap, Task *task, nat sync_type)
{
nat prev_pending_sync;
prev_pending_sync = cas(&pending_sync, 0, sync_type);
if (prev_pending_sync)
{
do {
debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
prev_pending_sync);
ASSERT(*pcap);
yieldCapability(pcap,task);
} while (pending_sync);
return prev_pending_sync; // NOTE: task->cap might have changed now
}
else
{
return 0;
}
}
//
// Grab all the capabilities except the one we already hold. Used
// when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
// before a fork (SYNC_FORK).
//
// Only call this after requestSync(), otherwise a deadlock might
// ensue if another thread is trying to synchronise.
//
static void acquireAllCapabilities(Capability *cap, Task *task)
{
Capability *tmpcap;
nat i;
for (i=0; i < n_capabilities; i++) {
debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", i, n_capabilities);
tmpcap = &capabilities[i];
if (tmpcap != cap) {
// we better hope this task doesn't get migrated to
// another Capability while we're waiting for this one.
// It won't, because load balancing happens while we have
// all the Capabilities, but even so it's a slightly
// unsavoury invariant.
task->cap = tmpcap;
waitForReturnCapability(&tmpcap, task);
if (tmpcap != &capabilities[i]) {
barf("acquireAllCapabilities: got the wrong capability");
}
}
}
}
#endif
/* -----------------------------------------------------------------------------
* Perform a garbage collection if necessary
* -------------------------------------------------------------------------- */
......@@ -1327,10 +1398,8 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
{
rtsBool heap_census;
#ifdef THREADED_RTS
/* extern static volatile StgWord waiting_for_gc;
lives inside capability.c */
rtsBool gc_type, prev_pending_gc;
nat i;
rtsBool gc_type;
nat i, sync;
#endif
if (sched_state == SCHED_SHUTTING_DOWN) {
......@@ -1346,9 +1415,9 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
&& N >= RtsFlags.ParFlags.parGcGen
&& ! oldest_gen->mark)
{
gc_type = PENDING_GC_PAR;
gc_type = SYNC_GC_PAR;
} else {
gc_type = PENDING_GC_SEQ;
gc_type = SYNC_GC_SEQ;
}
// In order to GC, there must be no threads running Haskell code.
......@@ -1363,26 +1432,25 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
//
/* Other capabilities are prevented from running yet more Haskell
threads if waiting_for_gc is set. Tested inside
threads if pending_sync is set. Tested inside
yieldCapability() and releaseCapability() in Capability.c */
prev_pending_gc = cas(&waiting_for_gc, 0, gc_type);
if (prev_pending_gc) {
do {
debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...",
prev_pending_gc);
ASSERT(cap);
yieldCapability(&cap,task);
} while (waiting_for_gc);
return cap; // NOTE: task->cap might have changed here
}
do {
sync = requestSync(&cap, task, gc_type);
if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) {
// someone else had a pending sync request for a GC, so
// let's assume GC has been done and we don't need to GC
// again.
return cap;
}
} while (sync);
interruptAllCapabilities();
// The final shutdown GC is always single-threaded, because it's
// possible that some of the Capabilities have no worker threads.
if (gc_type == PENDING_GC_SEQ)
if (gc_type == SYNC_GC_SEQ)
{
traceEventRequestSeqGc(cap);
}
......@@ -1392,25 +1460,10 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
}
if (gc_type == PENDING_GC_SEQ)
if (gc_type == SYNC_GC_SEQ)
{
// single-threaded GC: grab all the capabilities
for (i=0; i < n_capabilities; i++) {
debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
if (cap != &capabilities[i]) {
Capability *pcap = &capabilities[i];
// we better hope this task doesn't get migrated to
// another Capability while we're waiting for this one.
// It won't, because load balancing happens while we have
// all the Capabilities, but even so it's a slightly
// unsavoury invariant.
task->cap = pcap;
waitForReturnCapability(&pcap, task);
if (pcap != &capabilities[i]) {
barf("scheduleDoGC: got the wrong capability");
}
}
}
acquireAllCapabilities(cap,task);
}
else
{
......@@ -1455,9 +1508,9 @@ delete_threads_and_gc:
traceEventGcStart(cap);
#if defined(THREADED_RTS)
// reset waiting_for_gc *before* GC, so that when the GC threads
// reset pending_sync *before* GC, so that when the GC threads
// emerge they don't immediately re-enter the GC.
waiting_for_gc = 0;
pending_sync = 0;
GarbageCollect(force_major || heap_census, heap_census, gc_type, cap);
#else
GarbageCollect(force_major || heap_census, heap_census, 0, cap);
......@@ -1494,7 +1547,7 @@ delete_threads_and_gc:
}
#if defined(THREADED_RTS)
if (gc_type == PENDING_GC_PAR)
if (gc_type == SYNC_GC_PAR)
{
releaseGCThreads(cap);
}
......@@ -1526,7 +1579,7 @@ delete_threads_and_gc:
#endif
#if defined(THREADED_RTS)
if (gc_type == PENDING_GC_SEQ) {
if (gc_type == SYNC_GC_SEQ) {
// release our stash of capabilities.
for (i = 0; i < n_capabilities; i++) {
if (cap != &capabilities[i]) {
......@@ -1561,26 +1614,41 @@ forkProcess(HsStablePtr *entry
StgTSO* t,*next;
Capability *cap;
nat g;
#if defined(THREADED_RTS)
if (RtsFlags.ParFlags.nNodes > 1) {
errorBelch("forking not supported with +RTS -N<n> greater than 1");
stg_exit(EXIT_FAILURE);
}
Task *task = NULL;
nat i;
#ifdef THREADED_RTS
nat sync;
#endif
debugTrace(DEBUG_sched, "forking!");
// ToDo: for SMP, we should probably acquire *all* the capabilities
cap = rts_lock();
task = newBoundTask();
cap = NULL;
waitForReturnCapability(&cap, task);
#ifdef THREADED_RTS
do {
sync = requestSync(&cap, task, SYNC_FORK);
} while (sync);
acquireAllCapabilities(cap,task);
pending_sync = 0;
#endif
// no funny business: hold locks while we fork, otherwise if some
// other thread is holding a lock when the fork happens, the data
// structure protected by the lock will forever be in an
// inconsistent state in the child. See also #1391.
ACQUIRE_LOCK(&sched_mutex);
ACQUIRE_LOCK(&cap->lock);
ACQUIRE_LOCK(&cap->running_task->lock);
ACQUIRE_LOCK(&sm_mutex);
ACQUIRE_LOCK(&stable_mutex);
ACQUIRE_LOCK(&task->lock);
for (i=0; i < n_capabilities; i++) {
ACQUIRE_LOCK(&capabilities[i].lock);
}
stopTimer(); // See #4074
......@@ -1595,19 +1663,30 @@ forkProcess(HsStablePtr *entry
startTimer(); // #4074
RELEASE_LOCK(&sched_mutex);
RELEASE_LOCK(&cap->lock);
RELEASE_LOCK(&cap->running_task->lock);
RELEASE_LOCK(&sm_mutex);
RELEASE_LOCK(&stable_mutex);
RELEASE_LOCK(&task->lock);
for (i=0; i < n_capabilities; i++) {
releaseCapability_(&capabilities[i],rtsFalse);
RELEASE_LOCK(&capabilities[i].lock);
}
boundTaskExiting(task);
// just return the pid
rts_unlock(cap);
return pid;
return pid;
} else { // child
#if defined(THREADED_RTS)
initMutex(&sched_mutex);
initMutex(&cap->lock);
initMutex(&cap->running_task->lock);
initMutex(&sm_mutex);
initMutex(&stable_mutex);
initMutex(&task->lock);
for (i=0; i < n_capabilities; i++) {
initMutex(&capabilities[i].lock);
}
#endif
#ifdef TRACING
......@@ -1626,7 +1705,7 @@ forkProcess(HsStablePtr *entry
// don't allow threads to catch the ThreadKilled
// exception, but we do want to raiseAsync() because these
// threads may be evaluating thunks that we need later.
deleteThread_(cap,t);
deleteThread_(t->cap,t);
// stop the GC from updating the InCall to point to
// the TSO. This is only necessary because the
......@@ -1637,44 +1716,58 @@ forkProcess(HsStablePtr *entry
}
}
// Empty the run queue. It seems tempting to let all the
// killed threads stay on the run queue as zombies to be
// cleaned up later, but some of them correspond to bound
// threads for which the corresponding Task does not exist.
cap->run_queue_hd = END_TSO_QUEUE;
cap->run_queue_tl = END_TSO_QUEUE;
// Any suspended C-calling Tasks are no more, their OS threads
// don't exist now:
cap->suspended_ccalls = NULL;