Capability.c 33.3 KB
Newer Older
sof's avatar
sof committed
1
/* ---------------------------------------------------------------------------
2
 *
Gabor Greif's avatar
typo    
Gabor Greif committed
3
 * (c) The GHC Team, 2003-2012
sof's avatar
sof committed
4
5
6
 *
 * Capabilities
 *
Gabor Greif's avatar
typo    
Gabor Greif committed
7
 * A Capability represents the token required to execute STG code,
sof's avatar
sof committed
8
 * and all the state an OS thread/task needs to run Haskell code:
sof's avatar
sof committed
9
 * its STG registers, a pointer to its TSO, a nursery etc. During
sof's avatar
sof committed
10
 * STG execution, a pointer to the capabilitity is kept in a
11
 * register (BaseReg; actually it is a pointer to cap->r).
sof's avatar
sof committed
12
 *
13
14
15
 * Only in an THREADED_RTS build will there be multiple capabilities,
 * for non-threaded builds there is only one global capability, namely
 * MainCapability.
16
 *
sof's avatar
sof committed
17
 * --------------------------------------------------------------------------*/
18

sof's avatar
sof committed
19
20
#include "PosixSource.h"
#include "Rts.h"
Simon Marlow's avatar
Simon Marlow committed
21

sof's avatar
sof committed
22
#include "Capability.h"
23
#include "Schedule.h"
24
#include "Sparks.h"
Simon Marlow's avatar
Simon Marlow committed
25
#include "Trace.h"
Simon Marlow's avatar
Simon Marlow committed
26
27
28
#include "sm/GC.h" // for gcWorkerThread()
#include "STM.h"
#include "RtsUtils.h"
29
#include "rts/IOManager.h"
sof's avatar
sof committed
30

31
32
#include <string.h>

33
34
35
// one global capability, this is the Capability for non-threaded
// builds, and for +RTS -N1
Capability MainCapability;
sof's avatar
sof committed
36

Simon Marlow's avatar
Simon Marlow committed
37
nat n_capabilities = 0;
38
nat enabled_capabilities = 0;
39
40
41
42
43
44
45

// The array of Capabilities.  It's important that when we need
// to allocate more Capabilities we don't have to move the existing
// Capabilities, because there may be pointers to them in use
// (e.g. threads in waitForReturnCapability(), see #8209), so this is
// an array of Capability* rather than an array of Capability.
Capability **capabilities = NULL;
sof's avatar
sof committed
46

47
48
49
50
// Holds the Capability which last became free.  This is used so that
// an in-call has a chance of quickly finding a free Capability.
// Maintaining a global free list of Capabilities would require global
// locking, so we don't do that.
Simon Marlow's avatar
Simon Marlow committed
51
Capability *last_free_capability = NULL;
52

53
54
55
56
57
58
/*
 * Indicates that the RTS wants to synchronise all the Capabilities
 * for some reason.  All Capabilities should stop and return to the
 * scheduler.
 */
volatile StgWord pending_sync = 0;
59

60
61
62
/* Let foreign code get the current Capability -- assuming there is one!
 * This is useful for unsafe foreign calls because they are called with
 * the current Capability held, but they are not passed it. For example,
63
 * see see the integer-gmp package which calls allocate() in its
64
65
66
67
68
69
70
71
72
73
74
 * stgAllocForGMP() function (which gets called by gmp functions).
 * */
Capability * rts_unsafeGetMyCapability (void)
{
#if defined(THREADED_RTS)
  return myTask()->cap;
#else
  return &MainCapability;
#endif
}

75
#if defined(THREADED_RTS)
76
77
78
STATIC_INLINE rtsBool
globalWorkToDo (void)
{
79
80
    return sched_state >= SCHED_INTERRUPTING
        || recent_activity == ACTIVITY_INACTIVE; // need to check for deadlock
81
}
82
#endif
83

84
#if defined(THREADED_RTS)
85
StgClosure *
86
findSpark (Capability *cap)
87
{
88
89
  Capability *robbed;
  StgClosurePtr spark;
90
  rtsBool retry;
91
92
  nat i = 0;

93
  if (!emptyRunQueue(cap) || cap->returning_tasks_hd != NULL) {
94
95
96
97
98
99
      // If there are other threads, don't try to run any new
      // sparks: sparks might be speculative, we don't want to take
      // resources away from the main computation.
      return 0;
  }

100
101
  do {
      retry = rtsFalse;
102

103
104
105
106
107
108
      // first try to get a spark from our own pool.
      // We should be using reclaimSpark(), because it works without
      // needing any atomic instructions:
      //   spark = reclaimSpark(cap->sparks);
      // However, measurements show that this makes at least one benchmark
      // slower (prsa) and doesn't affect the others.
109
110
      spark = tryStealSpark(cap->sparks);
      while (spark != NULL && fizzledSpark(spark)) {
111
          cap->spark_stats.fizzled++;
112
          traceEventSparkFizzle(cap);
113
114
          spark = tryStealSpark(cap->sparks);
      }
115
      if (spark != NULL) {
116
          cap->spark_stats.converted++;
117
118

          // Post event for running a spark from capability's own pool.
119
          traceEventSparkRun(cap);
120
121
122
123
124
125
126
127
128
129
130
131
132

          return spark;
      }
      if (!emptySparkPoolCap(cap)) {
          retry = rtsTrue;
      }

      if (n_capabilities == 1) { return NULL; } // makes no sense...

      debugTrace(DEBUG_sched,
                 "cap %d: Trying to steal work from other capabilities", 
                 cap->no);

133
134
135
      /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
      start at a random place instead of 0 as well.  */
      for ( i=0 ; i < n_capabilities ; i++ ) {
136
          robbed = capabilities[i];
137
138
          if (cap == robbed)  // ourselves...
              continue;
139

140
141
142
          if (emptySparkPoolCap(robbed)) // nothing to steal here
              continue;

143
144
          spark = tryStealSpark(robbed->sparks);
          while (spark != NULL && fizzledSpark(spark)) {
145
              cap->spark_stats.fizzled++;
146
              traceEventSparkFizzle(cap);
147
148
              spark = tryStealSpark(robbed->sparks);
          }
149
150
151
152
153
154
155
          if (spark == NULL && !emptySparkPoolCap(robbed)) {
              // we conflicted with another thread while trying to steal;
              // try again later.
              retry = rtsTrue;
          }

          if (spark != NULL) {
156
              cap->spark_stats.converted++;
157
              traceEventSparkSteal(cap, robbed->no);
158
              
159
              return spark;
160
161
162
163
          }
          // otherwise: no success, try next one
      }
  } while (retry);
164

165
  debugTrace(DEBUG_sched, "No sparks stolen");
166
167
168
169
170
171
172
173
174
175
176
177
178
  return NULL;
}

// Returns True if any spark pool is non-empty at this moment in time
// The result is only valid for an instant, of course, so in a sense
// is immediately invalid, and should not be relied upon for
// correctness.
rtsBool
anySparks (void)
{
    nat i;

    for (i=0; i < n_capabilities; i++) {
179
        if (!emptySparkPoolCap(capabilities[i])) {
180
181
182
183
            return rtsTrue;
        }
    }
    return rtsFalse;
184
}
185
#endif
186
187
188

/* -----------------------------------------------------------------------------
 * Manage the returning_tasks lists.
189
 *
190
191
192
193
194
195
196
197
 * These functions require cap->lock
 * -------------------------------------------------------------------------- */

#if defined(THREADED_RTS)
STATIC_INLINE void
newReturningTask (Capability *cap, Task *task)
{
    ASSERT_LOCK_HELD(&cap->lock);
198
    ASSERT(task->next == NULL);
199
    if (cap->returning_tasks_hd) {
200
201
	ASSERT(cap->returning_tasks_tl->next == NULL);
	cap->returning_tasks_tl->next = task;
202
203
204
205
    } else {
	cap->returning_tasks_hd = task;
    }
    cap->returning_tasks_tl = task;
206
207
}

208
209
210
211
212
213
214
STATIC_INLINE Task *
popReturningTask (Capability *cap)
{
    ASSERT_LOCK_HELD(&cap->lock);
    Task *task;
    task = cap->returning_tasks_hd;
    ASSERT(task);
215
    cap->returning_tasks_hd = task->next;
216
217
218
    if (!cap->returning_tasks_hd) {
	cap->returning_tasks_tl = NULL;
    }
219
    task->next = NULL;
220
221
222
223
    return task;
}
#endif

224
/* ----------------------------------------------------------------------------
225
226
227
228
 * Initialisation
 *
 * The Capability is initially marked not free.
 * ------------------------------------------------------------------------- */
229
230

static void
231
initCapability( Capability *cap, nat i )
sof's avatar
sof committed
232
{
233
    nat g;
234

235
236
    cap->no = i;
    cap->in_haskell        = rtsFalse;
237
    cap->idle              = 0;
238
    cap->disabled          = rtsFalse;
239
240
241
242
243
244
245
246

    cap->run_queue_hd      = END_TSO_QUEUE;
    cap->run_queue_tl      = END_TSO_QUEUE;

#if defined(THREADED_RTS)
    initMutex(&cap->lock);
    cap->running_task      = NULL; // indicates cap is free
    cap->spare_workers     = NULL;
247
    cap->n_spare_workers   = 0;
248
    cap->suspended_ccalls  = NULL;
249
250
    cap->returning_tasks_hd = NULL;
    cap->returning_tasks_tl = NULL;
251
    cap->inbox              = (Message*)END_TSO_QUEUE;
252
    cap->sparks             = allocSparkPool();
253
254
    cap->spark_stats.created    = 0;
    cap->spark_stats.dud        = 0;
255
    cap->spark_stats.overflowed = 0;
256
257
258
    cap->spark_stats.converted  = 0;
    cap->spark_stats.gcd        = 0;
    cap->spark_stats.fizzled    = 0;
AndreasVoellmy's avatar
AndreasVoellmy committed
259
    cap->io_manager_control_wr_fd = -1;
260
#endif
261
    cap->total_allocated        = 0;
262

263
    cap->f.stgEagerBlackholeInfo = (W_)&__stg_EAGER_BLACKHOLE_info;
Simon Marlow's avatar
Simon Marlow committed
264
265
    cap->f.stgGCEnter1     = (StgFunPtr)__stg_gc_enter_1;
    cap->f.stgGCFun        = (StgFunPtr)__stg_gc_fun;
266

267
    cap->mut_lists  = stgMallocBytes(sizeof(bdescr *) *
268
269
				     RtsFlags.GcFlags.generations,
				     "initCapability");
270
271
272
    cap->saved_mut_lists = stgMallocBytes(sizeof(bdescr *) *
                                          RtsFlags.GcFlags.generations,
                                          "initCapability");
273
274
275

    for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
	cap->mut_lists[g] = NULL;
276
    }
277

278
279
    cap->weak_ptr_list_hd = NULL;
    cap->weak_ptr_list_tl = NULL;
tharris@microsoft.com's avatar
tharris@microsoft.com committed
280
281
    cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE;
    cap->free_invariant_check_queues = END_INVARIANT_CHECK_QUEUE;
282
283
284
    cap->free_trec_chunks = END_STM_CHUNK_LIST;
    cap->free_trec_headers = NO_TREC;
    cap->transaction_tokens = 0;
285
    cap->context_switch = 0;
286
    cap->pinned_object_block = NULL;
287
    cap->pinned_object_blocks = NULL;
288

289
290
#ifdef PROFILING
    cap->r.rCCCS = CCS_SYSTEM;
291
292
#else
    cap->r.rCCCS = NULL;
293
294
#endif

295
    traceCapCreate(cap);
296
    traceCapsetAssignCap(CAPSET_OSPROCESS_DEFAULT, i);
297
    traceCapsetAssignCap(CAPSET_CLOCKDOMAIN_DEFAULT, i);
Duncan Coutts's avatar
Duncan Coutts committed
298
299
300
#if defined(THREADED_RTS)
    traceSparkCounters(cap);
#endif
sof's avatar
sof committed
301
302
}

303
/* ---------------------------------------------------------------------------
sof's avatar
sof committed
304
305
 * Function:  initCapabilities()
 *
306
 * Purpose:   set up the Capability handling. For the THREADED_RTS build,
sof's avatar
sof committed
307
 *            we keep a table of them, the size of which is
308
 *            controlled by the user via the RTS flag -N.
sof's avatar
sof committed
309
 *
310
 * ------------------------------------------------------------------------- */
sof's avatar
sof committed
311
void
312
initCapabilities( void )
sof's avatar
sof committed
313
{
314
315
    /* Declare a couple capability sets representing the process and
       clock domain. Each capability will get added to these capsets. */
316
    traceCapsetCreate(CAPSET_OSPROCESS_DEFAULT, CapsetTypeOsProcess);
317
    traceCapsetCreate(CAPSET_CLOCKDOMAIN_DEFAULT, CapsetTypeClockdomain);
318

319
#if defined(THREADED_RTS)
320

321
#ifndef REG_Base
Simon Marlow's avatar
Simon Marlow committed
322
323
324
325
326
327
328
    // We can't support multiple CPUs if BaseReg is not a register
    if (RtsFlags.ParFlags.nNodes > 1) {
	errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
	RtsFlags.ParFlags.nNodes = 1;
    }
#endif

329
330
    n_capabilities = 0;
    moreCapabilities(0, RtsFlags.ParFlags.nNodes);
331
332
333
334
    n_capabilities = RtsFlags.ParFlags.nNodes;

#else /* !THREADED_RTS */

335
    n_capabilities = 1;
336
337
    capabilities = stgMallocBytes(sizeof(Capability*), "initCapabilities");
    capabilities[0] = &MainCapability;
338
    initCapability(&MainCapability, 0);
339

340
341
#endif

342
343
    enabled_capabilities = n_capabilities;

344
345
346
    // There are no free capabilities to begin with.  We will start
    // a worker Task to each Capability, which will quickly put the
    // Capability on the free list when it finds nothing to do.
347
    last_free_capability = capabilities[0];
sof's avatar
sof committed
348
349
}

350
void
351
352
353
354
moreCapabilities (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
{
#if defined(THREADED_RTS)
    nat i;
355
356
357
    Capability **old_capabilities = capabilities;

    capabilities = stgMallocBytes(to * sizeof(Capability*), "moreCapabilities");
358
359
360
361
362

    if (to == 1) {
        // THREADED_RTS must work on builds that don't have a mutable
        // BaseReg (eg. unregisterised), so in this case
	// capabilities[0] must coincide with &MainCapability.
363
        capabilities[0] = &MainCapability;
364
        initCapability(&MainCapability, 0);
365
    }
366
367
368
369
370
371
372
373
374
375
    else
    {
        for (i = 0; i < to; i++) {
            if (i < from) {
                capabilities[i] = old_capabilities[i];
            } else {
                capabilities[i] = stgMallocBytes(sizeof(Capability),
                                                 "moreCapabilities");
                initCapability(capabilities[i], i);
            }
376
        }
377
378
379
380
    }

    debugTrace(DEBUG_sched, "allocated %d more capabilities", to - from);

381
382
    if (old_capabilities != NULL) {
        stgFree(old_capabilities);
383
384
385
386
    }
#endif
}

387
388
389
390
391
/* ----------------------------------------------------------------------------
 * setContextSwitches: cause all capabilities to context switch as
 * soon as possible.
 * ------------------------------------------------------------------------- */

392
void contextSwitchAllCapabilities(void)
393
{
394
395
    nat i;
    for (i=0; i < n_capabilities; i++) {
396
        contextSwitchCapability(capabilities[i]);
397
    }
398
399
}

400
401
402
403
void interruptAllCapabilities(void)
{
    nat i;
    for (i=0; i < n_capabilities; i++) {
404
        interruptCapability(capabilities[i]);
405
406
407
    }
}

408
/* ----------------------------------------------------------------------------
409
410
411
412
413
414
415
416
417
418
 * Give a Capability to a Task.  The task must currently be sleeping
 * on its condition variable.
 *
 * Requires cap->lock (modifies cap->running_task).
 *
 * When migrating a Task, the migrater must take task->lock before
 * modifying task->cap, to synchronise with the waking up Task.
 * Additionally, the migrater should own the Capability (when
 * migrating the run queue), or cap->lock (when migrating
 * returning_workers).
419
420
 *
 * ------------------------------------------------------------------------- */
421
422
423

#if defined(THREADED_RTS)
STATIC_INLINE void
424
giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
425
{
426
427
    ASSERT_LOCK_HELD(&cap->lock);
    ASSERT(task->cap == cap);
428
    debugTrace(DEBUG_sched, "passing capability %d to %s %#" FMT_HexWord64,
429
               cap->no, task->incall->tso ? "bound task" : "worker",
430
               serialisableTaskId(task));
431
    ACQUIRE_LOCK(&task->lock);
432
433
434
435
436
437
438
    if (task->wakeup == rtsFalse) {
        task->wakeup = rtsTrue;
        // the wakeup flag is needed because signalCondition() doesn't
        // flag the condition if the thread is already runniing, but we want
        // it to be sticky.
        signalCondition(&task->cond);
    }
439
    RELEASE_LOCK(&task->lock);
440
}
441
#endif
442

443
/* ----------------------------------------------------------------------------
sof's avatar
sof committed
444
445
 * Function:  releaseCapability(Capability*)
 *
sof's avatar
sof committed
446
447
448
 * Purpose:   Letting go of a capability. Causes a
 *            'returning worker' thread or a 'waiting worker'
 *            to wake up, in that order.
449
450
 * ------------------------------------------------------------------------- */

451
#if defined(THREADED_RTS)
452
void
453
454
releaseCapability_ (Capability* cap, 
                    rtsBool always_wakeup)
455
{
456
457
458
459
    Task *task;

    task = cap->running_task;

460
    ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
461
462

    cap->running_task = NULL;
463

464
465
    // Check to see whether a worker thread can be given
    // the go-ahead to return the result of an external call..
466
467
468
469
    if (cap->returning_tasks_hd != NULL) {
	giveCapabilityToTask(cap,cap->returning_tasks_hd);
	// The Task pops itself from the queue (see waitForReturnCapability())
	return;
470
    }
471

472
473
474
475
    // If there is a pending sync, then we should just leave the
    // Capability free.  The thread trying to sync will be about to
    // call waitForReturnCapability().
    if (pending_sync != 0 && pending_sync != SYNC_GC_PAR) {
476
      last_free_capability = cap; // needed?
477
      debugTrace(DEBUG_sched, "sync pending, set capability %d free", cap->no);
478
479
480
      return;
    } 

481
482
    // If the next thread on the run queue is a bound thread,
    // give this Capability to the appropriate Task.
483
    if (!emptyRunQueue(cap) && peekRunQueue(cap)->bound) {
484
	// Make sure we're not about to try to wake ourselves up
485
486
487
488
	// ASSERT(task != cap->run_queue_hd->bound);
        // assertion is false: in schedule() we force a yield after
	// ThreadBlocked, but the thread may be back on the run queue
	// by now.
489
	task = peekRunQueue(cap)->bound->task;
Gabor Greif's avatar
typo    
Gabor Greif committed
490
	giveCapabilityToTask(cap, task);
491
	return;
492
    }
493

494
    if (!cap->spare_workers) {
495
496
497
498
	// Create a worker thread if we don't have one.  If the system
	// is interrupted, we only create a worker task if there
	// are threads that need to be completed.  If the system is
	// shutting down, we never create a new worker.
499
	if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
Simon Marlow's avatar
Simon Marlow committed
500
501
	    debugTrace(DEBUG_sched,
		       "starting new worker on capability %d", cap->no);
502
	    startWorkerTask(cap);
503
504
	    return;
	}
505
    }
506

507
508
    // If we have an unbound thread on the run queue, or if there's
    // anything else to do, give the Capability to a worker thread.
509
    if (always_wakeup || 
510
        !emptyRunQueue(cap) || !emptyInbox(cap) ||
511
        (!cap->disabled && !emptySparkPoolCap(cap)) || globalWorkToDo()) {
512
	if (cap->spare_workers) {
Gabor Greif's avatar
typo    
Gabor Greif committed
513
	    giveCapabilityToTask(cap, cap->spare_workers);
514
515
516
517
518
	    // The worker Task pops itself from the queue;
	    return;
	}
    }

519
520
521
#ifdef PROFILING
    cap->r.rCCCS = CCS_IDLE;
#endif
522
    last_free_capability = cap;
Simon Marlow's avatar
Simon Marlow committed
523
    debugTrace(DEBUG_sched, "freeing capability %d", cap->no);
sof's avatar
sof committed
524
525
}

526
void
527
releaseCapability (Capability* cap USED_IF_THREADS)
528
529
{
    ACQUIRE_LOCK(&cap->lock);
530
531
532
533
534
535
536
537
538
    releaseCapability_(cap, rtsFalse);
    RELEASE_LOCK(&cap->lock);
}

void
releaseAndWakeupCapability (Capability* cap USED_IF_THREADS)
{
    ACQUIRE_LOCK(&cap->lock);
    releaseCapability_(cap, rtsTrue);
539
540
541
542
    RELEASE_LOCK(&cap->lock);
}

static void
543
releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
544
545
546
547
548
549
550
{
    Task *task;

    ACQUIRE_LOCK(&cap->lock);

    task = cap->running_task;

551
552
553
554
    // If the Task is stopped, we shouldn't be yielding, we should
    // be just exiting.
    ASSERT(!task->stopped);

555
556
557
558
559
    // If the current task is a worker, save it on the spare_workers
    // list of this Capability.  A worker can mark itself as stopped,
    // in which case it is not replaced on the spare_worker queue.
    // This happens when the system is shutting down (see
    // Schedule.c:workerStart()).
560
    if (!isBoundTask(task))
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
    {
        if (cap->n_spare_workers < MAX_SPARE_WORKERS)
        {
            task->next = cap->spare_workers;
            cap->spare_workers = task;
            cap->n_spare_workers++;
        }
        else
        {
            debugTrace(DEBUG_sched, "%d spare workers already, exiting",
                       cap->n_spare_workers);
            releaseCapability_(cap,rtsFalse);
            // hold the lock until after workerTaskStop; c.f. scheduleWorker()
            workerTaskStop(task);
            RELEASE_LOCK(&cap->lock);
            shutdownThread();
        }
578
579
580
    }
    // Bound tasks just float around attached to their TSOs.

581
    releaseCapability_(cap,rtsFalse);
582
583
584
585

    RELEASE_LOCK(&cap->lock);
}
#endif
sof's avatar
sof committed
586

587
/* ----------------------------------------------------------------------------
588
 * waitForReturnCapability (Capability **pCap, Task *task)
sof's avatar
sof committed
589
590
 *
 * Purpose:  when an OS thread returns from an external call,
591
592
 * it calls waitForReturnCapability() (via Schedule.resumeThread())
 * to wait for permission to enter the RTS & communicate the
sof's avatar
sof committed
593
 * result of the external call back to the Haskell thread that
sof's avatar
sof committed
594
595
 * made it.
 *
596
 * ------------------------------------------------------------------------- */
sof's avatar
sof committed
597
void
598
waitForReturnCapability (Capability **pCap, Task *task)
sof's avatar
sof committed
599
{
600
#if !defined(THREADED_RTS)
601

602
603
604
    MainCapability.running_task = task;
    task->cap = &MainCapability;
    *pCap = &MainCapability;
605

606
#else
607
608
609
610
611
    Capability *cap = *pCap;

    if (cap == NULL) {
	// Try last_free_capability first
	cap = last_free_capability;
612
	if (cap->running_task) {
613
614
	    nat i;
	    // otherwise, search for a free capability
615
            cap = NULL;
616
	    for (i = 0; i < n_capabilities; i++) {
617
618
                if (!capabilities[i]->running_task) {
                    cap = capabilities[i];
619
620
621
		    break;
		}
	    }
622
623
624
625
            if (cap == NULL) {
                // Can't find a free one, use last_free_capability.
                cap = last_free_capability;
            }
626
627
628
629
630
	}

	// record the Capability as the one this Task is now assocated with.
	task->cap = cap;

631
    } else {
632
	ASSERT(task->cap == cap);
633
634
    }

635
    ACQUIRE_LOCK(&cap->lock);
sof's avatar
sof committed
636

Simon Marlow's avatar
Simon Marlow committed
637
    debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no);
sof's avatar
sof committed
638

639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
    if (!cap->running_task) {
	// It's free; just grab it
	cap->running_task = task;
	RELEASE_LOCK(&cap->lock);
    } else {
	newReturningTask(cap,task);
	RELEASE_LOCK(&cap->lock);

	for (;;) {
	    ACQUIRE_LOCK(&task->lock);
	    // task->lock held, cap->lock not held
	    if (!task->wakeup) waitCondition(&task->cond, &task->lock);
	    cap = task->cap;
	    task->wakeup = rtsFalse;
	    RELEASE_LOCK(&task->lock);

	    // now check whether we should wake up...
	    ACQUIRE_LOCK(&cap->lock);
	    if (cap->running_task == NULL) {
		if (cap->returning_tasks_hd != task) {
		    giveCapabilityToTask(cap,cap->returning_tasks_hd);
		    RELEASE_LOCK(&cap->lock);
		    continue;
		}
		cap->running_task = task;
		popReturningTask(cap);
		RELEASE_LOCK(&cap->lock);
		break;
	    }
	    RELEASE_LOCK(&cap->lock);
	}

    }

673
674
675
#ifdef PROFILING
    cap->r.rCCCS = CCS_SYSTEM;
#endif
676

Gabor Greif's avatar
typo    
Gabor Greif committed
677
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task);
678

Simon Marlow's avatar
Simon Marlow committed
679
    debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
680
681
682
683
684
685

    *pCap = cap;
#endif
}

#if defined(THREADED_RTS)
686
/* ----------------------------------------------------------------------------
687
 * yieldCapability
688
 * ------------------------------------------------------------------------- */
sof's avatar
sof committed
689

690
691
692
693
/* See Note [GC livelock] in Schedule.c for why we have gcAllowed
   and return the rtsBool */
rtsBool /* Did we GC? */
yieldCapability (Capability** pCap, Task *task, rtsBool gcAllowed)
sof's avatar
sof committed
694
{
695
696
    Capability *cap = *pCap;

697
    if ((pending_sync == SYNC_GC_PAR) && gcAllowed) {
698
        traceEventGcStart(cap);
699
        gcWorkerThread(cap);
700
        traceEventGcEnd(cap);
701
        traceSparkCounters(cap);
702
        // See Note [migrated bound threads 2]
703
704
        if (task->cap == cap) {
            return rtsTrue;
Ian Lynagh's avatar
Ian Lynagh committed
705
        }
706
707
    }

Simon Marlow's avatar
Simon Marlow committed
708
	debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
709
710

	// We must now release the capability and wait to be woken up
711
	// again.
712
	task->wakeup = rtsFalse;
713
714
715
716
717
718
719
720
721
722
	releaseCapabilityAndQueueWorker(cap);

	for (;;) {
	    ACQUIRE_LOCK(&task->lock);
	    // task->lock held, cap->lock not held
	    if (!task->wakeup) waitCondition(&task->cond, &task->lock);
	    cap = task->cap;
	    task->wakeup = rtsFalse;
	    RELEASE_LOCK(&task->lock);

Simon Marlow's avatar
Simon Marlow committed
723
724
	    debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);

725
726
	    ACQUIRE_LOCK(&cap->lock);
	    if (cap->running_task != NULL) {
Simon Marlow's avatar
Simon Marlow committed
727
728
		debugTrace(DEBUG_sched, 
			   "capability %d is owned by another task", cap->no);
729
730
731
732
		RELEASE_LOCK(&cap->lock);
		continue;
	    }

733
734
735
736
737
738
739
740
741
            if (task->cap != cap) {
                // see Note [migrated bound threads]
                debugTrace(DEBUG_sched,
                           "task has been migrated to cap %d", task->cap->no);
		RELEASE_LOCK(&cap->lock);
		continue;
	    }

            if (task->incall->tso == NULL) {
742
743
744
745
746
747
748
749
750
751
		ASSERT(cap->spare_workers != NULL);
		// if we're not at the front of the queue, release it
		// again.  This is unlikely to happen.
		if (cap->spare_workers != task) {
		    giveCapabilityToTask(cap,cap->spare_workers);
		    RELEASE_LOCK(&cap->lock);
		    continue;
		}
		cap->spare_workers = task->next;
		task->next = NULL;
752
753
                cap->n_spare_workers--;
            }
754
755

            cap->running_task = task;
756
757
758
759
	    RELEASE_LOCK(&cap->lock);
	    break;
	}

760
        debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
761
	ASSERT(cap->running_task == task);
762

763
764
765
766
#ifdef PROFILING
        cap->r.rCCCS = CCS_SYSTEM;
#endif

767
    *pCap = cap;
768

769
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
770

771
    return rtsFalse;
sof's avatar
sof committed
772
773
}

774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
// Note [migrated bound threads]
//
// There's a tricky case where:
//    - cap A is running an unbound thread T1
//    - there is a bound thread T2 at the head of the run queue on cap A
//    - T1 makes a safe foreign call, the task bound to T2 is woken up on cap A
//    - T1 returns quickly grabbing A again (T2 is still waking up on A)
//    - T1 blocks, the scheduler migrates T2 to cap B
//    - the task bound to T2 wakes up on cap B
//
// We take advantage of the following invariant:
//
//  - A bound thread can only be migrated by the holder of the
//    Capability on which the bound thread currently lives.  So, if we
//    hold Capabilty C, and task->cap == C, then task cannot be
//    migrated under our feet.

791
792
793
794
795
796
797
798
799
800
801
// Note [migrated bound threads 2]
//
// Second tricky case;
//   - A bound Task becomes a GC thread
//   - scheduleDoGC() migrates the thread belonging to this Task,
//     because the Capability it is on is disabled
//   - after GC, gcWorkerThread() returns, but now we are
//     holding a Capability that is not the same as task->cap
//   - Hence we must check for this case and immediately give up the
//     cap we hold.

802
/* ----------------------------------------------------------------------------
803
 * prodCapability
804
 *
805
806
 * If a Capability is currently idle, wake up a Task on it.  Used to 
 * get every Capability into the GC.
807
 * ------------------------------------------------------------------------- */
808

809
void
810
prodCapability (Capability *cap, Task *task)
811
{
812
813
814
815
816
817
    ACQUIRE_LOCK(&cap->lock);
    if (!cap->running_task) {
        cap->running_task = task;
        releaseCapability_(cap,rtsTrue);
    }
    RELEASE_LOCK(&cap->lock);
818
}
819

820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
/* ----------------------------------------------------------------------------
 * tryGrabCapability
 *
 * Attempt to gain control of a Capability if it is free.
 *
 * ------------------------------------------------------------------------- */

rtsBool
tryGrabCapability (Capability *cap, Task *task)
{
    if (cap->running_task != NULL) return rtsFalse;
    ACQUIRE_LOCK(&cap->lock);
    if (cap->running_task != NULL) {
	RELEASE_LOCK(&cap->lock);
	return rtsFalse;
    }
    task->cap = cap;
    cap->running_task = task;
    RELEASE_LOCK(&cap->lock);
    return rtsTrue;
}


#endif /* THREADED_RTS */

845
846
847
848
849
850
851
852
/* ----------------------------------------------------------------------------
 * shutdownCapability
 *
 * At shutdown time, we want to let everything exit as cleanly as
 * possible.  For each capability, we let its run queue drain, and
 * allow the workers to stop.
 *
 * This function should be called when interrupted and
853
 * sched_state = SCHED_SHUTTING_DOWN, thus any worker that wakes up
854
855
856
 * will exit the scheduler and call taskStop(), and any bound thread
 * that wakes up will return to its caller.  Runnable threads are
 * killed.
857
 *
858
 * ------------------------------------------------------------------------- */
859
860

void
861
shutdownCapability (Capability *cap USED_IF_THREADS,
862
863
                    Task *task USED_IF_THREADS,
                    rtsBool safe USED_IF_THREADS)
864
{
865
#if defined(THREADED_RTS)
866
867
868
869
    nat i;

    task->cap = cap;

870
871
872
873
874
875
876
    // Loop indefinitely until all the workers have exited and there
    // are no Haskell threads left.  We used to bail out after 50
    // iterations of this loop, but that occasionally left a worker
    // running which caused problems later (the closeMutex() below
    // isn't safe, for one thing).

    for (i = 0; /* i < 50 */; i++) {
Simon Marlow's avatar
Simon Marlow committed
877
878
        ASSERT(sched_state == SCHED_SHUTTING_DOWN);

Simon Marlow's avatar
Simon Marlow committed
879
880
	debugTrace(DEBUG_sched, 
		   "shutting down capability %d, attempt %d", cap->no, i);
881
882
883
	ACQUIRE_LOCK(&cap->lock);
	if (cap->running_task) {
	    RELEASE_LOCK(&cap->lock);
Simon Marlow's avatar
Simon Marlow committed
884
	    debugTrace(DEBUG_sched, "not owner, yielding");
885
886
	    yieldThread();
	    continue;
887
	}
888
	cap->running_task = task;
Simon Marlow's avatar
Simon Marlow committed
889
890
891
892
893
894
895
896
897
898
899
900
901

        if (cap->spare_workers) {
            // Look for workers that have died without removing
            // themselves from the list; this could happen if the OS
            // summarily killed the thread, for example.  This
            // actually happens on Windows when the system is
            // terminating the program, and the RTS is running in a
            // DLL.
            Task *t, *prev;
            prev = NULL;
            for (t = cap->spare_workers; t != NULL; t = t->next) {
                if (!osThreadIsAlive(t->id)) {
                    debugTrace(DEBUG_sched, 
Ian Lynagh's avatar
Ian Lynagh committed
902
                               "worker thread %p has died unexpectedly", (void *)(size_t)t->id);
903
904
905
906
907
908
909
                    cap->n_spare_workers--;
                    if (!prev) {
                        cap->spare_workers = t->next;
                    } else {
                        prev->next = t->next;
                    }
                    prev = t;
Simon Marlow's avatar
Simon Marlow committed
910
911
912
913
                }
            }
        }

914
	if (!emptyRunQueue(cap) || cap->spare_workers) {
Simon Marlow's avatar
Simon Marlow committed
915
916
	    debugTrace(DEBUG_sched, 
		       "runnable threads or workers still alive, yielding");
917
	    releaseCapability_(cap,rtsFalse); // this will wake up a worker
918
919
920
	    RELEASE_LOCK(&cap->lock);
	    yieldThread();
	    continue;
921
	}
922
923
924
925
926
927
928

        // If "safe", then busy-wait for any threads currently doing
        // foreign calls.  If we're about to unload this DLL, for
        // example, we need to be sure that there are no OS threads
        // that will try to return to code that has been unloaded.
        // We can be a bit more relaxed when this is a standalone
        // program that is about to terminate, and let safe=false.
929
        if (cap->suspended_ccalls && safe) {
930
931
932
933
	    debugTrace(DEBUG_sched, 
		       "thread(s) are involved in foreign calls, yielding");
            cap->running_task = NULL;
	    RELEASE_LOCK(&cap->lock);
934
935
936
937
938
939
940
941
            // The IO manager thread might have been slow to start up,
            // so the first attempt to kill it might not have
            // succeeded.  Just in case, try again - the kill message
            // will only be sent once.
            //
            // To reproduce this deadlock: run ffi002(threaded1)
            // repeatedly on a loaded machine.
            ioManagerDie();
942
943
944
            yieldThread();
            continue;
        }
945

946
        traceSparkCounters(cap);
947
948
	RELEASE_LOCK(&cap->lock);
	break;
949
    }
950
951
    // we now have the Capability, its run queue and spare workers
    // list are both empty.
952

953
954
955
956
    // ToDo: we can't drop this mutex, because there might still be
    // threads performing foreign calls that will eventually try to 
    // return via resumeThread() and attempt to grab cap->lock.
    // closeMutex(&cap->lock);
957
958
#endif
}
959

