Task.c 7.14 KB
Newer Older
sof's avatar
sof committed
1
2
/* -----------------------------------------------------------------------------
 *
3
 * (c) The GHC Team 2001-2005
sof's avatar
sof committed
4
5
6
7
8
9
 *
 * The task manager subsystem.  Tasks execute STG code, with this
 * module providing the API which the Scheduler uses to control their
 * creation and destruction.
 * 
 * -------------------------------------------------------------------------*/
10

sof's avatar
sof committed
11
12
13
14
#include "Rts.h"
#include "RtsUtils.h"
#include "OSThreads.h"
#include "Task.h"
15
#include "Capability.h"
sof's avatar
sof committed
16
17
#include "Stats.h"
#include "RtsFlags.h"
18
#include "Storage.h"
sof's avatar
sof committed
19
#include "Schedule.h"
20
#include "Hash.h"
Simon Marlow's avatar
Simon Marlow committed
21
#include "Trace.h"
sof's avatar
sof committed
22

23
24
25
26
#if HAVE_SIGNAL_H
#include <signal.h>
#endif

27
28
29
30
31
// Task lists and global counters.
// Locks required: sched_mutex.
Task *all_tasks = NULL;
static Task *task_free_list = NULL; // singly-linked
static nat taskCount;
32
33
34
static nat tasksRunning;
static nat workerCount;

35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/* -----------------------------------------------------------------------------
 * Remembering the current thread's Task
 * -------------------------------------------------------------------------- */

// A thread-local-storage key that we can use to get access to the
// current thread's Task structure.
#if defined(THREADED_RTS)
ThreadLocalKey currentTaskKey;
#else
Task *my_task;
#endif

/* -----------------------------------------------------------------------------
 * Rest of the Task API
 * -------------------------------------------------------------------------- */
sof's avatar
sof committed
50
51

void
52
initTaskManager (void)
sof's avatar
sof committed
53
{
54
55
56
57
58
59
60
    static int initialized = 0;

    if (!initialized) {
	taskCount = 0;
	workerCount = 0;
	tasksRunning = 0;
	initialized = 1;
61
62
63
#if defined(THREADED_RTS)
	newThreadLocalKey(&currentTaskKey);
#endif
sof's avatar
sof committed
64
65
66
67
68
    }
}


void
69
stopTaskManager (void)
sof's avatar
sof committed
70
{
Simon Marlow's avatar
Simon Marlow committed
71
72
73
    debugTrace(DEBUG_sched, 
	       "stopping task manager, %d tasks still running",
	       tasksRunning);
74
    /* nothing to do */
75
76
77
78
79
80
81
82
83
84
85
86
}


void
freeTaskManager (void)
{
    Task *task, *next;

    debugTrace(DEBUG_sched, "freeing task manager");

    ACQUIRE_LOCK(&sched_mutex);
    for (task = task_free_list; task != NULL; task = next) {
87
88
89
90
#if defined(THREADED_RTS)
        closeCondition(&task->cond);
        closeMutex(&task->lock);
#endif
91
        next = task->next;
92
        stgFree(task);
93
94
95
    }
    task_free_list = NULL;
    RELEASE_LOCK(&sched_mutex);
sof's avatar
sof committed
96
}
sof's avatar
sof committed
97

98

99
100
static Task*
newTask (void)
sof's avatar
sof committed
101
{
102
#if defined(THREADED_RTS)
103
    Ticks currentElapsedTime, currentUserTime;
104
105
#endif
    Task *task;
106

107
    task = stgMallocBytes(sizeof(Task), "newTask");
108
    
109
110
111
112
113
114
    task->cap  = NULL;
    task->stopped = rtsFalse;
    task->suspended_tso = NULL;
    task->tso  = NULL;
    task->stat = NoStatus;
    task->ret  = NULL;
115
    
116
117
118
119
120
121
122
#if defined(THREADED_RTS)
    initCondition(&task->cond);
    initMutex(&task->lock);
    task->wakeup = rtsFalse;
#endif

#if defined(THREADED_RTS)
123
124
    currentUserTime = getThreadCPUTime();
    currentElapsedTime = getProcessElapsedTime();
125
126
127
128
    task->mut_time = 0;
    task->mut_etime = 0;
    task->gc_time = 0;
    task->gc_etime = 0;
129
130
131
132
133
134
135
136
137
138
139
    task->muttimestart = currentUserTime;
    task->elapsedtimestart = currentElapsedTime;
#endif

    task->prev = NULL;
    task->next = NULL;
    task->return_link = NULL;

    task->all_link = all_tasks;
    all_tasks = task;

140
141
142
    taskCount++;
    workerCount++;

143
    return task;
sof's avatar
sof committed
144
145
}

146
147
Task *
newBoundTask (void)
sof's avatar
sof committed
148
{
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
    Task *task;

    ASSERT_LOCK_HELD(&sched_mutex);
    if (task_free_list == NULL) {
	task = newTask();
    } else {
	task = task_free_list;
	task_free_list = task->next;
	task->next = NULL;
	task->prev = NULL;
	task->stopped = rtsFalse;
    }
#if defined(THREADED_RTS)
    task->id = osThreadId();
#endif
    ASSERT(task->cap == NULL);
sof's avatar
sof committed
165

166
167
168
169
    tasksRunning++;

    taskEnter(task);

Simon Marlow's avatar
Simon Marlow committed
170
    debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
171
    return task;
sof's avatar
sof committed
172
173
}

174
175
void
boundTaskExiting (Task *task)
176
{
177
178
179
180
181
182
183
184
    task->stopped = rtsTrue;
    task->cap = NULL;

#if defined(THREADED_RTS)
    ASSERT(osThreadId() == task->id);
#endif
    ASSERT(myTask() == task);
    setMyTask(task->prev_stack);
185

186
187
188
189
190
191
192
193
    tasksRunning--;

    // sadly, we need a lock around the free task list. Todo: eliminate.
    ACQUIRE_LOCK(&sched_mutex);
    task->next = task_free_list;
    task_free_list = task;
    RELEASE_LOCK(&sched_mutex);

Simon Marlow's avatar
Simon Marlow committed
194
    debugTrace(DEBUG_sched, "task exiting");
195
}
sof's avatar
sof committed
196

197
198
199
200
201
202
#ifdef THREADED_RTS
#define TASK_ID(t) (t)->id
#else
#define TASK_ID(t) (t)
#endif

203
204
void
discardTask (Task *task)
205
{
206
    ASSERT_LOCK_HELD(&sched_mutex);
207
    if (!task->stopped) {
208
	debugTrace(DEBUG_sched, "discarding task %ld", (long)TASK_ID(task));
209
210
211
212
213
214
215
	task->cap = NULL;
	task->tso = NULL;
	task->stopped = rtsTrue;
	tasksRunning--;
	task->next = task_free_list;
	task_free_list = task;
    }
216
}
sof's avatar
sof committed
217

sof's avatar
sof committed
218
void
219
taskTimeStamp (Task *task USED_IF_THREADS)
sof's avatar
sof committed
220
{
221
#if defined(THREADED_RTS)
222
    Ticks currentElapsedTime, currentUserTime, elapsedGCTime;
223

224
225
    currentUserTime = getThreadCPUTime();
    currentElapsedTime = getProcessElapsedTime();
226
227
228
229

    // XXX this is wrong; we want elapsed GC time since the
    // Task started.
    elapsedGCTime = stat_getElapsedGCTime();
230
    
231
232
233
234
    task->mut_time = 
	currentUserTime - task->muttimestart - task->gc_time;
    task->mut_etime = 
	currentElapsedTime - task->elapsedtimestart - elapsedGCTime;
235

236
237
238
239
240
241
242
243
244
245
246
247
248
    if (task->mut_time  < 0) { task->mut_time  = 0; }
    if (task->mut_etime < 0) { task->mut_etime = 0; }
#endif
}

void
workerTaskStop (Task *task)
{
#if defined(THREADED_RTS)
    OSThreadId id;
    id = osThreadId();
    ASSERT(task->id == id);
    ASSERT(myTask() == task);
249
#endif
250

251
    taskTimeStamp(task);
252
    task->stopped = rtsTrue;
253
    tasksRunning--;
sof's avatar
sof committed
254
}
255
256

void
257
resetTaskManagerAfterFork (void)
258
{
259
    // TODO!
260
    taskCount = 0;
261
}
sof's avatar
sof committed
262

263
264
265
266
267
#if defined(THREADED_RTS)

void
startWorkerTask (Capability *cap, 
		 void OSThreadProcAttr (*taskStart)(Task *task))
268
{
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
  int r;
  OSThreadId tid;
  Task *task;

  workerCount++;

  // A worker always gets a fresh Task structure.
  task = newTask();

  tasksRunning++;

  // The lock here is to synchronise with taskStart(), to make sure
  // that we have finished setting up the Task structure before the
  // worker thread reads it.
  ACQUIRE_LOCK(&task->lock);

  task->cap = cap;

  // Give the capability directly to the worker; we can't let anyone
  // else get in, because the new worker Task has nowhere to go to
  // sleep so that it could be woken up again.
  ASSERT_LOCK_HELD(&cap->lock);
  cap->running_task = task;

  r = createOSThread(&tid, (OSThreadProc *)taskStart, task);
  if (r != 0) {
295
296
    sysErrorBelch("failed to create OS thread");
    stg_exit(EXIT_FAILURE);
297
298
  }

Simon Marlow's avatar
Simon Marlow committed
299
  debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
300
301
302
303
304

  task->id = tid;

  // ok, finished with the Task struct.
  RELEASE_LOCK(&task->lock);
305
}
sof's avatar
sof committed
306

307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
#endif /* THREADED_RTS */

#ifdef DEBUG

static void *taskId(Task *task)
{
#ifdef THREADED_RTS
    return (void *)task->id;
#else
    return (void *)task;
#endif
}

void printAllTasks(void);

void
printAllTasks(void)
{
    Task *task;
    for (task = all_tasks; task != NULL; task = task->all_link) {
	debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive");
	if (!task->stopped) {
	    if (task->cap) {
		debugBelch("on capability %d, ", task->cap->no);
	    }
	    if (task->tso) {
333
	      debugBelch("bound to thread %lu", (unsigned long)task->tso->id);
334
335
336
337
338
339
340
341
342
343
	    } else {
		debugBelch("worker");
	    }
	}
	debugBelch("\n");
    }
}		       

#endif