Commit f4692220 authored by Simon Marlow's avatar Simon Marlow

Change the representation of the MVar blocked queue

The list of threads blocked on an MVar is now represented as a list of
separately allocated objects rather than being linked through the TSOs
themselves.  This lets us remove a TSO from the list in O(1) time
rather than O(n) time, by marking the list object.  Removing this
linear component fixes some pathalogical performance cases where many
threads were blocked on an MVar and became unreachable simultaneously
(nofib/smp/threads007), or when sending an asynchronous exception to a
TSO in a long list of thread blocked on an MVar.

MVar performance has actually improved by a few percent as a result of
this change, slightly to my surprise.

This is the final cleanup in the sequence, which let me remove the old
way of waking up threads (unblockOne(), MSG_WAKEUP) in favour of the
new way (tryWakeupThread and MSG_TRY_WAKEUP, which is idempotent).  It
is now the case that only the Capability that owns a TSO may modify
its state (well, almost), and this simplifies various things.  More of
the RTS is based on message-passing between Capabilities now.
parent 7c4cb84e
......@@ -372,6 +372,10 @@ main(int argc, char *argv[])
closure_field(StgMVar,tail);
closure_field(StgMVar,value);
closure_size(StgMVarTSOQueue);
closure_field(StgMVarTSOQueue, link);
closure_field(StgMVarTSOQueue, tso);
closure_size(StgBCO);
closure_field(StgBCO, instrs);
closure_field(StgBCO, literals);
......
......@@ -227,8 +227,12 @@
/* same as above but don't unblock async exceptions in resumeThread() */
/* Involved in a message sent to tso->msg_cap */
#define BlockedOnMsgWakeup 12
#define BlockedOnMsgThrowTo 13
#define BlockedOnMsgThrowTo 12
/* The thread is not on any run queues, but can be woken up
by tryWakeupThread() */
#define ThreadMigrating 13
/*
* These constants are returned to the scheduler by a thread that has
* stopped for one reason or another. See typedef StgThreadReturnCode
......
......@@ -109,7 +109,8 @@ typedef struct bdescr_ {
#else
INLINE_HEADER bdescr *Bdescr(StgPtr p)
EXTERN_INLINE bdescr *Bdescr(StgPtr p);
EXTERN_INLINE bdescr *Bdescr(StgPtr p)
{
return (bdescr *)
((((W_)p & MBLOCK_MASK & ~BLOCK_MASK) >> (BLOCK_SHIFT-BDESCR_SHIFT))
......
......@@ -305,11 +305,17 @@ typedef struct {
/* Concurrent communication objects */
typedef struct StgMVarTSOQueue_ {
StgHeader header;
struct StgMVarTSOQueue_ *link;
struct StgTSO_ *tso;
} StgMVarTSOQueue;
typedef struct {
StgHeader header;
struct StgTSO_ *head;
struct StgTSO_ *tail;
StgClosure* value;
StgHeader header;
struct StgMVarTSOQueue_ *head;
struct StgMVarTSOQueue_ *tail;
StgClosure* value;
} StgMVar;
......
......@@ -82,7 +82,6 @@ typedef struct StgTSO_ {
/*
Currently used for linking TSOs on:
* cap->run_queue_{hd,tl}
* MVAR queue
* (non-THREADED_RTS); the blocked_queue
* and pointing to the relocated version of a ThreadRelocated
......
......@@ -42,10 +42,10 @@
# define RTS_FUN(f) RTS_FUN_INFO(f##_info)
# define RTS_THUNK(f) RTS_THUNK_INFO(f##_info)
#else
# define RTS_RET(f) RTS_INFO(f##_info) RTS_FUN_DECL(f##_ret)
# define RTS_ENTRY(f) RTS_INFO(f##_info) RTS_FUN_DECL(f##_entry)
# define RTS_FUN(f) RTS_FUN_INFO(f##_info) RTS_FUN_DECL(f##_entry)
# define RTS_THUNK(f) RTS_THUNK_INFO(f##_info) RTS_FUN_DECL(f##_entry)
# define RTS_RET(f) RTS_INFO(f##_info); RTS_FUN_DECL(f##_ret)
# define RTS_ENTRY(f) RTS_INFO(f##_info); RTS_FUN_DECL(f##_entry)
# define RTS_FUN(f) RTS_FUN_INFO(f##_info); RTS_FUN_DECL(f##_entry)
# define RTS_THUNK(f) RTS_THUNK_INFO(f##_info); RTS_FUN_DECL(f##_entry)
#endif
/* Stack frames */
......@@ -109,7 +109,6 @@ RTS_ENTRY(stg_MUT_ARR_PTRS_FROZEN0);
RTS_ENTRY(stg_MUT_VAR_CLEAN);
RTS_ENTRY(stg_MUT_VAR_DIRTY);
RTS_ENTRY(stg_END_TSO_QUEUE);
RTS_ENTRY(stg_MSG_WAKEUP);
RTS_ENTRY(stg_MSG_TRY_WAKEUP);
RTS_ENTRY(stg_MSG_THROWTO);
RTS_ENTRY(stg_MSG_BLACKHOLE);
......
......@@ -199,9 +199,9 @@ extern volatile StgWord waiting_for_gc;
//
void waitForReturnCapability (Capability **cap/*in/out*/, Task *task);
INLINE_HEADER void recordMutableCap (StgClosure *p, Capability *cap, nat gen);
EXTERN_INLINE void recordMutableCap (StgClosure *p, Capability *cap, nat gen);
INLINE_HEADER void recordClosureMutated (Capability *cap, StgClosure *p);
EXTERN_INLINE void recordClosureMutated (Capability *cap, StgClosure *p);
#if defined(THREADED_RTS)
......@@ -291,7 +291,7 @@ INLINE_HEADER rtsBool emptyInbox(Capability *cap);;
* INLINE functions... private below here
* -------------------------------------------------------------------------- */
INLINE_HEADER void
EXTERN_INLINE void
recordMutableCap (StgClosure *p, Capability *cap, nat gen)
{
bdescr *bd;
......@@ -310,7 +310,7 @@ recordMutableCap (StgClosure *p, Capability *cap, nat gen)
*bd->free++ = (StgWord)p;
}
INLINE_HEADER void
EXTERN_INLINE void
recordClosureMutated (Capability *cap, StgClosure *p)
{
bdescr *bd;
......
......@@ -481,9 +481,13 @@ INFO_TABLE_RET( stg_gc_gen, RET_DYN )
stg_gc_gen
{
// Hack; see Note [mvar-heap-check] in PrimOps.cmm
if (R10 == stg_putMVarzh || R10 == stg_takeMVarzh) {
unlockClosure(R1, stg_MVAR_DIRTY_info)
}
SAVE_EVERYTHING;
GC_GENERIC
}
}
// A heap check at an unboxed tuple return point. The return address
// is on the stack, and we can find it by using the offsets given
......@@ -583,11 +587,7 @@ INFO_TABLE_RET( stg_block_takemvar, RET_SMALL, P_ unused )
// code fragment executed just before we return to the scheduler
stg_block_takemvar_finally
{
#ifdef THREADED_RTS
unlockClosure(R3, stg_MVAR_DIRTY_info);
#else
SET_INFO(R3, stg_MVAR_DIRTY_info);
#endif
jump StgReturn;
}
......
......@@ -5,3 +5,4 @@
#include "PosixSource.h"
#include "Rts.h"
#include "Schedule.h"
#include "Capability.h"
......@@ -28,8 +28,7 @@ void sendMessage(Capability *from_cap, Capability *to_cap, Message *msg)
#ifdef DEBUG
{
const StgInfoTable *i = msg->header.info;
if (i != &stg_MSG_WAKEUP_info &&
i != &stg_MSG_THROWTO_info &&
if (i != &stg_MSG_THROWTO_info &&
i != &stg_MSG_BLACKHOLE_info &&
i != &stg_MSG_TRY_WAKEUP_info &&
i != &stg_IND_info && // can happen if a MSG_BLACKHOLE is revoked
......@@ -71,21 +70,7 @@ executeMessage (Capability *cap, Message *m)
loop:
write_barrier(); // allow m->header to be modified by another thread
i = m->header.info;
if (i == &stg_MSG_WAKEUP_info)
{
// the plan is to eventually get rid of these and use
// TRY_WAKEUP instead.
MessageWakeup *w = (MessageWakeup *)m;
StgTSO *tso = w->tso;
debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld",
(lnat)tso->id);
ASSERT(tso->cap == cap);
ASSERT(tso->why_blocked == BlockedOnMsgWakeup);
ASSERT(tso->block_info.closure == (StgClosure *)m);
tso->why_blocked = NotBlocked;
appendToRunQueue(cap, tso);
}
else if (i == &stg_MSG_TRY_WAKEUP_info)
if (i == &stg_MSG_TRY_WAKEUP_info)
{
StgTSO *tso = ((MessageWakeup *)m)->tso;
debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld",
......
This diff is collapsed.
......@@ -31,6 +31,8 @@ static void raiseAsync (Capability *cap,
static void removeFromQueues(Capability *cap, StgTSO *tso);
static void removeFromMVarBlockedQueue (StgTSO *tso);
static void blockedThrowTo (Capability *cap,
StgTSO *target, MessageThrowTo *msg);
......@@ -124,7 +126,7 @@ suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
Currently we send a message if the target belongs to another
Capability, and it is
- NotBlocked, BlockedOnMsgWakeup, BlockedOnMsgThrowTo,
- NotBlocked, BlockedOnMsgThrowTo,
BlockedOnCCall
- or it is masking exceptions (TSO_BLOCKEX)
......@@ -221,67 +223,7 @@ check_target:
switch (status) {
case NotBlocked:
case BlockedOnMsgWakeup:
/* if status==NotBlocked, and target->cap == cap, then
we own this TSO and can raise the exception.
How do we establish this condition? Very carefully.
Let
P = (status == NotBlocked)
Q = (tso->cap == cap)
Now, if P & Q are true, then the TSO is locked and owned by
this capability. No other OS thread can steal it.
If P==0 and Q==1: the TSO is blocked, but attached to this
capabilty, and it can be stolen by another capability.
If P==1 and Q==0: the TSO is runnable on another
capability. At any time, the TSO may change from runnable
to blocked and vice versa, while it remains owned by
another capability.
Suppose we test like this:
p = P
q = Q
if (p && q) ...
this is defeated by another capability stealing a blocked
TSO from us to wake it up (Schedule.c:unblockOne()). The
other thread is doing
Q = 0
P = 1
assuming arbitrary reordering, we could see this
interleaving:
start: P==0 && Q==1
P = 1
p = P
q = Q
Q = 0
if (p && q) ...
so we need a memory barrier:
p = P
mb()
q = Q
if (p && q) ...
this avoids the problematic case. There are other cases
to consider, but this is the tricky one.
Note that we must be sure that unblockOne() does the
writes in the correct order: Q before P. The memory
barrier ensures that if we have seen the write to P, we
have also seen the write to Q.
*/
{
write_barrier();
if ((target->flags & TSO_BLOCKEX) == 0) {
// It's on our run queue and not blocking exceptions
raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
......@@ -389,18 +331,26 @@ check_target:
goto retry;
}
if (target->_link == END_TSO_QUEUE) {
// the MVar operation has already completed. There is a
// MSG_TRY_WAKEUP on the way, but we can just wake up the
// thread now anyway and ignore the message when it
// arrives.
unlockClosure((StgClosure *)mvar, info);
tryWakeupThread(cap, target);
goto retry;
}
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
blockedThrowTo(cap,target,msg);
unlockClosure((StgClosure *)mvar, info);
return THROWTO_BLOCKED;
} else {
removeThreadFromMVarQueue(cap, mvar, target);
// revoke the MVar operation
removeFromMVarBlockedQueue(target);
raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
if (info == &stg_MVAR_CLEAN_info) {
dirty_MVAR(&cap->r,(StgClosure*)mvar);
}
unlockClosure((StgClosure *)mvar, &stg_MVAR_DIRTY_info);
unlockClosure((StgClosure *)mvar, info);
return THROWTO_SUCCESS;
}
}
......@@ -587,12 +537,55 @@ awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
of conditions as throwToSingleThreaded() (c.f.).
-------------------------------------------------------------------------- */
static void
removeFromMVarBlockedQueue (StgTSO *tso)
{
StgMVar *mvar = (StgMVar*)tso->block_info.closure;
StgMVarTSOQueue *q = (StgMVarTSOQueue*)tso->_link;
if (q == (StgMVarTSOQueue*)END_TSO_QUEUE) {
// already removed from this MVar
return;
}
// Assume the MVar is locked. (not assertable; sometimes it isn't
// actually WHITEHOLE'd).
// We want to remove the MVAR_TSO_QUEUE object from the queue. It
// isn't doubly-linked so we can't actually remove it; instead we
// just overwrite it with an IND if possible and let the GC short
// it out. However, we have to be careful to maintain the deque
// structure:
if (mvar->head == q) {
mvar->head = q->link;
q->header.info = &stg_IND_info;
if (mvar->tail == q) {
mvar->tail = (StgMVarTSOQueue*)END_TSO_QUEUE;
}
}
else if (mvar->tail == q) {
// we can't replace it with an IND in this case, because then
// we lose the tail pointer when the GC shorts out the IND.
// So we use MSG_NULL as a kind of non-dupable indirection;
// these are ignored by takeMVar/putMVar.
q->header.info = &stg_MSG_NULL_info;
}
else {
q->header.info = &stg_IND_info;
}
// revoke the MVar operation
tso->_link = END_TSO_QUEUE;
}
static void
removeFromQueues(Capability *cap, StgTSO *tso)
{
switch (tso->why_blocked) {
case NotBlocked:
case ThreadMigrating:
return;
case BlockedOnSTM:
......@@ -605,22 +598,13 @@ removeFromQueues(Capability *cap, StgTSO *tso)
goto done;
case BlockedOnMVar:
removeThreadFromMVarQueue(cap, (StgMVar *)tso->block_info.closure, tso);
// we aren't doing a write barrier here: the MVar is supposed to
// be already locked, so replacing the info pointer would unlock it.
removeFromMVarBlockedQueue(tso);
goto done;
case BlockedOnBlackHole:
// nothing to do
goto done;
case BlockedOnMsgWakeup:
{
// kill the message, atomically:
OVERWRITE_INFO(tso->block_info.wakeup, &stg_IND_info);
break;
}
case BlockedOnMsgThrowTo:
{
MessageThrowTo *m = tso->block_info.throwto;
......@@ -659,7 +643,8 @@ removeFromQueues(Capability *cap, StgTSO *tso)
}
done:
unblockOne(cap, tso);
tso->why_blocked = NotBlocked;
appendToRunQueue(cap, tso);
}
/* -----------------------------------------------------------------------------
......@@ -733,7 +718,7 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
ASSERT(tso->cap == cap);
// wake it up
if (tso->why_blocked != NotBlocked && tso->why_blocked != BlockedOnMsgWakeup) {
if (tso->why_blocked != NotBlocked) {
tso->why_blocked = NotBlocked;
appendToRunQueue(cap,tso);
}
......
......@@ -125,7 +125,7 @@ static Capability *schedule (Capability *initialCapability, Task *task);
static void schedulePreLoop (void);
static void scheduleFindWork (Capability *cap);
#if defined(THREADED_RTS)
static void scheduleYield (Capability **pcap, Task *task, rtsBool);
static void scheduleYield (Capability **pcap, Task *task);
#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
......@@ -204,7 +204,6 @@ schedule (Capability *initialCapability, Task *task)
rtsBool ready_to_gc;
#if defined(THREADED_RTS)
rtsBool first = rtsTrue;
rtsBool force_yield = rtsFalse;
#endif
cap = initialCapability;
......@@ -328,9 +327,7 @@ schedule (Capability *initialCapability, Task *task)
// ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
}
yield:
scheduleYield(&cap,task,force_yield);
force_yield = rtsFalse;
scheduleYield(&cap,task);
if (emptyRunQueue(cap)) continue; // look for work again
#endif
......@@ -490,19 +487,6 @@ run_thread:
traceEventStopThread(cap, t, ret);
#if defined(THREADED_RTS)
// If ret is ThreadBlocked, and this Task is bound to the TSO that
// blocked, we are in limbo - the TSO is now owned by whatever it
// is blocked on, and may in fact already have been woken up,
// perhaps even on a different Capability. It may be the case
// that task->cap != cap. We better yield this Capability
// immediately and return to normaility.
if (ret == ThreadBlocked) {
force_yield = rtsTrue;
goto yield;
}
#endif
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
ASSERT(t->cap == cap);
......@@ -639,23 +623,13 @@ shouldYieldCapability (Capability *cap, Task *task)
// and also check the benchmarks in nofib/parallel for regressions.
static void
scheduleYield (Capability **pcap, Task *task, rtsBool force_yield)
scheduleYield (Capability **pcap, Task *task)
{
Capability *cap = *pcap;
// if we have work, and we don't need to give up the Capability, continue.
//
// The force_yield flag is used when a bound thread blocks. This
// is a particularly tricky situation: the current Task does not
// own the TSO any more, since it is on some queue somewhere, and
// might be woken up or manipulated by another thread at any time.
// The TSO and Task might be migrated to another Capability.
// Certain invariants might be in doubt, such as task->bound->cap
// == cap. We have to yield the current Capability immediately,
// no messing around.
//
if (!force_yield &&
!shouldYieldCapability(cap,task) &&
if (!shouldYieldCapability(cap,task) &&
(!emptyRunQueue(cap) ||
!emptyInbox(cap) ||
sched_state >= SCHED_INTERRUPTING))
......@@ -1891,8 +1865,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
if (cpu == cap->no) {
appendToRunQueue(cap,tso);
} else {
traceEventMigrateThread (cap, tso, capabilities[cpu].no);
wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
migrateThread(cap, tso, &capabilities[cpu]);
}
#else
appendToRunQueue(cap,tso);
......@@ -2372,8 +2345,8 @@ deleteThread_(Capability *cap, StgTSO *tso)
if (tso->why_blocked == BlockedOnCCall ||
tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
unblockOne(cap,tso);
tso->what_next = ThreadKilled;
appendToRunQueue(tso->cap, tso);
} else {
deleteThread(cap,tso);
}
......
......@@ -491,8 +491,6 @@ CLOSURE(stg_NO_TREC_closure,stg_NO_TREC);
------------------------------------------------------------------------- */
// PRIM rather than CONSTR, because PRIM objects cannot be duplicated by the GC.
INFO_TABLE_CONSTR(stg_MSG_WAKEUP,2,0,0,PRIM,"MSG_WAKEUP","MSG_WAKEUP")
{ foreign "C" barf("MSG_WAKEUP object entered!") never returns; }
INFO_TABLE_CONSTR(stg_MSG_TRY_WAKEUP,2,0,0,PRIM,"MSG_TRY_WAKEUP","MSG_TRY_WAKEUP")
{ foreign "C" barf("MSG_TRY_WAKEUP object entered!") never returns; }
......@@ -572,6 +570,13 @@ INFO_TABLE( stg_dummy_ret, 0, 0, CONSTR_NOCAF_STATIC, "DUMMY_RET", "DUMMY_RET")
}
CLOSURE(stg_dummy_ret_closure,stg_dummy_ret);
/* ----------------------------------------------------------------------------
MVAR_TSO_QUEUE
------------------------------------------------------------------------- */
INFO_TABLE_CONSTR(stg_MVAR_TSO_QUEUE,2,0,0,PRIM,"MVAR_TSO_QUEUE","MVAR_TSO_QUEUE")
{ foreign "C" barf("MVAR_TSO_QUEUE object entered!") never returns; }
/* ----------------------------------------------------------------------------
CHARLIKE and INTLIKE closures.
......
......@@ -205,83 +205,21 @@ removeThreadFromDeQueue (Capability *cap,
barf("removeThreadFromMVarQueue: not found");
}
void
removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
{
// caller must do the write barrier, because replacing the info
// pointer will unlock the MVar.
removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
tso->_link = END_TSO_QUEUE;
}
/* ----------------------------------------------------------------------------
unblockOne()
unblock a single thread.
------------------------------------------------------------------------- */
StgTSO *
unblockOne (Capability *cap, StgTSO *tso)
{
return unblockOne_(cap,tso,rtsTrue); // allow migration
}
StgTSO *
unblockOne_ (Capability *cap, StgTSO *tso,
rtsBool allow_migrate USED_IF_THREADS)
{
StgTSO *next;
// NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
ASSERT(tso->why_blocked != NotBlocked);
ASSERT(tso->why_blocked != BlockedOnMsgWakeup ||
tso->block_info.closure->header.info == &stg_IND_info);
next = tso->_link;
tso->_link = END_TSO_QUEUE;
tryWakeupThread()
#if defined(THREADED_RTS)
if (tso->cap == cap || (!tsoLocked(tso) &&
allow_migrate &&
RtsFlags.ParFlags.wakeupMigrate)) {
// We are waking up this thread on the current Capability, which
// might involve migrating it from the Capability it was last on.
if (tso->bound) {
ASSERT(tso->bound->task->cap == tso->cap);
tso->bound->task->cap = cap;
}
tso->cap = cap;
write_barrier();
tso->why_blocked = NotBlocked;
appendToRunQueue(cap,tso);
// context-switch soonish so we can migrate the new thread if
// necessary. NB. not contextSwitchCapability(cap), which would
// force a context switch immediately.
cap->context_switch = 1;
} else {
// we'll try to wake it up on the Capability it was last on.
wakeupThreadOnCapability(cap, tso->cap, tso);
}
#else
tso->why_blocked = NotBlocked;
appendToRunQueue(cap,tso);
// context-switch soonish so we can migrate the new thread if
// necessary. NB. not contextSwitchCapability(cap), which would
// force a context switch immediately.
cap->context_switch = 1;
#endif
traceEventThreadWakeup (cap, tso, tso->cap->no);
Attempt to wake up a thread. tryWakeupThread is idempotent: it is
always safe to call it too many times, but it is not safe in
general to omit a call.
return next;
}
------------------------------------------------------------------------- */
void
tryWakeupThread (Capability *cap, StgTSO *tso)
{
traceEventThreadWakeup (cap, tso, tso->cap->no);
#ifdef THREADED_RTS
if (tso->cap != cap)
{
......@@ -298,6 +236,16 @@ tryWakeupThread (Capability *cap, StgTSO *tso)
switch (tso->why_blocked)
{
case BlockedOnMVar:
{
if (tso->_link == END_TSO_QUEUE) {
tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
goto unblock;
} else {
return;
}
}
case BlockedOnMsgThrowTo:
{
const StgInfoTable *i;
......@@ -307,27 +255,45 @@ tryWakeupThread (Capability *cap, StgTSO *tso)
if (i != &stg_MSG_NULL_info) {
debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
(lnat)tso->id, tso->block_info.throwto->header.info);
break; // still blocked
return;
}
// remove the block frame from the stack
ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info);
tso->sp += 3;
// fall through...
goto unblock;
}
case BlockedOnBlackHole:
case BlockedOnSTM:
{
// just run the thread now, if the BH is not really available,
// we'll block again.
tso->why_blocked = NotBlocked;
appendToRunQueue(cap,tso);
break;
}
case ThreadMigrating:
goto unblock;
default:
// otherwise, do nothing
break;
return;
}
unblock:
// just run the thread now, if the BH is not really available,
// we'll block again.
tso->why_blocked = NotBlocked;
appendToRunQueue(cap,tso);
}
/* ----------------------------------------------------------------------------
migrateThread
------------------------------------------------------------------------- */
void
migrateThread (Capability *from, StgTSO *tso, Capability *to)
{
traceEventMigrateThread (from, tso, to->no);
// ThreadMigrating tells the target cap that it needs to be added to
// the run queue when it receives the MSG_TRY_WAKEUP.
tso->what_next = ThreadMigrating;
tso->cap = to;
tryWakeupThread(from, tso);
}
/* ----------------------------------------------------------------------------
......@@ -450,47 +416,6 @@ updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
}
}
/* ----------------------------------------------------------------------------
* Wake up a thread on a Capability.
*
* This is used when the current Task is running on a Capability and
* wishes to wake up a thread on a different Capability.
* ------------------------------------------------------------------------- */
#ifdef THREADED_RTS
void
wakeupThreadOnCapability (Capability *cap,
Capability *other_cap,
StgTSO *tso)
{
MessageWakeup *msg;
// ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
if (tso->bound) {
ASSERT(tso->bound->task->cap == tso->cap);
tso->bound->task->cap = other_cap;
}
tso->cap = other_cap;
ASSERT(tso->why_blocked != BlockedOnMsgWakeup ||
tso->block_info.closure->header.info == &stg_IND_info);
ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
SET_HDR(msg, &stg_MSG_WAKEUP_info, CCS_SYSTEM);
msg->tso = tso;
tso->block_info.closure = (StgClosure *)msg;
dirty_TSO(cap, tso);
write_barrier();
tso->why_blocked = BlockedOnMsgWakeup;
sendMessage(cap, other_cap, (Message*)msg);
}
#endif /* THREADED_RTS */
/* ---------------------------------------------------------------------------
* rtsSupportsBoundThreads(): is the RTS built to support bound threads?
* used by Control.Concurrent for error checking.
......@@ -549,15 +474,15 @@ printThreadBlockage(StgTSO *tso)
debugBelch("is blocked on a black hole %p",
((StgBlockingQueue*)tso->block_info.bh->bh));
break;
case BlockedOnMsgWakeup:
debugBelch("is blocked on a wakeup message");
break;
case BlockedOnMsgThrowTo: