Capability.c 26.9 KB
Newer Older
sof's avatar
sof committed
1
/* ---------------------------------------------------------------------------
2
 *
3
 * (c) The GHC Team, 2003-2006
sof's avatar
sof committed
4 5 6
 *
 * Capabilities
 *
sof's avatar
sof committed
7 8
 * A Capability represent the token required to execute STG code,
 * and all the state an OS thread/task needs to run Haskell code:
sof's avatar
sof committed
9
 * its STG registers, a pointer to its TSO, a nursery etc. During
sof's avatar
sof committed
10
 * STG execution, a pointer to the capabilitity is kept in a
11
 * register (BaseReg; actually it is a pointer to cap->r).
sof's avatar
sof committed
12
 *
13 14 15
 * Only in an THREADED_RTS build will there be multiple capabilities,
 * for non-threaded builds there is only one global capability, namely
 * MainCapability.
16
 *
sof's avatar
sof committed
17
 * --------------------------------------------------------------------------*/
18

sof's avatar
sof committed
19 20 21
#include "PosixSource.h"
#include "Rts.h"
#include "RtsUtils.h"
22
#include "RtsFlags.h"
23
#include "STM.h"
sof's avatar
sof committed
24
#include "OSThreads.h"
sof's avatar
sof committed
25
#include "Capability.h"
26
#include "Schedule.h"
27
#include "Sparks.h"
Simon Marlow's avatar
Simon Marlow committed
28
#include "Trace.h"
29
#include "GC.h"
sof's avatar
sof committed
30

31 32 33
// one global capability, this is the Capability for non-threaded
// builds, and for +RTS -N1
Capability MainCapability;
sof's avatar
sof committed
34

35
nat n_capabilities;
36
Capability *capabilities = NULL;
sof's avatar
sof committed
37

38 39 40 41 42
// Holds the Capability which last became free.  This is used so that
// an in-call has a chance of quickly finding a free Capability.
// Maintaining a global free list of Capabilities would require global
// locking, so we don't do that.
Capability *last_free_capability;
43

44 45 46
/* GC indicator, in scope for the scheduler, init'ed to false */
volatile StgWord waiting_for_gc = 0;

47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
/* Let foreign code get the current Capability -- assuming there is one!
 * This is useful for unsafe foreign calls because they are called with
 * the current Capability held, but they are not passed it. For example,
 * see see the integer-gmp package which calls allocateLocal() in its
 * stgAllocForGMP() function (which gets called by gmp functions).
 * */
Capability * rts_unsafeGetMyCapability (void)
{
#if defined(THREADED_RTS)
  return myTask()->cap;
#else
  return &MainCapability;
#endif
}

62
#if defined(THREADED_RTS)
63 64 65
STATIC_INLINE rtsBool
globalWorkToDo (void)
{
66
    return blackholes_need_checking
67
	|| sched_state >= SCHED_INTERRUPTING
68 69
	;
}
70
#endif
71

72
#if defined(THREADED_RTS)
73
StgClosure *
74
findSpark (Capability *cap)
75
{
76 77
  Capability *robbed;
  StgClosurePtr spark;
78
  rtsBool retry;
79 80
  nat i = 0;

81 82 83 84 85 86 87
  if (!emptyRunQueue(cap)) {
      // If there are other threads, don't try to run any new
      // sparks: sparks might be speculative, we don't want to take
      // resources away from the main computation.
      return 0;
  }

88 89 90 91 92 93 94 95 96
  // first try to get a spark from our own pool.
  // We should be using reclaimSpark(), because it works without
  // needing any atomic instructions:
  //   spark = reclaimSpark(cap->sparks);
  // However, measurements show that this makes at least one benchmark
  // slower (prsa) and doesn't affect the others.
  spark = tryStealSpark(cap);
  if (spark != NULL) {
      cap->sparks_converted++;
97 98 99 100

      // Post event for running a spark from capability's own pool.
      postEvent(cap, EVENT_RUN_SPARK, cap->r.rCurrentTSO->id, 0);

101 102 103 104 105
      return spark;
  }

  if (n_capabilities == 1) { return NULL; } // makes no sense...

106 107 108 109
  debugTrace(DEBUG_sched,
	     "cap %d: Trying to steal work from other capabilities", 
	     cap->no);

110 111
  do {
      retry = rtsFalse;
112

113 114 115 116 117 118
      /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
      start at a random place instead of 0 as well.  */
      for ( i=0 ; i < n_capabilities ; i++ ) {
          robbed = &capabilities[i];
          if (cap == robbed)  // ourselves...
              continue;
119

120 121 122
          if (emptySparkPoolCap(robbed)) // nothing to steal here
              continue;

123
          spark = tryStealSpark(robbed);
124 125 126 127 128 129 130 131
          if (spark == NULL && !emptySparkPoolCap(robbed)) {
              // we conflicted with another thread while trying to steal;
              // try again later.
              retry = rtsTrue;
          }

          if (spark != NULL) {
              debugTrace(DEBUG_sched,
132
		 "cap %d: Stole a spark from capability %d",
133
                         cap->no, robbed->no);
134
              cap->sparks_converted++;
135 136 137 138 139

              postEvent(cap, EVENT_STEAL_SPARK, 
                        cap->r.rCurrentTSO->id, robbed->no);
                        
              
140
              return spark;
141 142 143 144
          }
          // otherwise: no success, try next one
      }
  } while (retry);
145

146
  debugTrace(DEBUG_sched, "No sparks stolen");
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
  return NULL;
}

// Returns True if any spark pool is non-empty at this moment in time
// The result is only valid for an instant, of course, so in a sense
// is immediately invalid, and should not be relied upon for
// correctness.
rtsBool
anySparks (void)
{
    nat i;

    for (i=0; i < n_capabilities; i++) {
        if (!emptySparkPoolCap(&capabilities[i])) {
            return rtsTrue;
        }
    }
    return rtsFalse;
165
}
166
#endif
167 168 169

/* -----------------------------------------------------------------------------
 * Manage the returning_tasks lists.
170
 *
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
 * These functions require cap->lock
 * -------------------------------------------------------------------------- */

#if defined(THREADED_RTS)
STATIC_INLINE void
newReturningTask (Capability *cap, Task *task)
{
    ASSERT_LOCK_HELD(&cap->lock);
    ASSERT(task->return_link == NULL);
    if (cap->returning_tasks_hd) {
	ASSERT(cap->returning_tasks_tl->return_link == NULL);
	cap->returning_tasks_tl->return_link = task;
    } else {
	cap->returning_tasks_hd = task;
    }
    cap->returning_tasks_tl = task;
187 188
}

189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
STATIC_INLINE Task *
popReturningTask (Capability *cap)
{
    ASSERT_LOCK_HELD(&cap->lock);
    Task *task;
    task = cap->returning_tasks_hd;
    ASSERT(task);
    cap->returning_tasks_hd = task->return_link;
    if (!cap->returning_tasks_hd) {
	cap->returning_tasks_tl = NULL;
    }
    task->return_link = NULL;
    return task;
}
#endif

205
/* ----------------------------------------------------------------------------
206 207 208 209
 * Initialisation
 *
 * The Capability is initially marked not free.
 * ------------------------------------------------------------------------- */
210 211

static void
212
initCapability( Capability *cap, nat i )
sof's avatar
sof committed
213
{
214
    nat g;
215

216 217
    cap->no = i;
    cap->in_haskell        = rtsFalse;
218
    cap->in_gc             = rtsFalse;
219 220 221 222 223 224 225 226 227 228 229

    cap->run_queue_hd      = END_TSO_QUEUE;
    cap->run_queue_tl      = END_TSO_QUEUE;

#if defined(THREADED_RTS)
    initMutex(&cap->lock);
    cap->running_task      = NULL; // indicates cap is free
    cap->spare_workers     = NULL;
    cap->suspended_ccalling_tasks = NULL;
    cap->returning_tasks_hd = NULL;
    cap->returning_tasks_tl = NULL;
230 231
    cap->wakeup_queue_hd    = END_TSO_QUEUE;
    cap->wakeup_queue_tl    = END_TSO_QUEUE;
232 233 234
    cap->sparks_created     = 0;
    cap->sparks_converted   = 0;
    cap->sparks_pruned      = 0;
235 236
#endif

237
    cap->f.stgEagerBlackholeInfo = (W_)&__stg_EAGER_BLACKHOLE_info;
sof's avatar
sof committed
238
    cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
239
    cap->f.stgGCFun        = (F_)__stg_gc_fun;
240

241
    cap->mut_lists  = stgMallocBytes(sizeof(bdescr *) *
242 243
				     RtsFlags.GcFlags.generations,
				     "initCapability");
244 245 246
    cap->saved_mut_lists = stgMallocBytes(sizeof(bdescr *) *
                                          RtsFlags.GcFlags.generations,
                                          "initCapability");
247 248 249

    for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
	cap->mut_lists[g] = NULL;
250
    }
251

tharris@microsoft.com's avatar
tharris@microsoft.com committed
252 253
    cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE;
    cap->free_invariant_check_queues = END_INVARIANT_CHECK_QUEUE;
254 255 256
    cap->free_trec_chunks = END_STM_CHUNK_LIST;
    cap->free_trec_headers = NO_TREC;
    cap->transaction_tokens = 0;
257
    cap->context_switch = 0;
sof's avatar
sof committed
258 259
}

260
/* ---------------------------------------------------------------------------
sof's avatar
sof committed
261 262
 * Function:  initCapabilities()
 *
263
 * Purpose:   set up the Capability handling. For the THREADED_RTS build,
sof's avatar
sof committed
264
 *            we keep a table of them, the size of which is
265
 *            controlled by the user via the RTS flag -N.
sof's avatar
sof committed
266
 *
267
 * ------------------------------------------------------------------------- */
sof's avatar
sof committed
268
void
269
initCapabilities( void )
sof's avatar
sof committed
270
{
271 272
#if defined(THREADED_RTS)
    nat i;
273

274
#ifndef REG_Base
Simon Marlow's avatar
Simon Marlow committed
275 276 277 278 279 280 281
    // We can't support multiple CPUs if BaseReg is not a register
    if (RtsFlags.ParFlags.nNodes > 1) {
	errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
	RtsFlags.ParFlags.nNodes = 1;
    }
#endif

282 283 284 285 286 287 288 289 290 291 292
    n_capabilities = RtsFlags.ParFlags.nNodes;

    if (n_capabilities == 1) {
	capabilities = &MainCapability;
	// THREADED_RTS must work on builds that don't have a mutable
	// BaseReg (eg. unregisterised), so in this case
	// capabilities[0] must coincide with &MainCapability.
    } else {
	capabilities = stgMallocBytes(n_capabilities * sizeof(Capability),
				      "initCapabilities");
    }
293

294
    for (i = 0; i < n_capabilities; i++) {
295
	initCapability(&capabilities[i], i);
296
    }
297

Simon Marlow's avatar
Simon Marlow committed
298
    debugTrace(DEBUG_sched, "allocated %d capabilities", n_capabilities);
299 300 301

#else /* !THREADED_RTS */

302
    n_capabilities = 1;
303
    capabilities = &MainCapability;
304
    initCapability(&MainCapability, 0);
305

306 307
#endif

308 309 310 311
    // There are no free capabilities to begin with.  We will start
    // a worker Task to each Capability, which will quickly put the
    // Capability on the free list when it finds nothing to do.
    last_free_capability = &capabilities[0];
sof's avatar
sof committed
312 313
}

314 315 316 317 318 319 320
/* ----------------------------------------------------------------------------
 * setContextSwitches: cause all capabilities to context switch as
 * soon as possible.
 * ------------------------------------------------------------------------- */

void setContextSwitches(void)
{
321 322 323 324
    nat i;
    for (i=0; i < n_capabilities; i++) {
        contextSwitchCapability(&capabilities[i]);
    }
325 326
}

327
/* ----------------------------------------------------------------------------
328 329 330 331 332 333 334 335 336 337
 * Give a Capability to a Task.  The task must currently be sleeping
 * on its condition variable.
 *
 * Requires cap->lock (modifies cap->running_task).
 *
 * When migrating a Task, the migrater must take task->lock before
 * modifying task->cap, to synchronise with the waking up Task.
 * Additionally, the migrater should own the Capability (when
 * migrating the run queue), or cap->lock (when migrating
 * returning_workers).
338 339
 *
 * ------------------------------------------------------------------------- */
340 341 342

#if defined(THREADED_RTS)
STATIC_INLINE void
343
giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
344
{
345 346
    ASSERT_LOCK_HELD(&cap->lock);
    ASSERT(task->cap == cap);
Simon Marlow's avatar
Simon Marlow committed
347 348 349
    debugTrace(DEBUG_sched, "passing capability %d to %s %p",
               cap->no, task->tso ? "bound task" : "worker",
               (void *)task->id);
350 351 352 353 354 355 356
    ACQUIRE_LOCK(&task->lock);
    task->wakeup = rtsTrue;
    // the wakeup flag is needed because signalCondition() doesn't
    // flag the condition if the thread is already runniing, but we want
    // it to be sticky.
    signalCondition(&task->cond);
    RELEASE_LOCK(&task->lock);
357
}
358
#endif
359

360
/* ----------------------------------------------------------------------------
sof's avatar
sof committed
361 362
 * Function:  releaseCapability(Capability*)
 *
sof's avatar
sof committed
363 364 365
 * Purpose:   Letting go of a capability. Causes a
 *            'returning worker' thread or a 'waiting worker'
 *            to wake up, in that order.
366 367
 * ------------------------------------------------------------------------- */

368
#if defined(THREADED_RTS)
369
void
370 371
releaseCapability_ (Capability* cap, 
                    rtsBool always_wakeup)
372
{
373 374 375 376
    Task *task;

    task = cap->running_task;

377
    ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
378 379

    cap->running_task = NULL;
380

381 382
    // Check to see whether a worker thread can be given
    // the go-ahead to return the result of an external call..
383 384 385 386
    if (cap->returning_tasks_hd != NULL) {
	giveCapabilityToTask(cap,cap->returning_tasks_hd);
	// The Task pops itself from the queue (see waitForReturnCapability())
	return;
387
    }
388

389
    if (waiting_for_gc == PENDING_GC_SEQ) {
390
      last_free_capability = cap; // needed?
Simon Marlow's avatar
Simon Marlow committed
391
      debugTrace(DEBUG_sched, "GC pending, set capability %d free", cap->no);
392 393 394 395
      return;
    } 


396 397 398 399 400 401 402 403
    // If the next thread on the run queue is a bound thread,
    // give this Capability to the appropriate Task.
    if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
	// Make sure we're not about to try to wake ourselves up
	ASSERT(task != cap->run_queue_hd->bound);
	task = cap->run_queue_hd->bound;
	giveCapabilityToTask(cap,task);
	return;
404
    }
405

406
    if (!cap->spare_workers) {
407 408 409 410
	// Create a worker thread if we don't have one.  If the system
	// is interrupted, we only create a worker task if there
	// are threads that need to be completed.  If the system is
	// shutting down, we never create a new worker.
411
	if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
Simon Marlow's avatar
Simon Marlow committed
412 413
	    debugTrace(DEBUG_sched,
		       "starting new worker on capability %d", cap->no);
414 415 416
	    startWorkerTask(cap, workerStart);
	    return;
	}
417
    }
418

419 420
    // If we have an unbound thread on the run queue, or if there's
    // anything else to do, give the Capability to a worker thread.
421 422 423
    if (always_wakeup || 
        !emptyRunQueue(cap) || !emptyWakeupQueue(cap) ||
        !emptySparkPoolCap(cap) || globalWorkToDo()) {
424 425 426 427 428 429 430
	if (cap->spare_workers) {
	    giveCapabilityToTask(cap,cap->spare_workers);
	    // The worker Task pops itself from the queue;
	    return;
	}
    }

431
    last_free_capability = cap;
Simon Marlow's avatar
Simon Marlow committed
432
    debugTrace(DEBUG_sched, "freeing capability %d", cap->no);
sof's avatar
sof committed
433 434
}

435
void
436
releaseCapability (Capability* cap USED_IF_THREADS)
437 438
{
    ACQUIRE_LOCK(&cap->lock);
439 440 441 442 443 444 445 446 447
    releaseCapability_(cap, rtsFalse);
    RELEASE_LOCK(&cap->lock);
}

void
releaseAndWakeupCapability (Capability* cap USED_IF_THREADS)
{
    ACQUIRE_LOCK(&cap->lock);
    releaseCapability_(cap, rtsTrue);
448 449 450 451
    RELEASE_LOCK(&cap->lock);
}

static void
452
releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472
{
    Task *task;

    ACQUIRE_LOCK(&cap->lock);

    task = cap->running_task;

    // If the current task is a worker, save it on the spare_workers
    // list of this Capability.  A worker can mark itself as stopped,
    // in which case it is not replaced on the spare_worker queue.
    // This happens when the system is shutting down (see
    // Schedule.c:workerStart()).
    // Also, be careful to check that this task hasn't just exited
    // Haskell to do a foreign call (task->suspended_tso).
    if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
	task->next = cap->spare_workers;
	cap->spare_workers = task;
    }
    // Bound tasks just float around attached to their TSOs.

473
    releaseCapability_(cap,rtsFalse);
474 475 476 477

    RELEASE_LOCK(&cap->lock);
}
#endif
sof's avatar
sof committed
478

479
/* ----------------------------------------------------------------------------
480
 * waitForReturnCapability( Task *task )
sof's avatar
sof committed
481 482
 *
 * Purpose:  when an OS thread returns from an external call,
483 484
 * it calls waitForReturnCapability() (via Schedule.resumeThread())
 * to wait for permission to enter the RTS & communicate the
sof's avatar
sof committed
485
 * result of the external call back to the Haskell thread that
sof's avatar
sof committed
486 487
 * made it.
 *
488
 * ------------------------------------------------------------------------- */
sof's avatar
sof committed
489
void
490
waitForReturnCapability (Capability **pCap, Task *task)
sof's avatar
sof committed
491
{
492
#if !defined(THREADED_RTS)
493

494 495 496
    MainCapability.running_task = task;
    task->cap = &MainCapability;
    *pCap = &MainCapability;
497

498
#else
499 500 501 502 503 504 505 506
    Capability *cap = *pCap;

    if (cap == NULL) {
	// Try last_free_capability first
	cap = last_free_capability;
	if (!cap->running_task) {
	    nat i;
	    // otherwise, search for a free capability
507
            cap = NULL;
508
	    for (i = 0; i < n_capabilities; i++) {
509 510
		if (!capabilities[i].running_task) {
                    cap = &capabilities[i];
511 512 513
		    break;
		}
	    }
514 515 516 517
            if (cap == NULL) {
                // Can't find a free one, use last_free_capability.
                cap = last_free_capability;
            }
518 519 520 521 522
	}

	// record the Capability as the one this Task is now assocated with.
	task->cap = cap;

523
    } else {
524
	ASSERT(task->cap == cap);
525 526
    }

527
    ACQUIRE_LOCK(&cap->lock);
sof's avatar
sof committed
528

Simon Marlow's avatar
Simon Marlow committed
529
    debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no);
sof's avatar
sof committed
530

531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564
    if (!cap->running_task) {
	// It's free; just grab it
	cap->running_task = task;
	RELEASE_LOCK(&cap->lock);
    } else {
	newReturningTask(cap,task);
	RELEASE_LOCK(&cap->lock);

	for (;;) {
	    ACQUIRE_LOCK(&task->lock);
	    // task->lock held, cap->lock not held
	    if (!task->wakeup) waitCondition(&task->cond, &task->lock);
	    cap = task->cap;
	    task->wakeup = rtsFalse;
	    RELEASE_LOCK(&task->lock);

	    // now check whether we should wake up...
	    ACQUIRE_LOCK(&cap->lock);
	    if (cap->running_task == NULL) {
		if (cap->returning_tasks_hd != task) {
		    giveCapabilityToTask(cap,cap->returning_tasks_hd);
		    RELEASE_LOCK(&cap->lock);
		    continue;
		}
		cap->running_task = task;
		popReturningTask(cap);
		RELEASE_LOCK(&cap->lock);
		break;
	    }
	    RELEASE_LOCK(&cap->lock);
	}

    }

565
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
566

Simon Marlow's avatar
Simon Marlow committed
567
    debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
568 569 570 571 572 573

    *pCap = cap;
#endif
}

#if defined(THREADED_RTS)
574
/* ----------------------------------------------------------------------------
575
 * yieldCapability
576
 * ------------------------------------------------------------------------- */
sof's avatar
sof committed
577

sof's avatar
sof committed
578
void
579
yieldCapability (Capability** pCap, Task *task)
sof's avatar
sof committed
580
{
581 582
    Capability *cap = *pCap;

583 584
    if (waiting_for_gc == PENDING_GC_PAR) {
	debugTrace(DEBUG_sched, "capability %d: becoming a GC thread", cap->no);
Simon Marlow's avatar
Simon Marlow committed
585
        postEvent(cap, EVENT_GC_START, 0, 0);
586
        gcWorkerThread(cap);
Simon Marlow's avatar
Simon Marlow committed
587
        postEvent(cap, EVENT_GC_END, 0, 0);
588 589 590
        return;
    }

Simon Marlow's avatar
Simon Marlow committed
591
	debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
592 593

	// We must now release the capability and wait to be woken up
594
	// again.
595
	task->wakeup = rtsFalse;
596 597 598 599 600 601 602 603 604 605
	releaseCapabilityAndQueueWorker(cap);

	for (;;) {
	    ACQUIRE_LOCK(&task->lock);
	    // task->lock held, cap->lock not held
	    if (!task->wakeup) waitCondition(&task->cond, &task->lock);
	    cap = task->cap;
	    task->wakeup = rtsFalse;
	    RELEASE_LOCK(&task->lock);

Simon Marlow's avatar
Simon Marlow committed
606 607
	    debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);

608 609
	    ACQUIRE_LOCK(&cap->lock);
	    if (cap->running_task != NULL) {
Simon Marlow's avatar
Simon Marlow committed
610 611
		debugTrace(DEBUG_sched, 
			   "capability %d is owned by another task", cap->no);
612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632
		RELEASE_LOCK(&cap->lock);
		continue;
	    }

	    if (task->tso == NULL) {
		ASSERT(cap->spare_workers != NULL);
		// if we're not at the front of the queue, release it
		// again.  This is unlikely to happen.
		if (cap->spare_workers != task) {
		    giveCapabilityToTask(cap,cap->spare_workers);
		    RELEASE_LOCK(&cap->lock);
		    continue;
		}
		cap->spare_workers = task->next;
		task->next = NULL;
	    }
	    cap->running_task = task;
	    RELEASE_LOCK(&cap->lock);
	    break;
	}

Simon Marlow's avatar
Simon Marlow committed
633
	debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
634
	ASSERT(cap->running_task == task);
635

636
    *pCap = cap;
637

638
    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
639

640
    return;
sof's avatar
sof committed
641 642
}

643 644 645 646 647 648 649 650
/* ----------------------------------------------------------------------------
 * Wake up a thread on a Capability.
 *
 * This is used when the current Task is running on a Capability and
 * wishes to wake up a thread on a different Capability.
 * ------------------------------------------------------------------------- */

void
651 652 653
wakeupThreadOnCapability (Capability *my_cap, 
                          Capability *other_cap, 
                          StgTSO *tso)
654
{
655
    ACQUIRE_LOCK(&other_cap->lock);
656

657 658 659 660 661 662 663 664
    // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
    if (tso->bound) {
	ASSERT(tso->bound->cap == tso->cap);
    	tso->bound->cap = other_cap;
    }
    tso->cap = other_cap;

    ASSERT(tso->bound ? tso->bound->cap == other_cap : 1);
665

666
    if (other_cap->running_task == NULL) {
667 668 669
	// nobody is running this Capability, we can add our thread
	// directly onto the run queue and start up a Task to run it.

670 671 672 673 674
	other_cap->running_task = myTask(); 
            // precond for releaseCapability_() and appendToRunQueue()

	appendToRunQueue(other_cap,tso);

675
	releaseCapability_(other_cap,rtsFalse);
676
    } else {
677
	appendToWakeupQueue(my_cap,other_cap,tso);
678
        other_cap->context_switch = 1;
679 680 681 682
	// someone is running on this Capability, so it cannot be
	// freed without first checking the wakeup queue (see
	// releaseCapability_).
    }
683

684
    RELEASE_LOCK(&other_cap->lock);
685 686
}

687
/* ----------------------------------------------------------------------------
688
 * prodCapability
689
 *
690 691
 * If a Capability is currently idle, wake up a Task on it.  Used to 
 * get every Capability into the GC.
692
 * ------------------------------------------------------------------------- */
693

694
void
695
prodCapability (Capability *cap, Task *task)
696
{
697 698 699 700 701 702
    ACQUIRE_LOCK(&cap->lock);
    if (!cap->running_task) {
        cap->running_task = task;
        releaseCapability_(cap,rtsTrue);
    }
    RELEASE_LOCK(&cap->lock);
703
}
704 705 706 707 708 709 710 711 712 713 714 715 716

/* ----------------------------------------------------------------------------
 * shutdownCapability
 *
 * At shutdown time, we want to let everything exit as cleanly as
 * possible.  For each capability, we let its run queue drain, and
 * allow the workers to stop.
 *
 * This function should be called when interrupted and
 * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
 * will exit the scheduler and call taskStop(), and any bound thread
 * that wakes up will return to its caller.  Runnable threads are
 * killed.
717
 *
718
 * ------------------------------------------------------------------------- */
719 720

void
721
shutdownCapability (Capability *cap, Task *task, rtsBool safe)
722
{
723 724 725 726
    nat i;

    task->cap = cap;

727 728 729 730 731 732 733
    // Loop indefinitely until all the workers have exited and there
    // are no Haskell threads left.  We used to bail out after 50
    // iterations of this loop, but that occasionally left a worker
    // running which caused problems later (the closeMutex() below
    // isn't safe, for one thing).

    for (i = 0; /* i < 50 */; i++) {
Simon Marlow's avatar
Simon Marlow committed
734 735
        ASSERT(sched_state == SCHED_SHUTTING_DOWN);

Simon Marlow's avatar
Simon Marlow committed
736 737
	debugTrace(DEBUG_sched, 
		   "shutting down capability %d, attempt %d", cap->no, i);
738 739 740
	ACQUIRE_LOCK(&cap->lock);
	if (cap->running_task) {
	    RELEASE_LOCK(&cap->lock);
Simon Marlow's avatar
Simon Marlow committed
741
	    debugTrace(DEBUG_sched, "not owner, yielding");
742 743
	    yieldThread();
	    continue;
744
	}
745
	cap->running_task = task;
Simon Marlow's avatar
Simon Marlow committed
746 747 748 749 750 751 752 753 754 755 756 757 758

        if (cap->spare_workers) {
            // Look for workers that have died without removing
            // themselves from the list; this could happen if the OS
            // summarily killed the thread, for example.  This
            // actually happens on Windows when the system is
            // terminating the program, and the RTS is running in a
            // DLL.
            Task *t, *prev;
            prev = NULL;
            for (t = cap->spare_workers; t != NULL; t = t->next) {
                if (!osThreadIsAlive(t->id)) {
                    debugTrace(DEBUG_sched, 
759
                               "worker thread %p has died unexpectedly", (void *)t->id);
Simon Marlow's avatar
Simon Marlow committed
760 761 762 763 764 765 766 767 768 769
                        if (!prev) {
                            cap->spare_workers = t->next;
                        } else {
                            prev->next = t->next;
                        }
                        prev = t;
                }
            }
        }

770
	if (!emptyRunQueue(cap) || cap->spare_workers) {
Simon Marlow's avatar
Simon Marlow committed
771 772
	    debugTrace(DEBUG_sched, 
		       "runnable threads or workers still alive, yielding");
773
	    releaseCapability_(cap,rtsFalse); // this will wake up a worker
774 775 776
	    RELEASE_LOCK(&cap->lock);
	    yieldThread();
	    continue;
777
	}
778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793

        // If "safe", then busy-wait for any threads currently doing
        // foreign calls.  If we're about to unload this DLL, for
        // example, we need to be sure that there are no OS threads
        // that will try to return to code that has been unloaded.
        // We can be a bit more relaxed when this is a standalone
        // program that is about to terminate, and let safe=false.
        if (cap->suspended_ccalling_tasks && safe) {
	    debugTrace(DEBUG_sched, 
		       "thread(s) are involved in foreign calls, yielding");
            cap->running_task = NULL;
	    RELEASE_LOCK(&cap->lock);
            yieldThread();
            continue;
        }
            
Simon Marlow's avatar
Simon Marlow committed
794
        postEvent(cap, EVENT_SHUTDOWN, 0, 0);
Simon Marlow's avatar
Simon Marlow committed
795
	debugTrace(DEBUG_sched, "capability %d is stopped.", cap->no);
796 797
	RELEASE_LOCK(&cap->lock);
	break;
798
    }
799 800
    // we now have the Capability, its run queue and spare workers
    // list are both empty.
801

802 803 804 805
    // ToDo: we can't drop this mutex, because there might still be
    // threads performing foreign calls that will eventually try to 
    // return via resumeThread() and attempt to grab cap->lock.
    // closeMutex(&cap->lock);
806
}
807

