Commit 6811e53a authored by Austin Seipp's avatar Austin Seipp
Browse files

[skip ci] rts: Detabify Capability.c


Signed-off-by: default avatarAustin Seipp <austin@well-typed.com>
parent a7ab7d38
...@@ -130,7 +130,7 @@ findSpark (Capability *cap) ...@@ -130,7 +130,7 @@ findSpark (Capability *cap)
if (n_capabilities == 1) { return NULL; } // makes no sense... if (n_capabilities == 1) { return NULL; } // makes no sense...
debugTrace(DEBUG_sched, debugTrace(DEBUG_sched,
"cap %d: Trying to steal work from other capabilities", "cap %d: Trying to steal work from other capabilities",
cap->no); cap->no);
/* visit cap.s 0..n-1 in sequence until a theft succeeds. We could /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
...@@ -158,7 +158,7 @@ findSpark (Capability *cap) ...@@ -158,7 +158,7 @@ findSpark (Capability *cap)
if (spark != NULL) { if (spark != NULL) {
cap->spark_stats.converted++; cap->spark_stats.converted++;
traceEventSparkSteal(cap, robbed->no); traceEventSparkSteal(cap, robbed->no);
return spark; return spark;
} }
// otherwise: no success, try next one // otherwise: no success, try next one
...@@ -200,10 +200,10 @@ newReturningTask (Capability *cap, Task *task) ...@@ -200,10 +200,10 @@ newReturningTask (Capability *cap, Task *task)
ASSERT_LOCK_HELD(&cap->lock); ASSERT_LOCK_HELD(&cap->lock);
ASSERT(task->next == NULL); ASSERT(task->next == NULL);
if (cap->returning_tasks_hd) { if (cap->returning_tasks_hd) {
ASSERT(cap->returning_tasks_tl->next == NULL); ASSERT(cap->returning_tasks_tl->next == NULL);
cap->returning_tasks_tl->next = task; cap->returning_tasks_tl->next = task;
} else { } else {
cap->returning_tasks_hd = task; cap->returning_tasks_hd = task;
} }
cap->returning_tasks_tl = task; cap->returning_tasks_tl = task;
} }
...@@ -217,7 +217,7 @@ popReturningTask (Capability *cap) ...@@ -217,7 +217,7 @@ popReturningTask (Capability *cap)
ASSERT(task); ASSERT(task);
cap->returning_tasks_hd = task->next; cap->returning_tasks_hd = task->next;
if (!cap->returning_tasks_hd) { if (!cap->returning_tasks_hd) {
cap->returning_tasks_tl = NULL; cap->returning_tasks_tl = NULL;
} }
task->next = NULL; task->next = NULL;
return task; return task;
...@@ -270,14 +270,14 @@ initCapability( Capability *cap, nat i ) ...@@ -270,14 +270,14 @@ initCapability( Capability *cap, nat i )
cap->f.stgGCFun = (StgFunPtr)__stg_gc_fun; cap->f.stgGCFun = (StgFunPtr)__stg_gc_fun;
cap->mut_lists = stgMallocBytes(sizeof(bdescr *) * cap->mut_lists = stgMallocBytes(sizeof(bdescr *) *
RtsFlags.GcFlags.generations, RtsFlags.GcFlags.generations,
"initCapability"); "initCapability");
cap->saved_mut_lists = stgMallocBytes(sizeof(bdescr *) * cap->saved_mut_lists = stgMallocBytes(sizeof(bdescr *) *
RtsFlags.GcFlags.generations, RtsFlags.GcFlags.generations,
"initCapability"); "initCapability");
for (g = 0; g < RtsFlags.GcFlags.generations; g++) { for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
cap->mut_lists[g] = NULL; cap->mut_lists[g] = NULL;
} }
cap->weak_ptr_list_hd = NULL; cap->weak_ptr_list_hd = NULL;
...@@ -326,8 +326,8 @@ initCapabilities( void ) ...@@ -326,8 +326,8 @@ initCapabilities( void )
#ifndef REG_Base #ifndef REG_Base
// We can't support multiple CPUs if BaseReg is not a register // We can't support multiple CPUs if BaseReg is not a register
if (RtsFlags.ParFlags.nNodes > 1) { if (RtsFlags.ParFlags.nNodes > 1) {
errorBelch("warning: multiple CPUs not supported in this build, reverting to 1"); errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
RtsFlags.ParFlags.nNodes = 1; RtsFlags.ParFlags.nNodes = 1;
} }
#endif #endif
...@@ -364,7 +364,7 @@ moreCapabilities (nat from USED_IF_THREADS, nat to USED_IF_THREADS) ...@@ -364,7 +364,7 @@ moreCapabilities (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
if (to == 1) { if (to == 1) {
// THREADED_RTS must work on builds that don't have a mutable // THREADED_RTS must work on builds that don't have a mutable
// BaseReg (eg. unregisterised), so in this case // BaseReg (eg. unregisterised), so in this case
// capabilities[0] must coincide with &MainCapability. // capabilities[0] must coincide with &MainCapability.
capabilities[0] = &MainCapability; capabilities[0] = &MainCapability;
initCapability(&MainCapability, 0); initCapability(&MainCapability, 0);
} }
...@@ -455,7 +455,7 @@ giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task) ...@@ -455,7 +455,7 @@ giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
#if defined(THREADED_RTS) #if defined(THREADED_RTS)
void void
releaseCapability_ (Capability* cap, releaseCapability_ (Capability* cap,
rtsBool always_wakeup) rtsBool always_wakeup)
{ {
Task *task; Task *task;
...@@ -469,9 +469,9 @@ releaseCapability_ (Capability* cap, ...@@ -469,9 +469,9 @@ releaseCapability_ (Capability* cap,
// Check to see whether a worker thread can be given // Check to see whether a worker thread can be given
// the go-ahead to return the result of an external call.. // the go-ahead to return the result of an external call..
if (cap->returning_tasks_hd != NULL) { if (cap->returning_tasks_hd != NULL) {
giveCapabilityToTask(cap,cap->returning_tasks_hd); giveCapabilityToTask(cap,cap->returning_tasks_hd);
// The Task pops itself from the queue (see waitForReturnCapability()) // The Task pops itself from the queue (see waitForReturnCapability())
return; return;
} }
// If there is a pending sync, then we should just leave the // If there is a pending sync, then we should just leave the
...@@ -481,44 +481,44 @@ releaseCapability_ (Capability* cap, ...@@ -481,44 +481,44 @@ releaseCapability_ (Capability* cap,
last_free_capability = cap; // needed? last_free_capability = cap; // needed?
debugTrace(DEBUG_sched, "sync pending, set capability %d free", cap->no); debugTrace(DEBUG_sched, "sync pending, set capability %d free", cap->no);
return; return;
} }
// If the next thread on the run queue is a bound thread, // If the next thread on the run queue is a bound thread,
// give this Capability to the appropriate Task. // give this Capability to the appropriate Task.
if (!emptyRunQueue(cap) && peekRunQueue(cap)->bound) { if (!emptyRunQueue(cap) && peekRunQueue(cap)->bound) {
// Make sure we're not about to try to wake ourselves up // Make sure we're not about to try to wake ourselves up
// ASSERT(task != cap->run_queue_hd->bound); // ASSERT(task != cap->run_queue_hd->bound);
// assertion is false: in schedule() we force a yield after // assertion is false: in schedule() we force a yield after
// ThreadBlocked, but the thread may be back on the run queue // ThreadBlocked, but the thread may be back on the run queue
// by now. // by now.
task = peekRunQueue(cap)->bound->task; task = peekRunQueue(cap)->bound->task;
giveCapabilityToTask(cap, task); giveCapabilityToTask(cap, task);
return; return;
} }
if (!cap->spare_workers) { if (!cap->spare_workers) {
// Create a worker thread if we don't have one. If the system // Create a worker thread if we don't have one. If the system
// is interrupted, we only create a worker task if there // is interrupted, we only create a worker task if there
// are threads that need to be completed. If the system is // are threads that need to be completed. If the system is
// shutting down, we never create a new worker. // shutting down, we never create a new worker.
if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) { if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
debugTrace(DEBUG_sched, debugTrace(DEBUG_sched,
"starting new worker on capability %d", cap->no); "starting new worker on capability %d", cap->no);
startWorkerTask(cap); startWorkerTask(cap);
return; return;
} }
} }
// If we have an unbound thread on the run queue, or if there's // If we have an unbound thread on the run queue, or if there's
// anything else to do, give the Capability to a worker thread. // anything else to do, give the Capability to a worker thread.
if (always_wakeup || if (always_wakeup ||
!emptyRunQueue(cap) || !emptyInbox(cap) || !emptyRunQueue(cap) || !emptyInbox(cap) ||
(!cap->disabled && !emptySparkPoolCap(cap)) || globalWorkToDo()) { (!cap->disabled && !emptySparkPoolCap(cap)) || globalWorkToDo()) {
if (cap->spare_workers) { if (cap->spare_workers) {
giveCapabilityToTask(cap, cap->spare_workers); giveCapabilityToTask(cap, cap->spare_workers);
// The worker Task pops itself from the queue; // The worker Task pops itself from the queue;
return; return;
} }
} }
#ifdef PROFILING #ifdef PROFILING
...@@ -612,29 +612,29 @@ waitForReturnCapability (Capability **pCap, Task *task) ...@@ -612,29 +612,29 @@ waitForReturnCapability (Capability **pCap, Task *task)
Capability *cap = *pCap; Capability *cap = *pCap;
if (cap == NULL) { if (cap == NULL) {
// Try last_free_capability first // Try last_free_capability first
cap = last_free_capability; cap = last_free_capability;
if (cap->running_task) { if (cap->running_task) {
nat i; nat i;
// otherwise, search for a free capability // otherwise, search for a free capability
cap = NULL; cap = NULL;
for (i = 0; i < n_capabilities; i++) { for (i = 0; i < n_capabilities; i++) {
if (!capabilities[i]->running_task) { if (!capabilities[i]->running_task) {
cap = capabilities[i]; cap = capabilities[i];
break; break;
} }
} }
if (cap == NULL) { if (cap == NULL) {
// Can't find a free one, use last_free_capability. // Can't find a free one, use last_free_capability.
cap = last_free_capability; cap = last_free_capability;
} }
} }
// record the Capability as the one this Task is now assocated with. // record the Capability as the one this Task is now assocated with.
task->cap = cap; task->cap = cap;
} else { } else {
ASSERT(task->cap == cap); ASSERT(task->cap == cap);
} }
ACQUIRE_LOCK(&cap->lock); ACQUIRE_LOCK(&cap->lock);
...@@ -642,36 +642,36 @@ waitForReturnCapability (Capability **pCap, Task *task) ...@@ -642,36 +642,36 @@ waitForReturnCapability (Capability **pCap, Task *task)
debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no); debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no);
if (!cap->running_task) { if (!cap->running_task) {
// It's free; just grab it // It's free; just grab it
cap->running_task = task; cap->running_task = task;
RELEASE_LOCK(&cap->lock); RELEASE_LOCK(&cap->lock);
} else { } else {
newReturningTask(cap,task); newReturningTask(cap,task);
RELEASE_LOCK(&cap->lock); RELEASE_LOCK(&cap->lock);
for (;;) { for (;;) {
ACQUIRE_LOCK(&task->lock); ACQUIRE_LOCK(&task->lock);
// task->lock held, cap->lock not held // task->lock held, cap->lock not held
if (!task->wakeup) waitCondition(&task->cond, &task->lock); if (!task->wakeup) waitCondition(&task->cond, &task->lock);
cap = task->cap; cap = task->cap;
task->wakeup = rtsFalse; task->wakeup = rtsFalse;
RELEASE_LOCK(&task->lock); RELEASE_LOCK(&task->lock);
// now check whether we should wake up... // now check whether we should wake up...
ACQUIRE_LOCK(&cap->lock); ACQUIRE_LOCK(&cap->lock);
if (cap->running_task == NULL) { if (cap->running_task == NULL) {
if (cap->returning_tasks_hd != task) { if (cap->returning_tasks_hd != task) {
giveCapabilityToTask(cap,cap->returning_tasks_hd); giveCapabilityToTask(cap,cap->returning_tasks_hd);
RELEASE_LOCK(&cap->lock); RELEASE_LOCK(&cap->lock);
continue; continue;
} }
cap->running_task = task; cap->running_task = task;
popReturningTask(cap); popReturningTask(cap);
RELEASE_LOCK(&cap->lock); RELEASE_LOCK(&cap->lock);
break; break;
} }
RELEASE_LOCK(&cap->lock); RELEASE_LOCK(&cap->lock);
} }
} }
...@@ -710,60 +710,60 @@ yieldCapability (Capability** pCap, Task *task, rtsBool gcAllowed) ...@@ -710,60 +710,60 @@ yieldCapability (Capability** pCap, Task *task, rtsBool gcAllowed)
} }
} }
debugTrace(DEBUG_sched, "giving up capability %d", cap->no); debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
// We must now release the capability and wait to be woken up // We must now release the capability and wait to be woken up
// again. // again.
task->wakeup = rtsFalse; task->wakeup = rtsFalse;
releaseCapabilityAndQueueWorker(cap); releaseCapabilityAndQueueWorker(cap);
for (;;) { for (;;) {
ACQUIRE_LOCK(&task->lock); ACQUIRE_LOCK(&task->lock);
// task->lock held, cap->lock not held // task->lock held, cap->lock not held
if (!task->wakeup) waitCondition(&task->cond, &task->lock); if (!task->wakeup) waitCondition(&task->cond, &task->lock);
cap = task->cap; cap = task->cap;
task->wakeup = rtsFalse; task->wakeup = rtsFalse;
RELEASE_LOCK(&task->lock); RELEASE_LOCK(&task->lock);
debugTrace(DEBUG_sched, "woken up on capability %d", cap->no); debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);
ACQUIRE_LOCK(&cap->lock); ACQUIRE_LOCK(&cap->lock);
if (cap->running_task != NULL) { if (cap->running_task != NULL) {
debugTrace(DEBUG_sched, debugTrace(DEBUG_sched,
"capability %d is owned by another task", cap->no); "capability %d is owned by another task", cap->no);
RELEASE_LOCK(&cap->lock); RELEASE_LOCK(&cap->lock);
continue; continue;
} }
if (task->cap != cap) { if (task->cap != cap) {
// see Note [migrated bound threads] // see Note [migrated bound threads]
debugTrace(DEBUG_sched, debugTrace(DEBUG_sched,
"task has been migrated to cap %d", task->cap->no); "task has been migrated to cap %d", task->cap->no);
RELEASE_LOCK(&cap->lock); RELEASE_LOCK(&cap->lock);
continue; continue;
} }
if (task->incall->tso == NULL) { if (task->incall->tso == NULL) {
ASSERT(cap->spare_workers != NULL); ASSERT(cap->spare_workers != NULL);
// if we're not at the front of the queue, release it // if we're not at the front of the queue, release it
// again. This is unlikely to happen. // again. This is unlikely to happen.
if (cap->spare_workers != task) { if (cap->spare_workers != task) {
giveCapabilityToTask(cap,cap->spare_workers); giveCapabilityToTask(cap,cap->spare_workers);
RELEASE_LOCK(&cap->lock); RELEASE_LOCK(&cap->lock);
continue; continue;
} }
cap->spare_workers = task->next; cap->spare_workers = task->next;
task->next = NULL; task->next = NULL;
cap->n_spare_workers--; cap->n_spare_workers--;
} }
cap->running_task = task; cap->running_task = task;
RELEASE_LOCK(&cap->lock); RELEASE_LOCK(&cap->lock);
break; break;
} }
debugTrace(DEBUG_sched, "resuming capability %d", cap->no); debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
ASSERT(cap->running_task == task); ASSERT(cap->running_task == task);
#ifdef PROFILING #ifdef PROFILING
cap->r.rCCCS = CCS_SYSTEM; cap->r.rCCCS = CCS_SYSTEM;
...@@ -807,7 +807,7 @@ yieldCapability (Capability** pCap, Task *task, rtsBool gcAllowed) ...@@ -807,7 +807,7 @@ yieldCapability (Capability** pCap, Task *task, rtsBool gcAllowed)
/* ---------------------------------------------------------------------------- /* ----------------------------------------------------------------------------
* prodCapability * prodCapability
* *
* If a Capability is currently idle, wake up a Task on it. Used to * If a Capability is currently idle, wake up a Task on it. Used to
* get every Capability into the GC. * get every Capability into the GC.
* ------------------------------------------------------------------------- */ * ------------------------------------------------------------------------- */
...@@ -835,8 +835,8 @@ tryGrabCapability (Capability *cap, Task *task) ...@@ -835,8 +835,8 @@ tryGrabCapability (Capability *cap, Task *task)
if (cap->running_task != NULL) return rtsFalse; if (cap->running_task != NULL) return rtsFalse;
ACQUIRE_LOCK(&cap->lock); ACQUIRE_LOCK(&cap->lock);
if (cap->running_task != NULL) { if (cap->running_task != NULL) {
RELEASE_LOCK(&cap->lock); RELEASE_LOCK(&cap->lock);
return rtsFalse; return rtsFalse;
} }
task->cap = cap; task->cap = cap;
cap->running_task = task; cap->running_task = task;
...@@ -881,16 +881,16 @@ shutdownCapability (Capability *cap USED_IF_THREADS, ...@@ -881,16 +881,16 @@ shutdownCapability (Capability *cap USED_IF_THREADS,
for (i = 0; /* i < 50 */; i++) { for (i = 0; /* i < 50 */; i++) {
ASSERT(sched_state == SCHED_SHUTTING_DOWN); ASSERT(sched_state == SCHED_SHUTTING_DOWN);
debugTrace(DEBUG_sched, debugTrace(DEBUG_sched,
"shutting down capability %d, attempt %d", cap->no, i); "shutting down capability %d, attempt %d", cap->no, i);
ACQUIRE_LOCK(&cap->lock); ACQUIRE_LOCK(&cap->lock);
if (cap->running_task) { if (cap->running_task) {
RELEASE_LOCK(&cap->lock); RELEASE_LOCK(&cap->lock);
debugTrace(DEBUG_sched, "not owner, yielding"); debugTrace(DEBUG_sched, "not owner, yielding");
yieldThread(); yieldThread();
continue; continue;
} }
cap->running_task = task; cap->running_task = task;
if (cap->spare_workers) { if (cap->spare_workers) {
// Look for workers that have died without removing // Look for workers that have died without removing
...@@ -903,7 +903,7 @@ shutdownCapability (Capability *cap USED_IF_THREADS, ...@@ -903,7 +903,7 @@ shutdownCapability (Capability *cap USED_IF_THREADS,
prev = NULL; prev = NULL;
for (t = cap->spare_workers; t != NULL; t = t->next) { for (t = cap->spare_workers; t != NULL; t = t->next) {
if (!osThreadIsAlive(t->id)) { if (!osThreadIsAlive(t->id)) {
debugTrace(DEBUG_sched, debugTrace(DEBUG_sched,
"worker thread %p has died unexpectedly", (void *)(size_t)t->id); "worker thread %p has died unexpectedly", (void *)(size_t)t->id);
cap->n_spare_workers--; cap->n_spare_workers--;
if (!prev) { if (!prev) {
...@@ -916,14 +916,14 @@ shutdownCapability (Capability *cap USED_IF_THREADS, ...@@ -916,14 +916,14 @@ shutdownCapability (Capability *cap USED_IF_THREADS,
} }
} }
if (!emptyRunQueue(cap) || cap->spare_workers) { if (!emptyRunQueue(cap) || cap->spare_workers) {
debugTrace(DEBUG_sched, debugTrace(DEBUG_sched,
"runnable threads or workers still alive, yielding"); "runnable threads or workers still alive, yielding");
releaseCapability_(cap,rtsFalse); // this will wake up a worker releaseCapability_(cap,rtsFalse); // this will wake up a worker
RELEASE_LOCK(&cap->lock); RELEASE_LOCK(&cap->lock);
yieldThread(); yieldThread();
continue; continue;
} }
// If "safe", then busy-wait for any threads currently doing // If "safe", then busy-wait for any threads currently doing
// foreign calls. If we're about to unload this DLL, for // foreign calls. If we're about to unload this DLL, for
...@@ -932,10 +932,10 @@ shutdownCapability (Capability *cap USED_IF_THREADS, ...@@ -932,10 +932,10 @@ shutdownCapability (Capability *cap USED_IF_THREADS,
// We can be a bit more relaxed when this is a standalone // We can be a bit more relaxed when this is a standalone
// program that is about to terminate, and let safe=false. // program that is about to terminate, and let safe=false.
if (cap->suspended_ccalls && safe) { if (cap->suspended_ccalls && safe) {
debugTrace(DEBUG_sched, debugTrace(DEBUG_sched,
"thread(s) are involved in foreign calls, yielding"); "thread(s) are involved in foreign calls, yielding");
cap->running_task = NULL; cap->running_task = NULL;
RELEASE_LOCK(&cap->lock); RELEASE_LOCK(&cap->lock);
// The IO manager thread might have been slow to start up, // The IO manager thread might have been slow to start up,
// so the first attempt to kill it might not have // so the first attempt to kill it might not have
// succeeded. Just in case, try again - the kill message // succeeded. Just in case, try again - the kill message
...@@ -949,14 +949,14 @@ shutdownCapability (Capability *cap USED_IF_THREADS, ...@@ -949,14 +949,14 @@ shutdownCapability (Capability *cap USED_IF_THREADS,
} }
traceSparkCounters(cap); traceSparkCounters(cap);
RELEASE_LOCK(&cap->lock); RELEASE_LOCK(&cap->lock);
break; break;
} }
// we now have the Capability, its run queue and spare workers // we now have the Capability, its run queue and spare workers
// list are both empty. // list are both empty.
// ToDo: we can't drop this mutex, because there might still be // ToDo: we can't drop this mutex, because there might still be
// threads performing foreign calls that will eventually try to // threads performing foreign calls that will eventually try to
// return via resumeThread() and attempt to grab cap->lock. // return via resumeThread() and attempt to grab cap->lock.
// closeMutex(&cap->lock); // closeMutex(&cap->lock);
#endif #endif
...@@ -1068,7 +1068,7 @@ rtsBool checkSparkCountInvariant (void) ...@@ -1068,7 +1068,7 @@ rtsBool checkSparkCountInvariant (void)
sparks.fizzled += capabilities[i]->spark_stats.fizzled; sparks.fizzled += capabilities[i]->spark_stats.fizzled;
remaining += sparkPoolSize(capabilities[i]->sparks); remaining += sparkPoolSize(capabilities[i]->sparks);
} }
/* The invariant is /* The invariant is
* created = converted + remaining + gcd + fizzled * created = converted + remaining + gcd + fizzled
*/ */
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment