Commit 5d52d9b6 authored by Simon Marlow's avatar Simon Marlow

New implementation of BLACKHOLEs

This replaces the global blackhole_queue with a clever scheme that
enables us to queue up blocked threads on the closure that they are
blocked on, while still avoiding atomic instructions in the common
case.

Advantages:

 - gets rid of a locked global data structure and some tricky GC code
   (replacing it with some per-thread data structures and different
   tricky GC code :)

 - wakeups are more prompt: parallel/concurrent performance should
   benefit.  I haven't seen anything dramatic in the parallel
   benchmarks so far, but a couple of threading benchmarks do improve
   a bit.

 - waking up a thread blocked on a blackhole is now O(1) (e.g. if
   it is the target of throwTo).

 - less sharing and better separation of Capabilities: communication
   is done with messages, the data structures are strictly owned by a
   Capability and cannot be modified except by sending messages.

 - this change will utlimately enable us to do more intelligent
   scheduling when threads block on each other.  This is what started
   off the whole thing, but it isn't done yet (#3838).

I'll be documenting all this on the wiki in due course.
parent 79957d77
......@@ -58,6 +58,7 @@ module CLabel (
mkSplitMarkerLabel,
mkDirty_MUT_VAR_Label,
mkUpdInfoLabel,
mkBHUpdInfoLabel,
mkIndStaticInfoLabel,
mkMainCapabilityLabel,
mkMAP_FROZEN_infoLabel,
......@@ -400,6 +401,7 @@ mkStaticConEntryLabel name c = IdLabel name c StaticConEntry
mkSplitMarkerLabel = CmmLabel rtsPackageId (fsLit "__stg_split_marker") CmmCode
mkDirty_MUT_VAR_Label = CmmLabel rtsPackageId (fsLit "dirty_MUT_VAR") CmmCode
mkUpdInfoLabel = CmmLabel rtsPackageId (fsLit "stg_upd_frame") CmmInfo
mkBHUpdInfoLabel = CmmLabel rtsPackageId (fsLit "stg_bh_upd_frame" ) CmmInfo
mkIndStaticInfoLabel = CmmLabel rtsPackageId (fsLit "stg_IND_STATIC") CmmInfo
mkMainCapabilityLabel = CmmLabel rtsPackageId (fsLit "MainCapability") CmmData
mkMAP_FROZEN_infoLabel = CmmLabel rtsPackageId (fsLit "stg_MUT_ARR_PTRS_FROZEN0") CmmInfo
......
......@@ -284,7 +284,6 @@ getSequelAmode
OnStack -> do { sp_rel <- getSpRelOffset virt_sp
; returnFC (CmmLoad sp_rel bWord) }
UpdateCode -> returnFC (CmmLit (CmmLabel mkUpdInfoLabel))
CaseAlts lbl _ _ -> returnFC (CmmLit (CmmLabel lbl))
}
......
......@@ -474,7 +474,12 @@ emitBlackHoleCode is_single_entry = do
then do
tickyBlackHole (not is_single_entry)
let bh_info = CmmReg (CmmGlobal EagerBlackholeInfo)
stmtC (CmmStore (CmmReg nodeReg) bh_info)
stmtsC [
CmmStore (cmmOffsetW (CmmReg nodeReg) fixedHdrSize)
(CmmReg (CmmGlobal CurrentTSO)),
CmmCall (CmmPrim MO_WriteBarrier) [] [] CmmUnsafe CmmMayReturn,
CmmStore (CmmReg nodeReg) bh_info
]
else
nopC
\end{code}
......@@ -489,17 +494,23 @@ setupUpdate closure_info code
= code
| not (isStaticClosure closure_info)
= if closureUpdReqd closure_info
then do { tickyPushUpdateFrame; pushUpdateFrame (CmmReg nodeReg) code }
else do { tickyUpdateFrameOmitted; code }
= do
if not (closureUpdReqd closure_info)
then do tickyUpdateFrameOmitted; code
else do
tickyPushUpdateFrame
dflags <- getDynFlags
if not opt_SccProfilingOn && dopt Opt_EagerBlackHoling dflags
then pushBHUpdateFrame (CmmReg nodeReg) code
else pushUpdateFrame (CmmReg nodeReg) code
| otherwise -- A static closure
= do { tickyUpdateBhCaf closure_info
; if closureUpdReqd closure_info
then do -- Blackhole the (updatable) CAF:
{ upd_closure <- link_caf closure_info True
; pushUpdateFrame upd_closure code }
; pushBHUpdateFrame upd_closure code }
else do
{ -- krc: removed some ticky-related code here.
; tickyUpdateFrameOmitted
......@@ -553,7 +564,8 @@ link_caf cl_info _is_upd = do
{ -- Alloc black hole specifying CC_HDR(Node) as the cost centre
; let use_cc = costCentreFrom (CmmReg nodeReg)
blame_cc = use_cc
; hp_offset <- allocDynClosure bh_cl_info use_cc blame_cc []
tso = CmmReg (CmmGlobal CurrentTSO)
; hp_offset <- allocDynClosure bh_cl_info use_cc blame_cc [(tso,fixedHdrSize)]
; hp_rel <- getHpRelOffset hp_offset
-- Call the RTS function newCAF to add the CAF to the CafList
......
......@@ -169,7 +169,6 @@ block.
\begin{code}
data Sequel
= OnStack -- Continuation is on the stack
| UpdateCode -- Continuation is update
| CaseAlts
CLabel -- Jump to this; if the continuation is for a vectored
......
......@@ -17,7 +17,7 @@ module CgStackery (
setStackFrame, getStackFrame,
mkVirtStkOffsets, mkStkAmodes,
freeStackSlots,
pushUpdateFrame, emitPushUpdateFrame,
pushUpdateFrame, pushBHUpdateFrame, emitPushUpdateFrame,
) where
#include "HsVersions.h"
......@@ -265,6 +265,14 @@ to reflect the frame pushed.
\begin{code}
pushUpdateFrame :: CmmExpr -> Code -> Code
pushUpdateFrame updatee code
= pushSpecUpdateFrame mkUpdInfoLabel updatee code
pushBHUpdateFrame :: CmmExpr -> Code -> Code
pushBHUpdateFrame updatee code
= pushSpecUpdateFrame mkBHUpdInfoLabel updatee code
pushSpecUpdateFrame :: CLabel -> CmmExpr -> Code -> Code
pushSpecUpdateFrame lbl updatee code
= do {
when debugIsOn $ do
{ EndOfBlockInfo _ sequel <- getEndOfBlockInfo ;
......@@ -277,15 +285,25 @@ pushUpdateFrame updatee code
-- The location of the lowest-address
-- word of the update frame itself
; setEndOfBlockInfo (EndOfBlockInfo vsp UpdateCode) $
do { emitPushUpdateFrame frame_addr updatee
-- NB. we used to set the Sequel to 'UpdateCode' so
-- that we could jump directly to the update code if
-- we know that the next frame on the stack is an
-- update frame. However, the RTS can sometimes
-- change an update frame into something else (see
-- e.g. Note [upd-black-hole] in rts/sm/Scav.c), so we
-- no longer make this assumption.
; setEndOfBlockInfo (EndOfBlockInfo vsp OnStack) $
do { emitSpecPushUpdateFrame lbl frame_addr updatee
; code }
}
emitPushUpdateFrame :: CmmExpr -> CmmExpr -> Code
emitPushUpdateFrame frame_addr updatee = do
emitPushUpdateFrame = emitSpecPushUpdateFrame mkUpdInfoLabel
emitSpecPushUpdateFrame :: CLabel -> CmmExpr -> CmmExpr -> Code
emitSpecPushUpdateFrame lbl frame_addr updatee = do
stmtsC [ -- Set the info word
CmmStore frame_addr (mkLblExpr mkUpdInfoLabel)
CmmStore frame_addr (mkLblExpr lbl)
, -- And the updatee
CmmStore (cmmOffsetB frame_addr off_updatee) updatee ]
initUpdFrameProf frame_addr
......
......@@ -106,10 +106,18 @@ void _assertFail(const char *filename, unsigned int linenum)
else \
_assertFail(__FILE__, __LINE__)
#define CHECKM(predicate, msg, ...) \
if (predicate) \
/*null*/; \
else \
barf(msg, ##__VA_ARGS__)
#ifndef DEBUG
#define ASSERT(predicate) /* nothing */
#define ASSERTM(predicate,msg,...) /* nothing */
#else
#define ASSERT(predicate) CHECK(predicate)
#define ASSERTM(predicate,msg,...) CHECKM(predicate,msg,##__VA_ARGS__)
#endif /* DEBUG */
/*
......
......@@ -291,6 +291,7 @@ main(int argc, char *argv[])
closure_field(StgTSO, trec);
closure_field(StgTSO, flags);
closure_field(StgTSO, dirty);
closure_field(StgTSO, bq);
closure_field_("StgTSO_CCCS", StgTSO, prof.CCCS);
tso_field(StgTSO, sp);
tso_field_offset(StgTSO, stack);
......@@ -382,6 +383,17 @@ main(int argc, char *argv[])
closure_size(StgStableName);
closure_field(StgStableName,sn);
closure_size(StgBlockingQueue);
closure_field(StgBlockingQueue, bh);
closure_field(StgBlockingQueue, owner);
closure_field(StgBlockingQueue, queue);
closure_field(StgBlockingQueue, link);
closure_size(MessageBlackHole);
closure_field(MessageBlackHole, link);
closure_field(MessageBlackHole, tso);
closure_field(MessageBlackHole, bh);
struct_field_("RtsFlags_ProfFlags_showCCSOnException",
RTS_FLAGS, ProfFlags.showCCSOnException);
struct_field_("RtsFlags_DebugFlags_apply",
......
......@@ -129,6 +129,12 @@
SET_HDR(c,info,costCentreStack); \
(c)->words = n_words;
// Use when changing a closure from one kind to another
#define OVERWRITE_INFO(c, new_info) \
LDV_RECORD_DEAD_FILL_SLOP_DYNAMIC((StgClosure *)(c)); \
SET_INFO((c), (new_info)); \
LDV_RECORD_CREATE(c);
/* -----------------------------------------------------------------------------
How to get hold of the static link field for a static closure.
-------------------------------------------------------------------------- */
......@@ -249,7 +255,7 @@ INLINE_HEADER StgOffset THUNK_SELECTOR_sizeW ( void )
{ return sizeofW(StgSelector); }
INLINE_HEADER StgOffset BLACKHOLE_sizeW ( void )
{ return sizeofW(StgHeader)+MIN_PAYLOAD_SIZE; }
{ return sizeofW(StgInd); } // a BLACKHOLE is a kind of indirection
/* --------------------------------------------------------------------------
Sizes of closures
......
......@@ -62,7 +62,7 @@
#define UPDATE_FRAME 38
#define CATCH_FRAME 39
#define STOP_FRAME 40
#define CAF_BLACKHOLE 41
#define BLOCKING_QUEUE 41
#define BLACKHOLE 42
#define MVAR_CLEAN 43
#define MVAR_DIRTY 44
......
......@@ -127,6 +127,14 @@ typedef struct {
StgInfoTable *saved_info;
} StgIndStatic;
typedef struct StgBlockingQueue_ {
StgHeader header;
struct StgBlockingQueue_ *link; // here so it looks like an IND
StgClosure *bh; // the BLACKHOLE
StgTSO *owner;
struct MessageBlackHole_ *queue;
} StgBlockingQueue;
typedef struct {
StgHeader header;
StgWord words;
......@@ -433,10 +441,17 @@ typedef struct MessageWakeup_ {
typedef struct MessageThrowTo_ {
StgHeader header;
Message *link;
struct MessageThrowTo_ *link;
StgTSO *source;
StgTSO *target;
StgClosure *exception;
} MessageThrowTo;
typedef struct MessageBlackHole_ {
StgHeader header;
struct MessageBlackHole_ *link;
StgTSO *tso;
StgClosure *bh;
} MessageBlackHole;
#endif /* RTS_STORAGE_CLOSURES_H */
......@@ -46,6 +46,7 @@ typedef struct {
/* Reason for thread being blocked. See comment above struct StgTso_. */
typedef union {
StgClosure *closure;
struct MessageBlackHole_ *bh;
struct MessageThrowTo_ *throwto;
struct MessageWakeup_ *wakeup;
StgInt fd; /* StgInt instead of int, so that it's the same size as the ptrs */
......@@ -78,12 +79,17 @@ typedef struct StgTSO_ {
*/
struct StgTSO_* _link;
/*
Currently used for linking TSOs on:
* cap->run_queue_{hd,tl}
* MVAR queue
* (non-THREADED_RTS); the blocked_queue
* and pointing to the relocated version of a ThreadRelocated
NOTE!!! do not modify _link directly, it is subject to
a write barrier for generational GC. Instead use the
setTSOLink() function. Exceptions to this rule are:
* setting the link field to END_TSO_QUEUE
* putting a TSO on the blackhole_queue
* setting the link field of the currently running TSO, as it
will already be dirty.
*/
......@@ -127,6 +133,12 @@ typedef struct StgTSO_ {
*/
struct MessageThrowTo_ * blocked_exceptions;
/*
A list of StgBlockingQueue objects, representing threads blocked
on thunks that are under evaluation by this thread.
*/
struct StgBlockingQueue_ *bq;
#ifdef TICKY_TICKY
/* TICKY-specific stuff would go here. */
#endif
......@@ -152,6 +164,18 @@ typedef struct StgTSO_ {
void dirty_TSO (Capability *cap, StgTSO *tso);
void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target);
// Apply to a TSO before looking at it if you are not sure whether it
// might be ThreadRelocated or not (basically, that's most of the time
// unless the TSO is the current TSO).
//
INLINE_HEADER StgTSO * deRefTSO(StgTSO *tso)
{
while (tso->what_next == ThreadRelocated) {
tso = tso->_link;
}
return tso;
}
/* -----------------------------------------------------------------------------
Invariants:
......
......@@ -44,6 +44,7 @@
/* Stack frames */
RTS_RET_INFO(stg_upd_frame_info);
RTS_RET_INFO(stg_bh_upd_frame_info);
RTS_RET_INFO(stg_marked_upd_frame_info);
RTS_RET_INFO(stg_noupd_frame_info);
RTS_RET_INFO(stg_catch_frame_info);
......@@ -54,6 +55,7 @@ RTS_RET_INFO(stg_catch_stm_frame_info);
RTS_RET_INFO(stg_unblockAsyncExceptionszh_ret_info);
RTS_ENTRY(stg_upd_frame_ret);
RTS_ENTRY(stg_bh_upd_frame_ret);
RTS_ENTRY(stg_marked_upd_frame_ret);
// RTS_FUN(stg_interp_constr_entry);
......@@ -90,12 +92,12 @@ RTS_INFO(stg_IND_STATIC_info);
RTS_INFO(stg_IND_PERM_info);
RTS_INFO(stg_IND_OLDGEN_info);
RTS_INFO(stg_IND_OLDGEN_PERM_info);
RTS_INFO(stg_CAF_UNENTERED_info);
RTS_INFO(stg_CAF_ENTERED_info);
RTS_INFO(stg_WHITEHOLE_info);
RTS_INFO(stg_BLACKHOLE_info);
RTS_INFO(__stg_EAGER_BLACKHOLE_info);
RTS_INFO(stg_CAF_BLACKHOLE_info);
RTS_INFO(__stg_EAGER_BLACKHOLE_info);
RTS_INFO(stg_WHITEHOLE_info);
RTS_INFO(stg_BLOCKING_QUEUE_CLEAN_info);
RTS_INFO(stg_BLOCKING_QUEUE_DIRTY_info);
RTS_FUN_INFO(stg_BCO_info);
RTS_INFO(stg_EVACUATED_info);
......@@ -115,7 +117,9 @@ RTS_INFO(stg_MUT_VAR_CLEAN_info);
RTS_INFO(stg_MUT_VAR_DIRTY_info);
RTS_INFO(stg_END_TSO_QUEUE_info);
RTS_INFO(stg_MSG_WAKEUP_info);
RTS_INFO(stg_MSG_TRY_WAKEUP_info);
RTS_INFO(stg_MSG_THROWTO_info);
RTS_INFO(stg_MSG_BLACKHOLE_info);
RTS_INFO(stg_MUT_CONS_info);
RTS_INFO(stg_catch_info);
RTS_INFO(stg_PAP_info);
......@@ -142,12 +146,10 @@ RTS_ENTRY(stg_IND_STATIC_entry);
RTS_ENTRY(stg_IND_PERM_entry);
RTS_ENTRY(stg_IND_OLDGEN_entry);
RTS_ENTRY(stg_IND_OLDGEN_PERM_entry);
RTS_ENTRY(stg_CAF_UNENTERED_entry);
RTS_ENTRY(stg_CAF_ENTERED_entry);
RTS_ENTRY(stg_WHITEHOLE_entry);
RTS_ENTRY(stg_BLACKHOLE_entry);
RTS_ENTRY(__stg_EAGER_BLACKHOLE_entry);
RTS_ENTRY(stg_CAF_BLACKHOLE_entry);
RTS_ENTRY(__stg_EAGER_BLACKHOLE_entry);
RTS_ENTRY(stg_BCO_entry);
RTS_ENTRY(stg_EVACUATED_entry);
RTS_ENTRY(stg_WEAK_entry);
......@@ -166,7 +168,9 @@ RTS_ENTRY(stg_MUT_VAR_CLEAN_entry);
RTS_ENTRY(stg_MUT_VAR_DIRTY_entry);
RTS_ENTRY(stg_END_TSO_QUEUE_entry);
RTS_ENTRY(stg_MSG_WAKEUP_entry);
RTS_ENTRY(stg_MSG_TRY_WAKEUP_entry);
RTS_ENTRY(stg_MSG_THROWTO_entry);
RTS_ENTRY(stg_MSG_BLACKHOLE_entry);
RTS_ENTRY(stg_MUT_CONS_entry);
RTS_ENTRY(stg_catch_entry);
RTS_ENTRY(stg_PAP_entry);
......@@ -404,6 +408,8 @@ RTS_FUN(stg_PAP_apply);
RTS_RET_INFO(stg_enter_info);
RTS_ENTRY(stg_enter_ret);
RTS_RET_INFO(stg_enter_checkbh_info);
RTS_ENTRY(stg_enter_checkbh_ret);
RTS_RET_INFO(stg_gc_void_info);
RTS_ENTRY(stg_gc_void_ret);
......
......@@ -62,9 +62,8 @@ Capability * rts_unsafeGetMyCapability (void)
STATIC_INLINE rtsBool
globalWorkToDo (void)
{
return blackholes_need_checking
|| sched_state >= SCHED_INTERRUPTING
;
return sched_state >= SCHED_INTERRUPTING
|| recent_activity == ACTIVITY_INACTIVE; // need to check for deadlock
}
#endif
......@@ -636,43 +635,6 @@ yieldCapability (Capability** pCap, Task *task)
return;
}
/* ----------------------------------------------------------------------------
* Wake up a thread on a Capability.
*
* This is used when the current Task is running on a Capability and
* wishes to wake up a thread on a different Capability.
* ------------------------------------------------------------------------- */
void
wakeupThreadOnCapability (Capability *cap,
Capability *other_cap,
StgTSO *tso)
{
MessageWakeup *msg;
// ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
if (tso->bound) {
ASSERT(tso->bound->task->cap == tso->cap);
tso->bound->task->cap = other_cap;
}
tso->cap = other_cap;
ASSERT(tso->why_blocked != BlockedOnMsgWakeup ||
tso->block_info.closure->header.info == &stg_IND_info);
ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
msg->header.info = &stg_MSG_WAKEUP_info;
msg->tso = tso;
tso->block_info.closure = (StgClosure *)msg;
dirty_TSO(cap, tso);
write_barrier();
tso->why_blocked = BlockedOnMsgWakeup;
sendMessage(other_cap, (Message*)msg);
}
/* ----------------------------------------------------------------------------
* prodCapability
*
......@@ -906,24 +868,3 @@ markCapabilities (evac_fn evac, void *user)
Messages
-------------------------------------------------------------------------- */
#ifdef THREADED_RTS
void sendMessage(Capability *cap, Message *msg)
{
ACQUIRE_LOCK(&cap->lock);
msg->link = cap->inbox;
cap->inbox = msg;
if (cap->running_task == NULL) {
cap->running_task = myTask();
// precond for releaseCapability_()
releaseCapability_(cap,rtsFalse);
} else {
contextSwitchCapability(cap);
}
RELEASE_LOCK(&cap->lock);
}
#endif // THREADED_RTS
......@@ -201,6 +201,8 @@ void waitForReturnCapability (Capability **cap/*in/out*/, Task *task);
INLINE_HEADER void recordMutableCap (StgClosure *p, Capability *cap, nat gen);
INLINE_HEADER void recordClosureMutated (Capability *cap, StgClosure *p);
#if defined(THREADED_RTS)
// Gives up the current capability IFF there is a higher-priority
......@@ -222,12 +224,6 @@ void yieldCapability (Capability** pCap, Task *task);
//
void waitForCapability (Task *task, Mutex *mutex, Capability **pCap);
// Wakes up a thread on a Capability (probably a different Capability
// from the one held by the current Task).
//
void wakeupThreadOnCapability (Capability *my_cap, Capability *other_cap,
StgTSO *tso);
// Wakes up a worker thread on just one Capability, used when we
// need to service some global event.
//
......@@ -289,8 +285,6 @@ void traverseSparkQueues (evac_fn evac, void *user);
INLINE_HEADER rtsBool emptyInbox(Capability *cap);;
void sendMessage (Capability *cap, Message *msg);
#endif // THREADED_RTS
/* -----------------------------------------------------------------------------
......@@ -316,6 +310,15 @@ recordMutableCap (StgClosure *p, Capability *cap, nat gen)
*bd->free++ = (StgWord)p;
}
INLINE_HEADER void
recordClosureMutated (Capability *cap, StgClosure *p)
{
bdescr *bd;
bd = Bdescr((StgPtr)p);
if (bd->gen_no != 0) recordMutableCap(p,cap,bd->gen_no);
}
#if defined(THREADED_RTS)
INLINE_HEADER rtsBool
emptySparkPoolCap (Capability *cap)
......
......@@ -62,8 +62,8 @@ StgWord16 closure_flags[] = {
[UPDATE_FRAME] = ( _BTM ),
[CATCH_FRAME] = ( _BTM ),
[STOP_FRAME] = ( _BTM ),
[CAF_BLACKHOLE] = ( _BTM|_NS| _UPT ),
[BLACKHOLE] = ( _NS| _UPT ),
[BLOCKING_QUEUE] = ( _NS| _MUT|_UPT ),
[MVAR_CLEAN] = (_HNF| _NS| _MUT|_UPT ),
[MVAR_DIRTY] = (_HNF| _NS| _MUT|_UPT ),
[ARR_WORDS] = (_HNF| _NS| _UPT ),
......
......@@ -662,8 +662,6 @@ residencyCensus( void )
type = Thunk;
break;
case CAF_BLACKHOLE:
case EAGER_BLACKHOLE:
case BLACKHOLE:
/* case BLACKHOLE_BQ: FIXME: case does not exist */
size = sizeW_fromITBL(info);
......
......@@ -158,6 +158,24 @@ __stg_gc_enter_1
GC_GENERIC
}
/* -----------------------------------------------------------------------------
stg_enter_checkbh is just like stg_enter, except that we also call
checkBlockingQueues(). The point of this is that the GC can
replace an stg_marked_upd_frame with an stg_enter_checkbh if it
finds that the BLACKHOLE has already been updated by another
thread. It would be unsafe to use stg_enter, because there might
be an orphaned BLOCKING_QUEUE now.
-------------------------------------------------------------------------- */
INFO_TABLE_RET( stg_enter_checkbh, RET_SMALL, P_ unused)
{
R1 = Sp(1);
Sp_adj(2);
foreign "C" checkBlockingQueues(MyCapability() "ptr",
CurrentTSO) [R1];
ENTER();
}
/* -----------------------------------------------------------------------------
Heap checks in Primitive case alternatives
......@@ -593,11 +611,7 @@ INFO_TABLE_RET( stg_block_putmvar, RET_SMALL, P_ unused1, P_ unused2 )
// code fragment executed just before we return to the scheduler
stg_block_putmvar_finally
{
#ifdef THREADED_RTS
unlockClosure(R3, stg_MVAR_DIRTY_info);
#else
SET_INFO(R3, stg_MVAR_DIRTY_info);
#endif
jump StgReturn;
}
......@@ -611,24 +625,12 @@ stg_block_putmvar
BLOCK_BUT_FIRST(stg_block_putmvar_finally);
}
// code fragment executed just before we return to the scheduler
stg_block_blackhole_finally
{
#if defined(THREADED_RTS)
// The last thing we do is release sched_lock, which is
// preventing other threads from accessing blackhole_queue and
// picking up this thread before we are finished with it.
RELEASE_LOCK(sched_mutex "ptr");
#endif
jump StgReturn;
}
stg_block_blackhole
{
Sp_adj(-2);
Sp(1) = R1;
Sp(0) = stg_enter_info;
BLOCK_BUT_FIRST(stg_block_blackhole_finally);
BLOCK_GENERIC;
}
INFO_TABLE_RET( stg_block_throwto, RET_SMALL, P_ unused, P_ unused )
......
......@@ -21,6 +21,7 @@
#include "Disassembler.h"
#include "Interpreter.h"
#include "ThreadPaused.h"
#include "Threads.h"
#include <string.h> /* for memcpy */
#ifdef HAVE_ERRNO_H
......@@ -443,7 +444,8 @@ do_return:
// to a PAP by the GC, violating the invariant that PAPs
// always contain a tagged pointer to the function.
INTERP_TICK(it_retto_UPDATE);
UPD_IND(cap, ((StgUpdateFrame *)Sp)->updatee, tagged_obj);
updateThunk(cap, cap->r.rCurrentTSO,
((StgUpdateFrame *)Sp)->updatee, tagged_obj);
Sp += sizeofW(StgUpdateFrame);
goto do_return;
......
......@@ -140,7 +140,7 @@ processHeapClosureForDead( StgClosure *c )
case FUN_1_1:
case FUN_0_2:
case BLACKHOLE:
case CAF_BLACKHOLE:
case BLOCKING_QUEUE:
case IND_PERM:
case IND_OLDGEN_PERM:
/*
......
......@@ -877,7 +877,10 @@ typedef struct _RtsSymbolVal {
SymI_HasProto(stable_ptr_table) \
SymI_HasProto(stackOverflow) \
SymI_HasProto(stg_CAF_BLACKHOLE_info) \
SymI_HasProto(stg_BLACKHOLE_info) \
SymI_HasProto(__stg_EAGER_BLACKHOLE_info) \
SymI_HasProto(stg_BLOCKING_QUEUE_CLEAN_info) \
SymI_HasProto(stg_BLOCKING_QUEUE_DIRTY_info) \
SymI_HasProto(startTimer) \
SymI_HasProto(stg_MVAR_CLEAN_info) \
SymI_HasProto(stg_MVAR_DIRTY_info) \
......@@ -941,6 +944,7 @@ typedef struct _RtsSymbolVal {
SymI_HasProto(stg_sel_8_upd_info) \
SymI_HasProto(stg_sel_9_upd_info) \
SymI_HasProto(stg_upd_frame_info) \
SymI_HasProto(stg_bh_upd_frame_info) \
SymI_HasProto(suspendThread) \
SymI_HasProto(stg_takeMVarzh) \
SymI_HasProto(stg_threadStatuszh) \
......
/* ---------------------------------------------------------------------------
*
* (c) The GHC Team, 2010
*
* Inter-Capability message passing
*
* --------------------------------------------------------------------------*/
#include "Rts.h"
#include "Messages.h"
#include "Trace.h"
#include "Capability.h"
#include "Schedule.h"
#include "Threads.h"
#include "RaiseAsync.h"
#include "sm/Storage.h"
/* ----------------------------------------------------------------------------
Send a message to another Capability
------------------------------------------------------------------------- */
#ifdef THREADED_RTS
void sendMessage(Capability *from_cap, Capability *to_cap, Message *msg)
{
ACQUIRE_LOCK(&to_cap->lock);
#ifdef DEBUG
{
const StgInfoTable *i = msg->header.info;
if (i != &stg_MSG_WAKEUP_info &&
i != &stg_MSG_THROWTO_info &&
i != &stg_MSG_BLACKHOLE_info &&
i != &stg_MSG_TRY_WAKEUP_info &&
i != &stg_IND_info && // can happen if a MSG_BLACKHOLE is revoked
i != &stg_WHITEHOLE_info) {
barf("sendMessage: %p", i);
}
}
#endif
msg->link = to_cap->inbox;
to_cap->inbox = msg;
recordClosureMutated(from_cap,(StgClosure*)msg);
if (to_cap->running_task == NULL) {
to_cap->running_task = myTask();
// precond for releaseCapability_()
releaseCapability_(to_cap,rtsFalse);
} else {
contextSwitchCapability(to_cap);
}
RELEASE_LOCK(&to_cap->lock);
}
#endif /* THREADED_RTS */
/* ----------------------------------------------------------------------------
Handle a message
------------------------------------------------------------------------- */
#ifdef THREADED_RTS
void
executeMessage (Capability *cap, Message *m)
{
const StgInfoTable *i;
loop:
write_barrier(); // allow m->header to be modified by another thread
i = m->header.info;
if (i == &stg_MSG_WAKEUP_info)
{
// the plan is to eventually get rid of these and use
// TRY_WAKEUP instead.
MessageWakeup *w = (MessageWakeup *)m;
StgTSO *tso = w->tso;
debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld",
(lnat)tso->id);
ASSERT(tso->cap == cap);
ASSERT(tso->why_blocked == BlockedOnMsgWakeup);
ASSERT(tso->block_info.closure == (StgClosure *)m);
tso->why_blocked = NotBlocked;
appendToRunQueue(cap, tso);
}
else if (i == &stg_MSG_TRY_WAKEUP_info)
{
StgTSO *tso = ((MessageWakeup *)m)->tso;
debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld",
(lnat)tso->id);
tryWakeupThread(cap, tso);
}
else if (i == &stg_MSG_THROWTO_info)
{
MessageThrowTo *t = (MessageThrowTo *)m;
nat r;
const StgInfoTable *i;
i = lockClosure((StgClosure*)m);
if (i != &stg_MSG_THROWTO_info) {
unlockClosure((StgClosure*)m, i);
goto loop;