Task.c 6.84 KB
Newer Older
sof's avatar
sof committed
1
2
/* -----------------------------------------------------------------------------
 *
3
 * (c) The GHC Team 2001-2005
sof's avatar
sof committed
4
5
6
7
8
9
 *
 * The task manager subsystem.  Tasks execute STG code, with this
 * module providing the API which the Scheduler uses to control their
 * creation and destruction.
 * 
 * -------------------------------------------------------------------------*/
10

sof's avatar
sof committed
11
12
13
14
#include "Rts.h"
#include "RtsUtils.h"
#include "OSThreads.h"
#include "Task.h"
15
#include "Capability.h"
sof's avatar
sof committed
16
17
#include "Stats.h"
#include "RtsFlags.h"
sof's avatar
sof committed
18
#include "Schedule.h"
19
#include "Hash.h"
Simon Marlow's avatar
Simon Marlow committed
20
#include "Trace.h"
sof's avatar
sof committed
21

22
23
24
25
#if HAVE_SIGNAL_H
#include <signal.h>
#endif

26
27
28
29
30
// Task lists and global counters.
// Locks required: sched_mutex.
Task *all_tasks = NULL;
static Task *task_free_list = NULL; // singly-linked
static nat taskCount;
31
32
33
static nat tasksRunning;
static nat workerCount;

34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/* -----------------------------------------------------------------------------
 * Remembering the current thread's Task
 * -------------------------------------------------------------------------- */

// A thread-local-storage key that we can use to get access to the
// current thread's Task structure.
#if defined(THREADED_RTS)
ThreadLocalKey currentTaskKey;
#else
Task *my_task;
#endif

/* -----------------------------------------------------------------------------
 * Rest of the Task API
 * -------------------------------------------------------------------------- */
sof's avatar
sof committed
49
50

void
51
initTaskManager (void)
sof's avatar
sof committed
52
{
53
54
55
56
57
58
59
    static int initialized = 0;

    if (!initialized) {
	taskCount = 0;
	workerCount = 0;
	tasksRunning = 0;
	initialized = 1;
60
61
62
#if defined(THREADED_RTS)
	newThreadLocalKey(&currentTaskKey);
#endif
sof's avatar
sof committed
63
64
65
66
67
    }
}


void
68
stopTaskManager (void)
sof's avatar
sof committed
69
{
70
71
    Task *task, *next;

Simon Marlow's avatar
Simon Marlow committed
72
73
74
    debugTrace(DEBUG_sched, 
	       "stopping task manager, %d tasks still running",
	       tasksRunning);
75
76
77
78
79
80
81
82

    ACQUIRE_LOCK(&sched_mutex);
    for (task = task_free_list; task != NULL; next) {
        next = task->next;
        stgFree(task);
    }
    task_free_list = NULL;
    RELEASE_LOCK(&sched_mutex);
sof's avatar
sof committed
83
}
sof's avatar
sof committed
84

85

86
87
static Task*
newTask (void)
sof's avatar
sof committed
88
{
89
#if defined(THREADED_RTS)
90
    Ticks currentElapsedTime, currentUserTime;
91
92
#endif
    Task *task;
93

94
    task = stgMallocBytes(sizeof(Task), "newTask");
95
    
96
97
98
99
100
101
    task->cap  = NULL;
    task->stopped = rtsFalse;
    task->suspended_tso = NULL;
    task->tso  = NULL;
    task->stat = NoStatus;
    task->ret  = NULL;
102
    
103
104
105
106
107
108
109
#if defined(THREADED_RTS)
    initCondition(&task->cond);
    initMutex(&task->lock);
    task->wakeup = rtsFalse;
#endif

#if defined(THREADED_RTS)
110
111
    currentUserTime = getThreadCPUTime();
    currentElapsedTime = getProcessElapsedTime();
112
113
114
115
    task->mut_time = 0;
    task->mut_etime = 0;
    task->gc_time = 0;
    task->gc_etime = 0;
116
117
118
119
120
121
122
123
124
125
126
    task->muttimestart = currentUserTime;
    task->elapsedtimestart = currentElapsedTime;
#endif

    task->prev = NULL;
    task->next = NULL;
    task->return_link = NULL;

    task->all_link = all_tasks;
    all_tasks = task;

127
128
129
    taskCount++;
    workerCount++;

130
    return task;
sof's avatar
sof committed
131
132
}

133
134
Task *
newBoundTask (void)
sof's avatar
sof committed
135
{
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
    Task *task;

    ASSERT_LOCK_HELD(&sched_mutex);
    if (task_free_list == NULL) {
	task = newTask();
    } else {
	task = task_free_list;
	task_free_list = task->next;
	task->next = NULL;
	task->prev = NULL;
	task->stopped = rtsFalse;
    }
#if defined(THREADED_RTS)
    task->id = osThreadId();
#endif
    ASSERT(task->cap == NULL);
sof's avatar
sof committed
152

153
154
155
156
    tasksRunning++;

    taskEnter(task);

Simon Marlow's avatar
Simon Marlow committed
157
    debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
158
    return task;
sof's avatar
sof committed
159
160
}

161
162
void
boundTaskExiting (Task *task)
163
{
164
165
166
167
168
169
170
171
    task->stopped = rtsTrue;
    task->cap = NULL;

#if defined(THREADED_RTS)
    ASSERT(osThreadId() == task->id);
#endif
    ASSERT(myTask() == task);
    setMyTask(task->prev_stack);
172

173
174
175
176
177
178
179
180
    tasksRunning--;

    // sadly, we need a lock around the free task list. Todo: eliminate.
    ACQUIRE_LOCK(&sched_mutex);
    task->next = task_free_list;
    task_free_list = task;
    RELEASE_LOCK(&sched_mutex);

Simon Marlow's avatar
Simon Marlow committed
181
    debugTrace(DEBUG_sched, "task exiting");
182
}
sof's avatar
sof committed
183

184
185
186
187
188
189
#ifdef THREADED_RTS
#define TASK_ID(t) (t)->id
#else
#define TASK_ID(t) (t)
#endif

190
191
void
discardTask (Task *task)
192
{
193
    ASSERT_LOCK_HELD(&sched_mutex);
194
    if (!task->stopped) {
195
	debugTrace(DEBUG_sched, "discarding task %ld", TASK_ID(task));
196
197
198
199
200
201
202
	task->cap = NULL;
	task->tso = NULL;
	task->stopped = rtsTrue;
	tasksRunning--;
	task->next = task_free_list;
	task_free_list = task;
    }
203
}
sof's avatar
sof committed
204

sof's avatar
sof committed
205
void
206
taskTimeStamp (Task *task USED_IF_THREADS)
sof's avatar
sof committed
207
{
208
#if defined(THREADED_RTS)
209
    Ticks currentElapsedTime, currentUserTime, elapsedGCTime;
210

211
212
    currentUserTime = getThreadCPUTime();
    currentElapsedTime = getProcessElapsedTime();
213
214
215
216

    // XXX this is wrong; we want elapsed GC time since the
    // Task started.
    elapsedGCTime = stat_getElapsedGCTime();
217
    
218
219
220
221
    task->mut_time = 
	currentUserTime - task->muttimestart - task->gc_time;
    task->mut_etime = 
	currentElapsedTime - task->elapsedtimestart - elapsedGCTime;
222

223
224
225
226
227
228
229
230
231
232
233
234
235
    if (task->mut_time  < 0) { task->mut_time  = 0; }
    if (task->mut_etime < 0) { task->mut_etime = 0; }
#endif
}

void
workerTaskStop (Task *task)
{
#if defined(THREADED_RTS)
    OSThreadId id;
    id = osThreadId();
    ASSERT(task->id == id);
    ASSERT(myTask() == task);
236
#endif
237

238
    taskTimeStamp(task);
239
    task->stopped = rtsTrue;
240
    tasksRunning--;
sof's avatar
sof committed
241
}
242
243

void
244
resetTaskManagerAfterFork (void)
245
{
246
    // TODO!
247
    taskCount = 0;
248
}
sof's avatar
sof committed
249

250
251
252
253
254
#if defined(THREADED_RTS)

void
startWorkerTask (Capability *cap, 
		 void OSThreadProcAttr (*taskStart)(Task *task))
255
{
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
  int r;
  OSThreadId tid;
  Task *task;

  workerCount++;

  // A worker always gets a fresh Task structure.
  task = newTask();

  tasksRunning++;

  // The lock here is to synchronise with taskStart(), to make sure
  // that we have finished setting up the Task structure before the
  // worker thread reads it.
  ACQUIRE_LOCK(&task->lock);

  task->cap = cap;

  // Give the capability directly to the worker; we can't let anyone
  // else get in, because the new worker Task has nowhere to go to
  // sleep so that it could be woken up again.
  ASSERT_LOCK_HELD(&cap->lock);
  cap->running_task = task;

  r = createOSThread(&tid, (OSThreadProc *)taskStart, task);
  if (r != 0) {
    barf("startTask: Can't create new task");
  }

Simon Marlow's avatar
Simon Marlow committed
285
  debugTrace(DEBUG_sched, "new worker task (taskCount: %d)", taskCount);
286
287
288
289
290

  task->id = tid;

  // ok, finished with the Task struct.
  RELEASE_LOCK(&task->lock);
291
}
sof's avatar
sof committed
292

293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
#endif /* THREADED_RTS */

#ifdef DEBUG

static void *taskId(Task *task)
{
#ifdef THREADED_RTS
    return (void *)task->id;
#else
    return (void *)task;
#endif
}

void printAllTasks(void);

void
printAllTasks(void)
{
    Task *task;
    for (task = all_tasks; task != NULL; task = task->all_link) {
	debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive");
	if (!task->stopped) {
	    if (task->cap) {
		debugBelch("on capability %d, ", task->cap->no);
	    }
	    if (task->tso) {
		debugBelch("bound to thread %d", task->tso->id);
	    } else {
		debugBelch("worker");
	    }
	}
	debugBelch("\n");
    }
}		       

#endif