Commit 96757f6a authored by simonmar's avatar simonmar
Browse files

[project @ 2005-01-12 12:36:28 by simonmar]

Numerous bug fixes to the STM code, mostly from a debugging session
with Tim Harris.  The test that flushed out all the bugs will shortly
be added to the test suite.
parent 3bebd698
......@@ -96,6 +96,15 @@ extern StgTRecHeader *stmStartTransaction(StgTRecHeader *outer);
extern void stmAbortTransaction(StgTRecHeader *trec);
// Ensure that a subsequent commit / validation will fail. We use this
// in our current handling of transactions that may have become invalid
// and started looping. We strip their stack back to the ATOMICALLY_FRAME,
// and, when the thread is next scheduled, discover it to be invalid and
// re-execute it. However, we need to force the transaction to stay invalid
// in case other threads' updates make it valid in the mean time.
extern void stmCondemnTransaction(StgTRecHeader *trec);
// Return the trec within which the specified trec was created (not
// valid if trec==NO_TREC).
......@@ -164,7 +173,7 @@ extern StgBool stmWait(StgTSO *tso, StgTRecHeader *trec);
// and leave it as unblocked. It is an error to call stmReWait if the
// thread is not waiting.
extern StgBool stmReWait(StgTRecHeader *trec);
extern StgBool stmReWait(StgTSO *tso);
// Merge the accesses made so far in the second trec into the first trec.
// Note that the resulting trec is only intended to be used in wait operations.
......
......@@ -996,7 +996,7 @@ CATCH_RETRY_FRAME_ENTRY_TEMPLATE(,%ENTRY_CODE(Sp(SP_OFF)))
trec = StgTSO_trec(CurrentTSO); \
if (StgAtomicallyFrame_waiting(frame)) { \
/* The TSO is currently waiting: should we stop waiting? */ \
valid = foreign "C" stmReWait(trec "ptr"); \
valid = foreign "C" stmReWait(CurrentTSO "ptr"); \
if (valid) { \
/* Previous attempt is still valid: no point trying again yet */ \
IF_NOT_REG_R1(Sp_adj(-1); Sp(0) = rval;) \
......@@ -1239,10 +1239,12 @@ retry_pop_stack:
W_ other_trec;
other_trec = StgCatchRetryFrame_first_code_trec(frame);
r = foreign "C" stmMergeForWaiting(trec "ptr", other_trec "ptr");
if (r) {
r = foreign "C" stmCommitTransaction(trec "ptr");
}
if (r) {
// Merge between siblings succeeded: commit it back to enclosing transaction
// and then propagate the retry
r = foreign "C" stmCommitTransaction(trec "ptr");
StgTSO_trec(CurrentTSO) = outer;
Sp = Sp + SIZEOF_StgCatchRetryFrame;
goto retry_pop_stack;
......
......@@ -115,6 +115,7 @@ static StgTRecChunk *cached_trec_chunks = END_STM_CHUNK_LIST;
static StgTVarWaitQueue *cached_tvar_wait_queues = END_STM_WAIT_QUEUE;
static void recycle_tvar_wait_queue(StgTVarWaitQueue *q) {
#if 0
if (shake()) {
TRACE("Shake: not re-using wait queue %p\n", q);
return;
......@@ -122,9 +123,11 @@ static void recycle_tvar_wait_queue(StgTVarWaitQueue *q) {
q -> next_queue_entry = cached_tvar_wait_queues;
cached_tvar_wait_queues = q;
#endif
}
static void recycle_closures_from_trec (StgTRecHeader *t) {
#if 0
if (shake()) {
TRACE("Shake: not re-using closures from %p\n", t);
return;
......@@ -140,6 +143,7 @@ static void recycle_closures_from_trec (StgTRecHeader *t) {
c -> prev_chunk = cached_trec_chunks;
cached_trec_chunks = c;
}
#endif
}
/*......................................................................*/
......@@ -278,7 +282,8 @@ static void start_tso_waiting_on_trec(StgTSO *tso, StgTRecHeader *trec) {
static void stop_tsos_waiting_on_trec(StgTRecHeader *trec) {
ASSERT(trec != NO_TREC);
ASSERT(trec -> enclosing_trec == NO_TREC);
ASSERT(trec -> state == TREC_WAITING);
ASSERT(trec -> state == TREC_WAITING ||
trec -> state == TREC_MUST_ABORT);
TRACE("stop_tsos_waiting in state=%d\n", trec -> state);
FOR_EACH_ENTRY(trec, e, {
StgTVar *s;
......@@ -509,6 +514,27 @@ void stmAbortTransaction(StgTRecHeader *trec) {
/*......................................................................*/
void stmCondemnTransaction(StgTRecHeader *trec) {
TRACE("stmCondemnTransaction trec=%p\n", trec);
ASSERT (trec != NO_TREC);
ASSERT ((trec -> state == TREC_ACTIVE) ||
(trec -> state == TREC_MUST_ABORT) ||
(trec -> state == TREC_WAITING) ||
(trec -> state == TREC_CANNOT_COMMIT));
if (trec -> state == TREC_WAITING) {
ASSERT (trec -> enclosing_trec == NO_TREC);
TRACE("stmCondemnTransaction condemning waiting transaction\n");
stop_tsos_waiting_on_trec(trec);
}
trec -> state = TREC_MUST_ABORT;
TRACE("stmCondemnTransaction trec=%p done\n", trec);
}
/*......................................................................*/
StgTRecHeader *stmGetEnclosingTRec(StgTRecHeader *trec) {
StgTRecHeader *outer;
TRACE("stmGetEnclosingTRec trec=%p\n", trec);
......@@ -671,12 +697,15 @@ StgBool stmWait(StgTSO *tso, StgTRecHeader *trec) {
/*......................................................................*/
StgBool stmReWait(StgTRecHeader *trec) {
StgBool stmReWait(StgTSO *tso) {
int result;
StgTRecHeader *trec = tso->trec;
TRACE("stmReWait trec=%p\n", trec);
ASSERT (trec != NO_TREC);
ASSERT (trec -> enclosing_trec == NO_TREC);
ASSERT (trec -> state == TREC_WAITING);
ASSERT ((trec -> state == TREC_WAITING) ||
(trec -> state == TREC_MUST_ABORT));
lock_stm();
result = transaction_is_valid(trec);
......@@ -685,13 +714,17 @@ StgBool stmReWait(StgTRecHeader *trec) {
// The transaction remains valid -- do nothing because it is already on
// the wait queues
ASSERT (trec -> state == TREC_WAITING);
park_tso(tso);
} else {
// The transcation has become invalid. We can now remove it from the wait
// queues.
stop_tsos_waiting_on_trec (trec);
if (trec -> state != TREC_MUST_ABORT) {
stop_tsos_waiting_on_trec (trec);
// Outcome now reflected by status field; no need for log
recycle_closures_from_trec(trec);
}
// Outcome now reflected by status field; no need for log
recycle_closures_from_trec(trec);
}
unlock_stm();
......
......@@ -221,6 +221,8 @@ static void schedule ( StgMainThread *mainThread, Capability *initi
static void detectBlackHoles ( void );
#endif
static void raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically);
#if defined(RTS_SUPPORTS_THREADS)
/* ToDo: carefully document the invariants that go together
* with these synchronisation objects.
......@@ -1200,6 +1202,7 @@ run_thread:
* previously, or it's blocked on an MVar or Blackhole, in which
* case it'll be on the relevant queue already.
*/
ASSERT(t->why_blocked != NotBlocked);
IF_DEBUG(scheduler,
debugBelch("--<< thread %d (%s) stopped: ",
t->id, whatNext_strs[t->what_next]);
......@@ -1333,44 +1336,14 @@ run_thread:
for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
if (!stmValidateTransaction (t -> trec)) {
StgRetInfoTable *info;
StgPtr sp = t -> sp;
IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
if (sp[0] == (W_)&stg_enter_info) {
sp++;
} else {
sp--;
sp[0] = (W_)&stg_dummy_ret_closure;
}
// Look up the stack for its atomically frame
StgPtr frame;
frame = sp + 1;
info = get_ret_itbl((StgClosure *)frame);
while (info->i.type != ATOMICALLY_FRAME &&
info->i.type != STOP_FRAME &&
info->i.type != UPDATE_FRAME) {
if (info -> i.type == CATCH_RETRY_FRAME) {
IF_DEBUG(stm, sched_belch("Aborting transaction in catch-retry frame"));
stmAbortTransaction(t -> trec);
t -> trec = stmGetEnclosingTRec(t -> trec);
}
frame += stack_frame_sizeW((StgClosure *)frame);
info = get_ret_itbl((StgClosure *)frame);
}
// strip the stack back to the ATOMICALLY_FRAME, aborting
// the (nested) transaction, and saving the stack of any
// partially-evaluated thunks on the heap.
raiseAsync_(t, NULL, rtsTrue);
if (!info -> i.type == ATOMICALLY_FRAME) {
barf("Could not find ATOMICALLY_FRAME for unvalidatable thread");
}
// Cause the thread to enter its atomically frame again when
// scheduled -- this will attempt stmCommitTransaction or stmReWait
// which will fail triggering re-rexecution.
t->sp = frame;
t->what_next = ThreadRunGHC;
ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
}
}
}
......@@ -3050,6 +3023,12 @@ raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
void
raiseAsync(StgTSO *tso, StgClosure *exception)
{
raiseAsync_(tso, exception, rtsFalse);
}
static void
raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
{
StgRetInfoTable *info;
StgPtr sp;
......@@ -3106,8 +3085,10 @@ raiseAsync(StgTSO *tso, StgClosure *exception)
while (info->i.type != UPDATE_FRAME
&& (info->i.type != CATCH_FRAME || exception == NULL)
&& info->i.type != STOP_FRAME) {
if (info->i.type == ATOMICALLY_FRAME) {
&& info->i.type != STOP_FRAME
&& (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
{
if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
// IF we find an ATOMICALLY_FRAME then we abort the
// current transaction and propagate the exception. In
// this case (unlike ordinary exceptions) we do not care
......@@ -3125,6 +3106,14 @@ raiseAsync(StgTSO *tso, StgClosure *exception)
switch (info->i.type) {
case ATOMICALLY_FRAME:
ASSERT(stop_at_atomically);
ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
stmCondemnTransaction(tso -> trec);
tso->sp = frame;
tso->what_next = ThreadRunGHC;
return;
case CATCH_FRAME:
// If we find a CATCH_FRAME, and we've got an exception to raise,
// then build the THUNK raise(exception), and leave it on
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment