Commit 677c6345 authored by simonmar's avatar simonmar

[project @ 2005-10-27 15:26:06 by simonmar]

- Very simple work-sharing amongst Capabilities: whenever a Capability
  detects that it has more than 1 thread in its run queue, it runs
  around looking for empty Capabilities, and shares the threads on its
  run queue equally with the free Capabilities it finds.

- unlock the garbage collector's mutable lists, by having private
  mutable lists per capability (and per generation).  The private
  mutable lists are moved onto the main mutable lists at each GC.
  This pulls the old-generation update code out of the storage manager
  mutex, which is one of the last remaining causes of (alleged) contention.

- Fix some problems with synchronising when a GC is required.  We should
  synchronise quicker now.
parent f9f5235f
......@@ -142,6 +142,10 @@ initCapability( Capability *cap, nat i )
cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1;
cap->f.stgGCFun = (F_)__stg_gc_fun;
cap->mut_lists = stgMallocBytes(sizeof(bdescr *) *
RtsFlags.GcFlags.generations,
"initCapability");
}
/* ---------------------------------------------------------------------------
......@@ -572,6 +576,29 @@ shutdownCapability (Capability *cap, Task *task)
// list are both empty.
}
/* ----------------------------------------------------------------------------
* tryGrabCapability
*
* Attempt to gain control of a Capability if it is free.
*
* ------------------------------------------------------------------------- */
rtsBool
tryGrabCapability (Capability *cap, Task *task)
{
if (cap->running_task != NULL) return rtsFalse;
ACQUIRE_LOCK(&cap->lock);
if (cap->running_task != NULL) {
RELEASE_LOCK(&cap->lock);
return rtsFalse;
}
task->cap = cap;
cap->running_task = task;
RELEASE_LOCK(&cap->lock);
return rtsTrue;
}
#endif /* THREADED_RTS */
......@@ -60,6 +60,12 @@ struct Capability_ {
// this list.
Task *suspended_ccalling_tasks;
// One mutable list per generation, so we don't need to take any
// locks when updating an old-generation thunk. These
// mini-mut-lists are moved onto the respective gen->mut_list at
// each GC.
bdescr **mut_lists;
#if defined(THREADED_RTS)
// Worker Tasks waiting in the wings. Singly-linked.
Task *spare_workers;
......@@ -146,6 +152,8 @@ extern Capability *last_free_capability;
//
void waitForReturnCapability (Capability **cap/*in/out*/, Task *task);
INLINE_HEADER void recordMutableCap (StgClosure *p, Capability *cap, nat gen);
#if defined(THREADED_RTS)
// Gives up the current capability IFF there is a higher-priority
......@@ -181,6 +189,10 @@ void prodAllCapabilities (void);
//
void shutdownCapability (Capability *cap, Task *task);
// Attempt to gain control of a Capability if it is free.
//
rtsBool tryGrabCapability (Capability *cap, Task *task);
#else // !THREADED_RTS
// Grab a capability. (Only in the non-threaded RTS; in the threaded
......@@ -190,4 +202,24 @@ extern void grabCapability (Capability **pCap);
#endif /* !THREADED_RTS */
/* -----------------------------------------------------------------------------
* INLINE functions... private below here
* -------------------------------------------------------------------------- */
INLINE_HEADER void
recordMutableCap (StgClosure *p, Capability *cap, nat gen)
{
bdescr *bd;
bd = cap->mut_lists[gen];
if (bd->free >= bd->start + BLOCK_SIZE_W) {
bdescr *new_bd;
new_bd = allocBlock_lock();
new_bd->link = bd;
bd = new_bd;
cap->mut_lists[gen] = bd;
}
*bd->free++ = (StgWord)p;
}
#endif /* CAPABILITY_H */
......@@ -334,7 +334,7 @@ GarbageCollect ( void (*get_roots)(evac_fn), rtsBool force_major_gc )
step *stp;
lnat live, allocated, collected = 0, copied = 0, scavd_copied = 0;
lnat oldgen_saved_blocks = 0;
nat g, s;
nat g, s, i;
ACQUIRE_SM_LOCK;
......@@ -439,6 +439,10 @@ GarbageCollect ( void (*get_roots)(evac_fn), rtsBool force_major_gc )
if (g != 0) {
freeChain(generations[g].mut_list);
generations[g].mut_list = allocBlock();
for (i = 0; i < n_capabilities; i++) {
freeChain(capabilities[i].mut_lists[g]);
capabilities[i].mut_lists[g] = allocBlock();
}
}
for (s = 0; s < generations[g].n_steps; s++) {
......@@ -541,6 +545,19 @@ GarbageCollect ( void (*get_roots)(evac_fn), rtsBool force_major_gc )
stp->scavenged_large_objects = NULL;
stp->n_scavenged_large_blocks = 0;
}
/* Move the private mutable lists from each capability onto the
* main mutable list for the generation.
*/
for (i = 0; i < n_capabilities; i++) {
for (bd = capabilities[i].mut_lists[g];
bd->link != NULL; bd = bd->link) {
/* nothing */
}
bd->link = generations[g].mut_list;
generations[g].mut_list = capabilities[i].mut_lists[g];
capabilities[i].mut_lists[g] = allocBlock();
}
}
/* Allocate a mark stack if we're doing a major collection.
......
......@@ -210,6 +210,7 @@ static Capability *schedule (Capability *initialCapability, Task *task);
// scheduler clearer.
//
static void schedulePreLoop (void);
static void schedulePushWork(Capability *cap, Task *task);
static void scheduleStartSignalHandlers (void);
static void scheduleCheckBlockedThreads (Capability *cap);
static void scheduleCheckBlackHoles (Capability *cap);
......@@ -386,6 +387,10 @@ schedule (Capability *initialCapability, Task *task)
}
#endif
#ifdef SMP
schedulePushWork(cap,task);
#endif
// Check whether we have re-entered the RTS from Haskell without
// going via suspendThread()/resumeThread (i.e. a 'safe' foreign
// call).
......@@ -599,6 +604,10 @@ run_thread:
cap->in_haskell = rtsFalse;
// The TSO might have moved, eg. if it re-entered the RTS and a GC
// happened. So find the new location:
t = cap->r.rCurrentTSO;
#ifdef SMP
// If ret is ThreadBlocked, and this Task is bound to the TSO that
// blocked, we are in limbo - the TSO is now owned by whatever it
......@@ -606,15 +615,16 @@ run_thread:
// perhaps even on a different Capability. It may be the case
// that task->cap != cap. We better yield this Capability
// immediately and return to normaility.
if (ret == ThreadBlocked) continue;
if (ret == ThreadBlocked) {
IF_DEBUG(scheduler,
debugBelch("--<< thread %d (%s) stopped: blocked\n",
t->id, whatNext_strs[t->what_next]));
continue;
}
#endif
ASSERT_CAPABILITY_INVARIANTS(cap,task);
// The TSO might have moved, eg. if it re-entered the RTS and a GC
// happened. So find the new location:
t = cap->r.rCurrentTSO;
// And save the current errno in this thread.
t->saved_errno = errno;
......@@ -665,6 +675,7 @@ run_thread:
case ThreadFinished:
if (scheduleHandleThreadFinished(cap, task, t)) return cap;
ASSERT_CAPABILITY_INVARIANTS(cap,task);
break;
default:
......@@ -705,6 +716,87 @@ schedulePreLoop(void)
#endif
}
/* -----------------------------------------------------------------------------
* schedulePushWork()
*
* Push work to other Capabilities if we have some.
* -------------------------------------------------------------------------- */
static void
schedulePushWork(Capability *cap, Task *task)
{
#ifdef SMP
Capability *free_caps[n_capabilities], *cap0;
nat i, n_free_caps;
// Check whether we have more threads on our run queue that we
// could hand to another Capability.
if (emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) {
return;
}
// First grab as many free Capabilities as we can.
for (i=0, n_free_caps=0; i < n_capabilities; i++) {
cap0 = &capabilities[i];
if (cap != cap0 && tryGrabCapability(cap0,task)) {
if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
// it already has some work, we just grabbed it at
// the wrong moment. Or maybe it's deadlocked!
releaseCapability(cap0);
} else {
free_caps[n_free_caps++] = cap0;
}
}
}
// we now have n_free_caps free capabilities stashed in
// free_caps[]. Share our run queue equally with them. This is
// probably the simplest thing we could do; improvements we might
// want to do include:
//
// - giving high priority to moving relatively new threads, on
// the gournds that they haven't had time to build up a
// working set in the cache on this CPU/Capability.
//
// - giving low priority to moving long-lived threads
if (n_free_caps > 0) {
StgTSO *prev, *t, *next;
IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
prev = cap->run_queue_hd;
t = prev->link;
prev->link = END_TSO_QUEUE;
i = 0;
for (; t != END_TSO_QUEUE; t = next) {
next = t->link;
t->link = END_TSO_QUEUE;
if (t->what_next == ThreadRelocated) {
prev->link = t;
prev = t;
} else if (i == n_free_caps) {
i = 0;
// keep one for us
prev->link = t;
prev = t;
} else {
appendToRunQueue(free_caps[i],t);
if (t->bound) { t->bound->cap = free_caps[i]; }
i++;
}
}
cap->run_queue_tl = prev;
// release the capabilities
for (i = 0; i < n_free_caps; i++) {
task->cap = free_caps[i];
releaseCapability(free_caps[i]);
}
}
task->cap = cap; // reset to point to our Capability.
#endif
}
/* ----------------------------------------------------------------------------
* Start any pending signal handlers
* ------------------------------------------------------------------------- */
......@@ -1775,7 +1867,13 @@ scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
//
was_waiting = cas(&waiting_for_gc, 0, 1);
if (was_waiting) return;
if (was_waiting) {
do {
IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
yieldCapability(&cap,task);
} while (waiting_for_gc);
return;
}
for (i=0; i < n_capabilities; i++) {
IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
......@@ -1787,6 +1885,7 @@ scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
// all the Capabilities, but even so it's a slightly
// unsavoury invariant.
task->cap = pcap;
context_switch = 1;
waitForReturnCapability(&pcap, task);
if (pcap != &capabilities[i]) {
barf("scheduleDoGC: got the wrong capability");
......@@ -3986,25 +4085,37 @@ printThreadBlockage(StgTSO *tso)
}
}
static void
printThreadStatus(StgTSO *tso)
void
printThreadStatus(StgTSO *t)
{
switch (tso->what_next) {
case ThreadKilled:
debugBelch("has been killed");
break;
case ThreadComplete:
debugBelch("has completed");
break;
default:
printThreadBlockage(tso);
}
debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
{
void *label = lookupThreadLabel(t->id);
if (label) debugBelch("[\"%s\"] ",(char *)label);
}
if (t->what_next == ThreadRelocated) {
debugBelch("has been relocated...\n");
} else {
switch (t->what_next) {
case ThreadKilled:
debugBelch("has been killed");
break;
case ThreadComplete:
debugBelch("has completed");
break;
default:
printThreadBlockage(t);
}
debugBelch("\n");
}
}
void
printAllThreads(void)
{
StgTSO *t;
StgTSO *t, *next;
nat i;
Capability *cap;
# if defined(GRAN)
char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
......@@ -4022,20 +4133,23 @@ printAllThreads(void)
debugBelch("all threads:\n");
# endif
for (t = all_threads; t != END_TSO_QUEUE; ) {
debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
{
void *label = lookupThreadLabel(t->id);
if (label) debugBelch("[\"%s\"] ",(char *)label);
}
if (t->what_next == ThreadRelocated) {
debugBelch("has been relocated...\n");
t = t->link;
} else {
printThreadStatus(t);
debugBelch("\n");
t = t->global_link;
}
for (i = 0; i < n_capabilities; i++) {
cap = &capabilities[i];
debugBelch("threads on capability %d:\n", cap->no);
for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
printThreadStatus(t);
}
}
for (t = all_threads; t != END_TSO_QUEUE; t = next) {
if (t->why_blocked != NotBlocked) {
printThreadStatus(t);
}
if (t->what_next == ThreadRelocated) {
next = t->link;
} else {
next = t->global_link;
}
}
}
......@@ -4045,13 +4159,7 @@ printThreadQueue(StgTSO *t)
{
nat i = 0;
for (; t != END_TSO_QUEUE; t = t->link) {
debugBelch("\tthread %d @ %p ", t->id, (void *)t);
if (t->what_next == ThreadRelocated) {
debugBelch("has been relocated...\n");
} else {
printThreadStatus(t);
debugBelch("\n");
}
printThreadStatus(t);
i++;
}
debugBelch("%d threads on queue\n", i);
......
......@@ -980,6 +980,11 @@ memInventory(void)
/* count the blocks we current have */
for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
for (i = 0; i < n_capabilities; i++) {
for (bd = capabilities[i].mut_lists[g]; bd != NULL; bd = bd->link) {
total_blocks += bd->blocks;
}
}
for (bd = generations[g].mut_list; bd != NULL; bd = bd->link) {
total_blocks += bd->blocks;
}
......
......@@ -280,8 +280,9 @@ DEBUG_FILL_SLOP(StgClosure *p)
and_then; \
} else { \
DEBUG_FILL_SLOP(p1); \
foreign "C" recordMutableGenLock(p1 "ptr", \
generation(TO_W_(bdescr_gen_no(bd))) "ptr"); \
foreign "C" recordMutableCap(p1 "ptr", \
MyCapability() "ptr", \
bdescr_gen_no(bd)); \
StgInd_indirectee(p1) = p2; \
SET_INFO(p1, stg_IND_OLDGEN_info); \
LDV_RECORD_CREATE(p1); \
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment