Commit 7effbbbb authored by Simon Marlow's avatar Simon Marlow

Split part of the Task struct into a separate struct InCall

The idea is that this leaves Tasks and OSThread in one-to-one
correspondence.  The part of a Task that represents a call into
Haskell from C is split into a separate struct InCall, pointed to by
the Task and the TSO bound to it.  A given OSThread/Task thus always
uses the same mutex and condition variable, rather than getting a new
one for each callback.  Conceptually it is simpler, although there are
more types and indirections in a few places now.

This improves callback performance by removing some of the locks that
we had to take when making in-calls.  Now we also keep the current Task
in a thread-local variable if supported by the OS and gcc (currently
only Linux).
parent 4e8b07db
......@@ -114,7 +114,7 @@ typedef struct StgTSO_ {
StgTSOBlockInfo block_info;
StgThreadID id;
int saved_errno;
struct Task_* bound;
struct InCall_* bound;
struct Capability_* cap;
struct StgTRecHeader_ * trec; /* STM transaction record */
......
......@@ -173,10 +173,10 @@ STATIC_INLINE void
newReturningTask (Capability *cap, Task *task)
{
ASSERT_LOCK_HELD(&cap->lock);
ASSERT(task->return_link == NULL);
ASSERT(task->next == NULL);
if (cap->returning_tasks_hd) {
ASSERT(cap->returning_tasks_tl->return_link == NULL);
cap->returning_tasks_tl->return_link = task;
ASSERT(cap->returning_tasks_tl->next == NULL);
cap->returning_tasks_tl->next = task;
} else {
cap->returning_tasks_hd = task;
}
......@@ -190,11 +190,11 @@ popReturningTask (Capability *cap)
Task *task;
task = cap->returning_tasks_hd;
ASSERT(task);
cap->returning_tasks_hd = task->return_link;
cap->returning_tasks_hd = task->next;
if (!cap->returning_tasks_hd) {
cap->returning_tasks_tl = NULL;
}
task->return_link = NULL;
task->next = NULL;
return task;
}
#endif
......@@ -220,7 +220,7 @@ initCapability( Capability *cap, nat i )
initMutex(&cap->lock);
cap->running_task = NULL; // indicates cap is free
cap->spare_workers = NULL;
cap->suspended_ccalling_tasks = NULL;
cap->suspended_ccalls = NULL;
cap->returning_tasks_hd = NULL;
cap->returning_tasks_tl = NULL;
cap->wakeup_queue_hd = END_TSO_QUEUE;
......@@ -342,7 +342,7 @@ giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
ASSERT_LOCK_HELD(&cap->lock);
ASSERT(task->cap == cap);
debugTrace(DEBUG_sched, "passing capability %d to %s %p",
cap->no, task->tso ? "bound task" : "worker",
cap->no, task->incall->tso ? "bound task" : "worker",
(void *)task->id);
ACQUIRE_LOCK(&task->lock);
task->wakeup = rtsTrue;
......@@ -398,7 +398,7 @@ releaseCapability_ (Capability* cap,
// assertion is false: in schedule() we force a yield after
// ThreadBlocked, but the thread may be back on the run queue
// by now.
task = cap->run_queue_hd->bound;
task = cap->run_queue_hd->bound->task;
giveCapabilityToTask(cap,task);
return;
}
......@@ -411,7 +411,7 @@ releaseCapability_ (Capability* cap,
if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
debugTrace(DEBUG_sched,
"starting new worker on capability %d", cap->no);
startWorkerTask(cap, workerStart);
startWorkerTask(cap);
return;
}
}
......@@ -462,9 +462,7 @@ releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
// in which case it is not replaced on the spare_worker queue.
// This happens when the system is shutting down (see
// Schedule.c:workerStart()).
// Also, be careful to check that this task hasn't just exited
// Haskell to do a foreign call (task->suspended_tso).
if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
if (!isBoundTask(task) && !task->stopped) {
task->next = cap->spare_workers;
cap->spare_workers = task;
}
......@@ -612,7 +610,7 @@ yieldCapability (Capability** pCap, Task *task)
continue;
}
if (task->tso == NULL) {
if (task->incall->tso == NULL) {
ASSERT(cap->spare_workers != NULL);
// if we're not at the front of the queue, release it
// again. This is unlikely to happen.
......@@ -655,12 +653,12 @@ wakeupThreadOnCapability (Capability *my_cap,
// ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
if (tso->bound) {
ASSERT(tso->bound->cap == tso->cap);
tso->bound->cap = other_cap;
ASSERT(tso->bound->task->cap == tso->cap);
tso->bound->task->cap = other_cap;
}
tso->cap = other_cap;
ASSERT(tso->bound ? tso->bound->cap == other_cap : 1);
ASSERT(tso->bound ? tso->bound->task->cap == other_cap : 1);
if (other_cap->running_task == NULL) {
// nobody is running this Capability, we can add our thread
......@@ -781,7 +779,7 @@ shutdownCapability (Capability *cap, Task *task, rtsBool safe)
// that will try to return to code that has been unloaded.
// We can be a bit more relaxed when this is a standalone
// program that is about to terminate, and let safe=false.
if (cap->suspended_ccalling_tasks && safe) {
if (cap->suspended_ccalls && safe) {
debugTrace(DEBUG_sched,
"thread(s) are involved in foreign calls, yielding");
cap->running_task = NULL;
......@@ -871,7 +869,7 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta,
{
nat i;
Capability *cap;
Task *task;
InCall *incall;
// Each GC thread is responsible for following roots from the
// Capability of the same number. There will usually be the same
......@@ -886,9 +884,9 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta,
evac(user, (StgClosure **)(void *)&cap->wakeup_queue_hd);
evac(user, (StgClosure **)(void *)&cap->wakeup_queue_tl);
#endif
for (task = cap->suspended_ccalling_tasks; task != NULL;
task=task->next) {
evac(user, (StgClosure **)(void *)&task->suspended_tso);
for (incall = cap->suspended_ccalls; incall != NULL;
incall=incall->next) {
evac(user, (StgClosure **)(void *)&incall->suspended_tso);
}
#if defined(THREADED_RTS)
......
......@@ -56,7 +56,7 @@ struct Capability_ {
// the suspended TSOs easily. Hence, when migrating a Task from
// the returning_tasks list, we must also migrate its entry from
// this list.
Task *suspended_ccalling_tasks;
InCall *suspended_ccalls;
// One mutable list per generation, so we don't need to take any
// locks when updating an old-generation thunk. This also lets us
......
......@@ -312,7 +312,7 @@ schedule (Capability *initialCapability, Task *task)
// If we are a worker, just exit. If we're a bound thread
// then we will exit below when we've removed our TSO from
// the run queue.
if (task->tso == NULL && emptyRunQueue(cap)) {
if (!isBoundTask(task) && emptyRunQueue(cap)) {
return cap;
}
break;
......@@ -378,10 +378,10 @@ schedule (Capability *initialCapability, Task *task)
// Check whether we can run this thread in the current task.
// If not, we have to pass our capability to the right task.
{
Task *bound = t->bound;
InCall *bound = t->bound;
if (bound) {
if (bound == task) {
if (bound->task == task) {
// yes, the Haskell thread is bound to the current native thread
} else {
debugTrace(DEBUG_sched,
......@@ -393,7 +393,7 @@ schedule (Capability *initialCapability, Task *task)
}
} else {
// The thread we want to run is unbound.
if (task->tso) {
if (task->incall->tso) {
debugTrace(DEBUG_sched,
"this OS thread cannot run thread %lu",
(unsigned long)t->id);
......@@ -441,7 +441,7 @@ run_thread:
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
ASSERT(t->cap == cap);
ASSERT(t->bound ? t->bound->cap == cap : 1);
ASSERT(t->bound ? t->bound->task->cap == cap : 1);
prev_what_next = t->what_next;
......@@ -639,9 +639,9 @@ shouldYieldCapability (Capability *cap, Task *task)
// and this task it bound).
return (waiting_for_gc ||
cap->returning_tasks_hd != NULL ||
(!emptyRunQueue(cap) && (task->tso == NULL
(!emptyRunQueue(cap) && (task->incall->tso == NULL
? cap->run_queue_hd->bound != NULL
: cap->run_queue_hd->bound != task)));
: cap->run_queue_hd->bound != task->incall)));
}
// This is the single place where a Task goes to sleep. There are
......@@ -768,7 +768,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
next = t->_link;
t->_link = END_TSO_QUEUE;
if (t->what_next == ThreadRelocated
|| t->bound == task // don't move my bound thread
|| t->bound == task->incall // don't move my bound thread
|| tsoLocked(t)) { // don't move a locked thread
setTSOLink(cap, prev, t);
prev = t;
......@@ -781,9 +781,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
} else {
appendToRunQueue(free_caps[i],t);
traceEventMigrateThread (cap, t, free_caps[i]->no);
traceEventMigrateThread (cap, t, free_caps[i]->no);
if (t->bound) { t->bound->cap = free_caps[i]; }
if (t->bound) { t->bound->task->cap = free_caps[i]; }
t->cap = free_caps[i];
i++;
}
......@@ -979,13 +979,13 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
/* Probably a real deadlock. Send the current main thread the
* Deadlock exception.
*/
if (task->tso) {
switch (task->tso->why_blocked) {
if (task->incall->tso) {
switch (task->incall->tso->why_blocked) {
case BlockedOnSTM:
case BlockedOnBlackHole:
case BlockedOnException:
case BlockedOnMVar:
throwToSingleThreaded(cap, task->tso,
throwToSingleThreaded(cap, task->incall->tso,
(StgClosure *)nonTermination_closure);
return;
default:
......@@ -1174,8 +1174,8 @@ scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
/* The TSO attached to this Task may have moved, so update the
* pointer to it.
*/
if (task->tso == t) {
task->tso = new_t;
if (task->incall->tso == t) {
task->incall->tso = new_t;
}
pushOnRunQueue(cap,new_t);
}
......@@ -1285,7 +1285,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
if (t->bound) {
if (t->bound != task) {
if (t->bound != task->incall) {
#if !defined(THREADED_RTS)
// Must be a bound thread that is not the topmost one. Leave
// it on the run queue until the stack has unwound to the
......@@ -1302,12 +1302,12 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
#endif
}
ASSERT(task->tso == t);
ASSERT(task->incall->tso == t);
if (t->what_next == ThreadComplete) {
if (task->ret) {
// NOTE: return val is tso->sp[1] (see StgStartup.hc)
*(task->ret) = (StgClosure *)task->tso->sp[1];
*(task->ret) = (StgClosure *)task->incall->tso->sp[1];
}
task->stat = Success;
} else {
......@@ -1325,7 +1325,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
}
}
#ifdef DEBUG
removeThreadLabel((StgWord)task->tso->id);
removeThreadLabel((StgWord)task->incall->tso->id);
#endif
// We no longer consider this thread and task to be bound to
......@@ -1336,7 +1336,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
// re-used). This was a real bug: the GC updated
// tso->bound->tso which lead to a deadlock.
t->bound = NULL;
task->tso = NULL;
task->incall->tso = NULL;
return rtsTrue; // tells schedule() to return
}
......@@ -1586,7 +1586,6 @@ forkProcess(HsStablePtr *entry
)
{
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
Task *task;
pid_t pid;
StgTSO* t,*next;
Capability *cap;
......@@ -1661,7 +1660,7 @@ forkProcess(HsStablePtr *entry
// Any suspended C-calling Tasks are no more, their OS threads
// don't exist now:
cap->suspended_ccalling_tasks = NULL;
cap->suspended_ccalls = NULL;
// Empty the threads lists. Otherwise, the garbage
// collector may attempt to resurrect some of these threads.
......@@ -1669,17 +1668,7 @@ forkProcess(HsStablePtr *entry
generations[g].threads = END_TSO_QUEUE;
}
// Wipe the task list, except the current Task.
ACQUIRE_LOCK(&sched_mutex);
for (task = all_tasks; task != NULL; task=task->all_link) {
if (task != cap->running_task) {
#if defined(THREADED_RTS)
initMutex(&task->lock); // see #1391
#endif
discardTask(task);
}
}
RELEASE_LOCK(&sched_mutex);
discardTasksExcept(cap->running_task);
#if defined(THREADED_RTS)
// Wipe our spare workers list, they no longer exist. New
......@@ -1747,35 +1736,41 @@ deleteAllThreads ( Capability *cap )
}
/* -----------------------------------------------------------------------------
Managing the suspended_ccalling_tasks list.
Managing the suspended_ccalls list.
Locks required: sched_mutex
-------------------------------------------------------------------------- */
STATIC_INLINE void
suspendTask (Capability *cap, Task *task)
{
ASSERT(task->next == NULL && task->prev == NULL);
task->next = cap->suspended_ccalling_tasks;
task->prev = NULL;
if (cap->suspended_ccalling_tasks) {
cap->suspended_ccalling_tasks->prev = task;
}
cap->suspended_ccalling_tasks = task;
InCall *incall;
incall = task->incall;
ASSERT(incall->next == NULL && incall->prev == NULL);
incall->next = cap->suspended_ccalls;
incall->prev = NULL;
if (cap->suspended_ccalls) {
cap->suspended_ccalls->prev = incall;
}
cap->suspended_ccalls = incall;
}
STATIC_INLINE void
recoverSuspendedTask (Capability *cap, Task *task)
{
if (task->prev) {
task->prev->next = task->next;
InCall *incall;
incall = task->incall;
if (incall->prev) {
incall->prev->next = incall->next;
} else {
ASSERT(cap->suspended_ccalling_tasks == task);
cap->suspended_ccalling_tasks = task->next;
ASSERT(cap->suspended_ccalls == incall);
cap->suspended_ccalls = incall->next;
}
if (task->next) {
task->next->prev = task->prev;
if (incall->next) {
incall->next->prev = incall->prev;
}
task->next = task->prev = NULL;
incall->next = incall->prev = NULL;
}
/* ---------------------------------------------------------------------------
......@@ -1832,7 +1827,8 @@ suspendThread (StgRegTable *reg)
}
// Hand back capability
task->suspended_tso = tso;
task->incall->suspended_tso = tso;
task->incall->suspended_cap = cap;
ACQUIRE_LOCK(&cap->lock);
......@@ -1853,6 +1849,7 @@ StgRegTable *
resumeThread (void *task_)
{
StgTSO *tso;
InCall *incall;
Capability *cap;
Task *task = task_;
int saved_errno;
......@@ -1865,18 +1862,22 @@ resumeThread (void *task_)
saved_winerror = GetLastError();
#endif
cap = task->cap;
incall = task->incall;
cap = incall->suspended_cap;
task->cap = cap;
// Wait for permission to re-enter the RTS with the result.
waitForReturnCapability(&cap,task);
// we might be on a different capability now... but if so, our
// entry on the suspended_ccalling_tasks list will also have been
// entry on the suspended_ccalls list will also have been
// migrated.
// Remove the thread from the suspended list
recoverSuspendedTask(cap,task);
tso = task->suspended_tso;
task->suspended_tso = NULL;
tso = incall->suspended_tso;
incall->suspended_tso = NULL;
incall->suspended_cap = NULL;
tso->_link = END_TSO_QUEUE; // no write barrier reqd
traceEventRunThread(cap, tso);
......@@ -1954,10 +1955,10 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
// This TSO is now a bound thread; make the Task and TSO
// point to each other.
tso->bound = task;
tso->bound = task->incall;
tso->cap = cap;
task->tso = tso;
task->incall->tso = tso;
task->ret = ret;
task->stat = NoStatus;
......@@ -1980,23 +1981,8 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
* ------------------------------------------------------------------------- */
#if defined(THREADED_RTS)
void OSThreadProcAttr
workerStart(Task *task)
void scheduleWorker (Capability *cap, Task *task)
{
Capability *cap;
// See startWorkerTask().
ACQUIRE_LOCK(&task->lock);
cap = task->cap;
RELEASE_LOCK(&task->lock);
if (RtsFlags.ParFlags.setAffinity) {
setThreadAffinity(cap->no, n_capabilities);
}
// set the thread-local pointer to the Task:
taskEnter(task);
// schedule() runs without a lock.
cap = schedule(cap,task);
......@@ -2062,6 +2048,8 @@ initScheduler(void)
initSparkPools();
#endif
RELEASE_LOCK(&sched_mutex);
#if defined(THREADED_RTS)
/*
* Eagerly start one worker to run each Capability, except for
......@@ -2075,13 +2063,11 @@ initScheduler(void)
for (i = 1; i < n_capabilities; i++) {
cap = &capabilities[i];
ACQUIRE_LOCK(&cap->lock);
startWorkerTask(cap, workerStart);
startWorkerTask(cap);
RELEASE_LOCK(&cap->lock);
}
}
#endif
RELEASE_LOCK(&sched_mutex);
}
void
......@@ -2102,7 +2088,7 @@ exitScheduler(
sched_state = SCHED_INTERRUPTING;
waitForReturnCapability(&task->cap,task);
scheduleDoGC(task->cap,task,rtsFalse);
ASSERT(task->tso == NULL);
ASSERT(task->incall->tso == NULL);
releaseCapability(task->cap);
}
sched_state = SCHED_SHUTTING_DOWN;
......@@ -2112,7 +2098,7 @@ exitScheduler(
nat i;
for (i = 0; i < n_capabilities; i++) {
ASSERT(task->tso == NULL);
ASSERT(task->incall->tso == NULL);
shutdownCapability(&capabilities[i], task, wait_foreign);
}
}
......@@ -2161,7 +2147,7 @@ performGC_(rtsBool force_major)
// We must grab a new Task here, because the existing Task may be
// associated with a particular Capability, and chained onto the
// suspended_ccalling_tasks queue.
// suspended_ccalls queue.
task = newBoundTask();
waitForReturnCapability(&task->cap,task);
......@@ -2368,8 +2354,8 @@ threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso)
// The TSO attached to this Task may have moved, so update the
// pointer to it.
if (task->tso == tso) {
task->tso = new_tso;
if (task->incall->tso == tso) {
task->incall->tso = new_tso;
}
unlockTSO(new_tso);
......
......@@ -46,15 +46,8 @@ StgWord raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *excepti
/* findRetryFrameHelper */
StgWord findRetryFrameHelper (StgTSO *tso);
/* workerStart()
*
* Entry point for a new worker task.
* Called from STG : NO
* Locks assumed : none
*/
#if defined(THREADED_RTS)
void OSThreadProcAttr workerStart(Task *task);
#endif
/* Entry point for a new worker */
void scheduleWorker (Capability *cap, Task *task);
/* The state of the scheduler. This is used to control the sequence
* of events during shutdown, and when the runtime is interrupted
......
......@@ -623,7 +623,7 @@ stat_exit(int alloc)
i++, task = task->all_link) {
statsPrintf(" Task %2d %-8s : %6.2fs (%6.2fs) %6.2fs (%6.2fs)\n",
i,
(task->tso == NULL) ? "(worker)" : "(bound)",
(task->worker) ? "(worker)" : "(bound)",
TICK_TO_DBL(task->mut_time),
TICK_TO_DBL(task->mut_etime),
TICK_TO_DBL(task->gc_time),
......
......@@ -26,12 +26,13 @@
// Task lists and global counters.
// Locks required: sched_mutex.
Task *all_tasks = NULL;
static Task *task_free_list = NULL; // singly-linked
static nat taskCount;
static nat tasksRunning;
static nat workerCount;
static int tasksInitialized = 0;
static void freeTask (Task *task);
static Task * allocTask (void);
static Task * newTask (rtsBool);
/* -----------------------------------------------------------------------------
* Remembering the current thread's Task
* -------------------------------------------------------------------------- */
......@@ -39,7 +40,11 @@ static int tasksInitialized = 0;
// A thread-local-storage key that we can use to get access to the
// current thread's Task structure.
#if defined(THREADED_RTS)
# if defined(MYTASK_USE_TLV)
__thread Task *my_task;
# else
ThreadLocalKey currentTaskKey;
# endif
#else
Task *my_task;
#endif
......@@ -53,10 +58,8 @@ initTaskManager (void)
{
if (!tasksInitialized) {
taskCount = 0;
workerCount = 0;
tasksRunning = 0;
tasksInitialized = 1;
#if defined(THREADED_RTS)
#if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV)
newThreadLocalKey(&currentTaskKey);
#endif
}
......@@ -66,29 +69,24 @@ nat
freeTaskManager (void)
{
Task *task, *next;
nat tasksRunning = 0;
ASSERT_LOCK_HELD(&sched_mutex);
debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
tasksRunning);
for (task = all_tasks; task != NULL; task = next) {
next = task->all_link;
if (task->stopped) {
// We only free resources if the Task is not in use. A
// Task may still be in use if we have a Haskell thread in
// a foreign call while we are attempting to shut down the
// RTS (see conc059).
#if defined(THREADED_RTS)
closeCondition(&task->cond);
closeMutex(&task->lock);
#endif
stgFree(task);
freeTask(task);
} else {
tasksRunning++;
}
}
debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
tasksRunning);
all_tasks = NULL;
task_free_list = NULL;
#if defined(THREADED_RTS)
#if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV)
freeThreadLocalKey(&currentTaskKey);
#endif
......@@ -97,9 +95,52 @@ freeTaskManager (void)
return tasksRunning;
}
static Task *
allocTask (void)
{
Task *task;
task = myTask();
if (task != NULL) {
return task;
} else {
task = newTask(rtsFalse);
#if defined(THREADED_RTS)
task->id = osThreadId();
#endif
setMyTask(task);
return task;
}
}
static void
freeTask (Task *task)
{
InCall *incall, *next;
// We only free resources if the Task is not in use. A
// Task may still be in use if we have a Haskell thread in
// a foreign call while we are attempting to shut down the
// RTS (see conc059).
#if defined(THREADED_RTS)
closeCondition(&task->cond);
closeMutex(&task->lock);
#endif
for (incall = task->incall; incall != NULL; incall = next) {
next = incall->prev_stack;
stgFree(incall);
}