Capability.c 26.9 KB
Newer Older
sof's avatar
sof committed
1
/* ---------------------------------------------------------------------------
2
 *
3
 * (c) The GHC Team, 2003-2006
sof's avatar
sof committed
4
5
6
 *
 * Capabilities
 *
sof's avatar
sof committed
7
8
 * A Capability represent the token required to execute STG code,
 * and all the state an OS thread/task needs to run Haskell code:
sof's avatar
sof committed
9
 * its STG registers, a pointer to its TSO, a nursery etc. During
sof's avatar
sof committed
10
 * STG execution, a pointer to the capabilitity is kept in a
11
 * register (BaseReg; actually it is a pointer to cap->r).
sof's avatar
sof committed
12
 *
13
14
15
 * Only in an THREADED_RTS build will there be multiple capabilities,
 * for non-threaded builds there is only one global capability, namely
 * MainCapability.
16
 *
sof's avatar
sof committed
17
 * --------------------------------------------------------------------------*/
18

sof's avatar
sof committed
19
20
#include "PosixSource.h"
#include "Rts.h"
Simon Marlow's avatar
Simon Marlow committed
21

sof's avatar
sof committed
22
#include "Capability.h"
23
#include "Schedule.h"
24
#include "Sparks.h"
Simon Marlow's avatar
Simon Marlow committed
25
#include "Trace.h"
Simon Marlow's avatar
Simon Marlow committed
26
27
28
#include "sm/GC.h" // for gcWorkerThread()
#include "STM.h"
#include "RtsUtils.h"
sof's avatar
sof committed
29

30
31
32
// one global capability, this is the Capability for non-threaded
// builds, and for +RTS -N1
Capability MainCapability;
sof's avatar
sof committed
33

Simon Marlow's avatar
Simon Marlow committed
34
nat n_capabilities = 0;
35
Capability *capabilities = NULL;
sof's avatar
sof committed
36

37
38
39
40
// Holds the Capability which last became free.  This is used so that
// an in-call has a chance of quickly finding a free Capability.
// Maintaining a global free list of Capabilities would require global
// locking, so we don't do that.
Simon Marlow's avatar
Simon Marlow committed
41
Capability *last_free_capability = NULL;
42

43
44
45
/* GC indicator, in scope for the scheduler, init'ed to false */
volatile StgWord waiting_for_gc = 0;

46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/* Let foreign code get the current Capability -- assuming there is one!
 * This is useful for unsafe foreign calls because they are called with
 * the current Capability held, but they are not passed it. For example,
 * see see the integer-gmp package which calls allocateLocal() in its
 * stgAllocForGMP() function (which gets called by gmp functions).
 * */
Capability * rts_unsafeGetMyCapability (void)
{
#if defined(THREADED_RTS)
  return myTask()->cap;
#else
  return &MainCapability;
#endif
}

61
#if defined(THREADED_RTS)
62
63
64
STATIC_INLINE rtsBool
globalWorkToDo (void)
{
65
    return blackholes_need_checking
66
	|| sched_state >= SCHED_INTERRUPTING
67
68
	;
}
69
#endif
70

