Commit 70e20631 authored by Edward Z. Yang's avatar Edward Z. Yang

Implement atomicReadMVar, fixing #4001.

We add the invariant to the MVar blocked threads queue that
threads blocked on an atomic read are always at the front of
the queue.  This invariant is easy to maintain, since takers
are only ever added to the end of the queue.
Signed-off-by: Edward Z. Yang's avatarEdward Z. Yang <ezyang@mit.edu>
parent ca9a4314
......@@ -1717,6 +1717,15 @@ primop TryPutMVarOp "tryPutMVar#" GenPrimOp
out_of_line = True
has_side_effects = True
primop AtomicReadMVarOp "atomicReadMVar#" GenPrimOp
MVar# s a -> State# s -> (# State# s, a #)
{If {\tt MVar\#} is empty, block until it becomes full.
Then read its contents without modifying the MVar, without possibility
of intervention from other threads.}
with
out_of_line = True
has_side_effects = True
primop SameMVarOp "sameMVar#" GenPrimOp
MVar# s a -> MVar# s a -> Bool
......
......@@ -202,31 +202,32 @@
*/
#define NotBlocked 0
#define BlockedOnMVar 1
#define BlockedOnBlackHole 2
#define BlockedOnRead 3
#define BlockedOnWrite 4
#define BlockedOnDelay 5
#define BlockedOnSTM 6
#define BlockedOnMVarRead 2
#define BlockedOnBlackHole 3
#define BlockedOnRead 4
#define BlockedOnWrite 5
#define BlockedOnDelay 6
#define BlockedOnSTM 7
/* Win32 only: */
#define BlockedOnDoProc 7
#define BlockedOnDoProc 8
/* Only relevant for PAR: */
/* blocked on a remote closure represented by a Global Address: */
#define BlockedOnGA 8
#define BlockedOnGA 9
/* same as above but without sending a Fetch message */
#define BlockedOnGA_NoSend 9
#define BlockedOnGA_NoSend 10
/* Only relevant for THREADED_RTS: */
#define BlockedOnCCall 10
#define BlockedOnCCall_Interruptible 11
#define BlockedOnCCall 11
#define BlockedOnCCall_Interruptible 12
/* same as above but permit killing the worker thread */
/* Involved in a message sent to tso->msg_cap */
#define BlockedOnMsgThrowTo 12
#define BlockedOnMsgThrowTo 13
/* The thread is not on any run queues, but can be woken up
by tryWakeupThread() */
#define ThreadMigrating 13
#define ThreadMigrating 14
/*
* These constants are returned to the scheduler by a thread that has
......
......@@ -293,7 +293,9 @@ RTS_FUN_DECL(stg_block_noregs);
RTS_FUN_DECL(stg_block_blackhole);
RTS_FUN_DECL(stg_block_blackhole_finally);
RTS_FUN_DECL(stg_block_takemvar);
RTS_FUN_DECL(stg_block_atomicreadmvar);
RTS_RET(stg_block_takemvar);
RTS_RET(stg_block_atomicreadmvar);
RTS_FUN_DECL(stg_block_putmvar);
RTS_RET(stg_block_putmvar);
#ifdef mingw32_HOST_OS
......@@ -376,6 +378,7 @@ RTS_FUN_DECL(stg_isEmptyMVarzh);
RTS_FUN_DECL(stg_newMVarzh);
RTS_FUN_DECL(stg_takeMVarzh);
RTS_FUN_DECL(stg_putMVarzh);
RTS_FUN_DECL(stg_atomicReadMVarzh);
RTS_FUN_DECL(stg_tryTakeMVarzh);
RTS_FUN_DECL(stg_tryPutMVarzh);
......
......@@ -487,11 +487,11 @@ stg_block_noregs
/* -----------------------------------------------------------------------------
* takeMVar/putMVar-specific blocks
*
* Stack layout for a thread blocked in takeMVar:
* Stack layout for a thread blocked in takeMVar/atomicReadMVar:
*
* ret. addr
* ptr to MVar (R1)
* stg_block_takemvar_info
* stg_block_takemvar_info (or stg_block_readmvar_info)
*
* Stack layout for a thread blocked in putMVar:
*
......@@ -531,6 +531,33 @@ stg_block_takemvar /* mvar passed in R1 */
BLOCK_BUT_FIRST(stg_block_takemvar_finally);
}
INFO_TABLE_RET ( stg_block_atomicreadmvar, RET_SMALL, W_ info_ptr, P_ mvar )
return ()
{
jump stg_atomicReadMVarzh(mvar);
}
// code fragment executed just before we return to the scheduler
stg_block_atomicreadmvar_finally
{
W_ r1, r3;
r1 = R1;
r3 = R3;
unlockClosure(R3, stg_MVAR_DIRTY_info);
R1 = r1;
R3 = r3;
jump StgReturn [R1];
}
stg_block_atomicreadmvar /* mvar passed in R1 */
{
Sp_adj(-2);
Sp(1) = R1;
Sp(0) = stg_block_atomicreadmvar_info;
R3 = R1; // mvar communicated to stg_block_atomicreadmvar_finally in R3
BLOCK_BUT_FIRST(stg_block_atomicreadmvar_finally);
}
INFO_TABLE_RET( stg_block_putmvar, RET_SMALL, W_ info_ptr,
P_ mvar, P_ val )
return ()
......
......@@ -1058,6 +1058,7 @@ typedef struct _RtsSymbolVal {
SymI_HasProto(stg_yield_to_interpreter) \
SymI_HasProto(stg_block_noregs) \
SymI_HasProto(stg_block_takemvar) \
SymI_HasProto(stg_block_atomicreadmvar) \
SymI_HasProto(stg_block_putmvar) \
MAIN_CAP_SYM \
SymI_HasProto(MallocFailHook) \
......@@ -1314,6 +1315,7 @@ typedef struct _RtsSymbolVal {
SymI_HasProto(stg_bh_upd_frame_info) \
SymI_HasProto(suspendThread) \
SymI_HasProto(stg_takeMVarzh) \
SymI_HasProto(stg_atomicReadMVarzh) \
SymI_HasProto(stg_threadStatuszh) \
SymI_HasProto(stg_tryPutMVarzh) \
SymI_HasProto(stg_tryTakeMVarzh) \
......
......@@ -1433,7 +1433,7 @@ loop:
goto loop;
}
// There are takeMVar(s) waiting: wake up the first one
// There are atomicReadMVar/takeMVar(s) waiting: wake up the first one
tso = StgMVarTSOQueue_tso(q);
StgMVar_head(mvar) = StgMVarTSOQueue_link(q);
......@@ -1441,8 +1441,11 @@ loop:
StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
}
ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
ASSERT(StgTSO_block_info(tso) == mvar);
// save why_blocked here, because waking up the thread destroys
// this information
W_ why_blocked;
why_blocked = TO_W_(StgTSO_why_blocked(tso));
// actually perform the takeMVar
W_ stack;
......@@ -1458,6 +1461,15 @@ loop:
ccall tryWakeupThread(MyCapability() "ptr", tso);
// If it was an atomicReadMVar, then we can still do work,
// so loop back. (XXX: This could take a while)
if (why_blocked == BlockedOnMVarRead) {
q = StgMVarTSOQueue_link(q);
goto loop;
}
ASSERT(why_blocked == BlockedOnMVar);
unlockClosure(mvar, info);
return ();
}
......@@ -1512,8 +1524,11 @@ loop:
StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
}
ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
ASSERT(StgTSO_block_info(tso) == mvar);
// save why_blocked here, because waking up the thread destroys
// this information
W_ why_blocked;
why_blocked = TO_W_(StgTSO_why_blocked(tso));
// actually perform the takeMVar
W_ stack;
......@@ -1529,10 +1544,68 @@ loop:
ccall tryWakeupThread(MyCapability() "ptr", tso);
// If it was an atomicReadMVar, then we can still do work,
// so loop back. (XXX: This could take a while)
if (why_blocked == BlockedOnMVarRead) {
q = StgMVarTSOQueue_link(q);
goto loop;
}
ASSERT(why_blocked == BlockedOnMVar);
unlockClosure(mvar, info);
return (1);
}
stg_atomicReadMVarzh ( P_ mvar, /* :: MVar a */ )
{
W_ val, info, tso, q;
#if defined(THREADED_RTS)
("ptr" info) = ccall lockClosure(mvar "ptr");
#else
info = GET_INFO(mvar);
#endif
if (info == stg_MVAR_CLEAN_info) {
ccall dirty_MVAR(BaseReg "ptr", mvar "ptr");
}
/* If the MVar is empty, put ourselves on the blocked readers
* list and wait until we're woken up.
*/
if (StgMVar_value(mvar) == stg_END_TSO_QUEUE_closure) {
ALLOC_PRIM_WITH_CUSTOM_FAILURE
(SIZEOF_StgMVarTSOQueue,
unlockClosure(mvar, stg_MVAR_DIRTY_info);
GC_PRIM_P(stg_atomicReadMVarzh, mvar));
q = Hp - SIZEOF_StgMVarTSOQueue + WDS(1);
// readMVars are pushed to the front of the queue, so
// they get handled immediately
SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
StgMVarTSOQueue_link(q) = StgMVar_head(mvar);
StgMVarTSOQueue_tso(q) = CurrentTSO;
StgTSO__link(CurrentTSO) = q;
StgTSO_block_info(CurrentTSO) = mvar;
StgTSO_why_blocked(CurrentTSO) = BlockedOnMVarRead::I16;
StgMVar_head(mvar) = q;
if (StgMVar_tail(mvar) == stg_END_TSO_QUEUE_closure) {
StgMVar_tail(mvar) = q;
}
jump stg_block_atomicreadmvar(mvar);
}
val = StgMVar_value(mvar);
unlockClosure(mvar, stg_MVAR_DIRTY_info);
return (val);
}
/* -----------------------------------------------------------------------------
Stable pointer primitives
......
......@@ -294,6 +294,7 @@ check_target:
}
case BlockedOnMVar:
case BlockedOnMVarRead:
{
/*
To establish ownership of this TSO, we need to acquire a
......@@ -318,7 +319,7 @@ check_target:
// we have the MVar, let's check whether the thread
// is still blocked on the same MVar.
if (target->why_blocked != BlockedOnMVar
if ((target->why_blocked != BlockedOnMVar && target->why_blocked != BlockedOnMVarRead)
|| (StgMVar *)target->block_info.closure != mvar) {
unlockClosure((StgClosure *)mvar, info);
goto retry;
......@@ -637,6 +638,7 @@ removeFromQueues(Capability *cap, StgTSO *tso)
goto done;
case BlockedOnMVar:
case BlockedOnMVarRead:
removeFromMVarBlockedQueue(tso);
goto done;
......
......@@ -49,6 +49,7 @@ interruptible(StgTSO *t)
{
switch (t->why_blocked) {
case BlockedOnMVar:
case BlockedOnMVarRead:
case BlockedOnMsgThrowTo:
case BlockedOnRead:
case BlockedOnWrite:
......
......@@ -1672,6 +1672,7 @@ inner_loop:
retainClosure(tso->bq, c, c_child_r);
retainClosure(tso->trec, c, c_child_r);
if ( tso->why_blocked == BlockedOnMVar
|| tso->why_blocked == BlockedOnMVarRead
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgThrowTo
) {
......
......@@ -947,6 +947,7 @@ scheduleDetectDeadlock (Capability **pcap, Task *task)
case BlockedOnBlackHole:
case BlockedOnMsgThrowTo:
case BlockedOnMVar:
case BlockedOnMVarRead:
throwToSingleThreaded(cap, task->incall->tso,
(StgClosure *)nonTermination_closure);
return;
......@@ -2843,6 +2844,7 @@ resurrectThreads (StgTSO *threads)
switch (tso->why_blocked) {
case BlockedOnMVar:
case BlockedOnMVarRead:
/* Called by GC - sched_mutex lock is currently held. */
throwToSingleThreaded(cap, tso,
(StgClosure *)blockedIndefinitelyOnMVar_closure);
......
......@@ -255,6 +255,7 @@ tryWakeupThread (Capability *cap, StgTSO *tso)
switch (tso->why_blocked)
{
case BlockedOnMVar:
case BlockedOnMVarRead:
{
if (tso->_link == END_TSO_QUEUE) {
tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
......@@ -734,6 +735,9 @@ printThreadBlockage(StgTSO *tso)
case BlockedOnMVar:
debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
break;
case BlockedOnMVarRead:
debugBelch("is blocked on atomic MVar read @ %p", tso->block_info.closure);
break;
case BlockedOnBlackHole:
debugBelch("is blocked on a black hole %p",
((StgBlockingQueue*)tso->block_info.bh->bh));
......
......@@ -179,6 +179,7 @@ static char *thread_stop_reasons[] = {
[ThreadFinished] = "finished",
[THREAD_SUSPENDED_FOREIGN_CALL] = "suspended while making a foreign call",
[6 + BlockedOnMVar] = "blocked on an MVar",
[6 + BlockedOnMVarRead] = "blocked on an atomic MVar read",
[6 + BlockedOnBlackHole] = "blocked on a black hole",
[6 + BlockedOnRead] = "blocked on a read operation",
[6 + BlockedOnWrite] = "blocked on a write operation",
......
......@@ -442,6 +442,7 @@ thread_TSO (StgTSO *tso)
thread_(&tso->global_link);
if ( tso->why_blocked == BlockedOnMVar
|| tso->why_blocked == BlockedOnMVarRead
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgThrowTo
|| tso->why_blocked == NotBlocked
......
......@@ -519,6 +519,7 @@ checkTSO(StgTSO *tso)
info == &stg_WHITEHOLE_info); // happens due to STM doing lockTSO()
if ( tso->why_blocked == BlockedOnMVar
|| tso->why_blocked == BlockedOnMVarRead
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgThrowTo
|| tso->why_blocked == NotBlocked
......
......@@ -71,6 +71,7 @@ scavengeTSO (StgTSO *tso)
evacuate((StgClosure **)&tso->_link);
if ( tso->why_blocked == BlockedOnMVar
|| tso->why_blocked == BlockedOnMVarRead
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgThrowTo
|| tso->why_blocked == NotBlocked
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment