From 3f6e8f4257fd6b0b775794cc9d229d46ecf15f57 Mon Sep 17 00:00:00 2001 From: Ben Gamari <ben@smart-cactus.org> Date: Fri, 4 Aug 2023 14:53:18 -0400 Subject: [PATCH] nonmoving: Refactor management of mark thread Here we refactor that treatment of the worker thread used by the nonmoving GC for concurrent marking, avoiding creating a new thread with every major GC cycle. As well, the new scheme is considerably easier to reason about, consolidating all state in one place, accessed via a small set of accessors with clear semantics. --- rts/RtsAPI.c | 4 +- rts/Schedule.c | 1 - rts/sm/GC.c | 2 +- rts/sm/NonMoving.c | 266 +++++++++++++++++++++++++++++---------------- rts/sm/NonMoving.h | 9 +- rts/sm/Sanity.c | 10 +- 6 files changed, 178 insertions(+), 114 deletions(-) diff --git a/rts/RtsAPI.c b/rts/RtsAPI.c index c5fd84ee316c..d1e352cb85d4 100644 --- a/rts/RtsAPI.c +++ b/rts/RtsAPI.c @@ -716,7 +716,7 @@ PauseToken *rts_pause (void) // so pausing the mutator while a collection is ongoing might lead to deadlock or // capabilities being prematurely re-awoken. if (RtsFlags.GcFlags.useNonmoving) { - ACQUIRE_LOCK(&nonmoving_collection_mutex); + nonmovingBlockConcurrentMark(true); } @@ -784,7 +784,7 @@ void rts_resume (PauseToken *pauseToken) stgFree(pauseToken); if (RtsFlags.GcFlags.useNonmoving) { - RELEASE_LOCK(&nonmoving_collection_mutex); + nonmovingUnblockConcurrentMark(); } } diff --git a/rts/Schedule.c b/rts/Schedule.c index cf3eb328f469..59208492fd95 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -2786,7 +2786,6 @@ exitScheduler (bool wait_foreign USED_IF_THREADS) // If we haven't killed all the threads yet, do it now. if (getSchedState() < SCHED_SHUTTING_DOWN) { setSchedState(SCHED_INTERRUPTING); - nonmovingStop(); Capability *cap = task->cap; waitForCapability(&cap,task); scheduleDoGC(&cap,task,true,false,false,true); diff --git a/rts/sm/GC.c b/rts/sm/GC.c index 20d80a4d8e9c..51f8b32e4b00 100644 --- a/rts/sm/GC.c +++ b/rts/sm/GC.c @@ -354,7 +354,7 @@ GarbageCollect (struct GcConfig config, deadlock_detect_gc = config.deadlock_detect; #if defined(THREADED_RTS) - if (major_gc && RtsFlags.GcFlags.useNonmoving && RELAXED_LOAD(&concurrent_coll_running)) { + if (major_gc && RtsFlags.GcFlags.useNonmoving && nonmovingConcurrentMarkIsRunning()) { /* If there is already a concurrent major collection running then * there is no benefit to starting another. * TODO: Catch heap-size runaway. diff --git a/rts/sm/NonMoving.c b/rts/sm/NonMoving.c index 05fa902c89ae..5d58d162b768 100644 --- a/rts/sm/NonMoving.c +++ b/rts/sm/NonMoving.c @@ -38,18 +38,6 @@ static void nonmovingBumpEpoch(void) { nonmovingMarkEpoch = nonmovingMarkEpoch == 1 ? 2 : 1; } -#if defined(THREADED_RTS) -/* - * This mutex ensures that only one non-moving collection is active at a time. - */ -Mutex nonmoving_collection_mutex; - -OSThreadId mark_thread; -bool concurrent_coll_running = false; -Condition concurrent_coll_finished; -Mutex concurrent_coll_finished_lock; -#endif - /* * Note [Non-moving garbage collector] * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -546,11 +534,12 @@ memcount nonmoving_segment_live_words = 0; // See Note [Sync phase marking budget]. MarkBudget sync_phase_marking_budget = 200000; -#if defined(THREADED_RTS) -static void* nonmovingConcurrentMark(void *mark_queue); -#endif static void nonmovingMark_(MarkQueue *mark_queue, StgWeak **dead_weaks, StgTSO **resurrected_threads, bool concurrent); +static void nonmovingInitConcurrentWorker(void); +static void nonmovingStartConcurrentMark(MarkQueue *roots); +static void nonmovingExitConcurrentWorker(void); + // Add a segment to the free list. void nonmovingPushFreeSegment(struct NonmovingSegment *seg) { @@ -594,44 +583,14 @@ unsigned int nonmovingBlockCountFromSize(uint8_t log_block_size) void nonmovingInit(void) { if (! RtsFlags.GcFlags.useNonmoving) return; -#if defined(THREADED_RTS) - initMutex(&nonmoving_collection_mutex); - initCondition(&concurrent_coll_finished); - initMutex(&concurrent_coll_finished_lock); -#endif + nonmovingInitConcurrentWorker(); nonmovingMarkInit(); } -// Stop any nonmoving collection in preparation for RTS shutdown. -void nonmovingStop(void) -{ - if (! RtsFlags.GcFlags.useNonmoving) return; -#if defined(THREADED_RTS) - if (RELAXED_LOAD(&mark_thread)) { - debugTrace(DEBUG_nonmoving_gc, - "waiting for nonmoving collector thread to terminate"); - ACQUIRE_LOCK(&concurrent_coll_finished_lock); - waitCondition(&concurrent_coll_finished, &concurrent_coll_finished_lock); - RELEASE_LOCK(&concurrent_coll_finished_lock); - } -#endif -} - void nonmovingExit(void) { if (! RtsFlags.GcFlags.useNonmoving) return; - - // First make sure collector is stopped before we tear things down. - nonmovingStop(); - -#if defined(THREADED_RTS) - ACQUIRE_LOCK(&nonmoving_collection_mutex); - RELEASE_LOCK(&nonmoving_collection_mutex); - - closeMutex(&concurrent_coll_finished_lock); - closeCondition(&concurrent_coll_finished); - closeMutex(&nonmoving_collection_mutex); -#endif + nonmovingExitConcurrentWorker(); } /* Prepare the heap bitmaps and snapshot metadata for a mark */ @@ -715,14 +674,19 @@ static void nonmovingPrepareMark(void) #endif } -void nonmovingCollect(StgWeak **dead_weaks, StgTSO **resurrected_threads, bool concurrent STG_UNUSED) +void nonmovingCollect(StgWeak **dead_weaks, StgTSO **resurrected_threads, bool concurrent) { #if defined(THREADED_RTS) - // We can't start a new collection until the old one has finished + // We can't start a new collection until the old one has finished. // We also don't run in final GC - if (RELAXED_LOAD(&concurrent_coll_running) || getSchedState() > SCHED_RUNNING) { + if (nonmovingConcurrentMarkIsRunning()) { + trace(TRACE_nonmoving_gc, "Aborted nonmoving collection due to on-going collection"); + } else if (getSchedState() > SCHED_RUNNING) { + trace(TRACE_nonmoving_gc, "Aborted nonmoving collection due to on-going shutdown"); return; } +#else + concurrent = false; #endif trace(TRACE_nonmoving_gc, "Starting nonmoving GC preparation"); @@ -819,28 +783,15 @@ void nonmovingCollect(StgWeak **dead_weaks, StgTSO **resurrected_threads, bool c concurrent = false; } -#if defined(THREADED_RTS) if (concurrent) { - RELAXED_STORE(&concurrent_coll_running, true); - nonmoving_write_barrier_enabled = true; - debugTrace(DEBUG_nonmoving_gc, "Starting concurrent mark thread"); - OSThreadId thread; - if (createOSThread(&thread, "nonmoving-mark", - nonmovingConcurrentMark, mark_queue) != 0) { - barf("nonmovingCollect: failed to spawn mark thread: %s", strerror(errno)); - } - RELAXED_STORE(&mark_thread, thread); - return; + nonmovingStartConcurrentMark(mark_queue); } else { RELEASE_SM_LOCK; - } -#endif - // Use the weak and thread lists from the preparation for any new weaks and - // threads found to be dead in mark. - nonmovingMark_(mark_queue, dead_weaks, resurrected_threads, false); + // Use the weak and thread lists from the preparation for any new weaks and + // threads found to be dead in mark. + nonmovingMark_(mark_queue, dead_weaks, resurrected_threads, false); - if (!concurrent) { ACQUIRE_SM_LOCK; } } @@ -868,14 +819,155 @@ static bool nonmovingMarkThreadsWeaks(MarkBudget *budget, MarkQueue *mark_queue) } } -#if defined(THREADED_RTS) -static void* nonmovingConcurrentMark(void *data) +#if !defined(THREADED_RTS) + +static void nonmovingInitConcurrentWorker(void) {} +static void nonmovingExitConcurrentWorker(void) {} + +static void STG_NORETURN nonmovingStartConcurrentMark(MarkQueue *roots STG_UNUSED) { - MarkQueue *mark_queue = (MarkQueue*)data; - StgWeak *dead_weaks = NULL; - StgTSO *resurrected_threads = (StgTSO*)&stg_END_TSO_QUEUE_closure; - nonmovingMark_(mark_queue, &dead_weaks, &resurrected_threads, true); - return NULL; + barf("nonmovingStartConcurrentMark: Not supported in non-threaded RTS"); +} + +bool nonmovingConcurrentMarkIsRunning(void) +{ + return false; +} + +bool nonmovingBlockConcurrentMark(bool wait STG_UNUSED) { return true; } +void nonmovingUnblockConcurrentMark(void) {} + +#else + +enum ConcurrentWorkerState { + CONCURRENT_WORKER_IDLE, + CONCURRENT_WORKER_RUNNING, + CONCURRENT_WORKER_STOPPED, +}; + +Mutex concurrent_coll_lock; +MarkQueue *concurrent_mark_roots; +Condition start_concurrent_mark_cond; +Condition concurrent_coll_finished_cond; +enum ConcurrentWorkerState concurrent_worker_state; +bool stop_concurrent_worker; +OSThreadId concurrent_worker_thread; + +static void* nonmovingConcurrentMarkWorker(void *data STG_UNUSED) +{ + newBoundTask(); + + ACQUIRE_LOCK(&concurrent_coll_lock); + while (true) { + concurrent_worker_state = CONCURRENT_WORKER_IDLE; + waitCondition(&start_concurrent_mark_cond, &concurrent_coll_lock); + if (stop_concurrent_worker) { + concurrent_worker_state = CONCURRENT_WORKER_STOPPED; + concurrent_worker_thread = 0; + broadcastCondition(&concurrent_coll_finished_cond); + RELEASE_LOCK(&concurrent_coll_lock); + return NULL; + } + + CHECK(concurrent_worker_state == CONCURRENT_WORKER_RUNNING); + MarkQueue *mark_queue = concurrent_mark_roots; + concurrent_mark_roots = NULL; + RELEASE_LOCK(&concurrent_coll_lock); + + StgWeak *dead_weaks = NULL; + StgTSO *resurrected_threads = (StgTSO*)&stg_END_TSO_QUEUE_closure; + nonmovingMark_(mark_queue, &dead_weaks, &resurrected_threads, true); + + ACQUIRE_LOCK(&concurrent_coll_lock); + broadcastCondition(&concurrent_coll_finished_cond); + } +} + +static void nonmovingInitConcurrentWorker(void) +{ + debugTrace(DEBUG_nonmoving_gc, "Starting concurrent mark thread"); + initMutex(&concurrent_coll_lock); + ACQUIRE_LOCK(&concurrent_coll_lock); + initCondition(&start_concurrent_mark_cond); + initCondition(&concurrent_coll_finished_cond); + stop_concurrent_worker = false; + concurrent_worker_state = CONCURRENT_WORKER_IDLE; + concurrent_mark_roots = NULL; + + if (createOSThread(&concurrent_worker_thread, "nonmoving-mark", + nonmovingConcurrentMarkWorker, NULL) != 0) { + barf("nonmovingInitConcurrentWorker: failed to spawn mark thread: %s", strerror(errno)); + } + RELEASE_LOCK(&concurrent_coll_lock); +} + +static void nonmovingExitConcurrentWorker(void) +{ + debugTrace(DEBUG_nonmoving_gc, + "waiting for nonmoving collector thread to terminate"); + ACQUIRE_LOCK(&concurrent_coll_lock); + while (concurrent_worker_state != CONCURRENT_WORKER_STOPPED) { + stop_concurrent_worker = true; + signalCondition(&start_concurrent_mark_cond); + waitCondition(&concurrent_coll_finished_cond, &concurrent_coll_lock); + } + RELEASE_LOCK(&concurrent_coll_lock); + + closeMutex(&concurrent_coll_lock); + closeCondition(&start_concurrent_mark_cond); + closeCondition(&concurrent_coll_finished_cond); +} + +static void nonmovingStartConcurrentMark(MarkQueue *roots) +{ + ACQUIRE_LOCK(&concurrent_coll_lock); + CHECK(concurrent_worker_state != CONCURRENT_WORKER_RUNNING); + concurrent_worker_state = CONCURRENT_WORKER_RUNNING; + concurrent_mark_roots = roots; + RELAXED_STORE(&nonmoving_write_barrier_enabled, true); + signalCondition(&start_concurrent_mark_cond); + RELEASE_LOCK(&concurrent_coll_lock); +} + +bool nonmovingConcurrentMarkIsRunning(void) +{ + ACQUIRE_LOCK(&concurrent_coll_lock); + bool running = concurrent_worker_state == CONCURRENT_WORKER_RUNNING; + RELEASE_LOCK(&concurrent_coll_lock); + return running; +} + +// Prevent the initiation of concurrent marking. Used by the sanity checker to +// avoid racing with the concurrent mark thread. +// If `wait` then wait until on-going marking has finished. +// Returns true if successfully blocked, false if mark is running. +bool nonmovingBlockConcurrentMark(bool wait) +{ + if (!RtsFlags.GcFlags.useNonmoving) { + return true; + } + ACQUIRE_LOCK(&concurrent_coll_lock); + if (wait) { + while (concurrent_worker_state == CONCURRENT_WORKER_RUNNING) { + waitCondition(&concurrent_coll_finished_cond, &concurrent_coll_lock); + } + } + bool running = concurrent_worker_state == CONCURRENT_WORKER_RUNNING; + // N.B. We don't release concurrent_coll_lock to block marking. + if (running) { + RELEASE_LOCK(&concurrent_coll_lock); + return false; + } else { + return true; + } +} + +void nonmovingUnblockConcurrentMark(void) +{ + if (!RtsFlags.GcFlags.useNonmoving) { + return; + } + RELEASE_LOCK(&concurrent_coll_lock); } // Append w2 to the end of w1. @@ -893,7 +985,6 @@ static void nonmovingMark_(MarkQueue *mark_queue, StgWeak **dead_weaks, StgTSO * #if !defined(THREADED_RTS) ASSERT(!concurrent); #endif - ACQUIRE_LOCK(&nonmoving_collection_mutex); debugTrace(DEBUG_nonmoving_gc, "Starting mark..."); stat_startNonmovingGc(); @@ -933,10 +1024,7 @@ concurrent_marking: } #if defined(THREADED_RTS) - Task *task = NULL; if (concurrent) { - task = newBoundTask(); - // If at this point if we've decided to exit then just return if (getSchedState() > SCHED_RUNNING) { // Note that we break our invariants here and leave segments in @@ -952,7 +1040,7 @@ concurrent_marking: } // We're still running, request a sync - nonmovingBeginFlush(task); + nonmovingBeginFlush(myTask()); bool all_caps_syncd; MarkBudget sync_marking_budget = sync_phase_marking_budget; @@ -963,7 +1051,7 @@ concurrent_marking: // See Note [Sync phase marking budget]. traceConcSyncEnd(); stat_endNonmovingGcSync(); - releaseAllCapabilities(n_capabilities, NULL, task); + releaseAllCapabilities(n_capabilities, NULL, myTask()); goto concurrent_marking; } } while (!all_caps_syncd); @@ -1045,7 +1133,7 @@ concurrent_marking: #if !defined(NONCONCURRENT_SWEEP) if (concurrent) { nonmoving_write_barrier_enabled = false; - nonmovingFinishFlush(task); + nonmovingFinishFlush(myTask()); } #endif #endif @@ -1097,24 +1185,10 @@ concurrent_marking: } #endif - // TODO: Remainder of things done by GarbageCollect (update stats) - #if defined(THREADED_RTS) finish: - if (concurrent) { - exitMyTask(); - - // We are done... - RELAXED_STORE(&mark_thread, 0); - stat_endNonmovingGc(); - } - - // Signal that the concurrent collection is finished, allowing the next - // non-moving collection to proceed - RELAXED_STORE(&concurrent_coll_running, false); - signalCondition(&concurrent_coll_finished); - RELEASE_LOCK(&nonmoving_collection_mutex); #endif + stat_endNonmovingGc(); } #if defined(DEBUG) diff --git a/rts/sm/NonMoving.h b/rts/sm/NonMoving.h index 870e5fa9e42c..308525610266 100644 --- a/rts/sm/NonMoving.h +++ b/rts/sm/NonMoving.h @@ -124,15 +124,12 @@ extern struct NonmovingHeap nonmovingHeap; extern memcount nonmoving_segment_live_words; -#if defined(THREADED_RTS) -extern bool concurrent_coll_running; -extern Mutex nonmoving_collection_mutex; -#endif - void nonmovingInit(void); -void nonmovingStop(void); void nonmovingExit(void); +bool nonmovingConcurrentMarkIsRunning(void); +bool nonmovingBlockConcurrentMark(bool wait); +void nonmovingUnblockConcurrentMark(void); // dead_weaks and resurrected_threads lists are used for two things: // diff --git a/rts/sm/Sanity.c b/rts/sm/Sanity.c index 2ad0c185b6aa..667f70633b5a 100644 --- a/rts/sm/Sanity.c +++ b/rts/sm/Sanity.c @@ -1255,8 +1255,7 @@ memInventory (bool show) #if defined(THREADED_RTS) // We need to be careful not to race with the nonmoving collector. // If a nonmoving collection is on-going we simply abort the inventory. - if (RtsFlags.GcFlags.useNonmoving){ - if(TRY_ACQUIRE_LOCK(&nonmoving_collection_mutex)) + if (RtsFlags.GcFlags.useNonmoving && !nonmovingBlockConcurrentMark(false)) { return; } #endif @@ -1363,12 +1362,7 @@ memInventory (bool show) ASSERT(n_alloc_blocks == live_blocks); ASSERT(!leak); -#if defined(THREADED_RTS) - if (RtsFlags.GcFlags.useNonmoving){ - RELEASE_LOCK(&nonmoving_collection_mutex); - } -#endif - + nonmovingUnblockConcurrentMark(); } #endif /* DEBUG */ -- GitLab