71
#if defined(THREADED_RTS)
72
StgClosure *
73
findSpark (Capability *cap)
74
{
75
76
  Capability *robbed;
  StgClosurePtr spark;
77
  rtsBool retry;
78
79
  nat i = 0;

80
81
82
83
84
85
86
  if (!emptyRunQueue(cap)) {
      // If there are other threads, don't try to run any new
      // sparks: sparks might be speculative, we don't want to take
      // resources away from the main computation.
      return 0;
  }

87
88
89
90
91
92
93
94
95
  // first try to get a spark from our own pool.
  // We should be using reclaimSpark(), because it works without
  // needing any atomic instructions:
  //   spark = reclaimSpark(cap->sparks);
  // However, measurements show that this makes at least one benchmark
  // slower (prsa) and doesn't affect the others.
  spark = tryStealSpark(cap);
  if (spark != NULL) {
      cap->sparks_converted++;
96
97
98
99

      // Post event for running a spark from capability's own pool.
      postEvent(cap, EVENT_RUN_SPARK, cap->r.rCurrentTSO->id, 0);

100
101
102
103
104
      return spark;
  }

  if (n_capabilities == 1) { return NULL; } // makes no sense...

105
106
107
108
  debugTrace(DEBUG_sched,
	     "cap %d: Trying to steal work from other capabilities", 
	     cap->no);

109
110
  do {
      retry = rtsFalse;
111

112
113
114
115
116
117
      /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
      start at a random place instead of 0 as well.  */
      for ( i=0 ; i < n_capabilities ; i++ ) {
          robbed = &capabilities[i];
          if (cap == robbed)  // ourselves...
              continue;
118

119
120
121
          if (emptySparkPoolCap(robbed)) // nothing to steal here
              continue;

122
          spark = tryStealSpark(robbed);
123
124
125
126
127
128
129
130
          if (spark == NULL && !emptySparkPoolCap(robbed)) {
              // we conflicted with another thread while trying to steal;
              // try again later.
              retry = rtsTrue;
          }

          if (spark != NULL) {
              debugTrace(DEBUG_sched,
131
		 "cap %d: Stole a spark from capability %d",
132
                         cap->no, robbed->no);
133
              cap->sparks_converted++;
134
135
136
137
138

              postEvent(cap, EVENT_STEAL_SPARK, 
                        cap->r.rCurrentTSO->id, robbed->no);
                        
              
139
              return spark;
140
141
142
143
          }
          // otherwise: no success, try next one
      }
  } while (retry);
144

145
  debugTrace(DEBUG_sched, "No sparks stolen");
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
  return NULL;
}

// Returns True if any spark pool is non-empty at this moment in time
// The result is only valid for an instant, of course, so in a sense
// is immediately invalid, and should not be relied upon for
// correctness.
rtsBool
anySparks (void)
{
    nat i;

    for (i=0; i < n_capabilities; i++) {
        if (!emptySparkPoolCap(&capabilities[i])) {
            return rtsTrue;
        }
    }
    return rtsFalse;
164
}
165
#endif
166
167
168

/* -----------------------------------------------------------------------------
 * Manage the returning_tasks lists.
169
 *
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
 * These functions require cap->lock
 * -------------------------------------------------------------------------- */

#if defined(THREADED_RTS)
STATIC_INLINE void
newReturningTask (Capability *cap, Task *task)
{
    ASSERT_LOCK_HELD(&cap->lock);
    ASSERT(task->return_link == NULL);
    if (cap->returning_tasks_hd) {
	ASSERT(cap->returning_tasks_tl->return_link == NULL);
	cap->returning_tasks_tl->return_link = task;
    } else {
	cap->returning_tasks_hd = task;
    }
    cap->returning_tasks_tl = task;
186
187
}

188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
STATIC_INLINE Task *
popReturningTask (Capability *cap)
{
    ASSERT_LOCK_HELD(&cap->lock);
    Task *task;
    task = cap->returning_tasks_hd;
    ASSERT(task);
    cap->returning_tasks_hd = task->return_link;
    if (!cap->returning_tasks_hd) {
	cap->returning_tasks_tl = NULL;
    }
    task->return_link = NULL;
    return task;
}
#endif

204
/* ----------------------------------------------------------------------------
205
206
207
208
 * Initialisation
 *
 * The Capability is initially marked not free.
 * ------------------------------------------------------------------------- */
209
210

static void
211
initCapability( Capability *cap, nat i )
sof's avatar
sof committed
212
{
213
    nat g;
214

215
216
    cap->no = i;
    cap->in_haskell        = rtsFalse;
217
    cap->in_gc             = rtsFalse;
218
219
220
221
222
223
224
225
226
227
228

    cap->run_queue_hd      = END_TSO_QUEUE;
    cap->run_queue_tl      = END_TSO_QUEUE;

#if defined(THREADED_RTS)
    initMutex(&cap->lock);
    cap->running_task      = NULL; // indicates cap is free
    cap->spare_workers     = NULL;
    cap->suspended_ccalling_tasks = NULL;
    cap->returning_tasks_hd = NULL;
    cap->returning_tasks_tl = NULL;
229
230
    cap->wakeup_queue_hd    = END_TSO_QUEUE;
    cap->wakeup_queue_tl    = END_TSO_QUEUE;
231
232
233
    cap->sparks_created     = 0;
    cap->sparks_converted   = 0;
    cap->sparks_pruned      = 0;
234
235
#endif

236
    cap->f.stgEagerBlackholeInfo = (W_)&__stg_EAGER_BLACKHOLE_info;
Simon Marlow's avatar
Simon Marlow committed
237
238
    cap->f.stgGCEnter1     = (StgFunPtr)__stg_gc_enter_1;
    cap->f.stgGCFun        = (StgFunPtr)__stg_gc_fun;
239

240
    cap->mut_lists  = stgMallocBytes(sizeof(bdescr *) *
241
242
				     RtsFlags.GcFlags.generations,
				     "initCapability");
243
244
245
    cap->saved_mut_lists = stgMallocBytes(sizeof(bdescr *) *
                                          RtsFlags.GcFlags.generations,
                                          "initCapability");
246
247
248

    for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
	cap->mut_lists[g] = NULL;
249
    }
250

tharris@microsoft.com's avatar
tharris@microsoft.com committed
251
252
    cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE;
    cap->free_invariant_check_queues = END_INVARIANT_CHECK_QUEUE;
253
254
255
    cap->free_trec_chunks = END_STM_CHUNK_LIST;
    cap->free_trec_headers = NO_TREC;
    cap->transaction_tokens = 0;
256
    cap->context_switch = 0;
sof's avatar
sof committed
257
258
}

259
/* ---------------------------------------------------------------------------
sof's avatar
sof committed
260
261
 * Function:  initCapabilities()
 *
262
 * Purpose:   set up the Capability handling. For the THREADED_RTS build,
sof's avatar
sof committed
263
 *            we keep a table of them, the size of which is
264
 *            controlled by the user via the RTS flag -N.
sof's avatar
sof committed
265
 *
266
 * ------------------------------------------------------------------------- */
sof's avatar
sof committed
267
void
268
initCapabilities( void )
sof's avatar
sof committed
269
{
270
271
#if defined(THREADED_RTS)
    nat i;
272

273
#ifndef REG_Base
Simon Marlow's avatar
Simon Marlow committed
274
275
276
277
278
279
280
    // We can't support multiple CPUs if BaseReg is not a register
    if (RtsFlags.ParFlags.nNodes > 1) {
	errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
	RtsFlags.ParFlags.nNodes = 1;
    }
#endif

281
282
283
284
285
286
287
288
289
290
291
    n_capabilities = RtsFlags.ParFlags.nNodes;

    if (n_capabilities == 1) {
	capabilities = &MainCapability;
	// THREADED_RTS must work on builds that don't have a mutable
	// BaseReg (eg. unregisterised), so in this case
	// capabilities[0] must coincide with &MainCapability.
    } else {
	capabilities = stgMallocBytes(n_capabilities * sizeof(Capability),
				      "initCapabilities");
    }
292

293
    for (i = 0; i < n_capabilities; i++) {
294
	initCapability(&capabilities[i], i);
295
    }
296

Simon Marlow's avatar
Simon Marlow committed
297
    debugTrace(DEBUG_sched, "allocated %d capabilities", n_capabilities);
298
299
300

#else /* !THREADED_RTS */

301
    n_capabilities = 1;
302
    capabilities = &MainCapability;
303
    initCapability(&MainCapability, 0);
304

305
306
#endif

307
308
309
310
    // There are no free capabilities to begin with.  We will start
    // a worker Task to each Capability, which will quickly put the
    // Capability on the free list when it finds nothing to do.
    last_free_capability = &capabilities[0];
sof's avatar
sof committed
311
312
}

313
314
315
316
317
318
319
/* ----------------------------------------------------------------------------
 * setContextSwitches: cause all capabilities to context switch as
 * soon as possible.
 * ------------------------------------------------------------------------- */

void setContextSwitches(void)
{
320
321
322
323
    nat i;
    for (i=0; i < n_capabilities; i++) {
        contextSwitchCapability(&capabilities[i]);
    }
324
325
}

326
/* ----------------------------------------------------------------------------
327
328
329
330
331
332
333
334
335
336
 * Give a Capability to a Task.  The task must currently be sleeping
 * on its condition variable.
 *
 * Requires cap->lock (modifies cap->running_task).
 *
 * When migrating a Task, the migrater must take task->lock before
 * modifying task->cap, to synchronise with the waking up Task.
 * Additionally, the migrater should own the Capability (when
 * migrating the run queue), or cap->lock (when migrating
 * returning_workers).
337
338
 *
 * ------------------------------------------------------------------------- */
339
340
341

#if defined(THREADED_RTS)
STATIC_INLINE void
342
giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
343
{
344
345
    ASSERT_LOCK_HELD(&cap->lock);
    ASSERT(task->cap == cap);
Simon Marlow's avatar
Simon Marlow committed
346
347
348
    debugTrace(DEBUG_sched, "passing capability %d to %s %p",
               cap->no, task->tso ? "bound task" : "worker",
               (void *)task->id);
349
350
351
352
353
354
355
    ACQUIRE_LOCK(&task->lock);
    task->wakeup = rtsTrue;
    // the wakeup flag is needed because signalCondition() doesn't
    // flag the condition if the thread is already runniing, but we want
    // it to be sticky.
    signalCondition(&task->cond);
    RELEASE_LOCK(&task->lock);
356
}
357
#endif
358

359
/* ----------------------------------------------------------------------------
sof's avatar
sof committed
360
361
 * Function:  releaseCapability(Capability*)
 *
sof's avatar
sof committed
362
363
364
 * Purpose:   Letting go of a capability. Causes a
 *            'returning worker' thread or a 'waiting worker'
 *            to wake up, in that order.
365
366
 * ------------------------------------------------------------------------- */

367
#if defined(THREADED_RTS)
368
void
369
370
releaseCapability_ (Capability* cap, 
                    rtsBool always_wakeup)
371
{
372
373
374
375
    Task *task;

    task = cap->running_task;

376
    ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
377
378

    cap->running_task = NULL;
379

380
381
    // Check to see whether a worker thread can be given
    // the go-ahead to return the result of an external call..
382
383
384
385
    if (cap->returning_tasks_hd != NULL) {
	giveCapabilityToTask(cap,cap->returning_tasks_hd);
	// The Task pops itself from the queue (see waitForReturnCapability())
	return;
386
    }
387

388
    if (waiting_for_gc == PENDING_GC_SEQ) {
389
      last_free_capability = cap; // needed?
Simon Marlow's avatar
Simon Marlow committed
390
      debugTrace(DEBUG_sched, "GC pending, set capability %d free", cap->no);
391
392
393
394
      return;
    } 


395
396
397
398
399
400
401
402
    // If the next thread on the run queue is a bound thread,
    // give this Capability to the appropriate Task.
    if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
	// Make sure we're not about to try to wake ourselves up
	ASSERT(task != cap->run_queue_hd->bound);
	task = cap->run_queue_hd->bound;
	giveCapabilityToTask(cap,task);
	return;
403
    }
404

405
    if (!cap->spare_workers) {
406
407
408
409
	// Create a worker thread if we don't have one.  If the system
	// is interrupted, we only create a worker task if there
	// are threads that need to be completed.  If the system is
	// shutting down, we never create a new worker.
410
	if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
Simon Marlow's avatar
Simon Marlow committed
411
412
	    debugTrace(DEBUG_sched,
		       "starting new worker on capability %d", cap->no);
413
414
415
	    startWorkerTask(cap, workerStart);
	    return;
	}
416
    }
417

418
419
    // If we have an unbound thread on the run queue, or if there's
    // anything else to do, give the Capability to a worker thread.
420
421
422
    if (always_wakeup || 
        !emptyRunQueue(cap) || !emptyWakeupQueue(cap) ||
        !emptySparkPoolCap(cap) || globalWorkToDo()) {
423
424
425
426
427
428
429
	if (cap->spare_workers) {
	    giveCapabilityToTask(cap,cap->spare_workers);
	    // The worker Task pops itself from the queue;
	    return;
	}
    }

430
    last_free_capability = cap;
Simon Marlow's avatar
Simon Marlow committed
431
    debugTrace(DEBUG_sched, "freeing capability %d", cap->no);
sof's avatar
sof committed
432
433
}

434
void
435
releaseCapability (Capability* cap USED_IF_THREADS)
436
437
{
    ACQUIRE_LOCK(&cap->lock);
438
439
440
441
442
443
444
445
446
    releaseCapability_(cap, rtsFalse);
    RELEASE_LOCK(&cap->lock);
}

void
releaseAndWakeupCapability (Capability* cap USED_IF_THREADS)
{
    ACQUIRE_LOCK(&cap->lock);
    releaseCapability_(cap, rtsTrue);
447
448
449
450
    RELEASE_LOCK(&cap->lock);
}

static void
451
releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
{
    Task *task;

    ACQUIRE_LOCK(&cap->lock);

    task = cap->running_task;

    // If the current task is a worker, save it on the spare_workers
    // list of this Capability.  A worker can mark itself as stopped,
    // in which case it is not replaced on the spare_worker queue.
    // This happens when the system is shutting down (see
    // Schedule.c:workerStart()).
    // Also, be careful to check that this task hasn't just exited
    // Haskell to do a foreign call (task->suspended_tso).
    if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
	task->next = cap->spare_workers;
	cap->spare_workers = task;
    }
    // Bound tasks just float around attached to their TSOs.

472
    releaseCapability_(cap,rtsFalse);
473
474
475
476

    RELEASE_LOCK(&cap->lock);
}
#endif
sof's avatar
sof committed
477

478
/* ----------------------------------------------------------------------------
479
 * waitForReturnCapability( Task *task )
sof's avatar
sof committed
480
481
 *
 * Purpose:  when an OS thread returns from an external call,
482
483
 * it calls waitForReturnCapability() (via Schedule.resumeThread())
 * to wait for permission to enter the RTS & communicate the
sof's avatar
sof committed
484
 * result of the external call back to the Haskell thread that
sof's avatar
sof committed
485
486
 * made it.
 *
487
 * ------------------------------------------------------------------------- */
sof's avatar
sof committed
488
void
489
waitForReturnCapability (Capability **pCap, Task *task)
sof's avatar
sof committed
490
{
491
#if !defined(THREADED_RTS)
492

493
494
495
    MainCapability.running_task = task;
    task->cap = &MainCapability;
    *pCap = &MainCapability;
496

497
#else
498
499
500
501
502
    Capability *cap = *pCap;

    if (cap == NULL) {
	// Try last_free_capability first
	cap = last_free_capability;
503
	if (cap->running_task) {
504
505
	    nat i;
	    // otherwise, search for a free capability
506
            cap = NULL;
507
	    for (i = 0; i < n_capabilities; i++) {
508
509
		if (!capabilities[i].running_task) {
                    cap = &capabilities[i];
510
511
512
		    break;
		}
	    }
513
514
515
516
            if (cap == NULL) {
                // Can't find a free one, use last_free_capability.
                cap = last_free_capability;
            }
517
518
519
520
521
	}

	// record the Capability as the one this Task is now assocated with.
	task->cap = cap;

522
    } else {
523
	ASSERT(task->cap == cap);
524
525
    }

526
    ACQUIRE_LOCK(&cap->lock);
sof's avatar
sof committed
527

Simon Marlow's avatar
Simon Marlow committed
528
    debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no);
sof's avatar
sof committed
529

530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
    if (!cap->running_task) {
	// It's free; just grab it
	cap->running_task = task;
	RELEASE_LOCK(&cap->lock);
    } else {
	newReturningTask(cap,task);
	RELEASE_LOCK(&cap->lock);

	for (;;) {
	    ACQUIRE_LOCK(&task->lock);
	    // task->lock held, cap->lock not held
	    if (!task->wakeup) waitCondition(&task->cond, &task->lock);
	    cap = task->cap;
	    task->wakeup = rtsFalse;
	    RELEASE_LOCK(&task->lock);

	    // now check whether we should wake up...
	    ACQUIRE_LOCK(&cap->lock);
	    if (cap->running_task == NULL) {
		if (cap->returning_tasks_hd != task) {
		    giveCapabilityToTask(cap,cap->returning_tasks_hd);
		    RELEASE_LOCK(&cap->lock);
		    continue;
		}
		cap->running_task = task;
		popReturningTask(cap);
		RELEASE_LOCK(&cap->lock);
		break;
	    }
	    RELEASE_LOCK(&cap->lock);
	}

    }

564
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
565

Simon Marlow's avatar
Simon Marlow committed
566
    debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
567
568
569
570
571
572

    *pCap = cap;
#endif
}

#if defined(THREADED_RTS)
573
/* ----------------------------------------------------------------------------
574
 * yieldCapability
575
 * ------------------------------------------------------------------------- */
sof's avatar
sof committed
576

sof's avatar
sof committed
577
void
578
yieldCapability (Capability** pCap, Task *task)
sof's avatar
sof committed
579
{
580
581
    Capability *cap = *pCap;

582
583
    if (waiting_for_gc == PENDING_GC_PAR) {
	debugTrace(DEBUG_sched, "capability %d: becoming a GC thread", cap->no);
Simon Marlow's avatar
Simon Marlow committed
584
        postEvent(cap, EVENT_GC_START, 0, 0);
585
        gcWorkerThread(cap);
Simon Marlow's avatar
Simon Marlow committed
586
        postEvent(cap, EVENT_GC_END, 0, 0);
587
588
589
        return;
    }

Simon Marlow's avatar
Simon Marlow committed
590
	debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
591
592

	// We must now release the capability and wait to be woken up
593
	// again.
594
	task->wakeup = rtsFalse;
595
596
597
598
599
600
601
602
603
604
	releaseCapabilityAndQueueWorker(cap);

	for (;;) {
	    ACQUIRE_LOCK(&task->lock);
	    // task->lock held, cap->lock not held
	    if (!task->wakeup) waitCondition(&task->cond, &task->lock);
	    cap = task->cap;
	    task->wakeup = rtsFalse;
	    RELEASE_LOCK(&task->lock);

Simon Marlow's avatar
Simon Marlow committed
605
606
	    debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);

607
608
	    ACQUIRE_LOCK(&cap->lock);
	    if (cap->running_task != NULL) {
Simon Marlow's avatar
Simon Marlow committed
609
610
		debugTrace(DEBUG_sched, 
			   "capability %d is owned by another task", cap->no);
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
		RELEASE_LOCK(&cap->lock);
		continue;
	    }

	    if (task->tso == NULL) {
		ASSERT(cap->spare_workers != NULL);
		// if we're not at the front of the queue, release it
		// again.  This is unlikely to happen.
		if (cap->spare_workers != task) {
		    giveCapabilityToTask(cap,cap->spare_workers);
		    RELEASE_LOCK(&cap->lock);
		    continue;
		}
		cap->spare_workers = task->next;
		task->next = NULL;
	    }
	    cap->running_task = task;
	    RELEASE_LOCK(&cap->lock);
	    break;
	}

Simon Marlow's avatar
Simon Marlow committed
632
	debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
633
	ASSERT(cap->running_task == task);
634

635
    *pCap = cap;
636

637
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
638

639
    return;
sof's avatar
sof committed
640
641
}

642
643
644
645
646
647
648
649
/* ----------------------------------------------------------------------------
 * Wake up a thread on a Capability.
 *
 * This is used when the current Task is running on a Capability and
 * wishes to wake up a thread on a different Capability.
 * ------------------------------------------------------------------------- */

void
650
651
652
wakeupThreadOnCapability (Capability *my_cap, 
                          Capability *other_cap, 
                          StgTSO *tso)
653
{
654
    ACQUIRE_LOCK(&other_cap->lock);
655

656
657
658
659
660
661
662
663
    // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
    if (tso->bound) {
	ASSERT(tso->bound->cap == tso->cap);
    	tso->bound->cap = other_cap;
    }
    tso->cap = other_cap;

    ASSERT(tso->bound ? tso->bound->cap == other_cap : 1);
664

665
    if (other_cap->running_task == NULL) {
666
667
668
	// nobody is running this Capability, we can add our thread
	// directly onto the run queue and start up a Task to run it.

669
670
671
672
673
	other_cap->running_task = myTask(); 
            // precond for releaseCapability_() and appendToRunQueue()

	appendToRunQueue(other_cap,tso);

674
	releaseCapability_(other_cap,rtsFalse);
675
    } else {
676
	appendToWakeupQueue(my_cap,other_cap,tso);
677
        other_cap->context_switch = 1;
678
679
680
681
	// someone is running on this Capability, so it cannot be
	// freed without first checking the wakeup queue (see
	// releaseCapability_).
    }
682

683
    RELEASE_LOCK(&other_cap->lock);
684
685
}

686
/* ----------------------------------------------------------------------------
687
 * prodCapability
688
 *
689
690
 * If a Capability is currently idle, wake up a Task on it.  Used to 
 * get every Capability into the GC.
691
 * ------------------------------------------------------------------------- */
692

693
void
694
prodCapability (Capability *cap, Task *task)
695
{
696
697
698
699
700
701
    ACQUIRE_LOCK(&cap->lock);
    if (!cap->running_task) {
        cap->running_task = task;
        releaseCapability_(cap,rtsTrue);
    }
    RELEASE_LOCK(&cap->lock);
702
}
703
704
705
706
707
708
709
710
711
712
713
714
715

/* ----------------------------------------------------------------------------
 * shutdownCapability
 *
 * At shutdown time, we want to let everything exit as cleanly as
 * possible.  For each capability, we let its run queue drain, and
 * allow the workers to stop.
 *
 * This function should be called when interrupted and
 * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
 * will exit the scheduler and call taskStop(), and any bound thread
 * that wakes up will return to its caller.  Runnable threads are
 * killed.
716
 *
717
 * ------------------------------------------------------------------------- */
718
719

void
720
shutdownCapability (Capability *cap, Task *task, rtsBool safe)
721
{
722
723
724
725
    nat i;

    task->cap = cap;

726
727
728
729
730
731
732
    // Loop indefinitely until all the workers have exited and there
    // are no Haskell threads left.  We used to bail out after 50
    // iterations of this loop, but that occasionally left a worker
    // running which caused problems later (the closeMutex() below
    // isn't safe, for one thing).

    for (i = 0; /* i < 50 */; i++) {
Simon Marlow's avatar
Simon Marlow committed
733
734
        ASSERT(sched_state == SCHED_SHUTTING_DOWN);

Simon Marlow's avatar
Simon Marlow committed
735
736
	debugTrace(DEBUG_sched, 
		   "shutting down capability %d, attempt %d", cap->no, i);
737
738
739
	ACQUIRE_LOCK(&cap->lock);
	if (cap->running_task) {
	    RELEASE_LOCK(&cap->lock);
Simon Marlow's avatar
Simon Marlow committed
740
	    debugTrace(DEBUG_sched, "not owner, yielding");
741
742
	    yieldThread();
	    continue;
743
	}
744
	cap->running_task = task;
Simon Marlow's avatar
Simon Marlow committed
745
746
747
748
749
750
751
752
753
754
755
756
757

        if (cap->spare_workers) {
            // Look for workers that have died without removing
            // themselves from the list; this could happen if the OS
            // summarily killed the thread, for example.  This
            // actually happens on Windows when the system is
            // terminating the program, and the RTS is running in a
            // DLL.
            Task *t, *prev;
            prev = NULL;
            for (t = cap->spare_workers; t != NULL; t = t->next) {
                if (!osThreadIsAlive(t->id)) {
                    debugTrace(DEBUG_sched, 
758
                               "worker thread %p has died unexpectedly", (void *)t->id);
Simon Marlow's avatar
Simon Marlow committed
759
760
761
762
763
764
765
766
767
768
                        if (!prev) {
                            cap->spare_workers = t->next;
                        } else {
                            prev->next = t->next;
                        }
                        prev = t;
                }
            }
        }

769
	if (!emptyRunQueue(cap) || cap->spare_workers) {
Simon Marlow's avatar
Simon Marlow committed
770
771
	    debugTrace(DEBUG_sched, 
		       "runnable threads or workers still alive, yielding");
772
	    releaseCapability_(cap,rtsFalse); // this will wake up a worker
773
774
775
	    RELEASE_LOCK(&cap->lock);
	    yieldThread();
	    continue;
776
	}
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792

        // If "safe", then busy-wait for any threads currently doing
        // foreign calls.  If we're about to unload this DLL, for
        // example, we need to be sure that there are no OS threads
        // that will try to return to code that has been unloaded.
        // We can be a bit more relaxed when this is a standalone
        // program that is about to terminate, and let safe=false.
        if (cap->suspended_ccalling_tasks && safe) {
	    debugTrace(DEBUG_sched, 
		       "thread(s) are involved in foreign calls, yielding");
            cap->running_task = NULL;
	    RELEASE_LOCK(&cap->lock);
            yieldThread();
            continue;
        }
            
Simon Marlow's avatar
Simon Marlow committed
793
        postEvent(cap, EVENT_SHUTDOWN, 0, 0);
Simon Marlow's avatar
Simon Marlow committed
794
	debugTrace(DEBUG_sched, "capability %d is stopped.", cap->no);
795
796
	RELEASE_LOCK(&cap->lock);
	break;
797
    }
798
799
    // we now have the Capability, its run queue and spare workers
    // list are both empty.
800

801
802
803
804
    // ToDo: we can't drop this mutex, because there might still be
    // threads performing foreign calls that will eventually try to 
    // return via resumeThread() and attempt to grab cap->lock.
    // closeMutex(&cap->lock);
805
}
806

807
808
809
810
/* ----------------------------------------------------------------------------
 * tryGrabCapability
 *
 * Attempt to gain control of a Capability if it is free.
811
 *
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
 * ------------------------------------------------------------------------- */

rtsBool
tryGrabCapability (Capability *cap, Task *task)
{
    if (cap->running_task != NULL) return rtsFalse;
    ACQUIRE_LOCK(&cap->lock);
    if (cap->running_task != NULL) {
	RELEASE_LOCK(&cap->lock);
	return rtsFalse;
    }
    task->cap = cap;
    cap->running_task = task;
    RELEASE_LOCK(&cap->lock);
    return rtsTrue;
}


830
#endif /* THREADED_RTS */
831

832
833
834
static void
freeCapability (Capability *cap)
{
Ian Lynagh's avatar
Ian Lynagh committed
835
    stgFree(cap->mut_lists);
Simon Marlow's avatar
Simon Marlow committed
836
#if defined(THREADED_RTS)
837
    freeSparkPool(cap->sparks);
Ian Lynagh's avatar
Ian Lynagh committed
838
839
#endif
}
840

841
842
843
844
845
846
847
848
849
850
851
852
853
void
freeCapabilities (void)
{
#if defined(THREADED_RTS)
    nat i;
    for (i=0; i < n_capabilities; i++) {
        freeCapability(&capabilities[i]);
    }
#else
    freeCapability(&MainCapability);
#endif
}

854
855
856
857
858
859
860
/* ---------------------------------------------------------------------------
   Mark everything directly reachable from the Capabilities.  When
   using multiple GC threads, each GC thread marks all Capabilities
   for which (c `mod` n == 0), for Capability c and thread n.
   ------------------------------------------------------------------------ */

void
861
862
markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta, 
                      rtsBool prune_sparks USED_IF_THREADS)
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
{
    nat i;
    Capability *cap;
    Task *task;

    // Each GC thread is responsible for following roots from the
    // Capability of the same number.  There will usually be the same
    // or fewer Capabilities as GC threads, but just in case there
    // are more, we mark every Capability whose number is the GC
    // thread's index plus a multiple of the number of GC threads.
    for (i = i0; i < n_capabilities; i += delta) {
	cap = &capabilities[i];
	evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
	evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
#if defined(THREADED_RTS)
	evac(user, (StgClosure **)(void *)&cap->wakeup_queue_hd);
	evac(user, (StgClosure **)(void *)&cap->wakeup_queue_tl);
#endif
	for (task = cap->suspended_ccalling_tasks; task != NULL; 
	     task=task->next) {
	    debugTrace(DEBUG_sched,
		       "evac'ing suspended TSO %lu", (unsigned long)task->suspended_tso->id);
	    evac(user, (StgClosure **)(void *)&task->suspended_tso);
	}
887
888

#if defined(THREADED_RTS)
889
890
891
892
893
        if (prune_sparks) {
            pruneSparkQueue (evac, user, cap);
        } else {
            traverseSparkQueue (evac, user, cap);
        }
894
#endif
895
    }
896

897
898
899
900
901
902
903
904
905
906
#if !defined(THREADED_RTS)
    evac(user, (StgClosure **)(void *)&blocked_queue_hd);
    evac(user, (StgClosure **)(void *)&blocked_queue_tl);
    evac(user, (StgClosure **)(void *)&sleeping_queue);
#endif 
}

void
markCapabilities (evac_fn evac, void *user)
{
907
    markSomeCapabilities(evac, user, 0, 1, rtsFalse);
908
}