Capability.c 26.1 KB
Newer Older
sof's avatar
sof committed
1
/* ---------------------------------------------------------------------------
2
 *
3
 * (c) The GHC Team, 2003-2006
sof's avatar
sof committed
4 5 6
 *
 * Capabilities
 *
sof's avatar
sof committed
7 8
 * A Capability represent the token required to execute STG code,
 * and all the state an OS thread/task needs to run Haskell code:
sof's avatar
sof committed
9
 * its STG registers, a pointer to its TSO, a nursery etc. During
sof's avatar
sof committed
10
 * STG execution, a pointer to the capabilitity is kept in a
11
 * register (BaseReg; actually it is a pointer to cap->r).
sof's avatar
sof committed
12
 *
13 14 15
 * Only in an THREADED_RTS build will there be multiple capabilities,
 * for non-threaded builds there is only one global capability, namely
 * MainCapability.
16
 *
sof's avatar
sof committed
17
 * --------------------------------------------------------------------------*/
18

sof's avatar
sof committed
19 20
#include "PosixSource.h"
#include "Rts.h"
Simon Marlow's avatar
Simon Marlow committed
21

sof's avatar
sof committed
22
#include "Capability.h"
23
#include "Schedule.h"
24
#include "Sparks.h"
Simon Marlow's avatar
Simon Marlow committed
25
#include "Trace.h"
Simon Marlow's avatar
Simon Marlow committed
26 27 28
#include "sm/GC.h" // for gcWorkerThread()
#include "STM.h"
#include "RtsUtils.h"
sof's avatar
sof committed
29

30 31 32
// one global capability, this is the Capability for non-threaded
// builds, and for +RTS -N1
Capability MainCapability;
sof's avatar
sof committed
33

Simon Marlow's avatar
Simon Marlow committed
34
nat n_capabilities = 0;
35
Capability *capabilities = NULL;
sof's avatar
sof committed
36

37 38 39 40
// Holds the Capability which last became free.  This is used so that
// an in-call has a chance of quickly finding a free Capability.
// Maintaining a global free list of Capabilities would require global
// locking, so we don't do that.
Simon Marlow's avatar
Simon Marlow committed
41
Capability *last_free_capability = NULL;
42

43 44 45
/* GC indicator, in scope for the scheduler, init'ed to false */
volatile StgWord waiting_for_gc = 0;

46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
/* Let foreign code get the current Capability -- assuming there is one!
 * This is useful for unsafe foreign calls because they are called with
 * the current Capability held, but they are not passed it. For example,
 * see see the integer-gmp package which calls allocateLocal() in its
 * stgAllocForGMP() function (which gets called by gmp functions).
 * */
Capability * rts_unsafeGetMyCapability (void)
{
#if defined(THREADED_RTS)
  return myTask()->cap;
#else
  return &MainCapability;
#endif
}

61
#if defined(THREADED_RTS)
62 63 64
STATIC_INLINE rtsBool
globalWorkToDo (void)
{
65 66
    return sched_state >= SCHED_INTERRUPTING
        || recent_activity == ACTIVITY_INACTIVE; // need to check for deadlock
67
}
68
#endif
69

70
#if defined(THREADED_RTS)
71
StgClosure *
72
findSpark (Capability *cap)
73
{
74 75
  Capability *robbed;
  StgClosurePtr spark;
76
  rtsBool retry;
77 78
  nat i = 0;

79
  if (!emptyRunQueue(cap) || cap->returning_tasks_hd != NULL) {
80 81 82 83 84 85
      // If there are other threads, don't try to run any new
      // sparks: sparks might be speculative, we don't want to take
      // resources away from the main computation.
      return 0;
  }

86 87
  do {
      retry = rtsFalse;
88

89 90 91 92 93 94 95 96 97 98 99
      // first try to get a spark from our own pool.
      // We should be using reclaimSpark(), because it works without
      // needing any atomic instructions:
      //   spark = reclaimSpark(cap->sparks);
      // However, measurements show that this makes at least one benchmark
      // slower (prsa) and doesn't affect the others.
      spark = tryStealSpark(cap);
      if (spark != NULL) {
          cap->sparks_converted++;

          // Post event for running a spark from capability's own pool.
100
          traceEventRunSpark(cap, cap->r.rCurrentTSO);
101 102 103 104 105 106 107 108 109 110 111 112 113

          return spark;
      }
      if (!emptySparkPoolCap(cap)) {
          retry = rtsTrue;
      }

      if (n_capabilities == 1) { return NULL; } // makes no sense...

      debugTrace(DEBUG_sched,
                 "cap %d: Trying to steal work from other capabilities", 
                 cap->no);

114 115 116 117 118 119
      /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
      start at a random place instead of 0 as well.  */
      for ( i=0 ; i < n_capabilities ; i++ ) {
          robbed = &capabilities[i];
          if (cap == robbed)  // ourselves...
              continue;
120

121 122 123
          if (emptySparkPoolCap(robbed)) // nothing to steal here
              continue;

124
          spark = tryStealSpark(robbed);
125 126 127 128 129 130 131
          if (spark == NULL && !emptySparkPoolCap(robbed)) {
              // we conflicted with another thread while trying to steal;
              // try again later.
              retry = rtsTrue;
          }

          if (spark != NULL) {
132
              cap->sparks_converted++;
133

134
              traceEventStealSpark(cap, cap->r.rCurrentTSO, robbed->no);
135
              
136
              return spark;
137 138 139 140
          }
          // otherwise: no success, try next one
      }
  } while (retry);
141

142
  debugTrace(DEBUG_sched, "No sparks stolen");
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
  return NULL;
}

// Returns True if any spark pool is non-empty at this moment in time
// The result is only valid for an instant, of course, so in a sense
// is immediately invalid, and should not be relied upon for
// correctness.
rtsBool
anySparks (void)
{
    nat i;

    for (i=0; i < n_capabilities; i++) {
        if (!emptySparkPoolCap(&capabilities[i])) {
            return rtsTrue;
        }
    }
    return rtsFalse;
161
}
162
#endif
163 164 165

/* -----------------------------------------------------------------------------
 * Manage the returning_tasks lists.
166
 *
167 168 169 170 171 172 173 174
 * These functions require cap->lock
 * -------------------------------------------------------------------------- */

#if defined(THREADED_RTS)
STATIC_INLINE void
newReturningTask (Capability *cap, Task *task)
{
    ASSERT_LOCK_HELD(&cap->lock);
175
    ASSERT(task->next == NULL);
176
    if (cap->returning_tasks_hd) {
177 178
	ASSERT(cap->returning_tasks_tl->next == NULL);
	cap->returning_tasks_tl->next = task;
179 180 181 182
    } else {
	cap->returning_tasks_hd = task;
    }
    cap->returning_tasks_tl = task;
183 184
}

185 186 187 188 189 190 191
STATIC_INLINE Task *
popReturningTask (Capability *cap)
{
    ASSERT_LOCK_HELD(&cap->lock);
    Task *task;
    task = cap->returning_tasks_hd;
    ASSERT(task);
192
    cap->returning_tasks_hd = task->next;
193 194 195
    if (!cap->returning_tasks_hd) {
	cap->returning_tasks_tl = NULL;
    }
196
    task->next = NULL;
197 198 199 200
    return task;
}
#endif

201
/* ----------------------------------------------------------------------------
202 203 204 205
 * Initialisation
 *
 * The Capability is initially marked not free.
 * ------------------------------------------------------------------------- */
206 207

static void
208
initCapability( Capability *cap, nat i )
sof's avatar
sof committed
209
{
210
    nat g;
211

212 213 214 215 216 217 218 219 220 221
    cap->no = i;
    cap->in_haskell        = rtsFalse;

    cap->run_queue_hd      = END_TSO_QUEUE;
    cap->run_queue_tl      = END_TSO_QUEUE;

#if defined(THREADED_RTS)
    initMutex(&cap->lock);
    cap->running_task      = NULL; // indicates cap is free
    cap->spare_workers     = NULL;
222
    cap->n_spare_workers   = 0;
223
    cap->suspended_ccalls  = NULL;
224 225
    cap->returning_tasks_hd = NULL;
    cap->returning_tasks_tl = NULL;
226
    cap->inbox              = (Message*)END_TSO_QUEUE;
227
    cap->sparks_created     = 0;
228
    cap->sparks_dud         = 0;
229
    cap->sparks_converted   = 0;
230 231
    cap->sparks_gcd         = 0;
    cap->sparks_fizzled     = 0;
232 233
#endif

234
    cap->f.stgEagerBlackholeInfo = (W_)&__stg_EAGER_BLACKHOLE_info;
Simon Marlow's avatar
Simon Marlow committed
235 236
    cap->f.stgGCEnter1     = (StgFunPtr)__stg_gc_enter_1;
    cap->f.stgGCFun        = (StgFunPtr)__stg_gc_fun;
237

238
    cap->mut_lists  = stgMallocBytes(sizeof(bdescr *) *
239 240
				     RtsFlags.GcFlags.generations,
				     "initCapability");
241 242 243
    cap->saved_mut_lists = stgMallocBytes(sizeof(bdescr *) *
                                          RtsFlags.GcFlags.generations,
                                          "initCapability");
244 245 246

    for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
	cap->mut_lists[g] = NULL;
247
    }
248

tharris@microsoft.com's avatar
tharris@microsoft.com committed
249 250
    cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE;
    cap->free_invariant_check_queues = END_INVARIANT_CHECK_QUEUE;
251 252 253
    cap->free_trec_chunks = END_STM_CHUNK_LIST;
    cap->free_trec_headers = NO_TREC;
    cap->transaction_tokens = 0;
254
    cap->context_switch = 0;
255
    cap->pinned_object_block = NULL;
sof's avatar
sof committed
256 257
}

258
/* ---------------------------------------------------------------------------
sof's avatar
sof committed
259 260
 * Function:  initCapabilities()
 *
261
 * Purpose:   set up the Capability handling. For the THREADED_RTS build,
sof's avatar
sof committed
262
 *            we keep a table of them, the size of which is
263
 *            controlled by the user via the RTS flag -N.
sof's avatar
sof committed
264
 *
265
 * ------------------------------------------------------------------------- */
sof's avatar
sof committed
266
void
267
initCapabilities( void )
sof's avatar
sof committed
268
{
269 270
#if defined(THREADED_RTS)
    nat i;
271

272
#ifndef REG_Base
273 274 275 276 277 278 279
    // We can't support multiple CPUs if BaseReg is not a register
    if (RtsFlags.ParFlags.nNodes > 1) {
	errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
	RtsFlags.ParFlags.nNodes = 1;
    }
#endif

280 281 282 283 284 285 286 287 288 289 290
    n_capabilities = RtsFlags.ParFlags.nNodes;

    if (n_capabilities == 1) {
	capabilities = &MainCapability;
	// THREADED_RTS must work on builds that don't have a mutable
	// BaseReg (eg. unregisterised), so in this case
	// capabilities[0] must coincide with &MainCapability.
    } else {
	capabilities = stgMallocBytes(n_capabilities * sizeof(Capability),
				      "initCapabilities");
    }
291

292
    for (i = 0; i < n_capabilities; i++) {
293
	initCapability(&capabilities[i], i);
294
    }
295

Simon Marlow's avatar
Simon Marlow committed
296
    debugTrace(DEBUG_sched, "allocated %d capabilities", n_capabilities);
297 298 299

#else /* !THREADED_RTS */

300
    n_capabilities = 1;
301
    capabilities = &MainCapability;
302
    initCapability(&MainCapability, 0);
303

304 305
#endif

306 307 308 309
    // There are no free capabilities to begin with.  We will start
    // a worker Task to each Capability, which will quickly put the
    // Capability on the free list when it finds nothing to do.
    last_free_capability = &capabilities[0];
sof's avatar
sof committed
310 311
}

312 313 314 315 316 317 318
/* ----------------------------------------------------------------------------
 * setContextSwitches: cause all capabilities to context switch as
 * soon as possible.
 * ------------------------------------------------------------------------- */

void setContextSwitches(void)
{
319 320 321 322
    nat i;
    for (i=0; i < n_capabilities; i++) {
        contextSwitchCapability(&capabilities[i]);
    }
323 324
}

325
/* ----------------------------------------------------------------------------
326 327 328 329 330 331 332 333 334 335
 * Give a Capability to a Task.  The task must currently be sleeping
 * on its condition variable.
 *
 * Requires cap->lock (modifies cap->running_task).
 *
 * When migrating a Task, the migrater must take task->lock before
 * modifying task->cap, to synchronise with the waking up Task.
 * Additionally, the migrater should own the Capability (when
 * migrating the run queue), or cap->lock (when migrating
 * returning_workers).
336 337
 *
 * ------------------------------------------------------------------------- */
338 339 340

#if defined(THREADED_RTS)
STATIC_INLINE void
341
giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
342
{
343 344
    ASSERT_LOCK_HELD(&cap->lock);
    ASSERT(task->cap == cap);
Simon Marlow's avatar
Simon Marlow committed
345
    debugTrace(DEBUG_sched, "passing capability %d to %s %p",
346
               cap->no, task->incall->tso ? "bound task" : "worker",
Simon Marlow's avatar
Simon Marlow committed
347
               (void *)task->id);
348 349 350 351 352 353 354
    ACQUIRE_LOCK(&task->lock);
    task->wakeup = rtsTrue;
    // the wakeup flag is needed because signalCondition() doesn't
    // flag the condition if the thread is already runniing, but we want
    // it to be sticky.
    signalCondition(&task->cond);
    RELEASE_LOCK(&task->lock);
355
}
356
#endif
357

358
/* ----------------------------------------------------------------------------
sof's avatar
sof committed
359 360
 * Function:  releaseCapability(Capability*)
 *
sof's avatar
sof committed
361 362 363
 * Purpose:   Letting go of a capability. Causes a
 *            'returning worker' thread or a 'waiting worker'
 *            to wake up, in that order.
364 365
 * ------------------------------------------------------------------------- */

366
#if defined(THREADED_RTS)
367
void
368 369
releaseCapability_ (Capability* cap, 
                    rtsBool always_wakeup)
370
{
371 372 373 374
    Task *task;

    task = cap->running_task;

375
    ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
376 377

    cap->running_task = NULL;
378

379 380
    // Check to see whether a worker thread can be given
    // the go-ahead to return the result of an external call..
381 382 383 384
    if (cap->returning_tasks_hd != NULL) {
	giveCapabilityToTask(cap,cap->returning_tasks_hd);
	// The Task pops itself from the queue (see waitForReturnCapability())
	return;
385
    }
386

387
    if (waiting_for_gc == PENDING_GC_SEQ) {
388
      last_free_capability = cap; // needed?
Simon Marlow's avatar
Simon Marlow committed
389
      debugTrace(DEBUG_sched, "GC pending, set capability %d free", cap->no);
390 391 392 393
      return;
    } 


394 395 396 397
    // If the next thread on the run queue is a bound thread,
    // give this Capability to the appropriate Task.
    if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
	// Make sure we're not about to try to wake ourselves up
398 399 400 401
	// ASSERT(task != cap->run_queue_hd->bound);
        // assertion is false: in schedule() we force a yield after
	// ThreadBlocked, but the thread may be back on the run queue
	// by now.
402
	task = cap->run_queue_hd->bound->task;
403 404
	giveCapabilityToTask(cap,task);
	return;
405
    }
406

407
    if (!cap->spare_workers) {
408 409 410 411
	// Create a worker thread if we don't have one.  If the system
	// is interrupted, we only create a worker task if there
	// are threads that need to be completed.  If the system is
	// shutting down, we never create a new worker.
412
	if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
Simon Marlow's avatar
Simon Marlow committed
413 414
	    debugTrace(DEBUG_sched,
		       "starting new worker on capability %d", cap->no);
415
	    startWorkerTask(cap);
416 417
	    return;
	}
418
    }
419

420 421
    // If we have an unbound thread on the run queue, or if there's
    // anything else to do, give the Capability to a worker thread.
422
    if (always_wakeup || 
423
        !emptyRunQueue(cap) || !emptyInbox(cap) ||
424
        !emptySparkPoolCap(cap) || globalWorkToDo()) {
425 426 427 428 429 430 431
	if (cap->spare_workers) {
	    giveCapabilityToTask(cap,cap->spare_workers);
	    // The worker Task pops itself from the queue;
	    return;
	}
    }

432
    last_free_capability = cap;
Simon Marlow's avatar
Simon Marlow committed
433
    debugTrace(DEBUG_sched, "freeing capability %d", cap->no);
sof's avatar
sof committed
434 435
}

436
void
437
releaseCapability (Capability* cap USED_IF_THREADS)
438 439
{
    ACQUIRE_LOCK(&cap->lock);
440 441 442 443 444 445 446 447 448
    releaseCapability_(cap, rtsFalse);
    RELEASE_LOCK(&cap->lock);
}

void
releaseAndWakeupCapability (Capability* cap USED_IF_THREADS)
{
    ACQUIRE_LOCK(&cap->lock);
    releaseCapability_(cap, rtsTrue);
449 450 451 452
    RELEASE_LOCK(&cap->lock);
}

static void
453
releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
454 455 456 457 458 459 460
{
    Task *task;

    ACQUIRE_LOCK(&cap->lock);

    task = cap->running_task;

461 462 463 464
    // If the Task is stopped, we shouldn't be yielding, we should
    // be just exiting.
    ASSERT(!task->stopped);

465 466 467 468 469
    // If the current task is a worker, save it on the spare_workers
    // list of this Capability.  A worker can mark itself as stopped,
    // in which case it is not replaced on the spare_worker queue.
    // This happens when the system is shutting down (see
    // Schedule.c:workerStart()).
470
    if (!isBoundTask(task))
471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487
    {
        if (cap->n_spare_workers < MAX_SPARE_WORKERS)
        {
            task->next = cap->spare_workers;
            cap->spare_workers = task;
            cap->n_spare_workers++;
        }
        else
        {
            debugTrace(DEBUG_sched, "%d spare workers already, exiting",
                       cap->n_spare_workers);
            releaseCapability_(cap,rtsFalse);
            // hold the lock until after workerTaskStop; c.f. scheduleWorker()
            workerTaskStop(task);
            RELEASE_LOCK(&cap->lock);
            shutdownThread();
        }
488 489 490
    }
    // Bound tasks just float around attached to their TSOs.

491
    releaseCapability_(cap,rtsFalse);
492 493 494 495

    RELEASE_LOCK(&cap->lock);
}
#endif
sof's avatar
sof committed
496

497
/* ----------------------------------------------------------------------------
498
 * waitForReturnCapability( Task *task )
sof's avatar
sof committed
499 500
 *
 * Purpose:  when an OS thread returns from an external call,
501 502
 * it calls waitForReturnCapability() (via Schedule.resumeThread())
 * to wait for permission to enter the RTS & communicate the
sof's avatar
sof committed
503
 * result of the external call back to the Haskell thread that
sof's avatar
sof committed
504 505
 * made it.
 *
506
 * ------------------------------------------------------------------------- */
sof's avatar
sof committed
507
void
508
waitForReturnCapability (Capability **pCap, Task *task)
sof's avatar
sof committed
509
{
510
#if !defined(THREADED_RTS)
511

512 513 514
    MainCapability.running_task = task;
    task->cap = &MainCapability;
    *pCap = &MainCapability;
515

516
#else
517 518 519 520 521
    Capability *cap = *pCap;

    if (cap == NULL) {
	// Try last_free_capability first
	cap = last_free_capability;
522
	if (cap->running_task) {
523 524
	    nat i;
	    // otherwise, search for a free capability
525
            cap = NULL;
526
	    for (i = 0; i < n_capabilities; i++) {
527 528
		if (!capabilities[i].running_task) {
                    cap = &capabilities[i];
529 530 531
		    break;
		}
	    }
532 533 534 535
            if (cap == NULL) {
                // Can't find a free one, use last_free_capability.
                cap = last_free_capability;
            }
536 537 538 539 540
	}

	// record the Capability as the one this Task is now assocated with.
	task->cap = cap;

541
    } else {
542
	ASSERT(task->cap == cap);
543 544
    }

545
    ACQUIRE_LOCK(&cap->lock);
sof's avatar
sof committed
546

Simon Marlow's avatar
Simon Marlow committed
547
    debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no);
sof's avatar
sof committed
548

549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582
    if (!cap->running_task) {
	// It's free; just grab it
	cap->running_task = task;
	RELEASE_LOCK(&cap->lock);
    } else {
	newReturningTask(cap,task);
	RELEASE_LOCK(&cap->lock);

	for (;;) {
	    ACQUIRE_LOCK(&task->lock);
	    // task->lock held, cap->lock not held
	    if (!task->wakeup) waitCondition(&task->cond, &task->lock);
	    cap = task->cap;
	    task->wakeup = rtsFalse;
	    RELEASE_LOCK(&task->lock);

	    // now check whether we should wake up...
	    ACQUIRE_LOCK(&cap->lock);
	    if (cap->running_task == NULL) {
		if (cap->returning_tasks_hd != task) {
		    giveCapabilityToTask(cap,cap->returning_tasks_hd);
		    RELEASE_LOCK(&cap->lock);
		    continue;
		}
		cap->running_task = task;
		popReturningTask(cap);
		RELEASE_LOCK(&cap->lock);
		break;
	    }
	    RELEASE_LOCK(&cap->lock);
	}

    }

583
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
584

Simon Marlow's avatar
Simon Marlow committed
585
    debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
586 587 588 589 590 591

    *pCap = cap;
#endif
}

#if defined(THREADED_RTS)
592
/* ----------------------------------------------------------------------------
593
 * yieldCapability
594
 * ------------------------------------------------------------------------- */
sof's avatar
sof committed
595

sof's avatar
sof committed
596
void
597
yieldCapability (Capability** pCap, Task *task)
sof's avatar
sof committed
598
{
599 600
    Capability *cap = *pCap;

601
    if (waiting_for_gc == PENDING_GC_PAR) {
602
        traceEventGcStart(cap);
603
        gcWorkerThread(cap);
604
        traceEventGcEnd(cap);
605 606 607
        return;
    }

Simon Marlow's avatar
Simon Marlow committed
608
	debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
609 610

	// We must now release the capability and wait to be woken up
611
	// again.
612
	task->wakeup = rtsFalse;
613 614 615 616 617 618 619 620 621 622
	releaseCapabilityAndQueueWorker(cap);

	for (;;) {
	    ACQUIRE_LOCK(&task->lock);
	    // task->lock held, cap->lock not held
	    if (!task->wakeup) waitCondition(&task->cond, &task->lock);
	    cap = task->cap;
	    task->wakeup = rtsFalse;
	    RELEASE_LOCK(&task->lock);

Simon Marlow's avatar
Simon Marlow committed
623 624
	    debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);

625 626
	    ACQUIRE_LOCK(&cap->lock);
	    if (cap->running_task != NULL) {
Simon Marlow's avatar
Simon Marlow committed
627 628
		debugTrace(DEBUG_sched, 
			   "capability %d is owned by another task", cap->no);
629 630 631 632
		RELEASE_LOCK(&cap->lock);
		continue;
	    }

633
	    if (task->incall->tso == NULL) {
634 635 636 637 638 639 640 641 642 643
		ASSERT(cap->spare_workers != NULL);
		// if we're not at the front of the queue, release it
		// again.  This is unlikely to happen.
		if (cap->spare_workers != task) {
		    giveCapabilityToTask(cap,cap->spare_workers);
		    RELEASE_LOCK(&cap->lock);
		    continue;
		}
		cap->spare_workers = task->next;
		task->next = NULL;
644 645
                cap->n_spare_workers--;
            }
646 647 648 649 650
	    cap->running_task = task;
	    RELEASE_LOCK(&cap->lock);
	    break;
	}

Simon Marlow's avatar
Simon Marlow committed
651
	debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
652
	ASSERT(cap->running_task == task);
653

654
    *pCap = cap;
655

656
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
657

658
    return;
sof's avatar
sof committed
659 660
}

661
/* ----------------------------------------------------------------------------
662
 * prodCapability
663
 *
664 665
 * If a Capability is currently idle, wake up a Task on it.  Used to 
 * get every Capability into the GC.
666
 * ------------------------------------------------------------------------- */
667

668
void
669
prodCapability (Capability *cap, Task *task)
670
{
671 672 673 674 675 676
    ACQUIRE_LOCK(&cap->lock);
    if (!cap->running_task) {
        cap->running_task = task;
        releaseCapability_(cap,rtsTrue);
    }
    RELEASE_LOCK(&cap->lock);
677
}
678 679 680 681 682 683 684 685 686 687 688 689 690

/* ----------------------------------------------------------------------------
 * shutdownCapability
 *
 * At shutdown time, we want to let everything exit as cleanly as
 * possible.  For each capability, we let its run queue drain, and
 * allow the workers to stop.
 *
 * This function should be called when interrupted and
 * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
 * will exit the scheduler and call taskStop(), and any bound thread
 * that wakes up will return to its caller.  Runnable threads are
 * killed.
691
 *
692
 * ------------------------------------------------------------------------- */
693 694

void
695
shutdownCapability (Capability *cap, Task *task, rtsBool safe)
696
{
697 698 699 700
    nat i;

    task->cap = cap;

701 702 703 704 705 706 707
    // Loop indefinitely until all the workers have exited and there
    // are no Haskell threads left.  We used to bail out after 50
    // iterations of this loop, but that occasionally left a worker
    // running which caused problems later (the closeMutex() below
    // isn't safe, for one thing).

    for (i = 0; /* i < 50 */; i++) {
Simon Marlow's avatar
Simon Marlow committed
708 709
        ASSERT(sched_state == SCHED_SHUTTING_DOWN);

Simon Marlow's avatar
Simon Marlow committed
710 711
	debugTrace(DEBUG_sched, 
		   "shutting down capability %d, attempt %d", cap->no, i);
712 713 714
	ACQUIRE_LOCK(&cap->lock);
	if (cap->running_task) {
	    RELEASE_LOCK(&cap->lock);
Simon Marlow's avatar
Simon Marlow committed
715
	    debugTrace(DEBUG_sched, "not owner, yielding");
716 717
	    yieldThread();
	    continue;
718
	}
719
	cap->running_task = task;
Simon Marlow's avatar
Simon Marlow committed
720 721 722 723 724 725 726 727 728 729 730 731 732

        if (cap->spare_workers) {
            // Look for workers that have died without removing
            // themselves from the list; this could happen if the OS
            // summarily killed the thread, for example.  This
            // actually happens on Windows when the system is
            // terminating the program, and the RTS is running in a
            // DLL.
            Task *t, *prev;
            prev = NULL;
            for (t = cap->spare_workers; t != NULL; t = t->next) {
                if (!osThreadIsAlive(t->id)) {
                    debugTrace(DEBUG_sched, 
733
                               "worker thread %p has died unexpectedly", (void *)t->id);
734 735 736 737 738 739 740
                    cap->n_spare_workers--;
                    if (!prev) {
                        cap->spare_workers = t->next;
                    } else {
                        prev->next = t->next;
                    }
                    prev = t;
Simon Marlow's avatar
Simon Marlow committed
741 742 743 744
                }
            }
        }

745
	if (!emptyRunQueue(cap) || cap->spare_workers) {
Simon Marlow's avatar
Simon Marlow committed
746 747
	    debugTrace(DEBUG_sched, 
		       "runnable threads or workers still alive, yielding");
748
	    releaseCapability_(cap,rtsFalse); // this will wake up a worker
749 750 751
	    RELEASE_LOCK(&cap->lock);
	    yieldThread();
	    continue;
752
	}
753 754 755 756 757 758 759

        // If "safe", then busy-wait for any threads currently doing
        // foreign calls.  If we're about to unload this DLL, for
        // example, we need to be sure that there are no OS threads
        // that will try to return to code that has been unloaded.
        // We can be a bit more relaxed when this is a standalone
        // program that is about to terminate, and let safe=false.
760
        if (cap->suspended_ccalls && safe) {
761 762 763 764
	    debugTrace(DEBUG_sched, 
		       "thread(s) are involved in foreign calls, yielding");
            cap->running_task = NULL;
	    RELEASE_LOCK(&cap->lock);
765 766 767 768 769 770 771 772
            // The IO manager thread might have been slow to start up,
            // so the first attempt to kill it might not have
            // succeeded.  Just in case, try again - the kill message
            // will only be sent once.
            //
            // To reproduce this deadlock: run ffi002(threaded1)
            // repeatedly on a loaded machine.
            ioManagerDie();
773 774 775
            yieldThread();
            continue;
        }
776

777
        traceEventShutdown(cap);
778 779
	RELEASE_LOCK(&cap->lock);
	break;
780
    }
781 782
    // we now have the Capability, its run queue and spare workers
    // list are both empty.
783

784 785 786 787
    // ToDo: we can't drop this mutex, because there might still be
    // threads performing foreign calls that will eventually try to 
    // return via resumeThread() and attempt to grab cap->lock.
    // closeMutex(&cap->lock);
788
}
789