808 809 810 811
/* ----------------------------------------------------------------------------
 * tryGrabCapability
 *
 * Attempt to gain control of a Capability if it is free.
812
 *
813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830
 * ------------------------------------------------------------------------- */

rtsBool
tryGrabCapability (Capability *cap, Task *task)
{
    if (cap->running_task != NULL) return rtsFalse;
    ACQUIRE_LOCK(&cap->lock);
    if (cap->running_task != NULL) {
	RELEASE_LOCK(&cap->lock);
	return rtsFalse;
    }
    task->cap = cap;
    cap->running_task = task;
    RELEASE_LOCK(&cap->lock);
    return rtsTrue;
}


831
#endif /* THREADED_RTS */
832

833 834 835
static void
freeCapability (Capability *cap)
{
Ian Lynagh's avatar
Ian Lynagh committed
836
    stgFree(cap->mut_lists);
Simon Marlow's avatar
Simon Marlow committed
837
#if defined(THREADED_RTS)
838
    freeSparkPool(cap->sparks);
Ian Lynagh's avatar
Ian Lynagh committed
839 840
#endif
}
841

842 843 844 845 846 847 848 849 850 851 852 853 854
void
freeCapabilities (void)
{
#if defined(THREADED_RTS)
    nat i;
    for (i=0; i < n_capabilities; i++) {
        freeCapability(&capabilities[i]);
    }
#else
    freeCapability(&MainCapability);
#endif
}

855 856 857 858 859 860 861
/* ---------------------------------------------------------------------------
   Mark everything directly reachable from the Capabilities.  When
   using multiple GC threads, each GC thread marks all Capabilities
   for which (c `mod` n == 0), for Capability c and thread n.
   ------------------------------------------------------------------------ */

void
862 863
markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta, 
                      rtsBool prune_sparks USED_IF_THREADS)
864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887
{
    nat i;
    Capability *cap;
    Task *task;

    // Each GC thread is responsible for following roots from the
    // Capability of the same number.  There will usually be the same
    // or fewer Capabilities as GC threads, but just in case there
    // are more, we mark every Capability whose number is the GC
    // thread's index plus a multiple of the number of GC threads.
    for (i = i0; i < n_capabilities; i += delta) {
	cap = &capabilities[i];
	evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
	evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
#if defined(THREADED_RTS)
	evac(user, (StgClosure **)(void *)&cap->wakeup_queue_hd);
	evac(user, (StgClosure **)(void *)&cap->wakeup_queue_tl);
#endif
	for (task = cap->suspended_ccalling_tasks; task != NULL; 
	     task=task->next) {
	    debugTrace(DEBUG_sched,
		       "evac'ing suspended TSO %lu", (unsigned long)task->suspended_tso->id);
	    evac(user, (StgClosure **)(void *)&task->suspended_tso);
	}
888 889

#if defined(THREADED_RTS)
890 891 892 893 894
        if (prune_sparks) {
            pruneSparkQueue (evac, user, cap);
        } else {
            traverseSparkQueue (evac, user, cap);
        }
895
#endif
896
    }
897

898 899 900 901 902 903 904 905 906 907
#if !defined(THREADED_RTS)
    evac(user, (StgClosure **)(void *)&blocked_queue_hd);
    evac(user, (StgClosure **)(void *)&blocked_queue_tl);
    evac(user, (StgClosure **)(void *)&sleeping_queue);
#endif 
}

void
markCapabilities (evac_fn evac, void *user)
{
908
    markSomeCapabilities(evac, user, 0, 1, rtsFalse);
909
}