960
961
void
shutdownCapabilities(Task *task, rtsBool safe)
962
{
963
964
965
    nat i;
    for (i=0; i < n_capabilities; i++) {
        ASSERT(task->incall->tso == NULL);
966
        shutdownCapability(capabilities[i], task, safe);
967
    }
968
969
970
#if defined(THREADED_RTS)
    ASSERT(checkSparkCountInvariant());
#endif
971
972
}

973
974
975
static void
freeCapability (Capability *cap)
{
Ian Lynagh's avatar
Ian Lynagh committed
976
    stgFree(cap->mut_lists);
Simon Marlow's avatar
Simon Marlow committed
977
    stgFree(cap->saved_mut_lists);
Simon Marlow's avatar
Simon Marlow committed
978
#if defined(THREADED_RTS)
979
    freeSparkPool(cap->sparks);
Ian Lynagh's avatar
Ian Lynagh committed
980
#endif
981
982
983
    traceCapsetRemoveCap(CAPSET_OSPROCESS_DEFAULT, cap->no);
    traceCapsetRemoveCap(CAPSET_CLOCKDOMAIN_DEFAULT, cap->no);
    traceCapDelete(cap);
Ian Lynagh's avatar
Ian Lynagh committed
984
}
985

986
987
988
989
990
991
void
freeCapabilities (void)
{
#if defined(THREADED_RTS)
    nat i;
    for (i=0; i < n_capabilities; i++) {
992
        freeCapability(capabilities[i]);
993
994
        if (capabilities[i] != &MainCapability)
            stgFree(capabilities[i]);
995
996
997
998
    }
#else
    freeCapability(&MainCapability);
#endif
999
    stgFree(capabilities);
1000
1001
    traceCapsetDelete(CAPSET_OSPROCESS_DEFAULT);
    traceCapsetDelete(CAPSET_CLOCKDOMAIN_DEFAULT);
1002
1003
}

1004
1005
1006
1007
1008
1009
1010
/* ---------------------------------------------------------------------------
   Mark everything directly reachable from the Capabilities.  When
   using multiple GC threads, each GC thread marks all Capabilities
   for which (c `mod` n == 0), for Capability c and thread n.
   ------------------------------------------------------------------------ */

void
Simon Marlow's avatar
Simon Marlow committed
1011
1012
markCapability (evac_fn evac, void *user, Capability *cap,
                rtsBool no_mark_sparks USED_IF_THREADS)
1013
{
1014
    InCall *incall;
1015
1016
1017
1018
1019
1020

    // Each GC thread is responsible for following roots from the
    // Capability of the same number.  There will usually be the same
    // or fewer Capabilities as GC threads, but just in case there
    // are more, we mark every Capability whose number is the GC
    // thread's index plus a multiple of the number of GC threads.
Simon Marlow's avatar
Simon Marlow committed
1021
1022
    evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
    evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
1023
#if defined(THREADED_RTS)
Simon Marlow's avatar
Simon Marlow committed
1024
    evac(user, (StgClosure **)(void *)&cap->inbox);
1025
#endif
Simon Marlow's avatar
Simon Marlow committed
1026
1027
1028
1029
    for (incall = cap->suspended_ccalls; incall != NULL;
         incall=incall->next) {
        evac(user, (StgClosure **)(void *)&incall->suspended_tso);
    }
1030
1031

#if defined(THREADED_RTS)
Simon Marlow's avatar
Simon Marlow committed
1032
1033
    if (!no_mark_sparks) {
        traverseSparkQueue (evac, user, cap);
1034
    }
Simon Marlow's avatar
Simon Marlow committed
1035
#endif
1036

Simon Marlow's avatar
Simon Marlow committed
1037
1038
    // Free STM structures for this Capability
    stmPreGCHook(cap);
1039
1040
1041
1042
1043
}

void
markCapabilities (evac_fn evac, void *user)
{
Simon Marlow's avatar
Simon Marlow committed
1044
1045
    nat n;
    for (n = 0; n < n_capabilities; n++) {
1046
        markCapability(evac, user, capabilities[n], rtsFalse);
Simon Marlow's avatar
Simon Marlow committed
1047
    }
1048
}
1049
1050
1051
1052
1053
1054