Commit e8c93ad1 authored by dimitris's avatar dimitris
Browse files

Merge branch 'master' of http://darcs.haskell.org//ghc

parents bd13338d 3d7e772f
......@@ -34,6 +34,7 @@
Capability MainCapability;
nat n_capabilities = 0;
nat enabled_capabilities = 0;
Capability *capabilities = NULL;
// Holds the Capability which last became free. This is used so that
......@@ -323,6 +324,8 @@ initCapabilities( void )
#endif
enabled_capabilities = n_capabilities;
// There are no free capabilities to begin with. We will start
// a worker Task to each Capability, which will quickly put the
// Capability on the free list when it finds nothing to do.
......@@ -493,7 +496,7 @@ releaseCapability_ (Capability* cap,
// anything else to do, give the Capability to a worker thread.
if (always_wakeup ||
!emptyRunQueue(cap) || !emptyInbox(cap) ||
!emptySparkPoolCap(cap) || globalWorkToDo()) {
(!cap->disabled && !emptySparkPoolCap(cap)) || globalWorkToDo()) {
if (cap->spare_workers) {
giveCapabilityToTask(cap,cap->spare_workers);
// The worker Task pops itself from the queue;
......@@ -682,7 +685,8 @@ yieldCapability (Capability** pCap, Task *task)
gcWorkerThread(cap);
traceEventGcEnd(cap);
traceSparkCounters(cap);
return;
// See Note [migrated bound threads 2]
if (task->cap == cap) return;
}
debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
......@@ -768,6 +772,17 @@ yieldCapability (Capability** pCap, Task *task)
// hold Capabilty C, and task->cap == C, then task cannot be
// migrated under our feet.
// Note [migrated bound threads 2]
//
// Second tricky case;
// - A bound Task becomes a GC thread
// - scheduleDoGC() migrates the thread belonging to this Task,
// because the Capability it is on is disabled
// - after GC, gcWorkerThread() returns, but now we are
// holding a Capability that is not the same as task->cap
// - Hence we must check for this case and immediately give up the
// cap we hold.
/* ----------------------------------------------------------------------------
* prodCapability
*
......
......@@ -49,6 +49,8 @@ struct Capability_ {
// Has there been any activity on this Capability since the last GC?
nat idle;
rtsBool disabled;
// The run queue. The Task owning this Capability has exclusive
// access to its run queue, so can wake up threads without
// taking a lock, and the common path through the scheduler is
......@@ -197,6 +199,8 @@ INLINE_HEADER void releaseCapability_ (Capability* cap STG_UNUSED,
// declared in includes/rts/Threads.h:
// extern nat n_capabilities;
extern nat enabled_capabilities;
// Array of all the capabilities
//
extern Capability *capabilities;
......
......@@ -133,7 +133,7 @@ static Capability *schedule (Capability *initialCapability, Task *task);
// scheduler clearer.
//
static void schedulePreLoop (void);
static void scheduleFindWork (Capability *cap);
static void scheduleFindWork (Capability **pcap);
#if defined(THREADED_RTS)
static void scheduleYield (Capability **pcap, Task *task);
#endif
......@@ -145,8 +145,8 @@ static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
static void scheduleProcessInbox(Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
static void scheduleProcessInbox(Capability **cap);
static void scheduleDetectDeadlock (Capability **pcap, Task *task);
static void schedulePushWork(Capability *cap, Task *task);
#if defined(THREADED_RTS)
static void scheduleActivateSpark(Capability *cap);
......@@ -159,8 +159,7 @@ static void scheduleHandleThreadBlocked( StgTSO *t );
static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
StgTSO *t );
static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
static Capability *scheduleDoGC(Capability *cap, Task *task,
rtsBool force_major);
static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
static void deleteThread (Capability *cap, StgTSO *tso);
static void deleteAllThreads (Capability *cap);
......@@ -281,7 +280,7 @@ schedule (Capability *initialCapability, Task *task)
case SCHED_INTERRUPTING:
debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
/* scheduleDoGC() deletes all the threads */
cap = scheduleDoGC(cap,task,rtsFalse);
scheduleDoGC(&cap,task,rtsFalse);
// after scheduleDoGC(), we must be shutting down. Either some
// other Capability did the final GC, or we did it above,
......@@ -303,17 +302,13 @@ schedule (Capability *initialCapability, Task *task)
barf("sched_state: %d", sched_state);
}
scheduleFindWork(cap);
scheduleFindWork(&cap);
/* work pushing, currently relevant only for THREADED_RTS:
(pushes threads, wakes up idle capabilities for stealing) */
schedulePushWork(cap,task);
scheduleDetectDeadlock(cap,task);
#if defined(THREADED_RTS)
cap = task->cap; // reload cap, it might have changed
#endif
scheduleDetectDeadlock(&cap,task);
// Normally, the only way we can get here with no threads to
// run is if a keyboard interrupt received during
......@@ -396,6 +391,26 @@ schedule (Capability *initialCapability, Task *task)
deleteThread(cap,t);
}
// If this capability is disabled, migrate the thread away rather
// than running it. NB. but not if the thread is bound: it is
// really hard for a bound thread to migrate itself. Believe me,
// I tried several ways and couldn't find a way to do it.
// Instead, when everything is stopped for GC, we migrate all the
// threads on the run queue then (see scheduleDoGC()).
//
// ToDo: what about TSO_LOCKED? Currently we're migrating those
// when the number of capabilities drops, but we never migrate
// them back if it rises again. Presumably we should, but after
// the thread has been migrated we no longer know what capability
// it was originally on.
#ifdef THREADED_RTS
if (cap->disabled && !t->bound) {
Capability *dest_cap = &capabilities[cap->no % enabled_capabilities];
migrateThread(cap, t, dest_cap);
continue;
}
#endif
/* context switches are initiated by the timer signal, unless
* the user specified "context switch as often as possible", with
* +RTS -C0
......@@ -558,7 +573,7 @@ run_thread:
}
if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
cap = scheduleDoGC(cap,task,rtsFalse);
scheduleDoGC(&cap,task,rtsFalse);
}
} /* end of while() */
}
......@@ -608,16 +623,16 @@ schedulePreLoop(void)
* -------------------------------------------------------------------------- */
static void
scheduleFindWork (Capability *cap)
scheduleFindWork (Capability **pcap)
{
scheduleStartSignalHandlers(cap);
scheduleStartSignalHandlers(*pcap);
scheduleProcessInbox(cap);
scheduleProcessInbox(pcap);
scheduleCheckBlockedThreads(cap);
scheduleCheckBlockedThreads(*pcap);
#if defined(THREADED_RTS)
if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
#endif
}
......@@ -707,10 +722,10 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
// First grab as many free Capabilities as we can.
for (i=0, n_free_caps=0; i < n_capabilities; i++) {
cap0 = &capabilities[i];
if (cap != cap0 && tryGrabCapability(cap0,task)) {
if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
if (!emptyRunQueue(cap0)
|| cap->returning_tasks_hd != NULL
|| cap->inbox != (Message*)END_TSO_QUEUE) {
|| cap0->returning_tasks_hd != NULL
|| cap0->inbox != (Message*)END_TSO_QUEUE) {
// it already has some work, we just grabbed it at
// the wrong moment. Or maybe it's deadlocked!
releaseCapability(cap0);
......@@ -869,9 +884,10 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
* ------------------------------------------------------------------------- */
static void
scheduleDetectDeadlock (Capability *cap, Task *task)
scheduleDetectDeadlock (Capability **pcap, Task *task)
{
/*
Capability *cap = *pcap;
/*
* Detect deadlock: when we have no threads to run, there are no
* threads blocked, waiting for I/O, or sleeping, and all the
* other tasks are waiting for work, we must have a deadlock of
......@@ -896,7 +912,8 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
// they are unreachable and will therefore be sent an
// exception. Any threads thus released will be immediately
// runnable.
cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
cap = *pcap;
// when force_major == rtsTrue. scheduleDoGC sets
// recent_activity to ACTIVITY_DONE_GC and turns off the timer
// signal.
......@@ -976,16 +993,18 @@ scheduleSendPendingMessages(void)
* ------------------------------------------------------------------------- */
static void
scheduleProcessInbox (Capability *cap USED_IF_THREADS)
scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
{
#if defined(THREADED_RTS)
Message *m, *next;
int r;
Capability *cap = *pcap;
while (!emptyInbox(cap)) {
if (cap->r.rCurrentNursery->link == NULL ||
g0->n_new_large_words >= large_alloc_lim) {
scheduleDoGC(cap, cap->running_task, rtsFalse);
scheduleDoGC(pcap, cap->running_task, rtsFalse);
cap = *pcap;
}
// don't use a blocking acquire; if the lock is held by
......@@ -1023,7 +1042,7 @@ scheduleProcessInbox (Capability *cap USED_IF_THREADS)
static void
scheduleActivateSpark(Capability *cap)
{
if (anySparks())
if (anySparks() && !cap->disabled)
{
createSparkThread(cap);
debugTrace(DEBUG_sched, "creating a spark thread");
......@@ -1415,21 +1434,24 @@ static void releaseAllCapabilities(Capability *cap, Task *task)
* Perform a garbage collection if necessary
* -------------------------------------------------------------------------- */
static Capability *
scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
static void
scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
rtsBool force_major)
{
Capability *cap = *pcap;
rtsBool heap_census;
#ifdef THREADED_RTS
rtsBool idle_cap[n_capabilities];
rtsBool gc_type;
nat i, sync;
StgTSO *tso;
#endif
if (sched_state == SCHED_SHUTTING_DOWN) {
// The final GC has already been done, and the system is
// shutting down. We'll probably deadlock if we try to GC
// now.
return cap;
return;
}
#ifdef THREADED_RTS
......@@ -1459,12 +1481,19 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
yieldCapability() and releaseCapability() in Capability.c */
do {
sync = requestSync(&cap, task, gc_type);
sync = requestSync(pcap, task, gc_type);
cap = *pcap;
if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) {
// someone else had a pending sync request for a GC, so
// let's assume GC has been done and we don't need to GC
// again.
return cap;
return;
}
if (sched_state == SCHED_SHUTTING_DOWN) {
// The scheduler might now be shutting down. We tested
// this above, but it might have become true since then as
// we yielded the capability in requestSync().
return;
}
} while (sync);
......@@ -1502,11 +1531,18 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
|| (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
N >= RtsFlags.ParFlags.parGcLoadBalancingGen)) {
for (i=0; i < n_capabilities; i++) {
idle_cap[i] = rtsFalse;
if (capabilities[i].disabled) {
idle_cap[i] = tryGrabCapability(&capabilities[i], task);
} else {
idle_cap[i] = rtsFalse;
}
}
} else {
for (i=0; i < n_capabilities; i++) {
if (i == cap->no || capabilities[i].idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) {
if (capabilities[i].disabled) {
idle_cap[i] = tryGrabCapability(&capabilities[i], task);
} else if (i == cap->no ||
capabilities[i].idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) {
idle_cap[i] = rtsFalse;
} else {
idle_cap[i] = tryGrabCapability(&capabilities[i], task);
......@@ -1570,6 +1606,29 @@ delete_threads_and_gc:
sched_state = SCHED_SHUTTING_DOWN;
}
/*
* When there are disabled capabilities, we want to migrate any
* threads away from them. Normally this happens in the
* scheduler's loop, but only for unbound threads - it's really
* hard for a bound thread to migrate itself. So we have another
* go here.
*/
#if defined(THREADED_RTS)
for (i = enabled_capabilities; i < n_capabilities; i++) {
Capability *tmp_cap, *dest_cap;
tmp_cap = &capabilities[i];
ASSERT(tmp_cap->disabled);
if (i != cap->no) {
dest_cap = &capabilities[i % enabled_capabilities];
while (!emptyRunQueue(tmp_cap)) {
tso = popRunQueue(tmp_cap);
migrateThread(tmp_cap, tso, dest_cap);
if (tso->bound) { tso->bound->task->cap = dest_cap; }
}
}
}
#endif
heap_census = scheduleNeedHeapProfile(rtsTrue);
traceEventGcStart(cap);
......@@ -1663,7 +1722,7 @@ delete_threads_and_gc:
}
#endif
return cap;
return;
}
/* ---------------------------------------------------------------------------
......@@ -1848,7 +1907,7 @@ forkProcess(HsStablePtr *entry
}
/* ---------------------------------------------------------------------------
* Increase the number of Capabilities
* Changing the number of Capabilities
*
* Changing the number of Capabilities is very tricky! We can only do
* it with the system fully stopped, so we do a full sync with
......@@ -1873,17 +1932,13 @@ setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
Capability *cap;
nat sync;
StgTSO* t;
nat g;
Capability *old_capabilities;
if (new_n_capabilities == n_capabilities) return;
nat g, n;
Capability *old_capabilities = NULL;
if (new_n_capabilities < n_capabilities) {
barf("setNumCapabilities: reducing the number of Capabilities is not currently supported.");
}
if (new_n_capabilities == enabled_capabilities) return;
debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
n_capabilities, new_n_capabilities);
enabled_capabilities, new_n_capabilities);
cap = rts_lock();
task = cap->running_task;
......@@ -1896,31 +1951,76 @@ setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
pending_sync = 0;
if (new_n_capabilities < enabled_capabilities)
{
// Reducing the number of capabilities: we do not actually
// remove the extra capabilities, we just mark them as
// "disabled". This has the following effects:
//
// - threads on a disabled capability are migrated away by the
// scheduler loop
//
// - disabled capabilities do not participate in GC
// (see scheduleDoGC())
//
// - No spark threads are created on this capability
// (see scheduleActivateSpark())
//
// - We do not attempt to migrate threads *to* a disabled
// capability (see schedulePushWork()).
//
// but in other respects, a disabled capability remains
// alive. Threads may be woken up on a disabled capability,
// but they will be immediately migrated away.
//
// This approach is much easier than trying to actually remove
// the capability; we don't have to worry about GC data
// structures, the nursery, etc.
//
for (n = new_n_capabilities; n < enabled_capabilities; n++) {
capabilities[n].disabled = rtsTrue;
}
enabled_capabilities = new_n_capabilities;
}
else
{
// Increasing the number of enabled capabilities.
//
// enable any disabled capabilities, up to the required number
for (n = enabled_capabilities;
n < new_n_capabilities && n < n_capabilities; n++) {
capabilities[n].disabled = rtsFalse;
}
enabled_capabilities = n;
if (new_n_capabilities > n_capabilities) {
#if defined(TRACING)
// Allocate eventlog buffers for the new capabilities. Note this
// must be done before calling moreCapabilities(), because that
// will emit events to add the new capabilities to capsets.
tracingAddCapapilities(n_capabilities, new_n_capabilities);
// Allocate eventlog buffers for the new capabilities. Note this
// must be done before calling moreCapabilities(), because that
// will emit events to add the new capabilities to capsets.
tracingAddCapapilities(n_capabilities, new_n_capabilities);
#endif
// Resize the capabilities array
// NB. after this, capabilities points somewhere new. Any pointers
// of type (Capability *) are now invalid.
old_capabilities = moreCapabilities(n_capabilities, new_n_capabilities);
// Resize the capabilities array
// NB. after this, capabilities points somewhere new. Any pointers
// of type (Capability *) are now invalid.
old_capabilities = moreCapabilities(n_capabilities, new_n_capabilities);
// update our own cap pointer
cap = &capabilities[cap->no];
// update our own cap pointer
cap = &capabilities[cap->no];
// Resize and update storage manager data structures
storageAddCapabilities(n_capabilities, new_n_capabilities);
// Resize and update storage manager data structures
storageAddCapabilities(n_capabilities, new_n_capabilities);
// Update (Capability *) refs in the Task manager.
updateCapabilityRefs();
// Update (Capability *) refs in the Task manager.
updateCapabilityRefs();
// Update (Capability *) refs from TSOs
for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
for (t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) {
t->cap = &capabilities[t->cap->no];
// Update (Capability *) refs from TSOs
for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
for (t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) {
t->cap = &capabilities[t->cap->no];
}
}
}
}
......@@ -1931,7 +2031,9 @@ setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
startWorkerTasks(n_capabilities, new_n_capabilities);
// finally, update n_capabilities
n_capabilities = new_n_capabilities;
if (new_n_capabilities > n_capabilities) {
n_capabilities = enabled_capabilities = new_n_capabilities;
}
// We can't free the old array until now, because we access it
// while updating pointers in updateCapabilityRefs().
......@@ -2177,7 +2279,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
// move this thread from now on.
#if defined(THREADED_RTS)
cpu %= n_capabilities;
cpu %= enabled_capabilities;
if (cpu == cap->no) {
appendToRunQueue(cap,tso);
} else {
......@@ -2332,10 +2434,11 @@ exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
// If we haven't killed all the threads yet, do it now.
if (sched_state < SCHED_SHUTTING_DOWN) {
sched_state = SCHED_INTERRUPTING;
waitForReturnCapability(&task->cap,task);
scheduleDoGC(task->cap,task,rtsFalse);
Capability *cap = task->cap;
waitForReturnCapability(&cap,task);
scheduleDoGC(&cap,task,rtsFalse);
ASSERT(task->incall->tso == NULL);
releaseCapability(task->cap);
releaseCapability(cap);
}
sched_state = SCHED_SHUTTING_DOWN;
......@@ -2394,15 +2497,16 @@ static void
performGC_(rtsBool force_major)
{
Task *task;
Capability *cap = NULL;
// We must grab a new Task here, because the existing Task may be
// associated with a particular Capability, and chained onto the
// suspended_ccalls queue.
task = newBoundTask();
waitForReturnCapability(&task->cap,task);
scheduleDoGC(task->cap,task,force_major);
releaseCapability(task->cap);
waitForReturnCapability(&cap,task);
scheduleDoGC(&cap,task,force_major);
releaseCapability(cap);
boundTaskExiting(task);
}
......
......@@ -14,7 +14,7 @@ my $binPath = $FindBin::Bin;
foreach $f ( @ARGV ) {
if ( $f =~ /\.lhs$/ ) {
open(INF, "$binPath/../../inplace/lib/unlit $f - |") || die "Couldn't unlit $f!\n";
open(INF, "$binPath/../../../inplace/lib/unlit $f - |") || die "Couldn't unlit $f!\n";
} else {
open(INF, "< $f") || die "Couldn't open $f!\n";
}
......@@ -22,6 +22,7 @@ foreach $f ( @ARGV ) {
while (<INF>) {
s/--.*//;
s/{-.*-}//;
s/\/\/.*//;
next if /^\s*$/;
$cnt++;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment