Task.c 6.68 KB
Newer Older
sof's avatar
sof committed
1
2
/* -----------------------------------------------------------------------------
 *
3
 * (c) The GHC Team 2001-2005
sof's avatar
sof committed
4
5
6
7
8
9
 *
 * The task manager subsystem.  Tasks execute STG code, with this
 * module providing the API which the Scheduler uses to control their
 * creation and destruction.
 * 
 * -------------------------------------------------------------------------*/
10

sof's avatar
sof committed
11
12
13
14
#include "Rts.h"
#include "RtsUtils.h"
#include "OSThreads.h"
#include "Task.h"
15
#include "Capability.h"
sof's avatar
sof committed
16
17
#include "Stats.h"
#include "RtsFlags.h"
sof's avatar
sof committed
18
#include "Schedule.h"
19
#include "Hash.h"
sof's avatar
sof committed
20

21
22
23
24
#if HAVE_SIGNAL_H
#include <signal.h>
#endif

25
26
27
28
29
30
31
// Task lists and global counters.
// Locks required: sched_mutex.
Task *all_tasks = NULL;
static Task *task_free_list = NULL; // singly-linked
static nat taskCount;
#define DEFAULT_MAX_WORKERS 64
static nat maxWorkers; // we won't create more workers than this
32
33
34
static nat tasksRunning;
static nat workerCount;

35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/* -----------------------------------------------------------------------------
 * Remembering the current thread's Task
 * -------------------------------------------------------------------------- */

// A thread-local-storage key that we can use to get access to the
// current thread's Task structure.
#if defined(THREADED_RTS)
ThreadLocalKey currentTaskKey;
#else
Task *my_task;
#endif

/* -----------------------------------------------------------------------------
 * Rest of the Task API
 * -------------------------------------------------------------------------- */
sof's avatar
sof committed
50
51

void
52
initTaskManager (void)
sof's avatar
sof committed
53
{
54
55
56
57
58
59
60
61
    static int initialized = 0;

    if (!initialized) {
	taskCount = 0;
	workerCount = 0;
	tasksRunning = 0;
	maxWorkers = DEFAULT_MAX_WORKERS;
	initialized = 1;
62
63
64
#if defined(THREADED_RTS)
	newThreadLocalKey(&currentTaskKey);
#endif
sof's avatar
sof committed
65
66
67
68
69
    }
}


void
70
stopTaskManager (void)
sof's avatar
sof committed
71
{
72
    IF_DEBUG(scheduler, sched_belch("stopping task manager, %d tasks still running", tasksRunning));
sof's avatar
sof committed
73
}
sof's avatar
sof committed
74

75

76
77
static Task*
newTask (void)
sof's avatar
sof committed
78
{
79
#if defined(THREADED_RTS)
80
    Ticks currentElapsedTime, currentUserTime;
81
82
#endif
    Task *task;
83

84
    task = stgMallocBytes(sizeof(Task), "newTask");
85
    
86
87
88
89
90
91
    task->cap  = NULL;
    task->stopped = rtsFalse;
    task->suspended_tso = NULL;
    task->tso  = NULL;
    task->stat = NoStatus;
    task->ret  = NULL;
92
    
93
94
95
96
97
98
99
#if defined(THREADED_RTS)
    initCondition(&task->cond);
    initMutex(&task->lock);
    task->wakeup = rtsFalse;
#endif

#if defined(THREADED_RTS)
100
101
    currentUserTime = getThreadCPUTime();
    currentElapsedTime = getProcessElapsedTime();
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
    task->mut_time = 0.0;
    task->mut_etime = 0.0;
    task->gc_time = 0.0;
    task->gc_etime = 0.0;
    task->muttimestart = currentUserTime;
    task->elapsedtimestart = currentElapsedTime;
#endif

    task->prev = NULL;
    task->next = NULL;
    task->return_link = NULL;

    task->all_link = all_tasks;
    all_tasks = task;

117
118
119
    taskCount++;
    workerCount++;

120
    return task;
sof's avatar
sof committed
121
122
}

123
124
Task *
newBoundTask (void)
sof's avatar
sof committed
125
{
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
    Task *task;

    ASSERT_LOCK_HELD(&sched_mutex);
    if (task_free_list == NULL) {
	task = newTask();
    } else {
	task = task_free_list;
	task_free_list = task->next;
	task->next = NULL;
	task->prev = NULL;
	task->stopped = rtsFalse;
    }
#if defined(THREADED_RTS)
    task->id = osThreadId();
#endif
    ASSERT(task->cap == NULL);
sof's avatar
sof committed
142

143
144
145
146
147
148
    tasksRunning++;

    taskEnter(task);

    IF_DEBUG(scheduler,sched_belch("new task (taskCount: %d)", taskCount););
    return task;
sof's avatar
sof committed
149
150
}

