Commit 7408b392 authored by Simon Marlow's avatar Simon Marlow

Use message-passing to implement throwTo in the RTS

This replaces some complicated locking schemes with message-passing
in the implementation of throwTo. The benefits are

 - previously it was impossible to guarantee that a throwTo from
   a thread running on one CPU to a thread running on another CPU
   would be noticed, and we had to rely on the GC to pick up these
   forgotten exceptions. This no longer happens.

 - the locking regime is simpler (though the code is about the same
   size)

 - threads can be unblocked from a blocked_exceptions queue without
   having to traverse the whole queue now.  It's a rare case, but
   replaces an O(n) operation with an O(1).

 - generally we move in the direction of sharing less between
   Capabilities (aka HECs), which will become important with other
   changes we have planned.

Also in this patch I replaced several STM-specific closure types with
a generic MUT_PRIM closure type, which allowed a lot of code in the GC
and other places to go away, hence the line-count reduction.  The
message-passing changes resulted in about a net zero line-count
difference.
parent 12cfec94
......@@ -208,25 +208,27 @@
#define NotBlocked 0
#define BlockedOnMVar 1
#define BlockedOnBlackHole 2
#define BlockedOnException 3
#define BlockedOnRead 4
#define BlockedOnWrite 5
#define BlockedOnDelay 6
#define BlockedOnSTM 7
#define BlockedOnRead 3
#define BlockedOnWrite 4
#define BlockedOnDelay 5
#define BlockedOnSTM 6
/* Win32 only: */
#define BlockedOnDoProc 8
#define BlockedOnDoProc 7
/* Only relevant for PAR: */
/* blocked on a remote closure represented by a Global Address: */
#define BlockedOnGA 9
#define BlockedOnGA 8
/* same as above but without sending a Fetch message */
#define BlockedOnGA_NoSend 10
#define BlockedOnGA_NoSend 9
/* Only relevant for THREADED_RTS: */
#define BlockedOnCCall 11
#define BlockedOnCCall_NoUnblockExc 12
#define BlockedOnCCall 10
#define BlockedOnCCall_NoUnblockExc 11
/* same as above but don't unblock async exceptions in resumeThread() */
/* Involved in a message sent to tso->msg_cap */
#define BlockedOnMsgWakeup 12
#define BlockedOnMsgThrowTo 13
/*
* These constants are returned to the scheduler by a thread that has
* stopped for one reason or another. See typedef StgThreadReturnCode
......
......@@ -335,18 +335,8 @@ closure_sizeW_ (StgClosure *p, StgInfoTable *info)
return tso_sizeW((StgTSO *)p);
case BCO:
return bco_sizeW((StgBCO *)p);
case TVAR_WATCH_QUEUE:
return sizeofW(StgTVarWatchQueue);
case TVAR:
return sizeofW(StgTVar);
case TREC_CHUNK:
return sizeofW(StgTRecChunk);
case TREC_HEADER:
return sizeofW(StgTRecHeader);
case ATOMIC_INVARIANT:
return sizeofW(StgAtomicInvariant);
case INVARIANT_CHECK_QUEUE:
return sizeofW(StgInvariantCheckQueue);
default:
return sizeW_fromITBL(info);
}
......
......@@ -74,18 +74,14 @@
#define MUT_VAR_CLEAN 50
#define MUT_VAR_DIRTY 51
#define WEAK 52
#define STABLE_NAME 53
#define TSO 54
#define TVAR_WATCH_QUEUE 55
#define INVARIANT_CHECK_QUEUE 56
#define ATOMIC_INVARIANT 57
#define TVAR 58
#define TREC_CHUNK 59
#define TREC_HEADER 60
#define ATOMICALLY_FRAME 61
#define CATCH_RETRY_FRAME 62
#define CATCH_STM_FRAME 63
#define WHITEHOLE 64
#define N_CLOSURE_TYPES 65
#define PRIM 53
#define MUT_PRIM 54
#define TSO 55
#define TREC_CHUNK 56
#define ATOMICALLY_FRAME 57
#define CATCH_RETRY_FRAME 58
#define CATCH_STM_FRAME 59
#define WHITEHOLE 60
#define N_CLOSURE_TYPES 61
#endif /* RTS_STORAGE_CLOSURETYPES_H */
......@@ -390,10 +390,10 @@ typedef struct StgInvariantCheckQueue_ {
struct StgTRecHeader_ {
StgHeader header;
TRecState state;
struct StgTRecHeader_ *enclosing_trec;
StgTRecChunk *current_chunk;
StgInvariantCheckQueue *invariants_to_check;
TRecState state;
};
typedef struct {
......@@ -416,4 +416,27 @@ typedef struct {
StgClosure *alt_code;
} StgCatchRetryFrame;
/* ----------------------------------------------------------------------------
Messages
------------------------------------------------------------------------- */
typedef struct Message_ {
StgHeader header;
struct Message_ *link;
} Message;
typedef struct MessageWakeup_ {
StgHeader header;
Message *link;
StgTSO *tso;
} MessageWakeup;
typedef struct MessageThrowTo_ {
StgHeader header;
Message *link;
StgTSO *source;
StgTSO *target;
StgClosure *exception;
} MessageThrowTo;
#endif /* RTS_STORAGE_CLOSURES_H */
......@@ -18,6 +18,7 @@
#else
EXTERN_INLINE StgInfoTable *lockClosure(StgClosure *p);
EXTERN_INLINE StgInfoTable *tryLockClosure(StgClosure *p);
EXTERN_INLINE void unlockClosure(StgClosure *p, const StgInfoTable *info);
#if defined(THREADED_RTS)
......@@ -43,11 +44,15 @@ EXTERN_INLINE StgInfoTable *lockClosure(StgClosure *p)
} while (1);
}
EXTERN_INLINE void unlockClosure(StgClosure *p, const StgInfoTable *info)
EXTERN_INLINE StgInfoTable *tryLockClosure(StgClosure *p)
{
// This is a strictly ordered write, so we need a write_barrier():
write_barrier();
p->header.info = info;
StgWord info;
info = xchg((P_)(void *)&p->header.info, (W_)&stg_WHITEHOLE_info);
if (info != (W_)&stg_WHITEHOLE_info) {
return (StgInfoTable *)info;
} else {
return NULL;
}
}
#else /* !THREADED_RTS */
......@@ -56,12 +61,19 @@ EXTERN_INLINE StgInfoTable *
lockClosure(StgClosure *p)
{ return (StgInfoTable *)p->header.info; }
EXTERN_INLINE void
unlockClosure(StgClosure *p STG_UNUSED, const StgInfoTable *info STG_UNUSED)
{ /* nothing */ }
EXTERN_INLINE StgInfoTable *
tryLockClosure(StgClosure *p)
{ return (StgInfoTable *)p->header.info; }
#endif /* THREADED_RTS */
EXTERN_INLINE void unlockClosure(StgClosure *p, const StgInfoTable *info)
{
// This is a strictly ordered write, so we need a write_barrier():
write_barrier();
p->header.info = info;
}
// Handy specialised versions of lockClosure()/unlockClosure()
EXTERN_INLINE void lockTSO(StgTSO *tso);
EXTERN_INLINE void lockTSO(StgTSO *tso)
......
......@@ -46,7 +46,8 @@ typedef struct {
/* Reason for thread being blocked. See comment above struct StgTso_. */
typedef union {
StgClosure *closure;
struct StgTSO_ *tso;
struct MessageThrowTo_ *throwto;
struct MessageWakeup_ *wakeup;
StgInt fd; /* StgInt instead of int, so that it's the same size as the ptrs */
#if defined(mingw32_HOST_OS)
StgAsyncIOResult *async_result;
......@@ -87,7 +88,8 @@ typedef struct StgTSO_ {
will already be dirty.
*/
struct StgTSO_* global_link; /* Links all threads together */
struct StgTSO_* global_link; // Links threads on the
// generation->threads lists
StgWord dirty; /* non-zero => dirty */
/*
......@@ -108,9 +110,9 @@ typedef struct StgTSO_ {
* setTSOLink().
*/
StgWord16 what_next; /* Values defined in Constants.h */
StgWord16 why_blocked; /* Values defined in Constants.h */
StgWord32 flags;
StgWord16 what_next; // Values defined in Constants.h
StgWord16 why_blocked; // Values defined in Constants.h
StgWord32 flags; // Values defined in Constants.h
StgTSOBlockInfo block_info;
StgThreadID id;
int saved_errno;
......@@ -123,7 +125,7 @@ typedef struct StgTSO_ {
exceptions. In order to access this field, the TSO must be
locked using lockClosure/unlockClosure (see SMP.h).
*/
struct StgTSO_ * blocked_exceptions;
struct MessageThrowTo_ * blocked_exceptions;
#ifdef TICKY_TICKY
/* TICKY-specific stuff would go here. */
......@@ -167,7 +169,7 @@ void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target);
tso->why_blocked tso->block_info location
----------------------------------------------------------------------
NotBlocked NULL runnable_queue, or running
NotBlocked END_TSO_QUEUE runnable_queue, or running
BlockedOnBlackHole the BLACKHOLE blackhole_queue
......@@ -175,7 +177,7 @@ void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target);
BlockedOnSTM END_TSO_QUEUE STM wait queue(s)
BlockedOnException the TSO TSO->blocked_exception
BlockedOnMsgThrowTo MessageThrowTo * TSO->blocked_exception
BlockedOnRead NULL blocked_queue
BlockedOnWrite NULL blocked_queue
......@@ -189,7 +191,6 @@ void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target);
tso->what_next == ThreadComplete or ThreadKilled
tso->link == (could be on some queue somewhere)
tso->su == tso->stack + tso->stack_size
tso->sp == tso->stack + tso->stack_size - 1 (i.e. top stack word)
tso->sp[0] == return value of thread, if what_next == ThreadComplete,
exception , if what_next == ThreadKilled
......
......@@ -114,6 +114,8 @@ RTS_INFO(stg_MUT_ARR_PTRS_FROZEN0_info);
RTS_INFO(stg_MUT_VAR_CLEAN_info);
RTS_INFO(stg_MUT_VAR_DIRTY_info);
RTS_INFO(stg_END_TSO_QUEUE_info);
RTS_INFO(stg_MSG_WAKEUP_info);
RTS_INFO(stg_MSG_THROWTO_info);
RTS_INFO(stg_MUT_CONS_info);
RTS_INFO(stg_catch_info);
RTS_INFO(stg_PAP_info);
......@@ -163,6 +165,8 @@ RTS_ENTRY(stg_MUT_ARR_PTRS_FROZEN0_entry);
RTS_ENTRY(stg_MUT_VAR_CLEAN_entry);
RTS_ENTRY(stg_MUT_VAR_DIRTY_entry);
RTS_ENTRY(stg_END_TSO_QUEUE_entry);
RTS_ENTRY(stg_MSG_WAKEUP_entry);
RTS_ENTRY(stg_MSG_THROWTO_entry);
RTS_ENTRY(stg_MUT_CONS_entry);
RTS_ENTRY(stg_catch_entry);
RTS_ENTRY(stg_PAP_entry);
......@@ -205,8 +209,6 @@ RTS_CLOSURE(stg_END_STM_CHUNK_LIST_closure);
RTS_CLOSURE(stg_NO_TREC_closure);
RTS_ENTRY(stg_NO_FINALIZER_entry);
RTS_ENTRY(stg_END_EXCEPTION_LIST_entry);
RTS_ENTRY(stg_EXCEPTION_CONS_entry);
#if IN_STG_CODE
extern DLL_IMPORT_RTS StgWordArray stg_CHARLIKE_closure;
......
......@@ -223,8 +223,7 @@ initCapability( Capability *cap, nat i )
cap->suspended_ccalls = NULL;
cap->returning_tasks_hd = NULL;
cap->returning_tasks_tl = NULL;
cap->wakeup_queue_hd = END_TSO_QUEUE;
cap->wakeup_queue_tl = END_TSO_QUEUE;
cap->inbox = (Message*)END_TSO_QUEUE;
cap->sparks_created = 0;
cap->sparks_converted = 0;
cap->sparks_pruned = 0;
......@@ -419,7 +418,7 @@ releaseCapability_ (Capability* cap,
// If we have an unbound thread on the run queue, or if there's
// anything else to do, give the Capability to a worker thread.
if (always_wakeup ||
!emptyRunQueue(cap) || !emptyWakeupQueue(cap) ||
!emptyRunQueue(cap) || !emptyInbox(cap) ||
!emptySparkPoolCap(cap) || globalWorkToDo()) {
if (cap->spare_workers) {
giveCapabilityToTask(cap,cap->spare_workers);
......@@ -645,11 +644,11 @@ yieldCapability (Capability** pCap, Task *task)
* ------------------------------------------------------------------------- */
void
wakeupThreadOnCapability (Capability *my_cap,
wakeupThreadOnCapability (Capability *cap,
Capability *other_cap,
StgTSO *tso)
{
ACQUIRE_LOCK(&other_cap->lock);
MessageWakeup *msg;
// ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
if (tso->bound) {
......@@ -658,27 +657,20 @@ wakeupThreadOnCapability (Capability *my_cap,
}
tso->cap = other_cap;
ASSERT(tso->bound ? tso->bound->task->cap == other_cap : 1);
ASSERT(tso->why_blocked != BlockedOnMsgWakeup ||
tso->block_info.closure->header.info == &stg_IND_info);
if (other_cap->running_task == NULL) {
// nobody is running this Capability, we can add our thread
// directly onto the run queue and start up a Task to run it.
ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
other_cap->running_task = myTask();
// precond for releaseCapability_() and appendToRunQueue()
msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
msg->header.info = &stg_MSG_WAKEUP_info;
msg->tso = tso;
tso->block_info.closure = (StgClosure *)msg;
dirty_TSO(cap, tso);
write_barrier();
tso->why_blocked = BlockedOnMsgWakeup;
appendToRunQueue(other_cap,tso);
releaseCapability_(other_cap,rtsFalse);
} else {
appendToWakeupQueue(my_cap,other_cap,tso);
other_cap->context_switch = 1;
// someone is running on this Capability, so it cannot be
// freed without first checking the wakeup queue (see
// releaseCapability_).
}
RELEASE_LOCK(&other_cap->lock);
sendMessage(other_cap, (Message*)msg);
}
/* ----------------------------------------------------------------------------
......@@ -881,8 +873,7 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta,
evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
#if defined(THREADED_RTS)
evac(user, (StgClosure **)(void *)&cap->wakeup_queue_hd);
evac(user, (StgClosure **)(void *)&cap->wakeup_queue_tl);
evac(user, (StgClosure **)(void *)&cap->inbox);
#endif
for (incall = cap->suspended_ccalls; incall != NULL;
incall=incall->next) {
......@@ -910,3 +901,29 @@ markCapabilities (evac_fn evac, void *user)
{
markSomeCapabilities(evac, user, 0, 1, rtsFalse);
}
/* -----------------------------------------------------------------------------
Messages
-------------------------------------------------------------------------- */
#ifdef THREADED_RTS
void sendMessage(Capability *cap, Message *msg)
{
ACQUIRE_LOCK(&cap->lock);
msg->link = cap->inbox;
cap->inbox = msg;
if (cap->running_task == NULL) {
cap->running_task = myTask();
// precond for releaseCapability_()
releaseCapability_(cap,rtsFalse);
} else {
contextSwitchCapability(cap);
}
RELEASE_LOCK(&cap->lock);
}
#endif // THREADED_RTS
......@@ -88,11 +88,8 @@ struct Capability_ {
Task *returning_tasks_hd; // Singly-linked, with head/tail
Task *returning_tasks_tl;
// A list of threads to append to this Capability's run queue at
// the earliest opportunity. These are threads that have been
// woken up by another Capability.
StgTSO *wakeup_queue_hd;
StgTSO *wakeup_queue_tl;
// Messages, or END_TSO_QUEUE.
Message *inbox;
SparkPool *sparks;
......@@ -284,6 +281,18 @@ void markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta,
void markCapabilities (evac_fn evac, void *user);
void traverseSparkQueues (evac_fn evac, void *user);
/* -----------------------------------------------------------------------------
Messages
-------------------------------------------------------------------------- */
#ifdef THREADED_RTS
INLINE_HEADER rtsBool emptyInbox(Capability *cap);;
void sendMessage (Capability *cap, Message *msg);
#endif // THREADED_RTS
/* -----------------------------------------------------------------------------
* INLINE functions... private below here
* -------------------------------------------------------------------------- */
......@@ -333,6 +342,15 @@ contextSwitchCapability (Capability *cap)
cap->context_switch = 1;
}
#ifdef THREADED_RTS
INLINE_HEADER rtsBool emptyInbox(Capability *cap)
{
return (cap->inbox == (Message*)END_TSO_QUEUE);
}
#endif
END_RTS_PRIVATE
#endif /* CAPABILITY_H */
......@@ -74,20 +74,16 @@ StgWord16 closure_flags[] = {
[MUT_VAR_CLEAN] = (_HNF| _NS| _MUT|_UPT ),
[MUT_VAR_DIRTY] = (_HNF| _NS| _MUT|_UPT ),
[WEAK] = (_HNF| _NS| _UPT ),
[STABLE_NAME] = (_HNF| _NS| _UPT ),
[PRIM] = (_HNF| _NS| _UPT ),
[MUT_PRIM] = (_HNF| _NS| _MUT|_UPT ),
[TSO] = (_HNF| _NS| _MUT|_UPT ),
[TVAR_WATCH_QUEUE] = ( _NS| _MUT|_UPT ),
[INVARIANT_CHECK_QUEUE]= ( _NS| _MUT|_UPT ),
[ATOMIC_INVARIANT] = ( _NS| _MUT|_UPT ),
[TVAR] = (_HNF| _NS| _MUT|_UPT ),
[TREC_CHUNK] = ( _NS| _MUT|_UPT ),
[TREC_HEADER] = ( _NS| _MUT|_UPT ),
[ATOMICALLY_FRAME] = ( _BTM ),
[CATCH_RETRY_FRAME] = ( _BTM ),
[CATCH_STM_FRAME] = ( _BTM ),
[WHITEHOLE] = ( 0 )
};
#if N_CLOSURE_TYPES != 65
#if N_CLOSURE_TYPES != 61
#error Closure types changed: update ClosureFlags.c!
#endif
......@@ -56,7 +56,7 @@ INFO_TABLE_RET( stg_unblockAsyncExceptionszh_ret, RET_SMALL )
CInt r;
StgTSO_flags(CurrentTSO) = StgTSO_flags(CurrentTSO) &
~(TSO_BLOCKEX::I32|TSO_INTERRUPTIBLE::I32);
%lobits32(~(TSO_BLOCKEX|TSO_INTERRUPTIBLE));
/* Eagerly raise a blocked exception, if there is one */
if (StgTSO_blocked_exceptions(CurrentTSO) != END_TSO_QUEUE) {
......@@ -99,8 +99,8 @@ INFO_TABLE_RET( stg_unblockAsyncExceptionszh_ret, RET_SMALL )
INFO_TABLE_RET( stg_blockAsyncExceptionszh_ret, RET_SMALL )
{
StgTSO_flags(CurrentTSO) =
StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32;
StgTSO_flags(CurrentTSO) = %lobits32(
TO_W_(StgTSO_flags(CurrentTSO)) | TSO_BLOCKEX | TSO_INTERRUPTIBLE);
Sp_adj(1);
jump %ENTRY_CODE(Sp(0));
......@@ -113,8 +113,8 @@ stg_blockAsyncExceptionszh
if ((TO_W_(StgTSO_flags(CurrentTSO)) & TSO_BLOCKEX) == 0) {
StgTSO_flags(CurrentTSO) =
StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32;
StgTSO_flags(CurrentTSO) = %lobits32(
TO_W_(StgTSO_flags(CurrentTSO)) | TSO_BLOCKEX | TSO_INTERRUPTIBLE);
/* avoid growing the stack unnecessarily */
if (Sp(0) == stg_blockAsyncExceptionszh_ret_info) {
......@@ -142,8 +142,8 @@ stg_unblockAsyncExceptionszh
/* If exceptions are already unblocked, there's nothing to do */
if ((TO_W_(StgTSO_flags(CurrentTSO)) & TSO_BLOCKEX) != 0) {
StgTSO_flags(CurrentTSO) = StgTSO_flags(CurrentTSO) &
~(TSO_BLOCKEX::I32|TSO_INTERRUPTIBLE::I32);
StgTSO_flags(CurrentTSO) = %lobits32(
TO_W_(StgTSO_flags(CurrentTSO)) & ~(TSO_BLOCKEX|TSO_INTERRUPTIBLE));
/* avoid growing the stack unnecessarily */
if (Sp(0) == stg_unblockAsyncExceptionszh_ret_info) {
......@@ -252,27 +252,22 @@ stg_killThreadzh
}
} else {
W_ out;
W_ retcode;
W_ msg;
out = Sp - WDS(1); /* ok to re-use stack space here */
(retcode) = foreign "C" throwTo(MyCapability() "ptr",
CurrentTSO "ptr",
target "ptr",
exception "ptr",
out "ptr") [R1,R2];
(msg) = foreign "C" throwTo(MyCapability() "ptr",
CurrentTSO "ptr",
target "ptr",
exception "ptr") [R1,R2];
switch [THROWTO_SUCCESS .. THROWTO_BLOCKED] (retcode) {
case THROWTO_SUCCESS: {
if (msg == NULL) {
jump %ENTRY_CODE(Sp(0));
}
case THROWTO_BLOCKED: {
R3 = W_[out];
// we must block, and call throwToReleaseTarget() before returning
} else {
StgTSO_why_blocked(CurrentTSO) = BlockedOnMsgThrowTo;
StgTSO_block_info(CurrentTSO) = msg;
// we must block, and unlock the message before returning
jump stg_block_throwto;
}
}
}
}
......@@ -507,8 +502,8 @@ retry_pop_stack:
/* Ensure that async excpetions are blocked when running the handler.
*/
StgTSO_flags(CurrentTSO) =
StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32;
StgTSO_flags(CurrentTSO) = %lobits32(
TO_W_(StgTSO_flags(CurrentTSO)) | TSO_BLOCKEX | TSO_INTERRUPTIBLE);
/* Call the handler, passing the exception value and a realworld
* token as arguments.
......
......@@ -697,7 +697,7 @@ residencyCensus( void )
break;
case WEAK:
case STABLE_NAME:
case PRIM:
case MVAR:
case MUT_VAR:
/* case MUT_CONS: FIXME: case does not exist */
......
......@@ -631,9 +631,8 @@ INFO_TABLE_RET( stg_block_throwto, RET_SMALL, P_ unused, P_ unused )
stg_block_throwto_finally
{
#ifdef THREADED_RTS
foreign "C" throwToReleaseTarget (R3 "ptr");
#endif
// unlock the throwto message
unlockClosure(StgTSO_block_info(CurrentTSO), stg_MSG_THROWTO_info);
jump StgReturn;
}
......
......@@ -109,7 +109,7 @@ processHeapClosureForDead( StgClosure *c )
case MUT_VAR_CLEAN:
case MUT_VAR_DIRTY:
case BCO:
case STABLE_NAME:
case PRIM:
case TVAR_WATCH_QUEUE:
case TVAR:
case TREC_HEADER:
......
......@@ -542,9 +542,9 @@ stg_forkzh
closure "ptr") [];
/* start blocked if the current thread is blocked */
StgTSO_flags(threadid) =
StgTSO_flags(threadid) | (StgTSO_flags(CurrentTSO) &
(TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32));
StgTSO_flags(threadid) = %lobits16(
TO_W_(StgTSO_flags(threadid)) |
TO_W_(StgTSO_flags(CurrentTSO)) & (TSO_BLOCKEX | TSO_INTERRUPTIBLE));
foreign "C" scheduleThread(MyCapability() "ptr", threadid "ptr") [];
......@@ -572,9 +572,9 @@ stg_forkOnzh
closure "ptr") [];
/* start blocked if the current thread is blocked */
StgTSO_flags(threadid) =
StgTSO_flags(threadid) | (StgTSO_flags(CurrentTSO) &
(TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32));
StgTSO_flags(threadid) = %lobits16(
TO_W_(StgTSO_flags(threadid)) |
TO_W_(StgTSO_flags(CurrentTSO)) & (TSO_BLOCKEX | TSO_INTERRUPTIBLE));
foreign "C" scheduleThreadOn(MyCapability() "ptr", cpu, threadid "ptr") [];
......
......@@ -160,6 +160,12 @@ printClosure( StgClosure *obj )
printStdObjPayload(obj);
break;
case PRIM:
debugBelch("PRIM(");
printPtr((StgPtr)obj->header.info);
printStdObjPayload(obj);
break;
case THUNK:
case THUNK_1_0: case THUNK_0_1:
case THUNK_1_1: case THUNK_0_2: case THUNK_2_0:
......@@ -356,10 +362,6 @@ printClosure( StgClosure *obj )
/* ToDo: chase 'link' ? */
break;
case STABLE_NAME:
debugBelch("STABLE_NAME(%lu)\n", (lnat)((StgStableName*)obj)->sn);
break;
case TSO:
debugBelch("TSO(");
debugBelch("%lu (%p)",(unsigned long)(((StgTSO*)obj)->id), (StgTSO*)obj);
......@@ -1132,14 +1134,10 @@ char *closure_type_names[] = {
[MUT_VAR_CLEAN] = "MUT_VAR_CLEAN",
[MUT_VAR_DIRTY] = "MUT_VAR_DIRTY",
[WEAK] = "WEAK",
[STABLE_NAME] = "STABLE_NAME",
[PRIM] = "PRIM",
[MUT_PRIM] = "MUT_PRIM",
[TSO] = "TSO",
[TVAR_WATCH_QUEUE] = "TVAR_WATCH_QUEUE",
[INVARIANT_CHECK_QUEUE] = "INVARIANT_CHECK_QUEUE",
[ATOMIC_INVARIANT] = "ATOMIC_INVARIANT",
[TVAR] = "TVAR",
[TREC_CHUNK] = "TREC_CHUNK",
[TREC_HEADER] = "TREC_HEADER",
[ATOMICALLY_FRAME] = "ATOMICALLY_FRAME",
[CATCH_RETRY_FRAME] = "CATCH_RETRY_FRAME",
[CATCH_STM_FRAME] = "CATCH_STM_FRAME",
......
......@@ -912,7 +912,8 @@ heapCensusChain( Census *census, bdescr *bd )
case MVAR_CLEAN:
case MVAR_DIRTY:
case WEAK:
case STABLE_NAME:
case PRIM:
case MUT_PRIM:
case MUT_VAR_CLEAN:
case MUT_VAR_DIRTY:
prim = rtsTrue;
......@@ -960,31 +961,6 @@ heapCensusChain( Census *census, bdescr *bd )
break;
#endif
case TREC_HEADER:
prim = rtsTrue;
size = sizeofW(StgTRecHeader);
break;
case TVAR_WATCH_QUEUE:
prim = rtsTrue;
size = sizeofW(StgTVarWatchQueue);
break;
case INVARIANT_CHECK_QUEUE:
prim = rtsTrue;
size = sizeofW(StgInvariantCheckQueue);
break;
case ATOMIC_INVARIANT:
prim = rtsTrue;
size = sizeofW(StgAtomicInvariant);
break;
case TVAR:
prim = rtsTrue;
size = sizeofW(StgTVar);
break;
case TREC_CHUNK:
prim = rtsTrue;
size = sizeofW(StgTRecChunk);
......
......@@ -30,10 +30,14 @@ static void raiseAsync (Capability *cap,
static void removeFromQueues(Capability *cap, StgTSO *tso);
static void blockedThrowTo (Capability *cap, StgTSO *source, StgTSO *target);
static void blockedThrowTo (Capability *cap,
StgTSO *target, MessageThrowTo *msg);
static void performBlockedException (Capability *cap,
StgTSO *source, StgTSO *target);
static void throwToSendMsg (Capability *cap USED_IF_THREADS,
Capability *target_cap USED_IF_THREADS,
MessageThrowTo *msg USED_IF_THREADS);
static void performBlockedException (Capability *cap, MessageThrowTo *msg);
/* -----------------------------------------------------------------------------
throwToSingleThreaded
......@@ -96,59 +100,85 @@ suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
may be blocked and could be woken up at any point by another CPU.
We have some delicate synchronisation to do.
There is a completely safe fallback scheme: it is always possible
to just block the source TSO on the target TSO's blocked_exceptions
queue. This queue is locked using lockTSO()/unlockTSO(). It is
checked at regular intervals: before and after running a thread
(schedule() and threadPaused() respectively), and just before GC
(scheduleDoGC()). Activating a thread on this queue should be done
using maybePerformBlockedException(): this is done in the context
of the target thread, so the exception can be raised eagerly.
This fallback scheme works even if the target thread is complete or
killed: scheduleDoGC() will discover the blocked thread before the
target is GC'd.
Blocking the source thread on the target thread's blocked_exception
queue is also employed when the target thread is currently blocking
exceptions (ie. inside Control.Exception.block).
We could use the safe fallback scheme exclusively, but that
wouldn't be ideal: most calls to throwTo would block immediately,
possibly until the next GC, which might require the deadlock
detection mechanism to kick in. So we try to provide promptness
wherever possible.
We can promptly deliver the exception if the target thread is:
- runnable, on the same Capability as the source thread (because
we own the run queue and therefore the target thread).
- blocked, and we can obtain exclusive access to it. Obtaining
exclusive access to the thread depends on how it is blocked.
We must also be careful to not trip over threadStackOverflow(),
which might be moving the TSO to enlarge its stack.
lockTSO()/unlockTSO() are used here too.
The underlying scheme when multiple Capabilities are in use is
message passing: when the target of a throwTo is on another
Capability, we send a message (a MessageThrowTo closure) to that
Capability.
If the throwTo needs to block because the target TSO is masking