Commit 200c73fd authored by simonmarhaskell@gmail.com's avatar simonmarhaskell@gmail.com
Browse files

Don't traverse the entire list of threads on every GC (phase 1)

Instead of keeping a single list of all threads, keep one per step
and only look at the threads belonging to steps that we are
collecting.
parent 233a4687
......@@ -69,6 +69,8 @@ typedef struct step_ {
bdescr * large_objects; // large objects (doubly linked)
unsigned int n_large_blocks; // no. of blocks used by large objs
StgTSO * threads; // threads in this step
// linked via global_link
// ------------------------------------
// Fields below are used during GC only
......@@ -100,6 +102,7 @@ typedef struct step_ {
bdescr * bitmap; // bitmap for compacting collection
StgTSO * old_threads;
} step;
......
......@@ -781,13 +781,17 @@ checkThreadQsSanity (rtsBool check_TSO_too)
void
checkGlobalTSOList (rtsBool checkTSOs)
{
extern StgTSO *all_threads;
StgTSO *tso;
for (tso=all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
ASSERT(LOOKS_LIKE_CLOSURE_PTR(tso));
ASSERT(get_itbl(tso)->type == TSO);
if (checkTSOs)
checkTSO(tso);
nat s;
for (s = 0; s < total_steps; s++) {
for (tso=all_steps[s].threads; tso != END_TSO_QUEUE;
tso = tso->global_link) {
ASSERT(LOOKS_LIKE_CLOSURE_PTR(tso));
ASSERT(get_itbl(tso)->type == TSO);
if (checkTSOs)
checkTSO(tso);
}
}
}
......
......@@ -118,12 +118,6 @@ StgTSO *blackhole_queue = NULL;
*/
rtsBool blackholes_need_checking = rtsFalse;
/* Linked list of all threads.
* Used for detecting garbage collected threads.
* LOCK: sched_mutex+capability, or all capabilities
*/
StgTSO *all_threads = NULL;
/* flag set by signal handler to precipitate a context switch
* LOCK: none (just an advisory flag)
*/
......@@ -1898,7 +1892,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
// point where we can deal with this. Leaving it on the run
// queue also ensures that the garbage collector knows about
// this thread and its return value (it gets dropped from the
// all_threads list so there's no other way to find it).
// step->threads list so there's no other way to find it).
appendToRunQueue(cap,t);
return rtsFalse;
#else
......@@ -2016,8 +2010,10 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
*/
{
StgTSO *next;
nat s;
for (t = all_threads; t != END_TSO_QUEUE; t = next) {
for (s = 0; s < total_steps; s++) {
for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
if (t->what_next == ThreadRelocated) {
next = t->_link;
} else {
......@@ -2052,6 +2048,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
}
}
}
}
}
}
......@@ -2133,6 +2130,7 @@ forkProcess(HsStablePtr *entry
pid_t pid;
StgTSO* t,*next;
Capability *cap;
nat s;
#if defined(THREADED_RTS)
if (RtsFlags.ParFlags.nNodes > 1) {
......@@ -2180,7 +2178,8 @@ forkProcess(HsStablePtr *entry
// all Tasks, because they correspond to OS threads that are
// now gone.
for (t = all_threads; t != END_TSO_QUEUE; t = next) {
for (s = 0; s < total_steps; s++) {
for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
if (t->what_next == ThreadRelocated) {
next = t->_link;
} else {
......@@ -2190,6 +2189,7 @@ forkProcess(HsStablePtr *entry
// threads may be evaluating thunks that we need later.
deleteThread_(cap,t);
}
}
}
// Empty the run queue. It seems tempting to let all the
......@@ -2203,9 +2203,11 @@ forkProcess(HsStablePtr *entry
// don't exist now:
cap->suspended_ccalling_tasks = NULL;
// Empty the all_threads list. Otherwise, the garbage
// Empty the threads lists. Otherwise, the garbage
// collector may attempt to resurrect some of these threads.
all_threads = END_TSO_QUEUE;
for (s = 0; s < total_steps; s++) {
all_steps[s].threads = END_TSO_QUEUE;
}
// Wipe the task list, except the current Task.
ACQUIRE_LOCK(&sched_mutex);
......@@ -2255,14 +2257,18 @@ deleteAllThreads ( Capability *cap )
// NOTE: only safe to call if we own all capabilities.
StgTSO* t, *next;
nat s;
debugTrace(DEBUG_sched,"deleting all threads");
for (t = all_threads; t != END_TSO_QUEUE; t = next) {
for (s = 0; s < total_steps; s++) {
for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
if (t->what_next == ThreadRelocated) {
next = t->_link;
} else {
next = t->global_link;
deleteThread(cap,t);
}
}
}
// The run queue now contains a bunch of ThreadKilled threads. We
......@@ -2572,7 +2578,6 @@ initScheduler(void)
#endif
blackhole_queue = END_TSO_QUEUE;
all_threads = END_TSO_QUEUE;
context_switch = 0;
sched_state = SCHED_RUNNING;
......@@ -3143,11 +3148,15 @@ resurrectThreads (StgTSO *threads)
{
StgTSO *tso, *next;
Capability *cap;
step *step;
for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
next = tso->global_link;
tso->global_link = all_threads;
all_threads = tso;
step = Bdescr((P_)tso)->step;
tso->global_link = step->threads;
step->threads = tso;
debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
// Wake up the thread on the Capability it was last on
......
......@@ -133,11 +133,6 @@ extern StgTSO *RTS_VAR(sleeping_queue);
#endif
#endif
/* Linked list of all threads.
* Locks required : sched_mutex
*/
extern StgTSO *RTS_VAR(all_threads);
/* Set to rtsTrue if there are threads on the blackhole_queue, and
* it is possible that one or more of them may be available to run.
* This flag is set to rtsFalse after we've checked the queue, and
......
......@@ -145,8 +145,8 @@ createThread(Capability *cap, nat size)
*/
ACQUIRE_LOCK(&sched_mutex);
tso->id = next_thread_id++; // while we have the mutex
tso->global_link = all_threads;
all_threads = tso;
tso->global_link = g0s0->threads;
g0s0->threads = tso;
RELEASE_LOCK(&sched_mutex);
#if defined(DIST)
......@@ -771,7 +771,7 @@ void
printAllThreads(void)
{
StgTSO *t, *next;
nat i;
nat i, s;
Capability *cap;
# if defined(GRAN)
......@@ -799,7 +799,8 @@ printAllThreads(void)
}
debugBelch("other threads:\n");
for (t = all_threads; t != END_TSO_QUEUE; t = next) {
for (s = 0; s < total_steps; s++) {
for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
if (t->why_blocked != NotBlocked) {
printThreadStatus(t);
}
......@@ -808,6 +809,7 @@ printAllThreads(void)
} else {
next = t->global_link;
}
}
}
}
......
......@@ -986,7 +986,9 @@ compact(StgClosure *static_objects)
}
// the global thread list
thread((void *)&all_threads);
for (s = 0; s < total_steps; s++) {
thread((void *)&all_steps[s].threads);
}
// any threads resurrected during this GC
thread((void *)&resurrected_threads);
......
......@@ -1078,14 +1078,19 @@ init_collected_gen (nat g, nat n_threads)
for (s = 0; s < generations[g].n_steps; s++) {
stp = &generations[g].steps[s];
ASSERT(stp->gen_no == g);
// we'll construct a new list of threads in this step
// during GC, throw away the current list.
stp->old_threads = stp->threads;
stp->threads = END_TSO_QUEUE;
// generation 0, step 0 doesn't need to-space
if (g == 0 && s == 0 && RtsFlags.GcFlags.generations > 1) {
continue;
}
stp = &generations[g].steps[s];
ASSERT(stp->gen_no == g);
// deprecate the existing blocks
stp->old_blocks = stp->blocks;
stp->n_old_blocks = stp->n_blocks;
......
......@@ -77,7 +77,6 @@ StgWeak *old_weak_ptr_list; // also pending finaliser list
/* List of all threads during GC
*/
StgTSO *resurrected_threads;
static StgTSO *old_all_threads;
void
initWeakForGC(void)
......@@ -85,12 +84,6 @@ initWeakForGC(void)
old_weak_ptr_list = weak_ptr_list;
weak_ptr_list = NULL;
weak_stage = WeakPtrs;
/* The all_threads list is like the weak_ptr_list.
* See traverseWeakPtrList() for the details.
*/
old_all_threads = all_threads;
all_threads = END_TSO_QUEUE;
resurrected_threads = END_TSO_QUEUE;
}
......@@ -185,53 +178,67 @@ traverseWeakPtrList(void)
* the weak ptr list. If we discover any threads that are about to
* become garbage, we wake them up and administer an exception.
*/
{
StgTSO *t, *tmp, *next, **prev;
{
StgTSO *t, *tmp, *next, **prev;
nat g, s;
step *stp;
prev = &old_all_threads;
for (t = old_all_threads; t != END_TSO_QUEUE; t = next) {
tmp = (StgTSO *)isAlive((StgClosure *)t);
// Traverse thread lists for generations we collected...
for (g = 0; g <= N; g++) {
for (s = 0; s < generations[g].n_steps; s++) {
stp = &generations[g].steps[s];
prev = &stp->old_threads;
for (t = stp->old_threads; t != END_TSO_QUEUE; t = next) {
if (tmp != NULL) {
t = tmp;
}
tmp = (StgTSO *)isAlive((StgClosure *)t);
ASSERT(get_itbl(t)->type == TSO);
switch (t->what_next) {
case ThreadRelocated:
next = t->_link;
*prev = next;
continue;
case ThreadKilled:
case ThreadComplete:
// finshed or died. The thread might still be alive, but we
// don't keep it on the all_threads list. Don't forget to
// stub out its global_link field.
next = t->global_link;
t->global_link = END_TSO_QUEUE;
*prev = next;
continue;
default:
;
}
if (tmp != NULL) {
t = tmp;
}
ASSERT(get_itbl(t)->type == TSO);
switch (t->what_next) {
case ThreadRelocated:
next = t->_link;
*prev = next;
continue;
case ThreadKilled:
case ThreadComplete:
// finshed or died. The thread might still
// be alive, but we don't keep it on the
// all_threads list. Don't forget to
// stub out its global_link field.
next = t->global_link;
t->global_link = END_TSO_QUEUE;
*prev = next;
continue;
default:
;
}
if (tmp == NULL) {
// not alive (yet): leave this thread on the
// old_all_threads list.
prev = &(t->global_link);
next = t->global_link;
}
else {
// alive: move this thread onto the all_threads list.
next = t->global_link;
t->global_link = all_threads;
all_threads = t;
*prev = next;
}
}
if (tmp == NULL) {
// not alive (yet): leave this thread on the
// old_all_threads list.
prev = &(t->global_link);
next = t->global_link;
}
else {
step *new_step;
// alive: move this thread onto the correct
// threads list.
next = t->global_link;
new_step = Bdescr((P_)t)->step;
t->global_link = new_step->threads;
new_step->threads = t;
*prev = next;
}
}
}
}
}
/* If we evacuated any threads, we need to go back to the scavenger.
*/
if (flag) return rtsTrue;
......@@ -239,14 +246,23 @@ traverseWeakPtrList(void)
/* And resurrect any threads which were about to become garbage.
*/
{
nat g, s;
step *stp;
StgTSO *t, *tmp, *next;
for (t = old_all_threads; t != END_TSO_QUEUE; t = next) {
next = t->global_link;
tmp = t;
evacuate((StgClosure **)&tmp);
tmp->global_link = resurrected_threads;
resurrected_threads = tmp;
}
for (g = 0; g <= N; g++) {
for (s = 0; s < generations[g].n_steps; s++) {
stp = &generations[g].steps[s];
for (t = stp->old_threads; t != END_TSO_QUEUE; t = next) {
next = t->global_link;
tmp = t;
evacuate((StgClosure **)&tmp);
tmp->global_link = resurrected_threads;
resurrected_threads = tmp;
}
}
}
}
/* Finally, we can update the blackhole_queue. This queue
......
......@@ -101,6 +101,8 @@ initStep (step *stp, int g, int s)
initSpinLock(&stp->sync_todo);
initSpinLock(&stp->sync_large_objects);
#endif
stp->threads = END_TSO_QUEUE;
stp->old_threads = END_TSO_QUEUE;
}
void
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment