Commit b1953bbb authored by Simon Marlow's avatar Simon Marlow

Asynchronous exception support for SMP

This patch makes throwTo work with -threaded, and also refactors large
parts of the concurrency support in the RTS to clean things up.  We
have some new files:

  RaiseAsync.{c,h}	asynchronous exception support
  Threads.{c,h}         general threading-related utils

Some of the contents of these new files used to be in Schedule.c,
which is smaller and cleaner as a result of the split.

Asynchronous exception support in the presence of multiple running
Haskell threads is rather tricky.  In fact, to my annoyance there are
still one or two bugs to track down, but the majority of the tests run
now.
parent 1e3d53b4
......@@ -230,6 +230,36 @@
#define ThreadBlocked 4
#define ThreadFinished 5
/*
* Flags for the tso->flags field.
*
* The TSO_DIRTY flag indicates that this TSO's stack should be
* scanned during garbage collection. The link field of a TSO is
* always scanned, so we don't have to dirty a TSO just for linking
* it on a different list.
*
* TSO_DIRTY is set by
* - schedule(), just before running a thread,
* - raiseAsync(), because it modifies a thread's stack
* - resumeThread(), just before running the thread again
* and unset by the garbage collector (only).
*/
#define TSO_DIRTY 1
/*
* TSO_LOCKED is set when a TSO is locked to a particular Capability.
*/
#define TSO_LOCKED 2
/*
* TSO_BLOCKEX: the TSO is blocking exceptions
*
* TSO_INTERRUPTIBLE: the TSO can be interrupted if it blocks
* interruptibly (eg. with BlockedOnMVar).
*/
#define TSO_BLOCKEX 4
#define TSO_INTERRUPTIBLE 8
/* -----------------------------------------------------------------------------
RET_DYN stack frames
-------------------------------------------------------------------------- */
......
......@@ -155,6 +155,21 @@ xchg(StgPtr p, StgWord w)
return old;
}
INLINE_HEADER StgInfoTable *
lockClosure(StgClosure *p)
{ return (StgInfoTable *)p->header.info; }
INLINE_HEADER void
unlockClosure(StgClosure *p STG_UNUSED, StgInfoTable *info STG_UNUSED)
{ /* nothing */ }
#endif /* !THREADED_RTS */
// Handy specialised versions of lockClosure()/unlockClosure()
INLINE_HEADER void lockTSO(StgTSO *tso)
{ lockClosure((StgClosure *)tso); }
INLINE_HEADER void unlockTSO(StgTSO *tso)
{ unlockClosure((StgClosure*)tso, (StgInfoTable*)&stg_TSO_info); }
#endif /* SMP_H */
......@@ -490,6 +490,8 @@ RTS_FUN(stg_block_async_void);
RTS_ENTRY(stg_block_async_void_ret);
#endif
RTS_FUN(stg_block_stmwait);
RTS_FUN(stg_block_throwto);
RTS_RET_INFO(stg_block_throwto_info);
/* Entry/exit points from StgStartup.cmm */
......
......@@ -77,27 +77,6 @@ typedef StgTSOStatBuf StgTSOGranInfo;
*/
typedef StgWord32 StgThreadID;
/*
* Flags for the tso->flags field.
*
* The TSO_DIRTY flag indicates that this TSO's stack should be
* scanned during garbage collection. The link field of a TSO is
* always scanned, so we don't have to dirty a TSO just for linking
* it on a different list.
*
* TSO_DIRTY is set by
* - schedule(), just before running a thread,
* - raiseAsync(), because it modifies a thread's stack
* - resumeThread(), just before running the thread again
* and unset by the garbage collector (only).
*/
#define TSO_DIRTY 1
/*
* TSO_LOCKED is set when a TSO is locked to a particular Capability.
*/
#define TSO_LOCKED 2
#define tsoDirty(tso) ((tso)->flags & TSO_DIRTY)
#define tsoLocked(tso) ((tso)->flags & TSO_LOCKED)
......@@ -127,6 +106,7 @@ typedef union {
StgWord target;
} StgTSOBlockInfo;
/*
* TSOs live on the heap, and therefore look just like heap objects.
* Large TSOs will live in their own "block group" allocated by the
......@@ -151,13 +131,19 @@ typedef struct StgTSO_ {
StgWord16 why_blocked; /* Values defined in Constants.h */
StgWord32 flags;
StgTSOBlockInfo block_info;
struct StgTSO_* blocked_exceptions;
StgThreadID id;
int saved_errno;
struct Task_* bound;
struct Capability_* cap;
struct StgTRecHeader_ * trec; /* STM transaction record */
/*
A list of threads blocked on this TSO waiting to throw
exceptions. In order to access this field, the TSO must be
locked using lockClosure/unlockClosure (see SMP.h).
*/
struct StgTSO_ * blocked_exceptions;
#ifdef TICKY_TICKY
/* TICKY-specific stuff would go here. */
#endif
......
......@@ -18,6 +18,7 @@
* doesn't affect the offsets of anything else.
*/
#define PROFILING
#define THREADED_RTS
#include "Rts.h"
#include "RtsFlags.h"
......@@ -227,6 +228,7 @@ main(int argc, char *argv[])
def_offset("stgGCFun", FUN_OFFSET(stgGCFun));
field_offset(Capability, r);
field_offset(Capability, lock);
struct_field(bdescr, start);
struct_field(bdescr, free);
......@@ -276,8 +278,10 @@ main(int argc, char *argv[])
closure_field(StgTSO, block_info);
closure_field(StgTSO, blocked_exceptions);
closure_field(StgTSO, id);
closure_field(StgTSO, cap);
closure_field(StgTSO, saved_errno);
closure_field(StgTSO, trec);
closure_field(StgTSO, flags);
closure_field_("StgTSO_CCCS", StgTSO, prof.CCCS);
tso_field(StgTSO, sp);
tso_field_offset(StgTSO, stack);
......
......@@ -518,8 +518,10 @@ wakeupThreadOnCapability (Capability *cap, StgTSO *tso)
{
ASSERT(tso->cap == cap);
ASSERT(tso->bound ? tso->bound->cap == cap : 1);
ASSERT_LOCK_HELD(&cap->lock);
tso->cap = cap;
ACQUIRE_LOCK(&cap->lock);
if (cap->running_task == NULL) {
// nobody is running this Capability, we can add our thread
// directly onto the run queue and start up a Task to run it.
......@@ -535,6 +537,33 @@ wakeupThreadOnCapability (Capability *cap, StgTSO *tso)
// freed without first checking the wakeup queue (see
// releaseCapability_).
}
}
void
wakeupThreadOnCapability_lock (Capability *cap, StgTSO *tso)
{
ACQUIRE_LOCK(&cap->lock);
migrateThreadToCapability (cap, tso);
RELEASE_LOCK(&cap->lock);
}
void
migrateThreadToCapability (Capability *cap, StgTSO *tso)
{
// ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
if (tso->bound) {
ASSERT(tso->bound->cap == tso->cap);
tso->bound->cap = cap;
}
tso->cap = cap;
wakeupThreadOnCapability(cap,tso);
}
void
migrateThreadToCapability_lock (Capability *cap, StgTSO *tso)
{
ACQUIRE_LOCK(&cap->lock);
migrateThreadToCapability (cap, tso);
RELEASE_LOCK(&cap->lock);
}
......
......@@ -199,6 +199,10 @@ void waitForCapability (Task *task, Mutex *mutex, Capability **pCap);
// from the one held by the current Task).
//
void wakeupThreadOnCapability (Capability *cap, StgTSO *tso);
void wakeupThreadOnCapability_lock (Capability *cap, StgTSO *tso);
void migrateThreadToCapability (Capability *cap, StgTSO *tso);
void migrateThreadToCapability_lock (Capability *cap, StgTSO *tso);
// Wakes up a worker thread on just one Capability, used when we
// need to service some global event.
......
......@@ -11,6 +11,7 @@
* ---------------------------------------------------------------------------*/
#include "Cmm.h"
#include "RaiseAsync.h"
/* -----------------------------------------------------------------------------
Exception Primitives
......@@ -54,13 +55,13 @@ INFO_TABLE_RET( stg_unblockAsyncExceptionszh_ret,
{
// Not true: see comments above
// ASSERT(StgTSO_blocked_exceptions(CurrentTSO) != NULL);
#if defined(GRAN) || defined(PAR)
foreign "C" awakenBlockedQueue(MyCapability() "ptr", StgTSO_blocked_exceptions(CurrentTSO) "ptr",
NULL "ptr");
#else
foreign "C" awakenBlockedQueue(MyCapability() "ptr", StgTSO_blocked_exceptions(CurrentTSO) "ptr");
#endif
StgTSO_blocked_exceptions(CurrentTSO) = NULL;
foreign "C" awakenBlockedExceptionQueue(MyCapability() "ptr",
CurrentTSO "ptr") [R1];
StgTSO_flags(CurrentTSO) = StgTSO_flags(CurrentTSO) &
~(TSO_BLOCKEX::I32|TSO_INTERRUPTIBLE::I32);
#ifdef REG_R1
Sp_adj(1);
jump %ENTRY_CODE(Sp(0));
......@@ -76,7 +77,10 @@ INFO_TABLE_RET( stg_blockAsyncExceptionszh_ret,
{
// Not true: see comments above
// ASSERT(StgTSO_blocked_exceptions(CurrentTSO) == NULL);
StgTSO_blocked_exceptions(CurrentTSO) = END_TSO_QUEUE;
StgTSO_flags(CurrentTSO) =
StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32;
#ifdef REG_R1
Sp_adj(1);
jump %ENTRY_CODE(Sp(0));
......@@ -92,15 +96,18 @@ blockAsyncExceptionszh_fast
/* Args: R1 :: IO a */
STK_CHK_GEN( WDS(2)/* worst case */, R1_PTR, blockAsyncExceptionszh_fast);
if (StgTSO_blocked_exceptions(CurrentTSO) == NULL) {
StgTSO_blocked_exceptions(CurrentTSO) = END_TSO_QUEUE;
/* avoid growing the stack unnecessarily */
if (Sp(0) == stg_blockAsyncExceptionszh_ret_info) {
Sp_adj(1);
} else {
Sp_adj(-1);
Sp(0) = stg_unblockAsyncExceptionszh_ret_info;
}
if ((TO_W_(StgTSO_flags(CurrentTSO)) & TSO_BLOCKEX) == 0) {
StgTSO_flags(CurrentTSO) =
StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32;
/* avoid growing the stack unnecessarily */
if (Sp(0) == stg_blockAsyncExceptionszh_ret_info) {
Sp_adj(1);
} else {
Sp_adj(-1);
Sp(0) = stg_unblockAsyncExceptionszh_ret_info;
}
}
TICK_UNKNOWN_CALL();
TICK_SLOW_CALL_v();
......@@ -112,22 +119,17 @@ unblockAsyncExceptionszh_fast
/* Args: R1 :: IO a */
STK_CHK_GEN( WDS(2), R1_PTR, unblockAsyncExceptionszh_fast);
if (StgTSO_blocked_exceptions(CurrentTSO) != NULL) {
#if defined(GRAN) || defined(PAR)
foreign "C" awakenBlockedQueue(MyCapability() "ptr", StgTSO_blocked_exceptions(CurrentTSO) "ptr",
StgTSO_block_info(CurrentTSO) "ptr");
#else
foreign "C" awakenBlockedQueue(MyCapability() "ptr", StgTSO_blocked_exceptions(CurrentTSO) "ptr");
#endif
StgTSO_blocked_exceptions(CurrentTSO) = NULL;
if (TO_W_(StgTSO_flags(CurrentTSO)) & TSO_BLOCKEX) {
foreign "C" awakenBlockedExceptionQueue(MyCapability() "ptr",
CurrentTSO "ptr") [R1];
/* avoid growing the stack unnecessarily */
if (Sp(0) == stg_unblockAsyncExceptionszh_ret_info) {
Sp_adj(1);
} else {
Sp_adj(-1);
Sp(0) = stg_blockAsyncExceptionszh_ret_info;
}
/* avoid growing the stack unnecessarily */
if (Sp(0) == stg_unblockAsyncExceptionszh_ret_info) {
Sp_adj(1);
} else {
Sp_adj(-1);
Sp(0) = stg_blockAsyncExceptionszh_ret_info;
}
}
TICK_UNKNOWN_CALL();
TICK_SLOW_CALL_v();
......@@ -135,74 +137,62 @@ unblockAsyncExceptionszh_fast
}
#define interruptible(what_next) \
( what_next == BlockedOnMVar \
|| what_next == BlockedOnException \
|| what_next == BlockedOnRead \
|| what_next == BlockedOnWrite \
|| what_next == BlockedOnDelay \
|| what_next == BlockedOnDoProc)
killThreadzh_fast
{
/* args: R1 = TSO to kill, R2 = Exception */
W_ why_blocked;
/* This thread may have been relocated.
* (see Schedule.c:threadStackOverflow)
*/
while:
if (StgTSO_what_next(R1) == ThreadRelocated::I16) {
R1 = StgTSO_link(R1);
goto while;
}
/* Determine whether this thread is interruptible or not */
/* If the target thread is currently blocking async exceptions,
* we'll have to block until it's ready to accept them. The
* exception is interruptible threads - ie. those that are blocked
* on some resource.
*/
why_blocked = TO_W_(StgTSO_why_blocked(R1));
if (StgTSO_blocked_exceptions(R1) != NULL && !interruptible(why_blocked))
{
StgTSO_link(CurrentTSO) = StgTSO_blocked_exceptions(R1);
StgTSO_blocked_exceptions(R1) = CurrentTSO;
StgTSO_why_blocked(CurrentTSO) = BlockedOnException::I16;
StgTSO_block_info(CurrentTSO) = R1;
BLOCK( R1_PTR & R2_PTR, killThreadzh_fast );
}
/* Killed threads turn into zombies, which might be garbage
* collected at a later date. That's why we don't have to
* explicitly remove them from any queues they might be on.
*/
/* We might have killed ourselves. In which case, better be *very*
* careful. If the exception killed us, then return to the scheduler.
* If the exception went to a catch frame, we'll just continue from
* the handler.
*/
if (R1 == CurrentTSO) {
/* args: R1 = TSO to kill, R2 = Exception */
W_ why_blocked;
W_ target;
W_ exception;
target = R1;
exception = R2;
STK_CHK_GEN( WDS(3), R1_PTR | R2_PTR, killThreadzh_fast);
/*
* We might have killed ourselves. In which case, better be *very*
* careful. If the exception killed us, then return to the scheduler.
* If the exception went to a catch frame, we'll just continue from
* the handler.
*/
if (target == CurrentTSO) {
SAVE_THREAD_STATE();
foreign "C" raiseAsync(MyCapability() "ptr", R1 "ptr", R2 "ptr");
/* ToDo: what if the current thread is blocking exceptions? */
foreign "C" throwToSingleThreaded(MyCapability() "ptr",
target "ptr", exception "ptr")[R1,R2];
if (StgTSO_what_next(CurrentTSO) == ThreadKilled::I16) {
R1 = ThreadFinished;
jump StgReturn;
R1 = ThreadFinished;
jump StgReturn;
} else {
LOAD_THREAD_STATE();
ASSERT(StgTSO_what_next(CurrentTSO) == ThreadRunGHC::I16);
jump %ENTRY_CODE(Sp(0));
LOAD_THREAD_STATE();
ASSERT(StgTSO_what_next(CurrentTSO) == ThreadRunGHC::I16);
jump %ENTRY_CODE(Sp(0));
}
} else {
W_ out;
W_ retcode;
out = BaseReg + OFFSET_StgRegTable_rmp_tmp_w;
retcode = foreign "C" throwTo(MyCapability() "ptr",
CurrentTSO "ptr",
target "ptr",
exception "ptr",
out "ptr") [R1,R2];
switch [THROWTO_SUCCESS .. THROWTO_BLOCKED] (retcode) {
case THROWTO_SUCCESS: {
jump %ENTRY_CODE(Sp(0));
}
} else {
foreign "C" raiseAsync(MyCapability() "ptr", R1 "ptr", R2 "ptr");
}
jump %ENTRY_CODE(Sp(0));
case THROWTO_BLOCKED: {
R3 = W_[out];
// we must block, and call throwToReleaseTarget() before returning
jump stg_block_throwto;
}
}
}
}
/* -----------------------------------------------------------------------------
......@@ -300,15 +290,14 @@ catchzh_fast
SET_HDR(Sp,stg_catch_frame_info,W_[CCCS]);
StgCatchFrame_handler(Sp) = R2;
StgCatchFrame_exceptions_blocked(Sp) =
(StgTSO_blocked_exceptions(CurrentTSO) != NULL);
StgCatchFrame_exceptions_blocked(Sp) = TO_W_(StgTSO_flags(CurrentTSO)) & TSO_BLOCKEX;
TICK_CATCHF_PUSHED();
/* Apply R1 to the realworld token */
TICK_UNKNOWN_CALL();
TICK_SLOW_CALL_v();
jump stg_ap_v_fast;
}
}
/* -----------------------------------------------------------------------------
* The raise infotable
......@@ -423,9 +412,8 @@ retry_pop_stack:
/* Ensure that async excpetions are blocked when running the handler.
*/
if (StgTSO_blocked_exceptions(CurrentTSO) == NULL) {
StgTSO_blocked_exceptions(CurrentTSO) = END_TSO_QUEUE;
}
StgTSO_flags(CurrentTSO) =
StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32;
/* Call the handler, passing the exception value and a realworld
* token as arguments.
......
......@@ -44,6 +44,7 @@
#endif
#include "Trace.h"
#include "RetainerProfile.h"
#include "RaiseAsync.h"
#include <string.h>
......@@ -2631,10 +2632,8 @@ scavengeTSO (StgTSO *tso)
) {
tso->block_info.closure = evacuate(tso->block_info.closure);
}
if ( tso->blocked_exceptions != NULL ) {
tso->blocked_exceptions =
(StgTSO *)evacuate((StgClosure *)tso->blocked_exceptions);
}
tso->blocked_exceptions =
(StgTSO *)evacuate((StgClosure *)tso->blocked_exceptions);
// We don't always chase the link field: TSOs on the blackhole
// queue are not automatically alive, so the link field is a
......@@ -4620,6 +4619,14 @@ threadPaused(Capability *cap, StgTSO *tso)
nat weight_pending = 0;
rtsBool prev_was_update_frame;
// Check to see whether we have threads waiting to raise
// exceptions, and we're not blocking exceptions, or are blocked
// interruptibly. This is important; if a thread is running with
// TSO_BLOCKEX and becomes blocked interruptibly, this is the only
// place we ensure that the blocked_exceptions get a chance.
maybePerformBlockedException (cap, tso);
if (tso->what_next == ThreadKilled) { return; }
stack_end = &tso->stack[tso->stack_size];
frame = (StgClosure *)tso->sp;
......
......@@ -403,9 +403,7 @@ thread_TSO (StgTSO *tso)
) {
thread_(&tso->block_info.closure);
}
if ( tso->blocked_exceptions != NULL ) {
thread_(&tso->blocked_exceptions);
}
thread_(&tso->blocked_exceptions);
thread_(&tso->trec);
......
/* includes for compiling .cmm files via-C */
#include "Rts.h"
#include "RtsFlags.h"
#include "RtsUtils.h"
#include "StgRun.h"
#include "Schedule.h"
#include "Printer.h"
#include "Sanity.h"
#include "STM.h"
#include "Storage.h"
#include "SchedAPI.h"
#include "Timer.h"
#include "ProfHeap.h"
#include "LdvProfile.h"
#include "Profiling.h"
#include "OSThreads.h"
#include "Apply.h"
#include "SMP.h"
#include "RaiseAsync.h"
#include "ThreadLabels.h"
#include "Threads.h"
#include "Prelude.h"
......@@ -902,6 +902,31 @@ stg_block_blackhole
BLOCK_BUT_FIRST(stg_block_blackhole_finally);
}
INFO_TABLE_RET( stg_block_throwto, 2/*framesize*/, 0/*bitmap*/, RET_SMALL )
{
R2 = Sp(2);
R1 = Sp(1);
Sp_adj(3);
jump killThreadzh_fast;
}
stg_block_throwto_finally
{
#ifdef THREADED_RTS
foreign "C" throwToReleaseTarget (R3 "ptr");
#endif
jump StgReturn;
}
stg_block_throwto
{
Sp_adj(-3);
Sp(2) = R2;
Sp(1) = R1;
Sp(0) = stg_block_throwto_info;
BLOCK_BUT_FIRST(stg_block_throwto_finally);
}
#ifdef mingw32_HOST_OS
INFO_TABLE_RET( stg_block_async, 0/*framesize*/, 0/*bitmap*/, RET_SMALL )
{
......
......@@ -301,26 +301,7 @@ endif
# Compiling the cmm files
# ToDo: should we really include Rts.h here? Required for GNU_ATTRIBUTE().
SRC_HC_OPTS += \
-I. \
-\#include Prelude.h \
-\#include Rts.h \
-\#include RtsFlags.h \
-\#include RtsUtils.h \
-\#include StgRun.h \
-\#include Schedule.h \
-\#include Printer.h \
-\#include Sanity.h \
-\#include STM.h \
-\#include Storage.h \
-\#include SchedAPI.h \
-\#include Timer.h \
-\#include ProfHeap.h \
-\#include LdvProfile.h \
-\#include Profiling.h \
-\#include OSThreads.h \
-\#include Apply.h \
-\#include SMP.h
SRC_HC_OPTS += -I. -\#include HCIncludes.h
ifeq "$(Windows)" "YES"
PrimOps_HC_OPTS += -\#include '<windows.h>' -\#include win32/AsyncIO.h
......
This diff is collapsed.
/* -----------------------------------------------------------------------------
/* ---------------------------------------------------------------------------
*
* (c) The GHC Team, 1998-2005
* (c) The GHC Team, 1998-2006
*
* Exception support
* Asynchronous exceptions
*
* ---------------------------------------------------------------------------*/
* --------------------------------------------------------------------------*/
#ifndef EXCEPTION_H
#define EXCEPTION_H
#ifndef RAISEASYNC_H
#define RAISEASYNC_H
extern const StgRetInfoTable stg_blockAsyncExceptionszh_ret_info;
extern const StgRetInfoTable stg_unblockAsyncExceptionszh_ret_info;
#define THROWTO_SUCCESS 0
#define THROWTO_BLOCKED 1
#ifndef CMINUSMINUS
void throwToSingleThreaded (Capability *cap,
StgTSO *tso,
StgClosure *exception);
void throwToSingleThreaded_ (Capability *cap,
StgTSO *tso,
StgClosure *exception,
rtsBool stop_at_atomically,
StgPtr stop_here);
void suspendComputation (Capability *cap,
StgTSO *tso,
StgPtr stop_here);
nat throwTo (Capability *cap, // the Capability we hold
StgTSO *source, // the TSO sending the exception
StgTSO *target, // the TSO receiving the exception
StgClosure *exception, // the exception closure
/*[out]*/ void **out // pass to throwToReleaseTarget()
);
#ifdef THREADED_RTS
void throwToReleaseTarget (void *tso);
#endif
void maybePerformBlockedException (Capability *cap, StgTSO *tso);
void awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso);
/* Determine whether a thread is interruptible (ie. blocked
* indefinitely). Interruptible threads can be sent an exception with
......@@ -36,5 +65,7 @@ interruptible(StgTSO *t)
}
}
#endif /* EXCEPTION_H */