From 515eb33d4fcc9382032a9e6b1757b9e2b00a145c Mon Sep 17 00:00:00 2001 From: Ben Gamari <ben@smart-cactus.org> Date: Thu, 14 Dec 2023 12:27:50 -0500 Subject: [PATCH] rts: Fix synchronization on thread blocking state We now use a release barrier whenever we update a thread's blocking state. This required widening StgTSO.why_blocked as AArch64 does not support atomic writes on 16-bit values. --- rts/Exception.cmm | 2 +- rts/PrimOps.cmm | 80 ++++++++++++++++------------------- rts/RaiseAsync.c | 4 +- rts/STM.c | 2 +- rts/Schedule.c | 3 +- rts/StgMiscClosures.cmm | 2 +- rts/Threads.c | 11 ++--- rts/TraverseHeap.c | 2 +- rts/include/rts/storage/TSO.h | 9 +++- rts/include/stg/SMP.h | 9 ++++ rts/posix/Select.c | 4 +- rts/sm/Compact.c | 2 +- rts/sm/NonMovingMark.c | 2 +- rts/sm/Scav.c | 2 +- rts/win32/AsyncMIO.c | 2 +- 15 files changed, 73 insertions(+), 63 deletions(-) diff --git a/rts/Exception.cmm b/rts/Exception.cmm index cc1e4cf3274d..838d43807b46 100644 --- a/rts/Exception.cmm +++ b/rts/Exception.cmm @@ -351,9 +351,9 @@ stg_killThreadzh (P_ target, P_ exception) if (msg == NULL) { return (); } else { - StgTSO_why_blocked(CurrentTSO) = BlockedOnMsgThrowTo; updateRemembSetPushPtr(StgTSO_block_info(CurrentTSO)); StgTSO_block_info(CurrentTSO) = msg; + %release StgTSO_why_blocked(CurrentTSO) = BlockedOnMsgThrowTo; // we must block, and unlock the message before returning jump stg_block_throwto (target, exception); } diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm index 11117c0ed82d..ae277ec12f7a 100644 --- a/rts/PrimOps.cmm +++ b/rts/PrimOps.cmm @@ -1714,21 +1714,17 @@ stg_takeMVarzh ( P_ mvar /* :: MVar a */ ) StgMVarTSOQueue_link(q) = END_TSO_QUEUE; StgMVarTSOQueue_tso(q) = CurrentTSO; SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM); - // Write barrier before we make the new MVAR_TSO_QUEUE - // visible to other cores. - // See Note [Heap memory barriers] - RELEASE_FENCE; if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) { - StgMVar_head(mvar) = q; + %release StgMVar_head(mvar) = q; } else { - StgMVarTSOQueue_link(StgMVar_tail(mvar)) = q; + %release StgMVarTSOQueue_link(StgMVar_tail(mvar)) = q; ccall recordClosureMutated(MyCapability() "ptr", StgMVar_tail(mvar)); } StgTSO__link(CurrentTSO) = q; StgTSO_block_info(CurrentTSO) = mvar; - StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16; + %release StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I32; StgMVar_tail(mvar) = q; jump stg_block_takemvar(mvar); @@ -1769,7 +1765,7 @@ loop: StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; } - ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16); + ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I32); ASSERT(StgTSO_block_info(tso) == mvar); // actually perform the putMVar for the thread that we just woke up @@ -1837,7 +1833,7 @@ loop: StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; } - ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16); + ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I32); ASSERT(StgTSO_block_info(tso) == mvar); // actually perform the putMVar for the thread that we just woke up @@ -1883,19 +1879,17 @@ stg_putMVarzh ( P_ mvar, /* :: MVar a */ StgMVarTSOQueue_tso(q) = CurrentTSO; SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM); - // See Note [Heap memory barriers] - RELEASE_FENCE; if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) { - StgMVar_head(mvar) = q; + %release StgMVar_head(mvar) = q; } else { - StgMVarTSOQueue_link(StgMVar_tail(mvar)) = q; + %release StgMVarTSOQueue_link(StgMVar_tail(mvar)) = q; ccall recordClosureMutated(MyCapability() "ptr", StgMVar_tail(mvar)); } StgTSO__link(CurrentTSO) = q; StgTSO_block_info(CurrentTSO) = mvar; - StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16; + %release StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I32; StgMVar_tail(mvar) = q; jump stg_block_putmvar(mvar,val); @@ -2032,11 +2026,11 @@ loop: } } - ASSERT(StgTSO_block_info(tso) == mvar); // save why_blocked here, because waking up the thread destroys // this information W_ why_blocked; - why_blocked = TO_W_(StgTSO_why_blocked(tso)); + why_blocked = TO_W_(StgTSO_why_blocked(tso)); // TODO: Missing barrier + ASSERT(StgTSO_block_info(tso) == mvar); // actually perform the takeMVar W_ stack; @@ -2092,13 +2086,11 @@ stg_readMVarzh ( P_ mvar, /* :: MVar a */ ) StgMVarTSOQueue_tso(q) = CurrentTSO; SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM); - // See Note [Heap memory barriers] - RELEASE_FENCE; + %release StgMVar_head(mvar) = q; StgTSO__link(CurrentTSO) = q; StgTSO_block_info(CurrentTSO) = mvar; - StgTSO_why_blocked(CurrentTSO) = BlockedOnMVarRead::I16; - StgMVar_head(mvar) = q; + %release StgTSO_why_blocked(CurrentTSO) = BlockedOnMVarRead::I16; if (StgMVar_tail(mvar) == stg_END_TSO_QUEUE_closure) { StgMVar_tail(mvar) = q; @@ -2225,17 +2217,16 @@ stg_readIOPortzh ( P_ ioport /* :: IOPort a */ ) StgMVarTSOQueue_tso(q) = CurrentTSO; SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM); - // See Note [Heap memory barriers] - RELEASE_FENCE; - StgMVar_head(ioport) = q; + %release StgMVar_head(ioport) = q; StgTSO__link(CurrentTSO) = q; StgTSO_block_info(CurrentTSO) = ioport; - StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16; + + // See Note [Heap memory barriers] + %release StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I32; //Unlocks the closure as well jump stg_block_readmvar(ioport); - } //This way we can check of there has been a read already. @@ -2313,11 +2304,11 @@ loop: // next element in the waiting list here, as there can only ever // be one thread blocked on a port. - ASSERT(StgTSO_block_info(tso) == ioport); // save why_blocked here, because waking up the thread destroys // this information W_ why_blocked; - why_blocked = TO_W_(StgTSO_why_blocked(tso)); + why_blocked = TO_W_(StgTSO_why_blocked(tso)); // TODO Missing acquire + ASSERT(StgTSO_block_info(tso) == ioport); // actually perform the takeMVar W_ stack; @@ -2558,9 +2549,9 @@ stg_waitReadzh ( W_ fd ) ccall barf("waitRead# on threaded RTS") never returns; #else - ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); - StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16; + ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I32); StgTSO_block_info(CurrentTSO) = fd; + %release StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I32; // No locking - we're not going to use this interface in the // threaded RTS anyway. ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr"); @@ -2574,9 +2565,9 @@ stg_waitWritezh ( W_ fd ) ccall barf("waitWrite# on threaded RTS") never returns; #else - ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); - StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16; + ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I32); StgTSO_block_info(CurrentTSO) = fd; + %release StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I32; // No locking - we're not going to use this interface in the // threaded RTS anyway. ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr"); @@ -2597,8 +2588,7 @@ stg_delayzh ( W_ us_delay ) ccall barf("delay# on threaded RTS") never returns; #else - ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); - StgTSO_why_blocked(CurrentTSO) = BlockedOnDelay::I16; + ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I32); #if defined(mingw32_HOST_OS) @@ -2615,12 +2605,13 @@ stg_delayzh ( W_ us_delay ) * simplifies matters, so change the status to OnDoProc put the * delayed thread on the blocked_queue. */ - StgTSO_why_blocked(CurrentTSO) = BlockedOnDoProc::I16; + %release StgTSO_why_blocked(CurrentTSO) = BlockedOnDoProc::I32; ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr"); jump stg_block_async_void(); #else + %relaxed StgTSO_why_blocked(CurrentTSO) = BlockedOnDelay::I32; (target) = ccall getDelayTarget(us_delay); StgTSO_block_info(CurrentTSO) = target; @@ -2642,9 +2633,6 @@ stg_asyncReadzh ( W_ fd, W_ is_sock, W_ len, W_ buf ) ccall barf("asyncRead# on threaded RTS") never returns; #else - ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); - StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16; - /* could probably allocate this on the heap instead */ ("ptr" ares) = ccall stgMallocBytes(SIZEOF_StgAsyncIOResult, "stg_asyncReadzh"); @@ -2653,6 +2641,10 @@ stg_asyncReadzh ( W_ fd, W_ is_sock, W_ len, W_ buf ) StgAsyncIOResult_len(ares) = 0; StgAsyncIOResult_errCode(ares) = 0; StgTSO_block_info(CurrentTSO) = ares; + + ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I32); + %release StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I32; + ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr"); jump stg_block_async(); #endif @@ -2667,9 +2659,6 @@ stg_asyncWritezh ( W_ fd, W_ is_sock, W_ len, W_ buf ) ccall barf("asyncWrite# on threaded RTS") never returns; #else - ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); - StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16; - ("ptr" ares) = ccall stgMallocBytes(SIZEOF_StgAsyncIOResult, "stg_asyncWritezh"); (reqID) = ccall addIORequest(fd, 1/*TRUE*/,is_sock,len,buf "ptr"); @@ -2678,6 +2667,10 @@ stg_asyncWritezh ( W_ fd, W_ is_sock, W_ len, W_ buf ) StgAsyncIOResult_len(ares) = 0; StgAsyncIOResult_errCode(ares) = 0; StgTSO_block_info(CurrentTSO) = ares; + + ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I32); + %release StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I32; + ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr"); jump stg_block_async(); #endif @@ -2692,9 +2685,6 @@ stg_asyncDoProczh ( W_ proc, W_ param ) ccall barf("asyncDoProc# on threaded RTS") never returns; #else - ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); - StgTSO_why_blocked(CurrentTSO) = BlockedOnDoProc::I16; - /* could probably allocate this on the heap instead */ ("ptr" ares) = ccall stgMallocBytes(SIZEOF_StgAsyncIOResult, "stg_asyncDoProczh"); @@ -2703,6 +2693,10 @@ stg_asyncDoProczh ( W_ proc, W_ param ) StgAsyncIOResult_len(ares) = 0; StgAsyncIOResult_errCode(ares) = 0; StgTSO_block_info(CurrentTSO) = ares; + + ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I32); + %release StgTSO_why_blocked(CurrentTSO) = BlockedOnDoProc::I32; + ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr"); jump stg_block_async(); #endif diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c index 9c3e6dd853fd..6601a929c657 100644 --- a/rts/RaiseAsync.c +++ b/rts/RaiseAsync.c @@ -266,7 +266,7 @@ check_target: return THROWTO_BLOCKED; } - status = target->why_blocked; + status = ACQUIRE_LOAD(&target->why_blocked); switch (status) { case NotBlocked: @@ -728,7 +728,7 @@ removeFromQueues(Capability *cap, StgTSO *tso) } done: - tso->why_blocked = NotBlocked; + RELAXED_STORE(&tso->why_blocked, NotBlocked); appendToRunQueue(cap, tso); } diff --git a/rts/STM.c b/rts/STM.c index 105e0992038a..805091303389 100644 --- a/rts/STM.c +++ b/rts/STM.c @@ -340,8 +340,8 @@ static StgBool cond_lock_tvar(Capability *cap, static void park_tso(StgTSO *tso) { ASSERT(tso -> why_blocked == NotBlocked); - tso -> why_blocked = BlockedOnSTM; tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE; + RELEASE_STORE(&tso -> why_blocked, BlockedOnSTM); TRACE("park_tso on tso=%p", tso); } diff --git a/rts/Schedule.c b/rts/Schedule.c index 0f10b5170ef4..65ce9b2a6496 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -512,7 +512,8 @@ run_thread: #endif if (ret == ThreadBlocked) { - if (t->why_blocked == BlockedOnBlackHole) { + uint16_t why_blocked = ACQUIRE_LOAD(&t->why_blocked); + if (why_blocked == BlockedOnBlackHole) { StgTSO *owner = blackHoleOwner(t->block_info.bh->bh); traceEventStopThread(cap, t, t->why_blocked + 6, owner != NULL ? owner->id : 0); diff --git a/rts/StgMiscClosures.cmm b/rts/StgMiscClosures.cmm index bd899d27711e..80dc84692c54 100644 --- a/rts/StgMiscClosures.cmm +++ b/rts/StgMiscClosures.cmm @@ -606,8 +606,8 @@ retry: if (r == 0) { goto retry; } else { - StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16; StgTSO_block_info(CurrentTSO) = msg; + %release StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16; jump stg_block_blackhole(node); } } diff --git a/rts/Threads.c b/rts/Threads.c index fc6544ad358a..ef1f5c790af4 100644 --- a/rts/Threads.c +++ b/rts/Threads.c @@ -94,8 +94,8 @@ createThread(Capability *cap, W_ size) // Always start with the compiled code evaluator tso->what_next = ThreadRunGHC; - tso->why_blocked = NotBlocked; tso->block_info.closure = (StgClosure *)END_TSO_QUEUE; + tso->why_blocked = NotBlocked; tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE; tso->bq = (StgBlockingQueue *)END_TSO_QUEUE; tso->flags = 0; @@ -286,7 +286,7 @@ tryWakeupThread (Capability *cap, StgTSO *tso) } #endif - switch (tso->why_blocked) + switch (ACQUIRE_LOAD(&tso->why_blocked)) { case BlockedOnMVar: case BlockedOnMVarRead: @@ -826,10 +826,11 @@ loop: } } - ASSERT(tso->block_info.closure == (StgClosure*)mvar); // save why_blocked here, because waking up the thread destroys // this information - StgWord why_blocked = RELAXED_LOAD(&tso->why_blocked); + StgWord why_blocked = ACQUIRE_LOAD(&tso->why_blocked); + ASSERT(why_blocked == BlockedOnMVarRead || why_blocked == BlockedOnMVar); + ASSERT(tso->block_info.closure == (StgClosure*)mvar); // actually perform the takeMVar StgStack* stack = tso->stackobj; @@ -903,7 +904,7 @@ StgMutArrPtrs *listThreads(Capability *cap) void printThreadBlockage(StgTSO *tso) { - switch (tso->why_blocked) { + switch (ACQUIRE_LOAD(&tso->why_blocked)) { #if defined(mingw32_HOST_OS) case BlockedOnDoProc: debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID); diff --git a/rts/TraverseHeap.c b/rts/TraverseHeap.c index e3d3c8390cae..ed26e60948e9 100644 --- a/rts/TraverseHeap.c +++ b/rts/TraverseHeap.c @@ -1239,7 +1239,7 @@ inner_loop: traversePushClosure(ts, (StgClosure *) tso->blocked_exceptions, c, sep, child_data); traversePushClosure(ts, (StgClosure *) tso->bq, c, sep, child_data); traversePushClosure(ts, (StgClosure *) tso->trec, c, sep, child_data); - switch (tso->why_blocked) { + switch (ACQUIRE_LOAD(&tso->why_blocked)) { case BlockedOnMVar: case BlockedOnMVarRead: case BlockedOnBlackHole: diff --git a/rts/include/rts/storage/TSO.h b/rts/include/rts/storage/TSO.h index 4ca19853d789..c65b97be8514 100644 --- a/rts/include/rts/storage/TSO.h +++ b/rts/include/rts/storage/TSO.h @@ -126,9 +126,14 @@ typedef struct StgTSO_ { */ StgWord16 what_next; // Values defined in Constants.h - StgWord16 why_blocked; // Values defined in Constants.h StgWord32 flags; // Values defined in Constants.h - StgTSOBlockInfo block_info; + + /* + * N.B. why_blocked only has a handful of values but must be atomically + * updated; the smallest width which AArch64 supports for is 32-bits. + */ + StgWord32 why_blocked; // Values defined in Constants.h + StgTSOBlockInfo block_info; // Barrier provided by why_blocked StgThreadID id; StgWord32 saved_errno; StgWord32 dirty; /* non-zero => dirty */ diff --git a/rts/include/stg/SMP.h b/rts/include/stg/SMP.h index 897d19665999..d7216386e3d6 100644 --- a/rts/include/stg/SMP.h +++ b/rts/include/stg/SMP.h @@ -177,6 +177,7 @@ EXTERN_INLINE void busy_wait_nop(void); * - StgSmallMutArrPtrs: payload * - StgThunk although this is a somewhat special case; see below * - StgInd: indirectee + * - StgTSO: block_info * * Finally, non-pointer fields can be safely mutated without barriers as * they do not refer to other memory locations. Technically, concurrent @@ -305,6 +306,14 @@ EXTERN_INLINE void busy_wait_nop(void); * the capability-local mut_list. Consequently this does not require any memory * barrier. * + * Barriers in thread blocking + * --------------------------- + * When a thread blocks (e.g. on an MVar) it will typically allocate a heap object + * to record its blocked-ness (e.g. a StgMVarTSOQueue), expose this via + * StgTSO.block_info, and update StgTSO.why_blocked to record the reason for + * its blocking. The visibility of the block_info is guaranteed by the ordering + * of the why_blocked update. + * * Barriers in thread migration * ---------------------------- * When a thread is migrated from one capability to another we must take care diff --git a/rts/posix/Select.c b/rts/posix/Select.c index 89a46fd76354..848739a90429 100644 --- a/rts/posix/Select.c +++ b/rts/posix/Select.c @@ -105,7 +105,7 @@ static bool wakeUpSleepingThreads (Capability *cap, LowResTime now) break; } iomgr->sleeping_queue = tso->_link; - tso->why_blocked = NotBlocked; + RELAXED_STORE(&tso->why_blocked, NotBlocked); tso->_link = END_TSO_QUEUE; IF_DEBUG(scheduler, debugBelch("Waking up sleeping thread %" FMT_StgThreadID "\n", tso->id)); @@ -268,7 +268,7 @@ awaitEvent(Capability *cap, bool wait) * So the (int) cast should be removed across the code base once * GHC requires a version of FreeBSD that has that change in it. */ - switch (tso->why_blocked) { + switch (ACQUIRE_LOAD(&tso->why_blocked)) { case BlockedOnRead: { int fd = tso->block_info.fd; diff --git a/rts/sm/Compact.c b/rts/sm/Compact.c index 92ac86779ac9..c1b445ac88ee 100644 --- a/rts/sm/Compact.c +++ b/rts/sm/Compact.c @@ -463,7 +463,7 @@ thread_TSO (StgTSO *tso) thread_(&tso->_link); thread_(&tso->global_link); - switch (tso->why_blocked) { + switch (ACQUIRE_LOAD(&tso->why_blocked)) { case BlockedOnMVar: case BlockedOnMVarRead: case BlockedOnBlackHole: diff --git a/rts/sm/NonMovingMark.c b/rts/sm/NonMovingMark.c index cfa506303695..9d633c345e8a 100644 --- a/rts/sm/NonMovingMark.c +++ b/rts/sm/NonMovingMark.c @@ -1052,7 +1052,7 @@ trace_tso (MarkQueue *queue, StgTSO *tso) if (tso->label != NULL) { markQueuePushClosure_(queue, (StgClosure *) tso->label); } - switch (tso->why_blocked) { + switch (ACQUIRE_LOAD(&tso->why_blocked)) { case BlockedOnMVar: case BlockedOnMVarRead: case BlockedOnBlackHole: diff --git a/rts/sm/Scav.c b/rts/sm/Scav.c index 29012f8ed3dd..a09130a0912a 100644 --- a/rts/sm/Scav.c +++ b/rts/sm/Scav.c @@ -137,7 +137,7 @@ scavengeTSO (StgTSO *tso) evacuate((StgClosure **)&tso->label); } - switch (tso->why_blocked) { + switch (ACQUIRE_LOAD(&tso->why_blocked)) { case BlockedOnMVar: case BlockedOnMVarRead: case BlockedOnBlackHole: diff --git a/rts/win32/AsyncMIO.c b/rts/win32/AsyncMIO.c index 32465d682a52..a581c555abe9 100644 --- a/rts/win32/AsyncMIO.c +++ b/rts/win32/AsyncMIO.c @@ -294,7 +294,7 @@ start: for(tso = iomgr->blocked_queue_hd; tso != END_TSO_QUEUE; tso = tso->_link) { - switch(tso->why_blocked) { + switch(ACQUIRE_LOAD(&tso->why_blocked)) { case BlockedOnRead: case BlockedOnWrite: case BlockedOnDoProc: -- GitLab