Task.c 10.3 KB
Newer Older
sof's avatar
sof committed
1 2
/* -----------------------------------------------------------------------------
 *
3
 * (c) The GHC Team 2001-2005
sof's avatar
sof committed
4 5 6 7 8 9
 *
 * The task manager subsystem.  Tasks execute STG code, with this
 * module providing the API which the Scheduler uses to control their
 * creation and destruction.
 * 
 * -------------------------------------------------------------------------*/
10

Simon Marlow's avatar
Simon Marlow committed
11
#include "PosixSource.h"
sof's avatar
sof committed
12
#include "Rts.h"
Simon Marlow's avatar
Simon Marlow committed
13

sof's avatar
sof committed
14 15
#include "RtsUtils.h"
#include "Task.h"
16
#include "Capability.h"
sof's avatar
sof committed
17
#include "Stats.h"
sof's avatar
sof committed
18
#include "Schedule.h"
19
#include "Hash.h"
Simon Marlow's avatar
Simon Marlow committed
20
#include "Trace.h"
sof's avatar
sof committed
21

22 23 24 25
#if HAVE_SIGNAL_H
#include <signal.h>
#endif

26
// Task lists and global counters.
27
// Locks required: all_tasks_mutex.
28 29
Task *all_tasks = NULL;
static nat taskCount;
30
static int tasksInitialized = 0;
31

32 33 34 35
static void   freeTask  (Task *task);
static Task * allocTask (void);
static Task * newTask   (rtsBool);

36 37 38 39
#if defined(THREADED_RTS)
static Mutex all_tasks_mutex;
#endif

40 41 42 43 44 45 46
/* -----------------------------------------------------------------------------
 * Remembering the current thread's Task
 * -------------------------------------------------------------------------- */

// A thread-local-storage key that we can use to get access to the
// current thread's Task structure.
#if defined(THREADED_RTS)
47 48 49
# if defined(MYTASK_USE_TLV)
__thread Task *my_task;
# else
50
ThreadLocalKey currentTaskKey;
51
# endif
52 53 54 55 56 57 58
#else
Task *my_task;
#endif

/* -----------------------------------------------------------------------------
 * Rest of the Task API
 * -------------------------------------------------------------------------- */
sof's avatar
sof committed
59 60

void
61
initTaskManager (void)
sof's avatar
sof committed
62
{
63
    if (!tasksInitialized) {
64
	taskCount = 0;
65
	tasksInitialized = 1;
66 67
#if defined(THREADED_RTS)
#if !defined(MYTASK_USE_TLV)
68
	newThreadLocalKey(&currentTaskKey);
69
#endif
70
        initMutex(&all_tasks_mutex);
71
#endif
sof's avatar
sof committed
72 73 74
    }
}

Simon Marlow's avatar
Simon Marlow committed
75
nat
76 77 78
freeTaskManager (void)
{
    Task *task, *next;
79
    nat tasksRunning = 0;
80

81
    ACQUIRE_LOCK(&all_tasks_mutex);
Simon Marlow's avatar
Simon Marlow committed
82

83
    for (task = all_tasks; task != NULL; task = next) {
84 85
        next = task->all_link;
        if (task->stopped) {
86 87 88
            freeTask(task);
        } else {
            tasksRunning++;
89
        }
90
    }
91 92 93 94

    debugTrace(DEBUG_sched, "freeing task manager, %d tasks still running",
               tasksRunning);

95
    all_tasks = NULL;
96 97 98

    RELEASE_LOCK(&all_tasks_mutex);

99
#if defined(THREADED_RTS) && !defined(MYTASK_USE_TLV)
100
    closeMutex(&all_tasks_mutex); 
101 102
    freeThreadLocalKey(&currentTaskKey);
#endif
Simon Marlow's avatar
Simon Marlow committed
103

104 105
    tasksInitialized = 0;

Simon Marlow's avatar
Simon Marlow committed
106
    return tasksRunning;
sof's avatar
sof committed
107
}
sof's avatar
sof committed
108

109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
static Task *
allocTask (void)
{
    Task *task;

    task = myTask();
    if (task != NULL) {
        return task;
    } else {
        task = newTask(rtsFalse);
#if defined(THREADED_RTS)
        task->id = osThreadId();
#endif
        setMyTask(task);
        return task;
    }
}

static void
freeTask (Task *task)
{
    InCall *incall, *next;

    // We only free resources if the Task is not in use.  A
    // Task may still be in use if we have a Haskell thread in
    // a foreign call while we are attempting to shut down the
    // RTS (see conc059).
#if defined(THREADED_RTS)
    closeCondition(&task->cond);
    closeMutex(&task->lock);
#endif

    for (incall = task->incall; incall != NULL; incall = next) {
        next = incall->prev_stack;
        stgFree(incall);
    }
    for (incall = task->spare_incalls; incall != NULL; incall = next) {
        next = incall->next;
        stgFree(incall);
    }

    stgFree(task);
}
152

153
static Task*
154
newTask (rtsBool worker)
sof's avatar
sof committed
155
{
156
#if defined(THREADED_RTS)
157
    Ticks currentElapsedTime, currentUserTime;
158 159
#endif
    Task *task;
160

161 162
#define ROUND_TO_CACHE_LINE(x) ((((x)+63) / 64) * 64)
    task = stgMallocBytes(ROUND_TO_CACHE_LINE(sizeof(Task)), "newTask");
163
    
164 165 166
    task->cap           = NULL;
    task->worker        = worker;
    task->stopped       = rtsFalse;
167
    task->running_finalizers = rtsFalse;
168 169 170
    task->n_spare_incalls = 0;
    task->spare_incalls = NULL;
    task->incall        = NULL;
171
    
172 173 174 175 176 177 178
#if defined(THREADED_RTS)
    initCondition(&task->cond);
    initMutex(&task->lock);
    task->wakeup = rtsFalse;
#endif

#if defined(THREADED_RTS)
179 180
    currentUserTime = getThreadCPUTime();
    currentElapsedTime = getProcessElapsedTime();
181 182 183 184
    task->mut_time = 0;
    task->mut_etime = 0;
    task->gc_time = 0;
    task->gc_etime = 0;
185 186 187 188 189
    task->muttimestart = currentUserTime;
    task->elapsedtimestart = currentElapsedTime;
#endif

    task->next = NULL;
190

191
    ACQUIRE_LOCK(&all_tasks_mutex);
192 193 194 195

    task->all_link = all_tasks;
    all_tasks = task;

196 197
    taskCount++;

198
    RELEASE_LOCK(&all_tasks_mutex);
199

200
    return task;
sof's avatar
sof committed
201 202
}

203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
// avoid the spare_incalls list growing unboundedly
#define MAX_SPARE_INCALLS 8

static void
newInCall (Task *task)
{
    InCall *incall;
    
    if (task->spare_incalls != NULL) {
        incall = task->spare_incalls;
        task->spare_incalls = incall->next;
        task->n_spare_incalls--;
    } else {
        incall = stgMallocBytes((sizeof(InCall)), "newBoundTask");
    }

    incall->tso = NULL;
    incall->task = task;
    incall->suspended_tso = NULL;
    incall->suspended_cap = NULL;
223 224
    incall->stat          = NoStatus;
    incall->ret           = NULL;
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
    incall->next = NULL;
    incall->prev = NULL;
    incall->prev_stack = task->incall;
    task->incall = incall;
}

static void
endInCall (Task *task)
{
    InCall *incall;

    incall = task->incall;
    incall->tso = NULL;
    task->incall = task->incall->prev_stack;

    if (task->n_spare_incalls >= MAX_SPARE_INCALLS) {
        stgFree(incall);
    } else {
        incall->next = task->spare_incalls;
        task->spare_incalls = incall;
        task->n_spare_incalls++;
    }
}


250 251
Task *
newBoundTask (void)
sof's avatar
sof committed
252
{
253 254
    Task *task;

255 256 257 258 259
    if (!tasksInitialized) {
        errorBelch("newBoundTask: RTS is not initialised; call hs_init() first");
        stg_exit(EXIT_FAILURE);
    }

260
    task = allocTask();
261

262
    task->stopped = rtsFalse;
263

264
    newInCall(task);
265

Simon Marlow's avatar
Simon Marlow committed
266
    debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
267
    return task;
sof's avatar
sof committed
268 269
}

270 271
void
boundTaskExiting (Task *task)
272
{
273 274 275 276 277
#if defined(THREADED_RTS)
    ASSERT(osThreadId() == task->id);
#endif
    ASSERT(myTask() == task);

278
    endInCall(task);
279

280 281 282 283 284 285 286 287
    // Set task->stopped, but only if this is the last call (#4850).
    // Remember that we might have a worker Task that makes a foreign
    // call and then a callback, so it can transform into a bound
    // Task for the duration of the callback.
    if (task->incall == NULL) {
        task->stopped = rtsTrue;
    }

Simon Marlow's avatar
Simon Marlow committed
288
    debugTrace(DEBUG_sched, "task exiting");
289
}
sof's avatar
sof committed
290

291

292 293 294 295 296 297
#ifdef THREADED_RTS
#define TASK_ID(t) (t)->id
#else
#define TASK_ID(t) (t)
#endif

298
void
299
discardTasksExcept (Task *keep)
300
{
301
    Task *task, *next;
302 303

    // Wipe the task list, except the current Task.
304
    ACQUIRE_LOCK(&all_tasks_mutex);
305 306
    for (task = all_tasks; task != NULL; task=next) {
        next = task->all_link;
307 308 309
        if (task != keep) {
            debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task));
            freeTask(task);
Simon Marlow's avatar
Simon Marlow committed
310
        }
311
    }
312
    all_tasks = keep;
313
    keep->all_link = NULL;
314
    RELEASE_LOCK(&all_tasks_mutex);
315
}
sof's avatar
sof committed
316

sof's avatar
sof committed
317
void
318
taskTimeStamp (Task *task USED_IF_THREADS)
sof's avatar
sof committed
319
{
320
#if defined(THREADED_RTS)
Simon Marlow's avatar
Simon Marlow committed
321
    Ticks currentElapsedTime, currentUserTime;
322

323 324
    currentUserTime = getThreadCPUTime();
    currentElapsedTime = getProcessElapsedTime();
325

Simon Marlow's avatar
Simon Marlow committed
326
    task->mut_time =
327 328
	currentUserTime - task->muttimestart - task->gc_time;
    task->mut_etime = 
Simon Marlow's avatar
Simon Marlow committed
329
        currentElapsedTime - task->elapsedtimestart - task->gc_etime;
330

Simon Marlow's avatar
Simon Marlow committed
331 332
    if (task->gc_time   < 0) { task->gc_time   = 0; }
    if (task->gc_etime  < 0) { task->gc_etime  = 0; }
333 334 335 336 337
    if (task->mut_time  < 0) { task->mut_time  = 0; }
    if (task->mut_etime < 0) { task->mut_etime = 0; }
#endif
}

Simon Marlow's avatar
Simon Marlow committed
338 339 340 341 342 343 344
void
taskDoneGC (Task *task, Ticks cpu_time, Ticks elapsed_time)
{
    task->gc_time  += cpu_time;
    task->gc_etime += elapsed_time;
}

Simon Marlow's avatar
Simon Marlow committed
345 346
#if defined(THREADED_RTS)

347 348 349
void
workerTaskStop (Task *task)
{
Ian Lynagh's avatar
Ian Lynagh committed
350 351
    DEBUG_ONLY( OSThreadId id );
    DEBUG_ONLY( id = osThreadId() );
352 353
    ASSERT(task->id == id);
    ASSERT(myTask() == task);
354

355
    task->cap = NULL;
356
    taskTimeStamp(task);
357
    task->stopped = rtsTrue;
sof's avatar
sof committed
358
}
359

Simon Marlow's avatar
Simon Marlow committed
360
#endif
sof's avatar
sof committed
361

362 363 364 365 366 367 368 369 370 371 372
#ifdef DEBUG

static void *taskId(Task *task)
{
#ifdef THREADED_RTS
    return (void *)task->id;
#else
    return (void *)task;
#endif
}

Ian Lynagh's avatar
Ian Lynagh committed
373 374
#endif

375 376
#if defined(THREADED_RTS)

377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
static void OSThreadProcAttr
workerStart(Task *task)
{
    Capability *cap;

    // See startWorkerTask().
    ACQUIRE_LOCK(&task->lock);
    cap = task->cap;
    RELEASE_LOCK(&task->lock);

    if (RtsFlags.ParFlags.setAffinity) {
        setThreadAffinity(cap->no, n_capabilities);
    }

    // set the thread-local pointer to the Task:
    setMyTask(task);

    newInCall(task);

    scheduleWorker(cap,task);
}

399
void
400
startWorkerTask (Capability *cap)
401
{
402 403 404 405 406
  int r;
  OSThreadId tid;
  Task *task;

  // A worker always gets a fresh Task structure.
407
  task = newTask(rtsTrue);
408 409 410 411 412 413 414 415 416 417 418 419 420 421

  // The lock here is to synchronise with taskStart(), to make sure
  // that we have finished setting up the Task structure before the
  // worker thread reads it.
  ACQUIRE_LOCK(&task->lock);

  task->cap = cap;

  // Give the capability directly to the worker; we can't let anyone
  // else get in, because the new worker Task has nowhere to go to
  // sleep so that it could be woken up again.
  ASSERT_LOCK_HELD(&cap->lock);
  cap->running_task = task;

422
  r = createOSThread(&tid, (OSThreadProc*)workerStart, task);
423
  if (r != 0) {
424 425
    sysErrorBelch("failed to create OS thread");
    stg_exit(EXIT_FAILURE);
426 427
  }

Simon Marlow's avatar
Simon Marlow committed
428
  debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
429 430 431 432 433

  task->id = tid;

  // ok, finished with the Task struct.
  RELEASE_LOCK(&task->lock);
434
}
sof's avatar
sof committed
435

436 437 438 439 440 441
void
interruptWorkerTask (Task *task)
{
  ASSERT(osThreadId() != task->id);    // seppuku not allowed
  ASSERT(task->incall->suspended_tso); // use this only for FFI calls
  interruptOSThread(task->id);
442
  debugTrace(DEBUG_sched, "interrupted worker task %p", taskId(task));
443 444
}

445 446
#endif /* THREADED_RTS */

Ian Lynagh's avatar
Ian Lynagh committed
447 448
#ifdef DEBUG

449 450 451 452 453 454 455 456 457 458 459 460
void printAllTasks(void);

void
printAllTasks(void)
{
    Task *task;
    for (task = all_tasks; task != NULL; task = task->all_link) {
	debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive");
	if (!task->stopped) {
	    if (task->cap) {
		debugBelch("on capability %d, ", task->cap->no);
	    }
461 462 463
	    if (task->incall->tso) {
	      debugBelch("bound to thread %lu",
                         (unsigned long)task->incall->tso->id);
464 465 466 467 468 469 470 471 472 473
	    } else {
		debugBelch("worker");
	    }
	}
	debugBelch("\n");
    }
}		       

#endif