151
152
void
boundTaskExiting (Task *task)
153
{
154
155
156
157
158
159
160
161
    task->stopped = rtsTrue;
    task->cap = NULL;

#if defined(THREADED_RTS)
    ASSERT(osThreadId() == task->id);
#endif
    ASSERT(myTask() == task);
    setMyTask(task->prev_stack);
162

163
164
165
166
167
168
169
170
171
    tasksRunning--;

    // sadly, we need a lock around the free task list. Todo: eliminate.
    ACQUIRE_LOCK(&sched_mutex);
    task->next = task_free_list;
    task_free_list = task;
    RELEASE_LOCK(&sched_mutex);

    IF_DEBUG(scheduler,sched_belch("task exiting"));
172
}
sof's avatar
sof committed
173

174
175
void
discardTask (Task *task)
176
{
177
    ASSERT_LOCK_HELD(&sched_mutex);
178
179
180
181
182
183
184
185
186
    if (!task->stopped) {
	IF_DEBUG(scheduler,sched_belch("discarding task %p",(void *)task->id));
	task->cap = NULL;
	task->tso = NULL;
	task->stopped = rtsTrue;
	tasksRunning--;
	task->next = task_free_list;
	task_free_list = task;
    }
187
}
sof's avatar
sof committed
188

sof's avatar
sof committed
189
void
190
taskStop (Task *task)
sof's avatar
sof committed
191
{
192
#if defined(THREADED_RTS)
193
    OSThreadId id;
194
    Ticks currentElapsedTime, currentUserTime, elapsedGCTime;
195
196

    id = osThreadId();
197
198
    ASSERT(task->id == id);
    ASSERT(myTask() == task);
199

200
201
    currentUserTime = getThreadCPUTime();
    currentElapsedTime = getProcessElapsedTime();
202
203
204
205

    // XXX this is wrong; we want elapsed GC time since the
    // Task started.
    elapsedGCTime = stat_getElapsedGCTime();
206
    
207
208
209
210
    task->mut_time = 
	currentUserTime - task->muttimestart - task->gc_time;
    task->mut_etime = 
	currentElapsedTime - task->elapsedtimestart - elapsedGCTime;
211

212
213
214
    if (task->mut_time < 0.0)  { task->mut_time = 0.0;  }
    if (task->mut_etime < 0.0) { task->mut_etime = 0.0; }
#endif
215

216
    task->stopped = rtsTrue;
217
    tasksRunning--;
sof's avatar
sof committed
218
}
219
220

void
221
resetTaskManagerAfterFork (void)
222
{
223
#warning TODO!
224
    taskCount = 0;
225
}
sof's avatar
sof committed
226

227
228
229
230
231
#if defined(THREADED_RTS)

void
startWorkerTask (Capability *cap, 
		 void OSThreadProcAttr (*taskStart)(Task *task))
232
{
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
  int r;
  OSThreadId tid;
  Task *task;

  if (workerCount >= maxWorkers) {
      barf("too many workers; runaway worker creation?");
  }
  workerCount++;

  // A worker always gets a fresh Task structure.
  task = newTask();

  tasksRunning++;

  // The lock here is to synchronise with taskStart(), to make sure
  // that we have finished setting up the Task structure before the
  // worker thread reads it.
  ACQUIRE_LOCK(&task->lock);

  task->cap = cap;

  // Give the capability directly to the worker; we can't let anyone
  // else get in, because the new worker Task has nowhere to go to
  // sleep so that it could be woken up again.
  ASSERT_LOCK_HELD(&cap->lock);
  cap->running_task = task;

  r = createOSThread(&tid, (OSThreadProc *)taskStart, task);
  if (r != 0) {
    barf("startTask: Can't create new task");
  }

  IF_DEBUG(scheduler,sched_belch("new worker task (taskCount: %d)", taskCount););

  task->id = tid;

  // ok, finished with the Task struct.
  RELEASE_LOCK(&task->lock);
271
}
sof's avatar
sof committed
272

273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
#endif /* THREADED_RTS */

#ifdef DEBUG

static void *taskId(Task *task)
{
#ifdef THREADED_RTS
    return (void *)task->id;
#else
    return (void *)task;
#endif
}

void printAllTasks(void);

void
printAllTasks(void)
{
    Task *task;
    for (task = all_tasks; task != NULL; task = task->all_link) {
	debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive");
	if (!task->stopped) {
	    if (task->cap) {
		debugBelch("on capability %d, ", task->cap->no);
	    }
	    if (task->tso) {
		debugBelch("bound to thread %d", task->tso->id);
	    } else {
		debugBelch("worker");
	    }
	}
	debugBelch("\n");
    }
}		       

#endif