Capability.c 17.1 KB
Newer Older
sof's avatar
sof committed
1
/* ---------------------------------------------------------------------------
2
 *
3
 * (c) The GHC Team, 2003-2006
sof's avatar
sof committed
4
5
6
 *
 * Capabilities
 *
sof's avatar
sof committed
7
8
 * A Capability represent the token required to execute STG code,
 * and all the state an OS thread/task needs to run Haskell code:
sof's avatar
sof committed
9
 * its STG registers, a pointer to its TSO, a nursery etc. During
sof's avatar
sof committed
10
 * STG execution, a pointer to the capabilitity is kept in a
11
 * register (BaseReg; actually it is a pointer to cap->r).
sof's avatar
sof committed
12
 *
sof's avatar
sof committed
13
14
15
 * Only in an SMP build will there be multiple capabilities, for
 * the threaded RTS and other non-threaded builds, there is only
 * one global capability, namely MainCapability.
16
 *
sof's avatar
sof committed
17
 * --------------------------------------------------------------------------*/
18

sof's avatar
sof committed
19
20
21
#include "PosixSource.h"
#include "Rts.h"
#include "RtsUtils.h"
22
#include "RtsFlags.h"
23
#include "STM.h"
sof's avatar
sof committed
24
#include "OSThreads.h"
sof's avatar
sof committed
25
#include "Capability.h"
26
#include "Schedule.h"
27
#include "Sparks.h"
sof's avatar
sof committed
28

sof's avatar
sof committed
29
#if !defined(SMP)
30
Capability MainCapability;     // for non-SMP, we have one global capability
sof's avatar
sof committed
31
#endif
sof's avatar
sof committed
32

33
nat n_capabilities;
34
Capability *capabilities = NULL;
sof's avatar
sof committed
35

36
37
38
39
40
// Holds the Capability which last became free.  This is used so that
// an in-call has a chance of quickly finding a free Capability.
// Maintaining a global free list of Capabilities would require global
// locking, so we don't do that.
Capability *last_free_capability;
41

42
#if defined(THREADED_RTS)
43
44
45
STATIC_INLINE rtsBool
globalWorkToDo (void)
{
46
47
48
49
    return blackholes_need_checking
	|| interrupted
	;
}
50
#endif
51

52
53
54
#if defined(THREADED_RTS)
STATIC_INLINE rtsBool
anyWorkForMe( Capability *cap, Task *task )
55
{
56
57
58
59
60
61
62
63
64
    // If the run queue is not empty, then we only wake up the guy who
    // can run the thread at the head, even if there is some other
    // reason for this task to run (eg. interrupted=rtsTrue).
    if (!emptyRunQueue(cap)) {
	if (cap->run_queue_hd->bound == NULL) {
	    return (task->tso == NULL);
	} else {
	    return (cap->run_queue_hd->bound == task);
	}
65
66
    } else if (task->tso == NULL && !emptySparkPoolCap(cap)) {
	return rtsTrue;
67
68
69
    }
    return globalWorkToDo();
}
70
#endif
71
72
73

/* -----------------------------------------------------------------------------
 * Manage the returning_tasks lists.
74
 *
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
 * These functions require cap->lock
 * -------------------------------------------------------------------------- */

#if defined(THREADED_RTS)
STATIC_INLINE void
newReturningTask (Capability *cap, Task *task)
{
    ASSERT_LOCK_HELD(&cap->lock);
    ASSERT(task->return_link == NULL);
    if (cap->returning_tasks_hd) {
	ASSERT(cap->returning_tasks_tl->return_link == NULL);
	cap->returning_tasks_tl->return_link = task;
    } else {
	cap->returning_tasks_hd = task;
    }
    cap->returning_tasks_tl = task;
91
92
}

93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
STATIC_INLINE Task *
popReturningTask (Capability *cap)
{
    ASSERT_LOCK_HELD(&cap->lock);
    Task *task;
    task = cap->returning_tasks_hd;
    ASSERT(task);
    cap->returning_tasks_hd = task->return_link;
    if (!cap->returning_tasks_hd) {
	cap->returning_tasks_tl = NULL;
    }
    task->return_link = NULL;
    return task;
}
#endif

109
/* ----------------------------------------------------------------------------
110
111
112
113
 * Initialisation
 *
 * The Capability is initially marked not free.
 * ------------------------------------------------------------------------- */
114
115

static void
116
initCapability( Capability *cap, nat i )
sof's avatar
sof committed
117
{
118
    nat g;
119

120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
    cap->no = i;
    cap->in_haskell        = rtsFalse;

    cap->run_queue_hd      = END_TSO_QUEUE;
    cap->run_queue_tl      = END_TSO_QUEUE;

#if defined(THREADED_RTS)
    initMutex(&cap->lock);
    cap->running_task      = NULL; // indicates cap is free
    cap->spare_workers     = NULL;
    cap->suspended_ccalling_tasks = NULL;
    cap->returning_tasks_hd = NULL;
    cap->returning_tasks_tl = NULL;
#endif

sof's avatar
sof committed
135
    cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
136
    cap->f.stgGCFun        = (F_)__stg_gc_fun;
137

138
    cap->mut_lists  = stgMallocBytes(sizeof(bdescr *) *
139
140
				     RtsFlags.GcFlags.generations,
				     "initCapability");
141
142
143

    for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
	cap->mut_lists[g] = NULL;
144
    }
145
146
147
148
149

    cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE;
    cap->free_trec_chunks = END_STM_CHUNK_LIST;
    cap->free_trec_headers = NO_TREC;
    cap->transaction_tokens = 0;
sof's avatar
sof committed
150
151
}

152
/* ---------------------------------------------------------------------------
sof's avatar
sof committed
153
154
155
156
 * Function:  initCapabilities()
 *
 * Purpose:   set up the Capability handling. For the SMP build,
 *            we keep a table of them, the size of which is
157
 *            controlled by the user via the RTS flag -N.
sof's avatar
sof committed
158
 *
159
 * ------------------------------------------------------------------------- */
sof's avatar
sof committed
160
void
161
initCapabilities( void )
sof's avatar
sof committed
162
163
{
#if defined(SMP)
164
165
    nat i,n;

Simon Marlow's avatar
Simon Marlow committed
166
167
168
169
170
171
172
173
#ifndef REG_BaseReg
    // We can't support multiple CPUs if BaseReg is not a register
    if (RtsFlags.ParFlags.nNodes > 1) {
	errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
	RtsFlags.ParFlags.nNodes = 1;
    }
#endif

174
    n_capabilities = n = RtsFlags.ParFlags.nNodes;
175
176
177
    capabilities = stgMallocBytes(n * sizeof(Capability), "initCapabilities");

    for (i = 0; i < n; i++) {
178
	initCapability(&capabilities[i], i);
179
    }
180

181
    IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", n));
sof's avatar
sof committed
182
#else
183
    n_capabilities = 1;
184
    capabilities = &MainCapability;
185
    initCapability(&MainCapability, 0);
186
187
#endif

188
189
190
191
    // There are no free capabilities to begin with.  We will start
    // a worker Task to each Capability, which will quickly put the
    // Capability on the free list when it finds nothing to do.
    last_free_capability = &capabilities[0];
sof's avatar
sof committed
192
193
}

194
/* ----------------------------------------------------------------------------
195
196
197
198
199
200
201
202
203
204
 * Give a Capability to a Task.  The task must currently be sleeping
 * on its condition variable.
 *
 * Requires cap->lock (modifies cap->running_task).
 *
 * When migrating a Task, the migrater must take task->lock before
 * modifying task->cap, to synchronise with the waking up Task.
 * Additionally, the migrater should own the Capability (when
 * migrating the run queue), or cap->lock (when migrating
 * returning_workers).
205
206
 *
 * ------------------------------------------------------------------------- */
207
208
209

#if defined(THREADED_RTS)
STATIC_INLINE void
210
giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
211
{
212
213
    ASSERT_LOCK_HELD(&cap->lock);
    ASSERT(task->cap == cap);
214
    IF_DEBUG(scheduler,
215
	     sched_belch("passing capability %d to %s %p",
216
			 cap->no, task->tso ? "bound task" : "worker",
217
218
219
220
221
222
223
224
			 (void *)task->id));
    ACQUIRE_LOCK(&task->lock);
    task->wakeup = rtsTrue;
    // the wakeup flag is needed because signalCondition() doesn't
    // flag the condition if the thread is already runniing, but we want
    // it to be sticky.
    signalCondition(&task->cond);
    RELEASE_LOCK(&task->lock);
225
}
226
#endif
227

228
/* ----------------------------------------------------------------------------
sof's avatar
sof committed
229
230
 * Function:  releaseCapability(Capability*)
 *
sof's avatar
sof committed
231
232
233
 * Purpose:   Letting go of a capability. Causes a
 *            'returning worker' thread or a 'waiting worker'
 *            to wake up, in that order.
234
235
 * ------------------------------------------------------------------------- */

236
#if defined(THREADED_RTS)
237
void
238
releaseCapability_ (Capability* cap)
239
{
240
241
242
243
    Task *task;

    task = cap->running_task;

244
    ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
245
246

    cap->running_task = NULL;
247

248
249
    // Check to see whether a worker thread can be given
    // the go-ahead to return the result of an external call..
250
251
252
253
    if (cap->returning_tasks_hd != NULL) {
	giveCapabilityToTask(cap,cap->returning_tasks_hd);
	// The Task pops itself from the queue (see waitForReturnCapability())
	return;
254
    }
255
256
257
258
259
260
261
262
263

    // If the next thread on the run queue is a bound thread,
    // give this Capability to the appropriate Task.
    if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
	// Make sure we're not about to try to wake ourselves up
	ASSERT(task != cap->run_queue_hd->bound);
	task = cap->run_queue_hd->bound;
	giveCapabilityToTask(cap,task);
	return;
264
    }
265
266
267

    // If we have an unbound thread on the run queue, or if there's
    // anything else to do, give the Capability to a worker thread.
268
    if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) {
269
270
271
272
	if (cap->spare_workers) {
	    giveCapabilityToTask(cap,cap->spare_workers);
	    // The worker Task pops itself from the queue;
	    return;
273
	}
274
275
276
277
278
279

	// Create a worker thread if we don't have one.  If the system
	// is interrupted, we only create a worker task if there
	// are threads that need to be completed.  If the system is
	// shutting down, we never create a new worker.
	if (!shutting_down_scheduler) {
280
	    IF_DEBUG(scheduler,
281
282
283
284
		     sched_belch("starting new worker on capability %d", cap->no));
	    startWorkerTask(cap, workerStart);
	    return;
	}
285
    }
286
287
288

    last_free_capability = cap;
    IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no));
sof's avatar
sof committed
289
290
}

291
void
292
releaseCapability (Capability* cap USED_IF_THREADS)
293
294
295
296
297
298
299
{
    ACQUIRE_LOCK(&cap->lock);
    releaseCapability_(cap);
    RELEASE_LOCK(&cap->lock);
}

static void
300
releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
{
    Task *task;

    ACQUIRE_LOCK(&cap->lock);

    task = cap->running_task;

    // If the current task is a worker, save it on the spare_workers
    // list of this Capability.  A worker can mark itself as stopped,
    // in which case it is not replaced on the spare_worker queue.
    // This happens when the system is shutting down (see
    // Schedule.c:workerStart()).
    // Also, be careful to check that this task hasn't just exited
    // Haskell to do a foreign call (task->suspended_tso).
    if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
	task->next = cap->spare_workers;
	cap->spare_workers = task;
    }
    // Bound tasks just float around attached to their TSOs.

    releaseCapability_(cap);

    RELEASE_LOCK(&cap->lock);
}
#endif
sof's avatar
sof committed
326

327
/* ----------------------------------------------------------------------------
328
 * waitForReturnCapability( Task *task )
sof's avatar
sof committed
329
330
 *
 * Purpose:  when an OS thread returns from an external call,
331
332
 * it calls waitForReturnCapability() (via Schedule.resumeThread())
 * to wait for permission to enter the RTS & communicate the
sof's avatar
sof committed
333
 * result of the external call back to the Haskell thread that
sof's avatar
sof committed
334
335
 * made it.
 *
336
 * ------------------------------------------------------------------------- */
sof's avatar
sof committed
337
void
338
waitForReturnCapability (Capability **pCap, Task *task)
sof's avatar
sof committed
339
{
340
#if !defined(THREADED_RTS)
341

342
343
344
    MainCapability.running_task = task;
    task->cap = &MainCapability;
    *pCap = &MainCapability;
345

346
#else
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
    Capability *cap = *pCap;

    if (cap == NULL) {
	// Try last_free_capability first
	cap = last_free_capability;
	if (!cap->running_task) {
	    nat i;
	    // otherwise, search for a free capability
	    for (i = 0; i < n_capabilities; i++) {
		cap = &capabilities[i];
		if (!cap->running_task) {
		    break;
		}
	    }
	    // Can't find a free one, use last_free_capability.
	    cap = last_free_capability;
	}

	// record the Capability as the one this Task is now assocated with.
	task->cap = cap;

368
    } else {
369
	ASSERT(task->cap == cap);
370
371
    }

372
    ACQUIRE_LOCK(&cap->lock);
sof's avatar
sof committed
373

374
    IF_DEBUG(scheduler,
375
	     sched_belch("returning; I want capability %d", cap->no));
sof's avatar
sof committed
376

377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
    if (!cap->running_task) {
	// It's free; just grab it
	cap->running_task = task;
	RELEASE_LOCK(&cap->lock);
    } else {
	newReturningTask(cap,task);
	RELEASE_LOCK(&cap->lock);

	for (;;) {
	    ACQUIRE_LOCK(&task->lock);
	    // task->lock held, cap->lock not held
	    if (!task->wakeup) waitCondition(&task->cond, &task->lock);
	    cap = task->cap;
	    task->wakeup = rtsFalse;
	    RELEASE_LOCK(&task->lock);

	    // now check whether we should wake up...
	    ACQUIRE_LOCK(&cap->lock);
	    if (cap->running_task == NULL) {
		if (cap->returning_tasks_hd != task) {
		    giveCapabilityToTask(cap,cap->returning_tasks_hd);
		    RELEASE_LOCK(&cap->lock);
		    continue;
		}
		cap->running_task = task;
		popReturningTask(cap);
		RELEASE_LOCK(&cap->lock);
		break;
	    }
	    RELEASE_LOCK(&cap->lock);
	}

    }

411
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
412

413
    IF_DEBUG(scheduler,
414
415
416
417
418
419
420
	     sched_belch("returning; got capability %d", cap->no));

    *pCap = cap;
#endif
}

#if defined(THREADED_RTS)
421
/* ----------------------------------------------------------------------------
422
 * yieldCapability
423
 * ------------------------------------------------------------------------- */
sof's avatar
sof committed
424

sof's avatar
sof committed
425
void
426
yieldCapability (Capability** pCap, Task *task)
sof's avatar
sof committed
427
{
428
429
    Capability *cap = *pCap;

430
    // The fast path has no locking, if we don't enter this while loop
431
432
433
434
435

    while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
	IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no));

	// We must now release the capability and wait to be woken up
436
	// again.
437
	task->wakeup = rtsFalse;
438
439
440
441
442
443
444
445
446
447
448
449
450
	releaseCapabilityAndQueueWorker(cap);

	for (;;) {
	    ACQUIRE_LOCK(&task->lock);
	    // task->lock held, cap->lock not held
	    if (!task->wakeup) waitCondition(&task->cond, &task->lock);
	    cap = task->cap;
	    task->wakeup = rtsFalse;
	    RELEASE_LOCK(&task->lock);

	    IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no));
	    ACQUIRE_LOCK(&cap->lock);
	    if (cap->running_task != NULL) {
451
		IF_DEBUG(scheduler, sched_belch("capability %d is owned by another task", cap->no));
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
		RELEASE_LOCK(&cap->lock);
		continue;
	    }

	    if (task->tso == NULL) {
		ASSERT(cap->spare_workers != NULL);
		// if we're not at the front of the queue, release it
		// again.  This is unlikely to happen.
		if (cap->spare_workers != task) {
		    giveCapabilityToTask(cap,cap->spare_workers);
		    RELEASE_LOCK(&cap->lock);
		    continue;
		}
		cap->spare_workers = task->next;
		task->next = NULL;
	    }
	    cap->running_task = task;
	    RELEASE_LOCK(&cap->lock);
	    break;
	}

	IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no));
	ASSERT(cap->running_task == task);
475
476
    }

477
    *pCap = cap;
478

479
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
480

481
    return;
sof's avatar
sof committed
482
483
}

484
/* ----------------------------------------------------------------------------
485
 * prodCapabilities
sof's avatar
sof committed
486
 *
487
488
489
 * Used to indicate that the interrupted flag is now set, or some
 * other global condition that might require waking up a Task on each
 * Capability.
490
491
 * ------------------------------------------------------------------------- */

492
493
494
495
496
497
static void
prodCapabilities(rtsBool all)
{
    nat i;
    Capability *cap;
    Task *task;
498

499
500
501
502
503
504
505
506
507
508
509
510
511
    for (i=0; i < n_capabilities; i++) {
	cap = &capabilities[i];
	ACQUIRE_LOCK(&cap->lock);
	if (!cap->running_task) {
	    if (cap->spare_workers) {
		task = cap->spare_workers;
		ASSERT(!task->stopped);
		giveCapabilityToTask(cap,task);
		if (!all) {
		    RELEASE_LOCK(&cap->lock);
		    return;
		}
	    }
512
	}
513
	RELEASE_LOCK(&cap->lock);
514
    }
sof's avatar
sof committed
515
}
516

517
518
519
520
521
void
prodAllCapabilities (void)
{
    prodCapabilities(rtsTrue);
}
sof's avatar
sof committed
522

523
/* ----------------------------------------------------------------------------
524
525
526
527
528
529
 * prodOneCapability
 *
 * Like prodAllCapabilities, but we only require a single Task to wake
 * up in order to service some global event, such as checking for
 * deadlock after some idle time has passed.
 * ------------------------------------------------------------------------- */
530

531
532
533
534
void
prodOneCapability (void)
{
    prodCapabilities(rtsFalse);
535
}
536
537
538
539
540
541
542
543
544
545
546
547
548

/* ----------------------------------------------------------------------------
 * shutdownCapability
 *
 * At shutdown time, we want to let everything exit as cleanly as
 * possible.  For each capability, we let its run queue drain, and
 * allow the workers to stop.
 *
 * This function should be called when interrupted and
 * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
 * will exit the scheduler and call taskStop(), and any bound thread
 * that wakes up will return to its caller.  Runnable threads are
 * killed.
549
 *
550
 * ------------------------------------------------------------------------- */
551
552

void
553
shutdownCapability (Capability *cap, Task *task)
554
{
555
556
557
558
559
560
561
562
563
564
565
566
567
568
    nat i;

    ASSERT(interrupted && shutting_down_scheduler);

    task->cap = cap;

    for (i = 0; i < 50; i++) {
	IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i));
	ACQUIRE_LOCK(&cap->lock);
	if (cap->running_task) {
	    RELEASE_LOCK(&cap->lock);
	    IF_DEBUG(scheduler, sched_belch("not owner, yielding"));
	    yieldThread();
	    continue;
569
	}
570
571
572
573
574
575
576
	cap->running_task = task;
	if (!emptyRunQueue(cap) || cap->spare_workers) {
	    IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding"));
	    releaseCapability_(cap); // this will wake up a worker
	    RELEASE_LOCK(&cap->lock);
	    yieldThread();
	    continue;
577
	}
578
579
580
	IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no));
	RELEASE_LOCK(&cap->lock);
	break;
581
    }
582
583
    // we now have the Capability, its run queue and spare workers
    // list are both empty.
584
}
585

586
587
588
589
/* ----------------------------------------------------------------------------
 * tryGrabCapability
 *
 * Attempt to gain control of a Capability if it is free.
590
 *
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
 * ------------------------------------------------------------------------- */

rtsBool
tryGrabCapability (Capability *cap, Task *task)
{
    if (cap->running_task != NULL) return rtsFalse;
    ACQUIRE_LOCK(&cap->lock);
    if (cap->running_task != NULL) {
	RELEASE_LOCK(&cap->lock);
	return rtsFalse;
    }
    task->cap = cap;
    cap->running_task = task;
    RELEASE_LOCK(&cap->lock);
    return rtsTrue;
}


609
#endif /* THREADED_RTS */
610
611