Capability.c 32.9 KB
Newer Older
sof's avatar
sof committed
1
/* ---------------------------------------------------------------------------
2
 *
Gabor Greif's avatar
typo    
Gabor Greif committed
3
 * (c) The GHC Team, 2003-2012
sof's avatar
sof committed
4
5
6
 *
 * Capabilities
 *
Gabor Greif's avatar
typo    
Gabor Greif committed
7
 * A Capability represents the token required to execute STG code,
sof's avatar
sof committed
8
 * and all the state an OS thread/task needs to run Haskell code:
sof's avatar
sof committed
9
 * its STG registers, a pointer to its TSO, a nursery etc. During
sof's avatar
sof committed
10
 * STG execution, a pointer to the capabilitity is kept in a
11
 * register (BaseReg; actually it is a pointer to cap->r).
sof's avatar
sof committed
12
 *
13
14
15
 * Only in an THREADED_RTS build will there be multiple capabilities,
 * for non-threaded builds there is only one global capability, namely
 * MainCapability.
16
 *
sof's avatar
sof committed
17
 * --------------------------------------------------------------------------*/
18

sof's avatar
sof committed
19
20
#include "PosixSource.h"
#include "Rts.h"
Simon Marlow's avatar
Simon Marlow committed
21

sof's avatar
sof committed
22
#include "Capability.h"
23
#include "Schedule.h"
24
#include "Sparks.h"
Simon Marlow's avatar
Simon Marlow committed
25
#include "Trace.h"
Simon Marlow's avatar
Simon Marlow committed
26
27
28
#include "sm/GC.h" // for gcWorkerThread()
#include "STM.h"
#include "RtsUtils.h"
sof's avatar
sof committed
29

30
31
#include <string.h>

32
33
34
// one global capability, this is the Capability for non-threaded
// builds, and for +RTS -N1
Capability MainCapability;
sof's avatar
sof committed
35

Simon Marlow's avatar
Simon Marlow committed
36
nat n_capabilities = 0;
37
nat enabled_capabilities = 0;
38
39
40
41
42
43
44

// The array of Capabilities.  It's important that when we need
// to allocate more Capabilities we don't have to move the existing
// Capabilities, because there may be pointers to them in use
// (e.g. threads in waitForReturnCapability(), see #8209), so this is
// an array of Capability* rather than an array of Capability.
Capability **capabilities = NULL;
sof's avatar
sof committed
45

46
47
48
49
// Holds the Capability which last became free.  This is used so that
// an in-call has a chance of quickly finding a free Capability.
// Maintaining a global free list of Capabilities would require global
// locking, so we don't do that.
Simon Marlow's avatar
Simon Marlow committed
50
Capability *last_free_capability = NULL;
51

52
53
54
55
56
57
/*
 * Indicates that the RTS wants to synchronise all the Capabilities
 * for some reason.  All Capabilities should stop and return to the
 * scheduler.
 */
volatile StgWord pending_sync = 0;
58

59
60
61
/* Let foreign code get the current Capability -- assuming there is one!
 * This is useful for unsafe foreign calls because they are called with
 * the current Capability held, but they are not passed it. For example,
62
 * see see the integer-gmp package which calls allocate() in its
63
64
65
66
67
68
69
70
71
72
73
 * stgAllocForGMP() function (which gets called by gmp functions).
 * */
Capability * rts_unsafeGetMyCapability (void)
{
#if defined(THREADED_RTS)
  return myTask()->cap;
#else
  return &MainCapability;
#endif
}

74
#if defined(THREADED_RTS)
75
76
77
STATIC_INLINE rtsBool
globalWorkToDo (void)
{
78
79
    return sched_state >= SCHED_INTERRUPTING
        || recent_activity == ACTIVITY_INACTIVE; // need to check for deadlock
80
}
81
#endif
82

83
#if defined(THREADED_RTS)
84
StgClosure *
85
findSpark (Capability *cap)
86
{
87
88
  Capability *robbed;
  StgClosurePtr spark;
89
  rtsBool retry;
90
91
  nat i = 0;

92
  if (!emptyRunQueue(cap) || cap->returning_tasks_hd != NULL) {
93
94
95
96
97
98
      // If there are other threads, don't try to run any new
      // sparks: sparks might be speculative, we don't want to take
      // resources away from the main computation.
      return 0;
  }

99
100
  do {
      retry = rtsFalse;
101

102
103
104
105
106
107
      // first try to get a spark from our own pool.
      // We should be using reclaimSpark(), because it works without
      // needing any atomic instructions:
      //   spark = reclaimSpark(cap->sparks);
      // However, measurements show that this makes at least one benchmark
      // slower (prsa) and doesn't affect the others.
108
109
      spark = tryStealSpark(cap->sparks);
      while (spark != NULL && fizzledSpark(spark)) {
110
          cap->spark_stats.fizzled++;
111
          traceEventSparkFizzle(cap);
112
113
          spark = tryStealSpark(cap->sparks);
      }
114
      if (spark != NULL) {
115
          cap->spark_stats.converted++;
116
117

          // Post event for running a spark from capability's own pool.
118
          traceEventSparkRun(cap);
119
120
121
122
123
124
125
126
127
128
129
130
131

          return spark;
      }
      if (!emptySparkPoolCap(cap)) {
          retry = rtsTrue;
      }

      if (n_capabilities == 1) { return NULL; } // makes no sense...

      debugTrace(DEBUG_sched,
                 "cap %d: Trying to steal work from other capabilities", 
                 cap->no);

132
133
134
      /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
      start at a random place instead of 0 as well.  */
      for ( i=0 ; i < n_capabilities ; i++ ) {
135
          robbed = capabilities[i];
136
137
          if (cap == robbed)  // ourselves...
              continue;
138

139
140
141
          if (emptySparkPoolCap(robbed)) // nothing to steal here
              continue;

142
143
          spark = tryStealSpark(robbed->sparks);
          while (spark != NULL && fizzledSpark(spark)) {
144
              cap->spark_stats.fizzled++;
145
              traceEventSparkFizzle(cap);
146
147
              spark = tryStealSpark(robbed->sparks);
          }
148
149
150
151
152
153
154
          if (spark == NULL && !emptySparkPoolCap(robbed)) {
              // we conflicted with another thread while trying to steal;
              // try again later.
              retry = rtsTrue;
          }

          if (spark != NULL) {
155
              cap->spark_stats.converted++;
156
              traceEventSparkSteal(cap, robbed->no);
157
              
158
              return spark;
159
160
161
162
          }
          // otherwise: no success, try next one
      }
  } while (retry);
163

164
  debugTrace(DEBUG_sched, "No sparks stolen");
165
166
167
168
169
170
171
172
173
174
175
176
177
  return NULL;
}

// Returns True if any spark pool is non-empty at this moment in time
// The result is only valid for an instant, of course, so in a sense
// is immediately invalid, and should not be relied upon for
// correctness.
rtsBool
anySparks (void)
{
    nat i;

    for (i=0; i < n_capabilities; i++) {
178
        if (!emptySparkPoolCap(capabilities[i])) {
179
180
181
182
            return rtsTrue;
        }
    }
    return rtsFalse;
183
}
184
#endif
185
186
187

/* -----------------------------------------------------------------------------
 * Manage the returning_tasks lists.
188
 *
189
190
191
192
193
194
195
196
 * These functions require cap->lock
 * -------------------------------------------------------------------------- */

#if defined(THREADED_RTS)
STATIC_INLINE void
newReturningTask (Capability *cap, Task *task)
{
    ASSERT_LOCK_HELD(&cap->lock);
197
    ASSERT(task->next == NULL);
198
    if (cap->returning_tasks_hd) {
199
200
	ASSERT(cap->returning_tasks_tl->next == NULL);
	cap->returning_tasks_tl->next = task;
201
202
203
204
    } else {
	cap->returning_tasks_hd = task;
    }
    cap->returning_tasks_tl = task;
205
206
}

207
208
209
210
211
212
213
STATIC_INLINE Task *
popReturningTask (Capability *cap)
{
    ASSERT_LOCK_HELD(&cap->lock);
    Task *task;
    task = cap->returning_tasks_hd;
    ASSERT(task);
214
    cap->returning_tasks_hd = task->next;
215
216
217
    if (!cap->returning_tasks_hd) {
	cap->returning_tasks_tl = NULL;
    }
218
    task->next = NULL;
219
220
221
222
    return task;
}
#endif

223
/* ----------------------------------------------------------------------------
224
225
226
227
 * Initialisation
 *
 * The Capability is initially marked not free.
 * ------------------------------------------------------------------------- */
228
229

static void
230
initCapability( Capability *cap, nat i )
sof's avatar
sof committed
231
{
232
    nat g;
233

234
235
    cap->no = i;
    cap->in_haskell        = rtsFalse;
236
    cap->idle              = 0;
237
    cap->disabled          = rtsFalse;
238
239
240
241
242
243
244
245

    cap->run_queue_hd      = END_TSO_QUEUE;
    cap->run_queue_tl      = END_TSO_QUEUE;

#if defined(THREADED_RTS)
    initMutex(&cap->lock);
    cap->running_task      = NULL; // indicates cap is free
    cap->spare_workers     = NULL;
246
    cap->n_spare_workers   = 0;
247
    cap->suspended_ccalls  = NULL;
248
249
    cap->returning_tasks_hd = NULL;
    cap->returning_tasks_tl = NULL;
250
    cap->inbox              = (Message*)END_TSO_QUEUE;
251
    cap->sparks             = allocSparkPool();
252
253
    cap->spark_stats.created    = 0;
    cap->spark_stats.dud        = 0;
254
    cap->spark_stats.overflowed = 0;
255
256
257
    cap->spark_stats.converted  = 0;
    cap->spark_stats.gcd        = 0;
    cap->spark_stats.fizzled    = 0;
258
#endif
259
    cap->total_allocated        = 0;
260

261
    cap->f.stgEagerBlackholeInfo = (W_)&__stg_EAGER_BLACKHOLE_info;
Simon Marlow's avatar
Simon Marlow committed
262
263
    cap->f.stgGCEnter1     = (StgFunPtr)__stg_gc_enter_1;
    cap->f.stgGCFun        = (StgFunPtr)__stg_gc_fun;
264

265
    cap->mut_lists  = stgMallocBytes(sizeof(bdescr *) *
266
267
				     RtsFlags.GcFlags.generations,
				     "initCapability");
268
269
270
    cap->saved_mut_lists = stgMallocBytes(sizeof(bdescr *) *
                                          RtsFlags.GcFlags.generations,
                                          "initCapability");
271
272
273

    for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
	cap->mut_lists[g] = NULL;
274
    }
275

276
277
    cap->weak_ptr_list_hd = NULL;
    cap->weak_ptr_list_tl = NULL;
tharris@microsoft.com's avatar
tharris@microsoft.com committed
278
279
    cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE;
    cap->free_invariant_check_queues = END_INVARIANT_CHECK_QUEUE;
280
281
282
    cap->free_trec_chunks = END_STM_CHUNK_LIST;
    cap->free_trec_headers = NO_TREC;
    cap->transaction_tokens = 0;
283
    cap->context_switch = 0;
284
    cap->pinned_object_block = NULL;
285
    cap->pinned_object_blocks = NULL;
286

287
288
#ifdef PROFILING
    cap->r.rCCCS = CCS_SYSTEM;
289
290
#else
    cap->r.rCCCS = NULL;
291
292
#endif

293
    traceCapCreate(cap);
294
    traceCapsetAssignCap(CAPSET_OSPROCESS_DEFAULT, i);
295
    traceCapsetAssignCap(CAPSET_CLOCKDOMAIN_DEFAULT, i);
Duncan Coutts's avatar
Duncan Coutts committed
296
297
298
#if defined(THREADED_RTS)
    traceSparkCounters(cap);
#endif
sof's avatar
sof committed
299
300
}

301
/* ---------------------------------------------------------------------------
sof's avatar
sof committed
302
303
 * Function:  initCapabilities()
 *
304
 * Purpose:   set up the Capability handling. For the THREADED_RTS build,
sof's avatar
sof committed
305
 *            we keep a table of them, the size of which is
306
 *            controlled by the user via the RTS flag -N.
sof's avatar
sof committed
307
 *
308
 * ------------------------------------------------------------------------- */
sof's avatar
sof committed
309
void
310
initCapabilities( void )
sof's avatar
sof committed
311
{
312
313
    /* Declare a couple capability sets representing the process and
       clock domain. Each capability will get added to these capsets. */
314
    traceCapsetCreate(CAPSET_OSPROCESS_DEFAULT, CapsetTypeOsProcess);
315
    traceCapsetCreate(CAPSET_CLOCKDOMAIN_DEFAULT, CapsetTypeClockdomain);
316

317
#if defined(THREADED_RTS)
318

319
#ifndef REG_Base
Simon Marlow's avatar
Simon Marlow committed
320
321
322
323
324
325
326
    // We can't support multiple CPUs if BaseReg is not a register
    if (RtsFlags.ParFlags.nNodes > 1) {
	errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
	RtsFlags.ParFlags.nNodes = 1;
    }
#endif

327
328
    n_capabilities = 0;
    moreCapabilities(0, RtsFlags.ParFlags.nNodes);
329
330
331
332
    n_capabilities = RtsFlags.ParFlags.nNodes;

#else /* !THREADED_RTS */

333
    n_capabilities = 1;
334
335
    capabilities = stgMallocBytes(sizeof(Capability*), "initCapabilities");
    capabilities[0] = &MainCapability;
336
    initCapability(&MainCapability, 0);
337

338
339
#endif

340
341
    enabled_capabilities = n_capabilities;

342
343
344
    // There are no free capabilities to begin with.  We will start
    // a worker Task to each Capability, which will quickly put the
    // Capability on the free list when it finds nothing to do.
345
    last_free_capability = capabilities[0];
sof's avatar
sof committed
346
347
}

348
void
349
350
351
352
moreCapabilities (nat from USED_IF_THREADS, nat to USED_IF_THREADS)
{
#if defined(THREADED_RTS)
    nat i;
353
354
355
    Capability **old_capabilities = capabilities;

    capabilities = stgMallocBytes(to * sizeof(Capability*), "moreCapabilities");
356
357
358
359
360

    if (to == 1) {
        // THREADED_RTS must work on builds that don't have a mutable
        // BaseReg (eg. unregisterised), so in this case
	// capabilities[0] must coincide with &MainCapability.
361
        capabilities[0] = &MainCapability;
362
        initCapability(&MainCapability, 0);
363
    }
364
365
366
367
368
369
370
371
372
373
    else
    {
        for (i = 0; i < to; i++) {
            if (i < from) {
                capabilities[i] = old_capabilities[i];
            } else {
                capabilities[i] = stgMallocBytes(sizeof(Capability),
                                                 "moreCapabilities");
                initCapability(capabilities[i], i);
            }
374
        }
375
376
377
378
    }

    debugTrace(DEBUG_sched, "allocated %d more capabilities", to - from);

379
380
    if (old_capabilities != NULL) {
        stgFree(old_capabilities);
381
382
383
384
    }
#endif
}

385
386
387
388
389
/* ----------------------------------------------------------------------------
 * setContextSwitches: cause all capabilities to context switch as
 * soon as possible.
 * ------------------------------------------------------------------------- */

390
void contextSwitchAllCapabilities(void)
391
{
392
393
    nat i;
    for (i=0; i < n_capabilities; i++) {
394
        contextSwitchCapability(capabilities[i]);
395
    }
396
397
}

398
399
400
401
void interruptAllCapabilities(void)
{
    nat i;
    for (i=0; i < n_capabilities; i++) {
402
        interruptCapability(capabilities[i]);
403
404
405
    }
}

406
/* ----------------------------------------------------------------------------
407
408
409
410
411
412
413
414
415
416
 * Give a Capability to a Task.  The task must currently be sleeping
 * on its condition variable.
 *
 * Requires cap->lock (modifies cap->running_task).
 *
 * When migrating a Task, the migrater must take task->lock before
 * modifying task->cap, to synchronise with the waking up Task.
 * Additionally, the migrater should own the Capability (when
 * migrating the run queue), or cap->lock (when migrating
 * returning_workers).
417
418
 *
 * ------------------------------------------------------------------------- */
419
420
421

#if defined(THREADED_RTS)
STATIC_INLINE void
422
giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
423
{
424
425
    ASSERT_LOCK_HELD(&cap->lock);
    ASSERT(task->cap == cap);
426
    debugTrace(DEBUG_sched, "passing capability %d to %s %#" FMT_HexWord64,
427
               cap->no, task->incall->tso ? "bound task" : "worker",
428
               serialisableTaskId(task));
429
    ACQUIRE_LOCK(&task->lock);
430
431
432
433
434
435
436
    if (task->wakeup == rtsFalse) {
        task->wakeup = rtsTrue;
        // the wakeup flag is needed because signalCondition() doesn't
        // flag the condition if the thread is already runniing, but we want
        // it to be sticky.
        signalCondition(&task->cond);
    }
437
    RELEASE_LOCK(&task->lock);
438
}
439
#endif
440

441
/* ----------------------------------------------------------------------------
sof's avatar
sof committed
442
443
 * Function:  releaseCapability(Capability*)
 *
sof's avatar
sof committed
444
445
446
 * Purpose:   Letting go of a capability. Causes a
 *            'returning worker' thread or a 'waiting worker'
 *            to wake up, in that order.
447
448
 * ------------------------------------------------------------------------- */

449
#if defined(THREADED_RTS)
450
void
451
452
releaseCapability_ (Capability* cap, 
                    rtsBool always_wakeup)
453
{
454
455
456
457
    Task *task;

    task = cap->running_task;

458
    ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
459
460

    cap->running_task = NULL;
461

462
463
    // Check to see whether a worker thread can be given
    // the go-ahead to return the result of an external call..
464
465
466
467
    if (cap->returning_tasks_hd != NULL) {
	giveCapabilityToTask(cap,cap->returning_tasks_hd);
	// The Task pops itself from the queue (see waitForReturnCapability())
	return;
468
    }
469

470
471
472
473
    // If there is a pending sync, then we should just leave the
    // Capability free.  The thread trying to sync will be about to
    // call waitForReturnCapability().
    if (pending_sync != 0 && pending_sync != SYNC_GC_PAR) {
474
      last_free_capability = cap; // needed?
475
      debugTrace(DEBUG_sched, "sync pending, set capability %d free", cap->no);
476
477
478
      return;
    } 

479
480
    // If the next thread on the run queue is a bound thread,
    // give this Capability to the appropriate Task.
481
    if (!emptyRunQueue(cap) && peekRunQueue(cap)->bound) {
482
	// Make sure we're not about to try to wake ourselves up
483
484
485
486
	// ASSERT(task != cap->run_queue_hd->bound);
        // assertion is false: in schedule() we force a yield after
	// ThreadBlocked, but the thread may be back on the run queue
	// by now.
487
	task = peekRunQueue(cap)->bound->task;
Gabor Greif's avatar
typo    
Gabor Greif committed
488
	giveCapabilityToTask(cap, task);
489
	return;
490
    }
491

492
    if (!cap->spare_workers) {
493
494
495
496
	// Create a worker thread if we don't have one.  If the system
	// is interrupted, we only create a worker task if there
	// are threads that need to be completed.  If the system is
	// shutting down, we never create a new worker.
497
	if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
Simon Marlow's avatar
Simon Marlow committed
498
499
	    debugTrace(DEBUG_sched,
		       "starting new worker on capability %d", cap->no);
500
	    startWorkerTask(cap);
501
502
	    return;
	}
503
    }
504

505
506
    // If we have an unbound thread on the run queue, or if there's
    // anything else to do, give the Capability to a worker thread.
507
    if (always_wakeup || 
508
        !emptyRunQueue(cap) || !emptyInbox(cap) ||
509
        (!cap->disabled && !emptySparkPoolCap(cap)) || globalWorkToDo()) {
510
	if (cap->spare_workers) {
Gabor Greif's avatar
typo    
Gabor Greif committed
511
	    giveCapabilityToTask(cap, cap->spare_workers);
512
513
514
515
516
	    // The worker Task pops itself from the queue;
	    return;
	}
    }

517
518
519
#ifdef PROFILING
    cap->r.rCCCS = CCS_IDLE;
#endif
520
    last_free_capability = cap;
Simon Marlow's avatar
Simon Marlow committed
521
    debugTrace(DEBUG_sched, "freeing capability %d", cap->no);
sof's avatar
sof committed
522
523
}

524
void
525
releaseCapability (Capability* cap USED_IF_THREADS)
526
527
{
    ACQUIRE_LOCK(&cap->lock);
528
529
530
531
532
533
534
535
536
    releaseCapability_(cap, rtsFalse);
    RELEASE_LOCK(&cap->lock);
}

void
releaseAndWakeupCapability (Capability* cap USED_IF_THREADS)
{
    ACQUIRE_LOCK(&cap->lock);
    releaseCapability_(cap, rtsTrue);
537
538
539
540
    RELEASE_LOCK(&cap->lock);
}

static void
541
releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
542
543
544
545
546
547
548
{
    Task *task;

    ACQUIRE_LOCK(&cap->lock);

    task = cap->running_task;

549
550
551
552
    // If the Task is stopped, we shouldn't be yielding, we should
    // be just exiting.
    ASSERT(!task->stopped);

553
554
555
556
557
    // If the current task is a worker, save it on the spare_workers
    // list of this Capability.  A worker can mark itself as stopped,
    // in which case it is not replaced on the spare_worker queue.
    // This happens when the system is shutting down (see
    // Schedule.c:workerStart()).
558
    if (!isBoundTask(task))
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
    {
        if (cap->n_spare_workers < MAX_SPARE_WORKERS)
        {
            task->next = cap->spare_workers;
            cap->spare_workers = task;
            cap->n_spare_workers++;
        }
        else
        {
            debugTrace(DEBUG_sched, "%d spare workers already, exiting",
                       cap->n_spare_workers);
            releaseCapability_(cap,rtsFalse);
            // hold the lock until after workerTaskStop; c.f. scheduleWorker()
            workerTaskStop(task);
            RELEASE_LOCK(&cap->lock);
            shutdownThread();
        }
576
577
578
    }
    // Bound tasks just float around attached to their TSOs.

579
    releaseCapability_(cap,rtsFalse);
580
581
582
583

    RELEASE_LOCK(&cap->lock);
}
#endif
sof's avatar
sof committed
584

585
/* ----------------------------------------------------------------------------
586
 * waitForReturnCapability (Capability **pCap, Task *task)
sof's avatar
sof committed
587
588
 *
 * Purpose:  when an OS thread returns from an external call,
589
590
 * it calls waitForReturnCapability() (via Schedule.resumeThread())
 * to wait for permission to enter the RTS & communicate the
sof's avatar
sof committed
591
 * result of the external call back to the Haskell thread that
sof's avatar
sof committed
592
593
 * made it.
 *
594
 * ------------------------------------------------------------------------- */
sof's avatar
sof committed
595
void
596
waitForReturnCapability (Capability **pCap, Task *task)
sof's avatar
sof committed
597
{
598
#if !defined(THREADED_RTS)
599

600
601
602
    MainCapability.running_task = task;
    task->cap = &MainCapability;
    *pCap = &MainCapability;
603

604
#else
605
606
607
608
609
    Capability *cap = *pCap;

    if (cap == NULL) {
	// Try last_free_capability first
	cap = last_free_capability;
610
	if (cap->running_task) {
611
612
	    nat i;
	    // otherwise, search for a free capability
613
            cap = NULL;
614
	    for (i = 0; i < n_capabilities; i++) {
615
616
                if (!capabilities[i]->running_task) {
                    cap = capabilities[i];
617
618
619
		    break;
		}
	    }
620
621
622
623
            if (cap == NULL) {
                // Can't find a free one, use last_free_capability.
                cap = last_free_capability;
            }
624
625
626
627
628
	}

	// record the Capability as the one this Task is now assocated with.
	task->cap = cap;

629
    } else {
630
	ASSERT(task->cap == cap);
631
632
    }

633
    ACQUIRE_LOCK(&cap->lock);
sof's avatar
sof committed
634

Simon Marlow's avatar
Simon Marlow committed
635
    debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no);
sof's avatar
sof committed
636

637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
    if (!cap->running_task) {
	// It's free; just grab it
	cap->running_task = task;
	RELEASE_LOCK(&cap->lock);
    } else {
	newReturningTask(cap,task);
	RELEASE_LOCK(&cap->lock);

	for (;;) {
	    ACQUIRE_LOCK(&task->lock);
	    // task->lock held, cap->lock not held
	    if (!task->wakeup) waitCondition(&task->cond, &task->lock);
	    cap = task->cap;
	    task->wakeup = rtsFalse;
	    RELEASE_LOCK(&task->lock);

	    // now check whether we should wake up...
	    ACQUIRE_LOCK(&cap->lock);
	    if (cap->running_task == NULL) {
		if (cap->returning_tasks_hd != task) {
		    giveCapabilityToTask(cap,cap->returning_tasks_hd);
		    RELEASE_LOCK(&cap->lock);
		    continue;
		}
		cap->running_task = task;
		popReturningTask(cap);
		RELEASE_LOCK(&cap->lock);
		break;
	    }
	    RELEASE_LOCK(&cap->lock);
	}

    }

671
672
673
#ifdef PROFILING
    cap->r.rCCCS = CCS_SYSTEM;
#endif
674

Gabor Greif's avatar
typo    
Gabor Greif committed
675
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task);
676

Simon Marlow's avatar
Simon Marlow committed
677
    debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
678
679
680
681
682
683

    *pCap = cap;
#endif
}

#if defined(THREADED_RTS)
684
/* ----------------------------------------------------------------------------
685
 * yieldCapability
686
 * ------------------------------------------------------------------------- */
sof's avatar
sof committed
687

688
689
690
691
/* See Note [GC livelock] in Schedule.c for why we have gcAllowed
   and return the rtsBool */
rtsBool /* Did we GC? */
yieldCapability (Capability** pCap, Task *task, rtsBool gcAllowed)
sof's avatar
sof committed
692
{
693
694
    Capability *cap = *pCap;

695
    if ((pending_sync == SYNC_GC_PAR) && gcAllowed) {
696
        traceEventGcStart(cap);
697
        gcWorkerThread(cap);
698
        traceEventGcEnd(cap);
699
        traceSparkCounters(cap);
700
        // See Note [migrated bound threads 2]
701
702
        if (task->cap == cap) {
            return rtsTrue;
Ian Lynagh's avatar
Ian Lynagh committed
703
        }
704
705
    }

Simon Marlow's avatar
Simon Marlow committed
706
	debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
707
708

	// We must now release the capability and wait to be woken up
709
	// again.
710
	task->wakeup = rtsFalse;
711
712
713
714
715
716
717
718
719
720
	releaseCapabilityAndQueueWorker(cap);

	for (;;) {
	    ACQUIRE_LOCK(&task->lock);
	    // task->lock held, cap->lock not held
	    if (!task->wakeup) waitCondition(&task->cond, &task->lock);
	    cap = task->cap;
	    task->wakeup = rtsFalse;
	    RELEASE_LOCK(&task->lock);

Simon Marlow's avatar
Simon Marlow committed
721
722
	    debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);

723
724
	    ACQUIRE_LOCK(&cap->lock);
	    if (cap->running_task != NULL) {
Simon Marlow's avatar
Simon Marlow committed
725
726
		debugTrace(DEBUG_sched, 
			   "capability %d is owned by another task", cap->no);
727
728
729
730
		RELEASE_LOCK(&cap->lock);
		continue;
	    }

731
732
733
734
735
736
737
738
739
            if (task->cap != cap) {
                // see Note [migrated bound threads]
                debugTrace(DEBUG_sched,
                           "task has been migrated to cap %d", task->cap->no);
		RELEASE_LOCK(&cap->lock);
		continue;
	    }

            if (task->incall->tso == NULL) {
740
741
742
743
744
745
746
747
748
749
		ASSERT(cap->spare_workers != NULL);
		// if we're not at the front of the queue, release it
		// again.  This is unlikely to happen.
		if (cap->spare_workers != task) {
		    giveCapabilityToTask(cap,cap->spare_workers);
		    RELEASE_LOCK(&cap->lock);
		    continue;
		}
		cap->spare_workers = task->next;
		task->next = NULL;
750
751
                cap->n_spare_workers--;
            }
752
753

            cap->running_task = task;
754
755
756
757
	    RELEASE_LOCK(&cap->lock);
	    break;
	}

758
        debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
759
	ASSERT(cap->running_task == task);
760

761
762
763
764
#ifdef PROFILING
        cap->r.rCCCS = CCS_SYSTEM;
#endif

765
    *pCap = cap;
766

767
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
768

769
    return rtsFalse;
sof's avatar
sof committed
770
771
}

772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
// Note [migrated bound threads]
//
// There's a tricky case where:
//    - cap A is running an unbound thread T1
//    - there is a bound thread T2 at the head of the run queue on cap A
//    - T1 makes a safe foreign call, the task bound to T2 is woken up on cap A
//    - T1 returns quickly grabbing A again (T2 is still waking up on A)
//    - T1 blocks, the scheduler migrates T2 to cap B
//    - the task bound to T2 wakes up on cap B
//
// We take advantage of the following invariant:
//
//  - A bound thread can only be migrated by the holder of the
//    Capability on which the bound thread currently lives.  So, if we
//    hold Capabilty C, and task->cap == C, then task cannot be
//    migrated under our feet.

789
790
791
792
793
794
795
796
797
798
799
// Note [migrated bound threads 2]
//
// Second tricky case;
//   - A bound Task becomes a GC thread
//   - scheduleDoGC() migrates the thread belonging to this Task,
//     because the Capability it is on is disabled
//   - after GC, gcWorkerThread() returns, but now we are
//     holding a Capability that is not the same as task->cap
//   - Hence we must check for this case and immediately give up the
//     cap we hold.

800
/* ----------------------------------------------------------------------------
801
 * prodCapability
802
 *
803
804
 * If a Capability is currently idle, wake up a Task on it.  Used to 
 * get every Capability into the GC.
805
 * ------------------------------------------------------------------------- */
806

807
void
808
prodCapability (Capability *cap, Task *task)
809
{
810
811
812
813
814
815
    ACQUIRE_LOCK(&cap->lock);
    if (!cap->running_task) {
        cap->running_task = task;
        releaseCapability_(cap,rtsTrue);
    }
    RELEASE_LOCK(&cap->lock);
816
}
817

818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
/* ----------------------------------------------------------------------------
 * tryGrabCapability
 *
 * Attempt to gain control of a Capability if it is free.
 *
 * ------------------------------------------------------------------------- */

rtsBool
tryGrabCapability (Capability *cap, Task *task)
{
    if (cap->running_task != NULL) return rtsFalse;
    ACQUIRE_LOCK(&cap->lock);
    if (cap->running_task != NULL) {
	RELEASE_LOCK(&cap->lock);
	return rtsFalse;
    }
    task->cap = cap;
    cap->running_task = task;
    RELEASE_LOCK(&cap->lock);
    return rtsTrue;
}


#endif /* THREADED_RTS */

843
844
845
846
847
848
849
850
/* ----------------------------------------------------------------------------
 * shutdownCapability
 *
 * At shutdown time, we want to let everything exit as cleanly as
 * possible.  For each capability, we let its run queue drain, and
 * allow the workers to stop.
 *
 * This function should be called when interrupted and
851
 * sched_state = SCHED_SHUTTING_DOWN, thus any worker that wakes up
852
853
854
 * will exit the scheduler and call taskStop(), and any bound thread
 * that wakes up will return to its caller.  Runnable threads are
 * killed.
855
 *
856
 * ------------------------------------------------------------------------- */
857
858

void
859
shutdownCapability (Capability *cap USED_IF_THREADS,
860
861
                    Task *task USED_IF_THREADS,
                    rtsBool safe USED_IF_THREADS)
862
{
863
#if defined(THREADED_RTS)
864
865
866
867
    nat i;

    task->cap = cap;

868
869
870
871
872
873
874
    // Loop indefinitely until all the workers have exited and there
    // are no Haskell threads left.  We used to bail out after 50
    // iterations of this loop, but that occasionally left a worker
    // running which caused problems later (the closeMutex() below
    // isn't safe, for one thing).

    for (i = 0; /* i < 50 */; i++) {
Simon Marlow's avatar
Simon Marlow committed
875
876
        ASSERT(sched_state == SCHED_SHUTTING_DOWN);

Simon Marlow's avatar
Simon Marlow committed
877
878
	debugTrace(DEBUG_sched, 
		   "shutting down capability %d, attempt %d", cap->no, i);
879
880
881
	ACQUIRE_LOCK(&cap->lock);
	if (cap->running_task) {
	    RELEASE_LOCK(&cap->lock);
Simon Marlow's avatar
Simon Marlow committed
882
	    debugTrace(DEBUG_sched, "not owner, yielding");
883
884
	    yieldThread();
	    continue;
885
	}
886
	cap->running_task = task;
Simon Marlow's avatar
Simon Marlow committed
887
888
889
890
891
892
893
894
895
896
897
898
899

        if (cap->spare_workers) {
            // Look for workers that have died without removing
            // themselves from the list; this could happen if the OS
            // summarily killed the thread, for example.  This
            // actually happens on Windows when the system is
            // terminating the program, and the RTS is running in a
            // DLL.
            Task *t, *prev;
            prev = NULL;
            for (t = cap->spare_workers; t != NULL; t = t->next) {
                if (!osThreadIsAlive(t->id)) {
                    debugTrace(DEBUG_sched, 
Ian Lynagh's avatar
Ian Lynagh committed
900
                               "worker thread %p has died unexpectedly", (void *)(size_t)t->id);
901
902
903
904
905
906
907
                    cap->n_spare_workers--;
                    if (!prev) {
                        cap->spare_workers = t->next;
                    } else {
                        prev->next = t->next;
                    }
                    prev = t;
Simon Marlow's avatar
Simon Marlow committed
908
909
910
911
                }
            }
        }

912
	if (!emptyRunQueue(cap) || cap->spare_workers) {
Simon Marlow's avatar
Simon Marlow committed
913
914
	    debugTrace(DEBUG_sched, 
		       "runnable threads or workers still alive, yielding");
915
	    releaseCapability_(cap,rtsFalse); // this will wake up a worker
916
917
918
	    RELEASE_LOCK(&cap->lock);
	    yieldThread();
	    continue;
919
	}
920
921
922
923
924
925
926

        // If "safe", then busy-wait for any threads currently doing
        // foreign calls.  If we're about to unload this DLL, for
        // example, we need to be sure that there are no OS threads
        // that will try to return to code that has been unloaded.
        // We can be a bit more relaxed when this is a standalone
        // program that is about to terminate, and let safe=false.
927
        if (cap->suspended_ccalls && safe) {
928
929
930
931
	    debugTrace(DEBUG_sched, 
		       "thread(s) are involved in foreign calls, yielding");
            cap->running_task = NULL;
	    RELEASE_LOCK(&cap->lock);
932
933
934
935
936
937
938
939
            // The IO manager thread might have been slow to start up,
            // so the first attempt to kill it might not have
            // succeeded.  Just in case, try again - the kill message
            // will only be sent once.
            //
            // To reproduce this deadlock: run ffi002(threaded1)
            // repeatedly on a loaded machine.
            ioManagerDie();
940
941
942
            yieldThread();
            continue;
        }
943

944
        traceSparkCounters(cap);
945
946
	RELEASE_LOCK(&cap->lock);
	break;
947
    }
948
949
    // we now have the Capability, its run queue and spare workers
    // list are both empty.
950

951
952
953
954
    // ToDo: we can't drop this mutex, because there might still be
    // threads performing foreign calls that will eventually try to 
    // return via resumeThread() and attempt to grab cap->lock.
    // closeMutex(&cap->lock);
955
956
#endif
}
957

958
959
void
shutdownCapabilities(Task *task, rtsBool safe)
960
{
961
962
963
    nat i;
    for (i=0; i < n_capabilities; i++) {
        ASSERT(task->incall->tso == NULL);
964
        shutdownCapability(capabilities[i], task, safe);
965
    }
966
967
968
#if defined(THREADED_RTS)
    ASSERT(checkSparkCountInvariant());
#endif
969
970
}

971
972
973
static void
freeCapability (Capability *cap)
{
Ian Lynagh's avatar
Ian Lynagh committed
974
    stgFree(cap->mut_lists);
Simon Marlow's avatar
Simon Marlow committed
975
    stgFree(cap->saved_mut_lists);
Simon Marlow's avatar
Simon Marlow committed
976
#if defined(THREADED_RTS)
977
    freeSparkPool(cap->sparks);
Ian Lynagh's avatar
Ian Lynagh committed
978
#endif
979
980
981
    traceCapsetRemoveCap(CAPSET_OSPROCESS_DEFAULT, cap->no);
    traceCapsetRemoveCap(CAPSET_CLOCKDOMAIN_DEFAULT, cap->no);
    traceCapDelete(cap);
Ian Lynagh's avatar
Ian Lynagh committed
982
}
983

984
985
986
987
988
989
void
freeCapabilities (void)
{
#if defined(THREADED_RTS)
    nat i;
    for (i=0; i < n_capabilities; i++) {
990
        freeCapability(capabilities[i]);
991
992
        if (capabilities[i] != &MainCapability)
            stgFree(capabilities[i]);
993
994
995
996
    }
#else
    freeCapability(&MainCapability);
#endif
997
    stgFree(capabilities);
998
999
    traceCapsetDelete(CAPSET_OSPROCESS_DEFAULT);
    traceCapsetDelete(CAPSET_CLOCKDOMAIN_DEFAULT);
1000
1001
}

1002
1003
1004
1005
1006
1007
1008
/* ---------------------------------------------------------------------------
   Mark everything directly reachable from the Capabilities.  When
   using multiple GC threads, each GC thread marks all Capabilities
   for which (c `mod` n == 0), for Capability c and thread n.
   ------------------------------------------------------------------------ */

void
Simon Marlow's avatar
Simon Marlow committed
1009
1010
markCapability (evac_fn evac, void *user, Capability *cap,
                rtsBool no_mark_sparks USED_IF_THREADS)
1011
{
1012
    InCall *incall;
1013
1014
1015
1016
1017
1018

    // Each GC thread is responsible for following roots from the
    // Capability of the same number.  There will usually be the same
    // or fewer Capabilities as GC threads, but just in case there
    // are more, we mark every Capability whose number is the GC
    // thread's index plus a multiple of the number of GC threads.
Simon Marlow's avatar
Simon Marlow committed
1019
1020
    evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
    evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
1021
#if defined(THREADED_RTS)
Simon Marlow's avatar
Simon Marlow committed
1022
    evac(user, (StgClosure **)(void *)&cap->inbox);
1023
#endif
Simon Marlow's avatar
Simon Marlow committed
1024
1025
1026
1027
    for (incall = cap->suspended_ccalls; incall != NULL;
         incall=incall->next) {
        evac(user, (StgClosure **)(void *)&incall->suspended_tso);
    }
1028
1029

#if defined(THREADED_RTS)
Simon Marlow's avatar
Simon Marlow committed
1030
1031
    if (!no_mark_sparks) {
        traverseSparkQueue (evac, user, cap);
1032
    }
Simon Marlow's avatar
Simon Marlow committed
1033
#endif
1034

Simon Marlow's avatar
Simon Marlow committed
1035
1036
    // Free STM structures for this Capability
    stmPreGCHook(cap);
1037
1038
1039
1040
1041
}

void
markCapabilities (evac_fn evac, void *user)
{
Simon Marlow's avatar
Simon Marlow committed
1042
1043
    nat n;
    for (n = 0; n < n_capabilities; n++) {
1044
        markCapability(evac, user, capabilities[n], rtsFalse);
Simon Marlow's avatar
Simon Marlow committed
1045
    }
1046
}
1047
1048
1049
1050
1051
1052
1053
1054
1055

#if defined(THREADED_RTS)
rtsBool checkSparkCountInvariant (void)
{
    SparkCounters sparks = { 0, 0, 0, 0, 0, 0 };
    StgWord64 remaining = 0;
    nat i;

    for (i = 0; i < n_capabilities; i++) {
1056
1057
1058
1059
1060
1061
1062
        sparks.created   += capabilities[i]->spark_stats.created;
        sparks.dud       += capabilities[i]->spark_stats.dud;
        sparks.overflowed+= capabilities[i]->spark_stats.overflowed;
        sparks.converted += capabilities[i]->spark_stats.converted;
        sparks.gcd       += capabilities[i]->spark_stats.gcd;
        sparks.fizzled   += capabilities[i]->spark_stats.fizzled;
        remaining        += sparkPoolSize(capabilities[i]->sparks);
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
    }
    
    /* The invariant is
     *   created = converted + remaining + gcd + fizzled
     */
    debugTrace(DEBUG_sparks,"spark invariant: %ld == %ld + %ld + %ld + %ld "
                            "(created == converted + remaining + gcd + fizzled)",
                            sparks.created, sparks.converted, remaining,
                            sparks.gcd, sparks.fizzled);

    return (sparks.created ==
              sparks.converted + remaining + sparks.gcd + sparks.fizzled);

}
#endif
1078
1079
1080
1081
1082
1083
1084
1085

// Local Variables:
// mode: C
// fill-column: 80
// indent-tabs-mode: nil
// c-basic-offset: 4
// buffer-file-coding-system: utf-8-unix
// End: