Commit 9a92cb1c authored by simonmar's avatar simonmar
Browse files

[project @ 2005-04-06 15:27:06 by simonmar]

Revamp the Task API: now we use the same implementation for threaded
and SMP.  We also keep per-task timing stats in the threaded RTS now,
which makes the output of +RTS -sstderr more useful.
parent 4bcd03a6
......@@ -206,6 +206,10 @@ releaseCapability( Capability* cap UNUSED_IF_NOT_SMP )
#if defined(RTS_SUPPORTS_THREADS)
#if !defined(SMP)
ASSERT(rts_n_free_capabilities == 0);
#endif
#if defined(SMP)
cap->link = free_capabilities;
free_capabilities = cap;
#endif
// Check to see whether a worker thread can be given
// the go-ahead to return the result of an external call..
......@@ -214,12 +218,6 @@ releaseCapability( Capability* cap UNUSED_IF_NOT_SMP )
// thread that is yielding its capability will repeatedly
// signal returning_worker_cond.
#if defined(SMP)
// SMP variant untested
cap->link = free_capabilities;
free_capabilities = cap;
#endif
rts_n_waiting_workers--;
signalCondition(&returning_worker_cond);
IF_DEBUG(scheduler, sched_belch("worker: released capability to returning worker"));
......@@ -230,13 +228,15 @@ releaseCapability( Capability* cap UNUSED_IF_NOT_SMP )
} else {
signalCondition(passTarget);
}
#if defined(SMP)
rts_n_free_capabilities++;
#else
rts_n_free_capabilities = 1;
#endif
IF_DEBUG(scheduler, sched_belch("worker: released capability, passing it"));
} else {
#if defined(SMP)
cap->link = free_capabilities;
free_capabilities = cap;
rts_n_free_capabilities++;
#else
rts_n_free_capabilities = 1;
......@@ -433,7 +433,6 @@ passCapabilityToWorker( void )
ToDo: should check whether the thread at the front of the queue is
bound, and if so wake up the appropriate worker.
-------------------------------------------------------------------------- */
void
threadRunnable ( void )
{
......@@ -444,3 +443,17 @@ threadRunnable ( void )
startSchedulerTaskIfNecessary();
#endif
}
/* ----------------------------------------------------------------------------
prodWorker()
Wake up... time to die.
-------------------------------------------------------------------------- */
void
prodWorker ( void )
{
#if defined(RTS_SUPPORTS_THREADS)
signalCondition(&thread_ready_cond);
#endif
}
......@@ -35,6 +35,8 @@ extern void releaseCapability( Capability* cap );
//
extern void threadRunnable ( void );
extern void prodWorker ( void );
#ifdef RTS_SUPPORTS_THREADS
// Gives up the current capability IFF there is a higher-priority
// thread waiting for it. This happens in one of two ways:
......
......@@ -16,6 +16,7 @@
#include "RtsFlags.h"
#include "RtsUtils.h"
#include "Prelude.h"
#include "Task.h"
#include <stdlib.h>
#ifdef DEBUG
......@@ -50,6 +51,11 @@ int main(int argc, char *argv[])
startupHaskell(argc,argv,__stginit_ZCMain);
/* Register this thread as a task, so we can get timing stats about it */
#if defined(RTS_SUPPORTS_THREADS)
threadIsTask(osThreadId());
#endif
/* kick off the computation by creating the main thread with a pointer
to mainIO_closure representing the computation of the overall program;
then enter the scheduler with this thread and off we go;
......
......@@ -273,7 +273,6 @@ static void schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
// scheduler clearer.
//
static void schedulePreLoop(void);
static void scheduleHandleInterrupt(void);
static void scheduleStartSignalHandlers(void);
static void scheduleCheckBlockedThreads(void);
static void scheduleCheckBlackHoles(void);
......@@ -333,26 +332,25 @@ taskStart(void)
ACQUIRE_LOCK(&sched_mutex);
startingWorkerThread = rtsFalse;
schedule(NULL,NULL);
taskStop();
RELEASE_LOCK(&sched_mutex);
}
void
startSchedulerTaskIfNecessary(void)
{
if(run_queue_hd != END_TSO_QUEUE
|| blocked_queue_hd != END_TSO_QUEUE
|| sleeping_queue != END_TSO_QUEUE)
{
if(!startingWorkerThread)
{ // we don't want to start another worker thread
// just because the last one hasn't yet reached the
// "waiting for capability" state
startingWorkerThread = rtsTrue;
if (!startTask(taskStart)) {
startingWorkerThread = rtsFalse;
}
if ( !EMPTY_RUN_QUEUE()
&& !shutting_down_scheduler // not if we're shutting down
&& !startingWorkerThread )
{
// we don't want to start another worker thread
// just because the last one hasn't yet reached the
// "waiting for capability" state
startingWorkerThread = rtsTrue;
if (!maybeStartNewWorker(taskStart)) {
startingWorkerThread = rtsFalse;
}
}
}
}
#endif
......@@ -508,7 +506,26 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
stg_exit(1);
}
scheduleHandleInterrupt();
//
// Test for interruption. If interrupted==rtsTrue, then either
// we received a keyboard interrupt (^C), or the scheduler is
// trying to shut down all the tasks (shutting_down_scheduler) in
// the threaded RTS.
//
if (interrupted) {
if (shutting_down_scheduler) {
IF_DEBUG(scheduler, sched_belch("shutting down"));
releaseCapability(cap);
if (mainThread) {
mainThread->stat = Interrupted;
mainThread->ret = NULL;
}
return;
} else {
IF_DEBUG(scheduler, sched_belch("interrupted"));
deleteAllThreads();
}
}
#if defined(not_yet) && defined(SMP)
//
......@@ -791,33 +808,6 @@ schedulePreLoop(void)
#endif
}
/* ----------------------------------------------------------------------------
* Deal with the interrupt flag
* ASSUMES: sched_mutex
* ------------------------------------------------------------------------- */
static
void scheduleHandleInterrupt(void)
{
//
// Test for interruption. If interrupted==rtsTrue, then either
// we received a keyboard interrupt (^C), or the scheduler is
// trying to shut down all the tasks (shutting_down_scheduler) in
// the threaded RTS.
//
if (interrupted) {
if (shutting_down_scheduler) {
IF_DEBUG(scheduler, sched_belch("shutting down"));
#if defined(RTS_SUPPORTS_THREADS)
shutdownThread();
#endif
} else {
IF_DEBUG(scheduler, sched_belch("interrupted"));
deleteAllThreads();
}
}
}
/* ----------------------------------------------------------------------------
* Start any pending signal handlers
* ASSUMES: sched_mutex
......@@ -1476,7 +1466,7 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
IF_DEBUG(scheduler,
debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %d)\n",
debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
(long)t->id, whatNext_strs[t->what_next], blocks));
// don't do this if it would push us over the
......@@ -2620,8 +2610,12 @@ initScheduler(void)
initCapabilities();
#if defined(RTS_SUPPORTS_THREADS)
/* start our haskell execution tasks */
startTaskManager(0,taskStart);
initTaskManager();
#endif
#if defined(SMP)
/* eagerly start some extra workers */
startTasks(RtsFlags.ParFlags.nNodes, taskStart);
#endif
#if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
......@@ -2634,11 +2628,12 @@ initScheduler(void)
void
exitScheduler( void )
{
interrupted = rtsTrue;
shutting_down_scheduler = rtsTrue;
#if defined(RTS_SUPPORTS_THREADS)
stopTaskManager();
if (threadIsTask(osThreadId())) { taskStop(); }
stopTaskManager();
#endif
interrupted = rtsTrue;
shutting_down_scheduler = rtsTrue;
}
/* ----------------------------------------------------------------------------
......
......@@ -363,13 +363,13 @@ stat_startExit(void)
PROF_VAL(RPe_tot_time + HCe_tot_time) - InitElapsedStamp;
if (MutElapsedTime < 0) { MutElapsedTime = 0; } /* sometimes -0.00 */
/* for SMP, we don't know the mutator time yet, we have to inspect
/* for threads, we don't know the mutator time yet, we have to inspect
* all the running threads to find out, and they haven't stopped
* yet. So we just timestamp MutUserTime at this point so we can
* calculate the EXIT time. The real MutUserTime is calculated
* in stat_exit below.
*/
#ifdef SMP
#if defined(RTS_SUPPORTS_THREADS)
MutUserTime = CurrentUserTime;
#else
MutUserTime = CurrentUserTime - GC_tot_time - PROF_VAL(RP_tot_time + HC_tot_time) - InitUserTime;
......@@ -381,7 +381,7 @@ void
stat_endExit(void)
{
getTimes();
#ifdef SMP
#if defined(RTS_SUPPORTS_THREADS)
ExitUserTime = CurrentUserTime - MutUserTime;
#else
ExitUserTime = CurrentUserTime - MutUserTime - GC_tot_time - PROF_VAL(RP_tot_time + HC_tot_time) - InitUserTime;
......@@ -478,17 +478,13 @@ stat_endGC(lnat alloc, lnat collect, lnat live, lnat copied, lnat gen)
GC_tot_time += gc_time;
GCe_tot_time += gc_etime;
#if defined(SMP)
#if defined(RTS_SUPPORTS_THREADS)
{
nat i;
pthread_t me = pthread_self();
for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
if (me == taskTable[i].id) {
taskTable[i].gc_time += gc_time;
taskTable[i].gc_etime += gc_etime;
break;
}
TaskInfo *task_info = taskOfId(osThreadId());
if (task_info != NULL) {
task_info->gc_time += gc_time;
task_info->gc_etime += gc_etime;
}
}
#endif
......@@ -582,35 +578,16 @@ stat_endHeapCensus(void)
stats for this thread into the taskTable struct for that thread.
-------------------------------------------------------------------------- */
#if defined(SMP)
void
stat_workerStop(void)
{
nat i;
pthread_t me = pthread_self();
getTimes();
for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
if (taskTable[i].id == me) {
taskTable[i].mut_time = CurrentUserTime - taskTable[i].gc_time;
taskTable[i].mut_etime = CurrentElapsedTime
- GCe_tot_time
- taskTable[i].elapsedtimestart;
if (taskTable[i].mut_time < 0.0) { taskTable[i].mut_time = 0.0; }
if (taskTable[i].mut_etime < 0.0) { taskTable[i].mut_etime = 0.0; }
}
}
}
#endif
#if defined(SMP)
long int stat_getElapsedTime ()
stat_getTimes ( long *currentElapsedTime,
long *currentUserTime,
long *elapsedGCTime )
{
getTimes();
return CurrentElapsedTime;
*currentElapsedTime = CurrentElapsedTime;
*currentUserTime = CurrentUserTime;
*elapsedGCTime = GCe_tot_time;
}
#endif
/* -----------------------------------------------------------------------------
Called at the end of execution
......@@ -631,15 +608,10 @@ stat_exit(int alloc)
nat g, total_collections = 0;
getTimes();
time = CurrentUserTime;
etime = CurrentElapsedTime - ElapsedTimeStart;
GC_tot_alloc += alloc;
/* avoid divide by zero if time is measured as 0.00 seconds -- SDM */
if (time == 0.0) time = 1;
if (etime == 0.0) etime = 1;
/* Count total garbage collections */
for (g = 0; g < RtsFlags.GcFlags.generations; g++)
total_collections += generations[g].collections;
......@@ -647,17 +619,24 @@ stat_exit(int alloc)
/* For SMP, we have to get the user time from each thread
* and try to work out the total time.
*/
#ifdef SMP
{ nat i;
#if defined(RTS_SUPPORTS_THREADS)
{
nat i;
MutUserTime = 0.0;
for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
for (i = 0; i < taskCount; i++) {
MutUserTime += taskTable[i].mut_time;
}
}
time = MutUserTime + GC_tot_time + InitUserTime + ExitUserTime;
if (MutUserTime < 0) { MutUserTime = 0; }
#else
time = CurrentUserTime;
#endif
/* avoid divide by zero if time is measured as 0.00 seconds -- SDM */
if (time == 0.0) time = 1;
if (etime == 0.0) etime = 1;
if (RtsFlags.GcFlags.giveStats >= VERBOSE_GC_STATS) {
statsPrintf("%9ld %9.9s %9.9s", (lnat)alloc*sizeof(W_), "", "");
statsPrintf(" %5.2f %5.2f\n\n", 0.0, 0.0);
......@@ -690,17 +669,18 @@ stat_exit(int alloc)
statsPrintf("\n%11ld Mb total memory in use\n\n",
mblocks_allocated * MBLOCK_SIZE / (1024 * 1024));
#ifdef SMP
#if defined(RTS_SUPPORTS_THREADS)
{
nat i;
for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
statsPrintf(" Task %2d: MUT time: %6.2fs (%6.2fs elapsed)\n"
" GC time: %6.2fs (%6.2fs elapsed)\n\n",
i,
TICK_TO_DBL(taskTable[i].mut_time),
TICK_TO_DBL(taskTable[i].mut_etime),
TICK_TO_DBL(taskTable[i].gc_time),
TICK_TO_DBL(taskTable[i].gc_etime));
for (i = 0; i < taskCount; i++) {
statsPrintf(" Task %2d %-8s : MUT time: %6.2fs (%6.2fs elapsed)\n"
" GC time: %6.2fs (%6.2fs elapsed)\n\n",
i,
taskTable[i].is_worker ? "(worker)" : "(bound)",
TICK_TO_DBL(taskTable[i].mut_time),
TICK_TO_DBL(taskTable[i].mut_etime),
TICK_TO_DBL(taskTable[i].gc_time),
TICK_TO_DBL(taskTable[i].gc_etime));
}
}
#endif
......
......@@ -46,6 +46,6 @@ extern double mut_user_time_during_heap_census(void);
extern void statDescribeGens( void );
extern HsInt64 getAllocations( void );
#if defined(SMP)
extern long int stat_getElapsedTime ( void );
#endif
extern void stat_getTimes ( long *currentElapsedTime,
long *currentUserTime,
long *elapsedGCTime );
/* -----------------------------------------------------------------------------
*
* (c) The GHC Team 2001-
* (c) The GHC Team 2001-2005
*
* The task manager subsystem. Tasks execute STG code, with this
* module providing the API which the Scheduler uses to control their
* creation and destruction.
*
* Two kinds of RTS builds uses 'tasks' - the SMP and the
* 'native thread-friendly' builds.
*
* The SMP build lets multiple tasks concurrently execute STG code,
* all sharing vital internal RTS data structures in a controlled manner
* (see details elsewhere...ToDo: fill in ref!)
*
* The 'threads' build has at any one time only one task executing STG
* code, other tasks are either busy executing code outside the RTS
* (e.g., a C call) or waiting for their turn to (again) evaluate some
* STG code. A task relinquishes its RTS token when it is asked to
* evaluate an external (C) call.
*
* -------------------------------------------------------------------------*/
#include "Rts.h"
#if defined(RTS_SUPPORTS_THREADS) /* to the end */
#include "RtsUtils.h"
......@@ -28,188 +16,234 @@
#include "Stats.h"
#include "RtsFlags.h"
#include "Schedule.h"
#include "Hash.h"
#include "Capability.h"
#if HAVE_SIGNAL_H
#include <signal.h>
#endif
/* There's not all that much code that is shared between the
* SMP and threads version of the 'task manager.' A sign
* that the code ought to be structured differently..(Maybe ToDo).
*/
#define INIT_TASK_TABLE_SIZE 16
/*
* The following Task Manager-local variables are assumed to be
* accessed with the RTS lock in hand.
*/
#if defined(SMP)
TaskInfo* taskTable;
#endif
/* upper bound / the number of tasks created. */
static nat maxTasks;
/* number of tasks currently created */
static nat taskCount;
static nat awaitDeath;
static nat taskTableSize;
HashTable *taskHash; // maps OSThreadID to TaskInfo*
nat taskCount;
static nat tasksRunning;
static nat workerCount;
#define DEFAULT_MAX_WORKERS 64
nat maxWorkers; // we won't create more workers than this
#if defined(SMP)
void
startTaskManager( nat maxCount, void (*taskStart)(void) )
initTaskManager (void)
{
nat i;
static int initialized = 0;
if (!initialized) {
taskCount = 0;
maxTasks = maxCount;
/* allocate table holding task metadata */
static int initialized = 0;
if (!initialized) {
taskTableSize = INIT_TASK_TABLE_SIZE;
taskTable = stgMallocBytes( taskTableSize * sizeof(TaskInfo),
"initTaskManager");
taskCount = 0;
workerCount = 0;
tasksRunning = 0;
taskHash = allocHashTable();
if (maxCount > 0) {
taskTable = stgMallocBytes(maxCount * sizeof(TaskInfo),
"startTaskManager:tasks");
/* and eagerly create them all. */
for (i = 0; i < maxCount; i++) {
startTask(taskStart);
taskCount++;
}
maxWorkers = DEFAULT_MAX_WORKERS;
initialized = 1;
}
initialized = 1;
}
}
rtsBool
startTask ( void (*taskStart)(void) )
static void
expandTaskTable (void)
{
int r;
OSThreadId tid;
r = createOSThread(&tid,taskStart);
if (r != 0) {
barf("startTask: Can't create new task");
}
taskTable[taskCount].id = tid;
taskTable[taskCount].mut_time = 0.0;
taskTable[taskCount].mut_etime = 0.0;
taskTable[taskCount].gc_time = 0.0;
taskTable[taskCount].gc_etime = 0.0;
taskTable[taskCount].elapsedtimestart = stat_getElapsedTime();
IF_DEBUG(scheduler,debugBelch("scheduler: Started task: %ld\n",tid););
return rtsTrue;
taskTableSize *= 2;
taskTable = stgReallocBytes(taskTable, taskTableSize * sizeof(TaskInfo),
"expandTaskTable");
}
void
stopTaskManager (void)
{
nat i;
OSThreadId tid = osThreadId();
/* Don't want to use pthread_cancel, since we'd have to install
* these silly exception handlers (pthread_cleanup_{push,pop}) around
* all our locks.
*/
#if 0
/* Cancel all our tasks */
for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
pthread_cancel(taskTable[i].id);
}
/* Wait for all the tasks to terminate */
for (i = 0; i < maxCount; i++) {
IF_DEBUG(scheduler,debugBelch("scheduler: waiting for task %ld\n",
taskTable[i].id));
pthread_join(taskTable[i].id, NULL);
}
#endif
/* Send 'em all a SIGHUP. That should shut 'em up. */
awaitDeath = taskCount==0 ? 0 : taskCount-1;
for (i = 0; i < taskCount; i++) {
/* don't cancel the thread running this piece of code. */
if ( taskTable[i].id != tid ) {
pthread_kill(taskTable[i].id,SIGTERM);
nat i;
IF_DEBUG(scheduler, sched_belch("stopping task manager, %d tasks still running", tasksRunning));
for (i = 1000; i > 0; i--) {
if (tasksRunning == 0) {
IF_DEBUG(scheduler, sched_belch("all tasks stopped"));
return;
}
prodWorker();
yieldThread();
}
}
while (awaitDeath > 0) {
sched_yield();
}
return;
IF_DEBUG(scheduler, sched_belch("%d tasks still running, exiting anyway", tasksRunning));
/*
OLD CODE follows:
*/
#if old_code
/* Send 'em all a SIGHUP. That should shut 'em up. */
awaitDeath = taskCount==0 ? 0 : taskCount-1;
for (i = 0; i < taskCount; i++) {
/* don't cancel the thread running this piece of code. */
if ( taskTable[i].id != tid ) {
pthread_kill(taskTable[i].id,SIGTERM);
}
}
while (awaitDeath > 0) {
sched_yield();
}
#endif // old_code
}
void
resetTaskManagerAfterFork (void)
rtsBool
startTasks (nat num, void (*taskStart)(void))
{
barf("resetTaskManagerAfterFork not implemented for SMP");
nat i;
for (i = 0; i < num; i++) {
if (!startTask(taskStart)) {
return rtsFalse;
}
}
return rtsTrue;
}
#else
/************ THREADS version *****************/
void