790 791 792 793
/* ----------------------------------------------------------------------------
 * tryGrabCapability
 *
 * Attempt to gain control of a Capability if it is free.
794
 *
795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812
 * ------------------------------------------------------------------------- */

rtsBool
tryGrabCapability (Capability *cap, Task *task)
{
    if (cap->running_task != NULL) return rtsFalse;
    ACQUIRE_LOCK(&cap->lock);
    if (cap->running_task != NULL) {
	RELEASE_LOCK(&cap->lock);
	return rtsFalse;
    }
    task->cap = cap;
    cap->running_task = task;
    RELEASE_LOCK(&cap->lock);
    return rtsTrue;
}


813
#endif /* THREADED_RTS */
814

815 816 817
static void
freeCapability (Capability *cap)
{
818
    stgFree(cap->mut_lists);
Simon Marlow's avatar
Simon Marlow committed
819
    stgFree(cap->saved_mut_lists);
Simon Marlow's avatar
Simon Marlow committed
820
#if defined(THREADED_RTS)
821
    freeSparkPool(cap->sparks);
822 823
#endif
}
824

825 826 827 828 829 830 831 832 833 834 835 836 837
void
freeCapabilities (void)
{
#if defined(THREADED_RTS)
    nat i;
    for (i=0; i < n_capabilities; i++) {
        freeCapability(&capabilities[i]);
    }
#else
    freeCapability(&MainCapability);
#endif
}

838 839 840 841 842 843 844
/* ---------------------------------------------------------------------------
   Mark everything directly reachable from the Capabilities.  When
   using multiple GC threads, each GC thread marks all Capabilities
   for which (c `mod` n == 0), for Capability c and thread n.
   ------------------------------------------------------------------------ */

void
Simon Marlow's avatar
Simon Marlow committed
845 846
markCapability (evac_fn evac, void *user, Capability *cap,
                rtsBool no_mark_sparks USED_IF_THREADS)
847
{
848
    InCall *incall;
849 850 851 852 853 854

    // Each GC thread is responsible for following roots from the
    // Capability of the same number.  There will usually be the same
    // or fewer Capabilities as GC threads, but just in case there
    // are more, we mark every Capability whose number is the GC
    // thread's index plus a multiple of the number of GC threads.
Simon Marlow's avatar
Simon Marlow committed
855 856
    evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
    evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
857
#if defined(THREADED_RTS)
Simon Marlow's avatar
Simon Marlow committed
858
    evac(user, (StgClosure **)(void *)&cap->inbox);
859
#endif
Simon Marlow's avatar
Simon Marlow committed
860 861 862 863
    for (incall = cap->suspended_ccalls; incall != NULL;
         incall=incall->next) {
        evac(user, (StgClosure **)(void *)&incall->suspended_tso);
    }
864 865

#if defined(THREADED_RTS)
Simon Marlow's avatar
Simon Marlow committed
866 867
    if (!no_mark_sparks) {
        traverseSparkQueue (evac, user, cap);
868
    }
Simon Marlow's avatar
Simon Marlow committed
869
#endif
870

Simon Marlow's avatar
Simon Marlow committed
871 872
    // Free STM structures for this Capability
    stmPreGCHook(cap);
873 874 875 876 877
}

void
markCapabilities (evac_fn evac, void *user)
{
Simon Marlow's avatar
Simon Marlow committed
878 879 880 881
    nat n;
    for (n = 0; n < n_capabilities; n++) {
        markCapability(evac, user, &capabilities[n], rtsFalse);
    }
882
}