Commit 085c7fe5 authored by Simon Marlow's avatar Simon Marlow

Drop the per-task timing stats, give a summary only (#5897)

We were keeping around the Task struct (216 bytes) for every worker we
ever created, even though we only keep a maximum of 6 workers per
Capability.  These Task structs accumulate and cause a space leak in
programs that do lots of safe FFI calls; this patch frees the Task
struct as soon as a worker exits.

One reason we were keeping the Task structs around is because we print
out per-Task timing stats in +RTS -s, but that isn't terribly useful.
What is sometimes useful is knowing how *many* Tasks there were.  So
now I'm printing a single-line summary, this is for the program in

  TASKS: 2001 (1 bound, 31 peak workers (2000 total), using -N1)

So although we created 2k tasks overall, there were only 31 workers
active at any one time (which is exactly what we expect: the program
makes 30 safe FFI calls concurrently).

This also gives an indication of how many capabilities were being
used, which is handy if you use +RTS -N without an explicit number.
parent 27d7d930
...@@ -62,8 +62,7 @@ static void real_main(void) ...@@ -62,8 +62,7 @@ static void real_main(void)
Capability *cap = rts_lock(); Capability *cap = rts_lock();
rts_evalLazyIO(&cap,progmain_closure, NULL); rts_evalLazyIO(&cap,progmain_closure, NULL);
status = rts_getSchedStatus(cap); status = rts_getSchedStatus(cap);
taskTimeStamp(myTask()); rts_unlock(cap);
rts_unlock(cap);
} }
/* check the status of the entire Haskell computation */ /* check the status of the entire Haskell computation */
......
...@@ -287,18 +287,29 @@ stat_startGC (gc_thread *gct) ...@@ -287,18 +287,29 @@ stat_startGC (gc_thread *gct)
} }
void void
stat_gcWorkerThreadStart (gc_thread *gct) stat_gcWorkerThreadStart (gc_thread *gct STG_UNUSED)
{ {
#if 0
/*
* We dont' collect per-thread GC stats any more, but this code
* could be used to do that if we want to in the future:
*/
if (RtsFlags.GcFlags.giveStats != NO_GC_STATS) if (RtsFlags.GcFlags.giveStats != NO_GC_STATS)
{ {
getProcessTimes(&gct->gc_start_cpu, &gct->gc_start_elapsed); getProcessTimes(&gct->gc_start_cpu, &gct->gc_start_elapsed);
gct->gc_start_thread_cpu = getThreadCPUTime(); gct->gc_start_thread_cpu = getThreadCPUTime();
} }
#endif
} }
void void
stat_gcWorkerThreadDone (gc_thread *gct) stat_gcWorkerThreadDone (gc_thread *gct STG_UNUSED)
{ {
#if 0
/*
* We dont' collect per-thread GC stats any more, but this code
* could be used to do that if we want to in the future:
*/
Time thread_cpu, elapsed, gc_cpu, gc_elapsed; Time thread_cpu, elapsed, gc_cpu, gc_elapsed;
if (RtsFlags.GcFlags.giveStats != NO_GC_STATS) if (RtsFlags.GcFlags.giveStats != NO_GC_STATS)
...@@ -311,6 +322,7 @@ stat_gcWorkerThreadDone (gc_thread *gct) ...@@ -311,6 +322,7 @@ stat_gcWorkerThreadDone (gc_thread *gct)
taskDoneGC(gct->cap->running_task, gc_cpu, gc_elapsed); taskDoneGC(gct->cap->running_task, gc_cpu, gc_elapsed);
} }
#endif
} }
/* ----------------------------------------------------------------------------- /* -----------------------------------------------------------------------------
...@@ -326,17 +338,13 @@ stat_endGC (gc_thread *gct, ...@@ -326,17 +338,13 @@ stat_endGC (gc_thread *gct,
RtsFlags.ProfFlags.doHeapProfile) RtsFlags.ProfFlags.doHeapProfile)
// heap profiling needs GC_tot_time // heap profiling needs GC_tot_time
{ {
Time cpu, elapsed, thread_gc_cpu, gc_cpu, gc_elapsed; Time cpu, elapsed, gc_cpu, gc_elapsed;
getProcessTimes(&cpu, &elapsed); getProcessTimes(&cpu, &elapsed);
gc_elapsed = elapsed - gct->gc_start_elapsed; gc_elapsed = elapsed - gct->gc_start_elapsed;
thread_gc_cpu = getThreadCPUTime() - gct->gc_start_thread_cpu;
gc_cpu = cpu - gct->gc_start_cpu; gc_cpu = cpu - gct->gc_start_cpu;
taskDoneGC(gct->cap->running_task, thread_gc_cpu, gc_elapsed);
if (RtsFlags.GcFlags.giveStats == VERBOSE_GC_STATS) { if (RtsFlags.GcFlags.giveStats == VERBOSE_GC_STATS) {
nat faults = getPageFaults(); nat faults = getPageFaults();
...@@ -629,22 +637,10 @@ stat_exit(int alloc) ...@@ -629,22 +637,10 @@ stat_exit(int alloc)
statsPrintf("\n"); statsPrintf("\n");
#if defined(THREADED_RTS) #if defined(THREADED_RTS)
{ statsPrintf(" TASKS: %d (%d bound, %d peak workers (%d total), using -N%d)\n",
nat i; taskCount, taskCount - workerCount,
Task *task; peakWorkerCount, workerCount,
statsPrintf(" MUT time (elapsed) GC time (elapsed)\n"); n_capabilities);
for (i = 0, task = all_tasks;
task != NULL;
i++, task = task->all_link) {
statsPrintf(" Task %2d %-8s : %6.2fs (%6.2fs) %6.2fs (%6.2fs)\n",
i,
(task->worker) ? "(worker)" : "(bound)",
TimeToSecondsDbl(task->mut_time),
TimeToSecondsDbl(task->mut_etime),
TimeToSecondsDbl(task->gc_time),
TimeToSecondsDbl(task->gc_etime));
}
}
statsPrintf("\n"); statsPrintf("\n");
......
...@@ -26,7 +26,12 @@ ...@@ -26,7 +26,12 @@
// Task lists and global counters. // Task lists and global counters.
// Locks required: all_tasks_mutex. // Locks required: all_tasks_mutex.
Task *all_tasks = NULL; Task *all_tasks = NULL;
static nat taskCount;
nat taskCount;
nat workerCount;
nat currentWorkerCount;
nat peakWorkerCount;
static int tasksInitialized = 0; static int tasksInitialized = 0;
static void freeTask (Task *task); static void freeTask (Task *task);
...@@ -64,8 +69,11 @@ void ...@@ -64,8 +69,11 @@ void
initTaskManager (void) initTaskManager (void)
{ {
if (!tasksInitialized) { if (!tasksInitialized) {
taskCount = 0; taskCount = 0;
tasksInitialized = 1; workerCount = 0;
currentWorkerCount = 0;
peakWorkerCount = 0;
tasksInitialized = 1;
#if defined(THREADED_RTS) #if defined(THREADED_RTS)
#if !defined(MYTASK_USE_TLV) #if !defined(MYTASK_USE_TLV)
newThreadLocalKey(&currentTaskKey); newThreadLocalKey(&currentTaskKey);
...@@ -87,7 +95,7 @@ freeTaskManager (void) ...@@ -87,7 +95,7 @@ freeTaskManager (void)
ACQUIRE_LOCK(&all_tasks_mutex); ACQUIRE_LOCK(&all_tasks_mutex);
for (task = all_tasks; task != NULL; task = next) { for (task = all_tasks; task != NULL; task = next) {
next = task->all_link; next = task->all_next;
if (task->stopped) { if (task->stopped) {
freeTask(task); freeTask(task);
} else { } else {
...@@ -164,9 +172,6 @@ freeTask (Task *task) ...@@ -164,9 +172,6 @@ freeTask (Task *task)
static Task* static Task*
newTask (rtsBool worker) newTask (rtsBool worker)
{ {
#if defined(THREADED_RTS)
Time currentElapsedTime, currentUserTime;
#endif
Task *task; Task *task;
#define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64) #define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
...@@ -186,26 +191,25 @@ newTask (rtsBool worker) ...@@ -186,26 +191,25 @@ newTask (rtsBool worker)
task->wakeup = rtsFalse; task->wakeup = rtsFalse;
#endif #endif
#if defined(THREADED_RTS)
currentUserTime = getThreadCPUTime();
currentElapsedTime = getProcessElapsedTime();
task->mut_time = 0;
task->mut_etime = 0;
task->gc_time = 0;
task->gc_etime = 0;
task->muttimestart = currentUserTime;
task->elapsedtimestart = currentElapsedTime;
#endif
task->next = NULL; task->next = NULL;
ACQUIRE_LOCK(&all_tasks_mutex); ACQUIRE_LOCK(&all_tasks_mutex);
task->all_link = all_tasks; task->all_prev = NULL;
task->all_next = all_tasks;
if (all_tasks != NULL) {
all_tasks->all_prev = task;
}
all_tasks = task; all_tasks = task;
taskCount++; taskCount++;
if (worker) {
workerCount++;
currentWorkerCount++;
if (currentWorkerCount > peakWorkerCount) {
peakWorkerCount = currentWorkerCount;
}
}
RELEASE_LOCK(&all_tasks_mutex); RELEASE_LOCK(&all_tasks_mutex);
return task; return task;
...@@ -314,14 +318,15 @@ discardTasksExcept (Task *keep) ...@@ -314,14 +318,15 @@ discardTasksExcept (Task *keep)
// Wipe the task list, except the current Task. // Wipe the task list, except the current Task.
ACQUIRE_LOCK(&all_tasks_mutex); ACQUIRE_LOCK(&all_tasks_mutex);
for (task = all_tasks; task != NULL; task=next) { for (task = all_tasks; task != NULL; task=next) {
next = task->all_link; next = task->all_next;
if (task != keep) { if (task != keep) {
debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task)); debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task));
freeTask(task); freeTask(task);
} }
} }
all_tasks = keep; all_tasks = keep;
keep->all_link = NULL; keep->all_next = NULL;
keep->all_prev = NULL;
RELEASE_LOCK(&all_tasks_mutex); RELEASE_LOCK(&all_tasks_mutex);
} }
...@@ -337,7 +342,7 @@ void updateCapabilityRefs (void) ...@@ -337,7 +342,7 @@ void updateCapabilityRefs (void)
ACQUIRE_LOCK(&all_tasks_mutex); ACQUIRE_LOCK(&all_tasks_mutex);
for (task = all_tasks; task != NULL; task=task->all_link) { for (task = all_tasks; task != NULL; task=task->all_next) {
if (task->cap != NULL) { if (task->cap != NULL) {
task->cap = &capabilities[task->cap->no]; task->cap = &capabilities[task->cap->no];
} }
...@@ -353,34 +358,6 @@ void updateCapabilityRefs (void) ...@@ -353,34 +358,6 @@ void updateCapabilityRefs (void)
} }
void
taskTimeStamp (Task *task USED_IF_THREADS)
{
#if defined(THREADED_RTS)
Time currentElapsedTime, currentUserTime;
currentUserTime = getThreadCPUTime();
currentElapsedTime = getProcessElapsedTime();
task->mut_time =
currentUserTime - task->muttimestart - task->gc_time;
task->mut_etime =
currentElapsedTime - task->elapsedtimestart - task->gc_etime;
if (task->gc_time < 0) { task->gc_time = 0; }
if (task->gc_etime < 0) { task->gc_etime = 0; }
if (task->mut_time < 0) { task->mut_time = 0; }
if (task->mut_etime < 0) { task->mut_etime = 0; }
#endif
}
void
taskDoneGC (Task *task, Time cpu_time, Time elapsed_time)
{
task->gc_time += cpu_time;
task->gc_etime += elapsed_time;
}
#if defined(THREADED_RTS) #if defined(THREADED_RTS)
void void
...@@ -391,9 +368,22 @@ workerTaskStop (Task *task) ...@@ -391,9 +368,22 @@ workerTaskStop (Task *task)
ASSERT(task->id == id); ASSERT(task->id == id);
ASSERT(myTask() == task); ASSERT(myTask() == task);
task->cap = NULL; ACQUIRE_LOCK(&all_tasks_mutex);
taskTimeStamp(task);
task->stopped = rtsTrue; if (task->all_prev) {
task->all_prev->all_next = task->all_next;
} else {
all_tasks = task->all_next;
}
if (task->all_next) {
task->all_next->all_prev = task->all_prev;
}
currentWorkerCount--;
RELEASE_LOCK(&all_tasks_mutex);
freeTask(task);
} }
#endif #endif
...@@ -491,7 +481,7 @@ void ...@@ -491,7 +481,7 @@ void
printAllTasks(void) printAllTasks(void)
{ {
Task *task; Task *task;
for (task = all_tasks; task != NULL; task = task->all_link) { for (task = all_tasks; task != NULL; task = task->all_next) {
debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive"); debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive");
if (!task->stopped) { if (!task->stopped) {
if (task->cap) { if (task->cap) {
......
...@@ -143,25 +143,13 @@ typedef struct Task_ { ...@@ -143,25 +143,13 @@ typedef struct Task_ {
// So that we can detect when a finalizer illegally calls back into Haskell // So that we can detect when a finalizer illegally calls back into Haskell
rtsBool running_finalizers; rtsBool running_finalizers;
// Stats that we collect about this task
// ToDo: we probably want to put this in a separate TaskStats
// structure, so we can share it between multiple Tasks. We don't
// really want separate stats for each call in a nested chain of
// foreign->haskell->foreign->haskell calls, but we'll get a
// separate Task for each of the haskell calls.
Time elapsedtimestart;
Time muttimestart;
Time mut_time;
Time mut_etime;
Time gc_time;
Time gc_etime;
// Links tasks on the returning_tasks queue of a Capability, and // Links tasks on the returning_tasks queue of a Capability, and
// on spare_workers. // on spare_workers.
struct Task_ *next; struct Task_ *next;
// Links tasks on the all_tasks list // Links tasks on the all_tasks list
struct Task_ *all_link; struct Task_ *all_next;
struct Task_ *all_prev;
} Task; } Task;
...@@ -201,15 +189,6 @@ void boundTaskExiting (Task *task); ...@@ -201,15 +189,6 @@ void boundTaskExiting (Task *task);
void workerTaskStop (Task *task); void workerTaskStop (Task *task);
#endif #endif
// Record the time spent in this Task.
// This is called by workerTaskStop() but not by boundTaskExiting(),
// because it would impose an extra overhead on call-in.
//
void taskTimeStamp (Task *task);
// The current Task has finished a GC, record the amount of time spent.
void taskDoneGC (Task *task, Time cpu_time, Time elapsed_time);
// Put the task back on the free list, mark it stopped. Used by // Put the task back on the free list, mark it stopped. Used by
// forkProcess(). // forkProcess().
// //
...@@ -240,6 +219,11 @@ void interruptWorkerTask (Task *task); ...@@ -240,6 +219,11 @@ void interruptWorkerTask (Task *task);
// //
void updateCapabilityRefs (void); void updateCapabilityRefs (void);
// For stats
extern nat taskCount;
extern nat workerCount;
extern nat peakWorkerCount;
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// INLINE functions... private from here on down: // INLINE functions... private from here on down:
......
...@@ -198,7 +198,6 @@ forkOS_createThreadWrapper ( void * entry ) ...@@ -198,7 +198,6 @@ forkOS_createThreadWrapper ( void * entry )
Capability *cap; Capability *cap;
cap = rts_lock(); cap = rts_lock();
rts_evalStableIO(&cap, (HsStablePtr) entry, NULL); rts_evalStableIO(&cap, (HsStablePtr) entry, NULL);
taskTimeStamp(myTask());
rts_unlock(cap); rts_unlock(cap);
return NULL; return NULL;
} }
......
...@@ -979,7 +979,7 @@ compact(StgClosure *static_objects) ...@@ -979,7 +979,7 @@ compact(StgClosure *static_objects)
{ {
Task *task; Task *task;
InCall *incall; InCall *incall;
for (task = all_tasks; task != NULL; task = task->all_link) { for (task = all_tasks; task != NULL; task = task->all_next) {
for (incall = task->incall; incall != NULL; for (incall = task->incall; incall != NULL;
incall = incall->prev_stack) { incall = incall->prev_stack) {
if (incall->tso) { if (incall->tso) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment