Commit afd08a9c authored by tharris's avatar tharris
Browse files

[project @ 2005-11-21 15:58:47 by tharris]

Re-use temporary storage in the STM implementation
parent 7fe1172a
......@@ -20,6 +20,7 @@
#include "Rts.h"
#include "RtsUtils.h"
#include "RtsFlags.h"
#include "STM.h"
#include "OSThreads.h"
#include "Capability.h"
#include "Schedule.h"
......@@ -155,6 +156,11 @@ initCapability( Capability *cap, nat i )
for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
cap->mut_lists[g] = NULL;
}
cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE;
cap->free_trec_chunks = END_STM_CHUNK_LIST;
cap->free_trec_headers = NO_TREC;
cap->transaction_tokens = 0;
}
/* ---------------------------------------------------------------------------
......
......@@ -81,6 +81,12 @@ struct Capability_ {
Task *returning_tasks_hd; // Singly-linked, with head/tail
Task *returning_tasks_tl;
#endif
// Per-capability STM-related data
StgTVarWaitQueue *free_tvar_wait_queues;
StgTRecChunk *free_trec_chunks;
StgTRecHeader *free_trec_headers;
nat transaction_tokens;
}; // typedef Capability, defined in RtsAPI.h
......
......@@ -360,7 +360,7 @@ retry_pop_stack:
W_ r;
trec = StgTSO_trec(CurrentTSO);
r = foreign "C" stmValidateNestOfTransactions(trec "ptr");
foreign "C" stmAbortTransaction(trec "ptr");
foreign "C" stmAbortTransaction(MyCapability() "ptr", trec "ptr");
StgTSO_trec(CurrentTSO) = NO_TREC;
if (r) {
// Transaction was valid: continue searching for a catch frame
......@@ -369,7 +369,7 @@ retry_pop_stack:
} else {
// Transaction was not valid: we retry the exception (otherwise continue
// with a further call to raiseExceptionHelper)
"ptr" trec = foreign "C" stmStartTransaction(BaseReg "ptr", NO_TREC "ptr");
"ptr" trec = foreign "C" stmStartTransaction(MyCapability() "ptr", NO_TREC "ptr");
StgTSO_trec(CurrentTSO) = trec;
R1 = StgAtomicallyFrame_code(Sp);
Sp_adj(-1);
......
......@@ -3051,9 +3051,6 @@ scavenge(step *stp)
evac_gen = 0;
tvar->current_value = evacuate((StgClosure*)tvar->current_value);
tvar->first_wait_queue_entry = (StgTVarWaitQueue *)evacuate((StgClosure*)tvar->first_wait_queue_entry);
#if defined(SMP)
tvar->last_update_by = (StgTRecHeader *)evacuate((StgClosure*)tvar->last_update_by);
#endif
evac_gen = saved_evac_gen;
failed_to_evac = rtsTrue; // mutable
p += sizeofW(StgTVar);
......@@ -3408,9 +3405,6 @@ linear_scan:
evac_gen = 0;
tvar->current_value = evacuate((StgClosure*)tvar->current_value);
tvar->first_wait_queue_entry = (StgTVarWaitQueue *)evacuate((StgClosure*)tvar->first_wait_queue_entry);
#if defined(SMP)
tvar->last_update_by = (StgTRecHeader *)evacuate((StgClosure*)tvar->last_update_by);
#endif
evac_gen = saved_evac_gen;
failed_to_evac = rtsTrue; // mutable
break;
......@@ -3732,9 +3726,6 @@ scavenge_one(StgPtr p)
evac_gen = 0;
tvar->current_value = evacuate((StgClosure*)tvar->current_value);
tvar->first_wait_queue_entry = (StgTVarWaitQueue *)evacuate((StgClosure*)tvar->first_wait_queue_entry);
#if defined(SMP)
tvar->last_update_by = (StgTRecHeader *)evacuate((StgClosure*)tvar->last_update_by);
#endif
evac_gen = saved_evac_gen;
failed_to_evac = rtsTrue; // mutable
break;
......
......@@ -688,9 +688,6 @@ thread_obj (StgInfoTable *info, StgPtr p)
StgTVar *tvar = (StgTVar *)p;
thread((StgPtr)&tvar->current_value);
thread((StgPtr)&tvar->first_wait_queue_entry);
#if defined(SMP)
thread((StgPtr)&tvar->last_update_by);
#endif
return p + sizeofW(StgTVar);
}
......
......@@ -1048,7 +1048,7 @@ INFO_TABLE_RET(stg_atomically_frame,
trec = StgTSO_trec(CurrentTSO);
if (StgAtomicallyFrame_waiting(frame)) {
/* The TSO is currently waiting: should we stop waiting? */
valid = foreign "C" stmReWait(CurrentTSO "ptr");
valid = foreign "C" stmReWait(MyCapability() "ptr", CurrentTSO "ptr");
if (valid) {
/* Previous attempt is still valid: no point trying again yet */
IF_NOT_REG_R1(Sp_adj(-2);
......@@ -1268,6 +1268,8 @@ retry_pop_stack:
r = foreign "C" stmCommitNestedTransaction(MyCapability() "ptr", other_trec "ptr");
if (r) {
r = foreign "C" stmCommitNestedTransaction(MyCapability() "ptr", trec "ptr");
} else {
foreign "C" stmAbortTransaction(MyCapability() "ptr", trec "ptr");
}
if (r) {
// Merge between siblings succeeded: commit it back to enclosing transaction
......
......@@ -217,8 +217,6 @@ hs_init(int *argc, char **argv[])
startupAsyncIO();
#endif
initSTM();
#ifdef RTS_GTK_FRONTPANEL
if (RtsFlags.GcFlags.frontpanel) {
initFrontPanel();
......
/* -----------------------------------------------------------------------------
*
* (c) The GHC Team 1998-2005
*
* STM implementation.
......@@ -173,6 +172,13 @@ static int shake(void) {
/*......................................................................*/
// if REUSE_MEMORY is defined then attempt to re-use descriptors, log chunks,
// and wait queue entries without GC
#define REUSE_MEMORY
/*......................................................................*/
#define IF_STM_UNIPROC(__X) do { } while (0)
#define IF_STM_CG_LOCK(__X) do { } while (0)
#define IF_STM_FG_LOCKS(__X) do { } while (0)
......@@ -350,7 +356,7 @@ static void unpark_waiters_on(Capability *cap, StgTVar *s) {
/*......................................................................*/
// Helper functions for allocation and initialization
// Helper functions for downstream allocation and initialization
static StgTVarWaitQueue *new_stg_tvar_wait_queue(Capability *cap,
StgTSO *waiting_tso) {
......@@ -392,6 +398,89 @@ static StgTRecHeader *new_stg_trec_header(Capability *cap,
/*......................................................................*/
// Allocation / deallocation functions that retain per-capability lists
// of closures that can be re-used
static StgTVarWaitQueue *alloc_stg_tvar_wait_queue(Capability *cap,
StgTSO *waiting_tso) {
StgTVarWaitQueue *result = NULL;
if (cap -> free_tvar_wait_queues == END_STM_WAIT_QUEUE) {
result = new_stg_tvar_wait_queue(cap, waiting_tso);
} else {
result = cap -> free_tvar_wait_queues;
result -> waiting_tso = waiting_tso;
cap -> free_tvar_wait_queues = result -> next_queue_entry;
}
return result;
}
static void free_stg_tvar_wait_queue(Capability *cap,
StgTVarWaitQueue *wq) {
#if defined(REUSE_MEMORY)
wq -> next_queue_entry = cap -> free_tvar_wait_queues;
cap -> free_tvar_wait_queues = wq;
#endif
}
static StgTRecChunk *alloc_stg_trec_chunk(Capability *cap) {
StgTRecChunk *result = NULL;
if (cap -> free_trec_chunks == END_STM_CHUNK_LIST) {
result = new_stg_trec_chunk(cap);
} else {
result = cap -> free_trec_chunks;
cap -> free_trec_chunks = result -> prev_chunk;
result -> prev_chunk = END_STM_CHUNK_LIST;
result -> next_entry_idx = 0;
}
return result;
}
static void free_stg_trec_chunk(Capability *cap,
StgTRecChunk *c) {
#if defined(REUSE_MEMORY)
c -> prev_chunk = cap -> free_trec_chunks;
cap -> free_trec_chunks = c;
#endif
}
static StgTRecHeader *alloc_stg_trec_header(Capability *cap,
StgTRecHeader *enclosing_trec) {
StgTRecHeader *result = NULL;
if (cap -> free_trec_headers == NO_TREC) {
result = new_stg_trec_header(cap, enclosing_trec);
} else {
result = cap -> free_trec_headers;
cap -> free_trec_headers = result -> enclosing_trec;
result -> enclosing_trec = enclosing_trec;
result -> current_chunk -> next_entry_idx = 0;
if (enclosing_trec == NO_TREC) {
result -> state = TREC_ACTIVE;
} else {
ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
enclosing_trec -> state == TREC_CONDEMNED);
result -> state = enclosing_trec -> state;
}
}
return result;
}
static void free_stg_trec_header(Capability *cap,
StgTRecHeader *trec) {
#if defined(REUSE_MEMORY)
StgTRecChunk *chunk = trec -> current_chunk -> prev_chunk;
while (chunk != END_STM_CHUNK_LIST) {
StgTRecChunk *prev_chunk = chunk -> prev_chunk;
free_stg_trec_chunk(cap, chunk);
chunk = prev_chunk;
}
trec -> current_chunk -> prev_chunk = END_STM_CHUNK_LIST;
trec -> enclosing_trec = cap -> free_trec_headers;
cap -> free_trec_headers = trec;
#endif
}
/*......................................................................*/
// Helper functions for managing waiting lists
static void build_wait_queue_entries_for_trec(Capability *cap,
......@@ -412,7 +501,7 @@ static void build_wait_queue_entries_for_trec(Capability *cap,
ACQ_ASSERT(s -> current_value == trec);
NACQ_ASSERT(s -> current_value == e -> expected_value);
fq = s -> first_wait_queue_entry;
q = new_stg_tvar_wait_queue(cap, tso);
q = alloc_stg_tvar_wait_queue(cap, tso);
q -> next_queue_entry = fq;
q -> prev_queue_entry = END_STM_WAIT_QUEUE;
if (fq != END_STM_WAIT_QUEUE) {
......@@ -423,7 +512,8 @@ static void build_wait_queue_entries_for_trec(Capability *cap,
});
}
static void remove_wait_queue_entries_for_trec(StgTRecHeader *trec) {
static void remove_wait_queue_entries_for_trec(Capability *cap,
StgTRecHeader *trec) {
ASSERT(trec != NO_TREC);
ASSERT(trec -> enclosing_trec == NO_TREC);
ASSERT(trec -> state == TREC_WAITING ||
......@@ -452,6 +542,7 @@ static void remove_wait_queue_entries_for_trec(StgTRecHeader *trec) {
ASSERT (s -> first_wait_queue_entry == q);
s -> first_wait_queue_entry = nq;
}
free_stg_tvar_wait_queue(cap, q);
unlock_tvar(trec, s, saw, FALSE);
});
}
......@@ -475,7 +566,7 @@ static TRecEntry *get_new_entry(Capability *cap,
} else {
// Current chunk is full: allocate a fresh one
StgTRecChunk *nc;
nc = new_stg_trec_chunk(cap);
nc = alloc_stg_trec_chunk(cap);
nc -> prev_chunk = c;
nc -> next_entry_idx = 1;
t -> current_chunk = nc;
......@@ -614,13 +705,13 @@ static StgBool validate_and_acquire_ownership (StgTRecHeader *trec,
result = FALSE;
BREAK_FOR_EACH;
}
e -> saw_update_by = s -> last_update_by;
e -> num_updates = s -> num_updates;
if (s -> current_value != e -> expected_value) {
TRACE("%p : doesn't match (race)\n", trec);
result = FALSE;
BREAK_FOR_EACH;
} else {
TRACE("%p : need to check update by %p\n", trec, e -> saw_update_by);
TRACE("%p : need to check version %d\n", trec, e -> num_updates);
}
});
}
......@@ -654,8 +745,8 @@ static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) {
StgTVar *s;
s = e -> tvar;
if (entry_is_read_only(e)) {
TRACE("%p : check_read_only for TVar %p, saw %p\n", trec, s, e -> saw_update_by);
if (s -> last_update_by != e -> saw_update_by) {
TRACE("%p : check_read_only for TVar %p, saw %d\n", trec, s, e -> num_updates);
if (s -> num_updates != e -> num_updates) {
// ||s -> current_value != e -> expected_value) {
TRACE("%p : mismatch\n", trec);
result = FALSE;
......@@ -672,31 +763,75 @@ static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) {
/************************************************************************/
void stmPreGCHook() {
nat i;
lock_stm(NO_TREC);
TRACE("stmPreGCHook\n");
for (i = 0; i < n_capabilities; i ++) {
Capability *cap = &capabilities[i];
cap -> free_tvar_wait_queues = END_STM_WAIT_QUEUE;
cap -> free_trec_chunks = END_STM_CHUNK_LIST;
cap -> free_trec_headers = NO_TREC;
}
unlock_stm(NO_TREC);
}
/************************************************************************/
void initSTM() {
TRACE("initSTM, NO_TREC=%p\n", NO_TREC);
// check_read_only relies on version numbers held in TVars' "num_updates"
// fields not wrapping around while a transaction is committed. The version
// number is incremented each time an update is committed to the TVar
// This is unlikely to wrap around when 32-bit integers are used for the counts,
// but to ensure correctness we maintain a shared count on the maximum
// number of commit operations that may occur and check that this has
// not increased by more than 2^32 during a commit.
#define TOKEN_BATCH_SIZE 1024
static volatile StgInt64 max_commits = 0;
static volatile StgBool token_locked = FALSE;
#if defined(SMP)
static void getTokenBatch(Capability *cap) {
while (cas(&token_locked, FALSE, TRUE) == TRUE) { /* nothing */ }
max_commits += TOKEN_BATCH_SIZE;
cap -> transaction_tokens = TOKEN_BATCH_SIZE;
token_locked = FALSE;
}
static void getToken(Capability *cap) {
if (cap -> transaction_tokens == 0) {
getTokenBatch(cap);
}
cap -> transaction_tokens --;
}
#else
static void getToken(Capability *cap STG_UNUSED) {
// Nothing
}
#endif
/*......................................................................*/
StgTRecHeader *stmStartTransaction(Capability *cap,
StgTRecHeader *outer) {
StgTRecHeader *t;
TRACE("%p : stmStartTransaction\n", outer);
t = new_stg_trec_header(cap, outer);
TRACE("%p : stmStartTransaction with %d tokens\n",
outer,
cap -> transaction_tokens);
getToken(cap);
t = alloc_stg_trec_header(cap, outer);
TRACE("%p : stmStartTransaction()=%p\n", outer, t);
return t;
}
/*......................................................................*/
void stmAbortTransaction(StgTRecHeader *trec) {
void stmAbortTransaction(Capability *cap,
StgTRecHeader *trec) {
TRACE("%p : stmAbortTransaction\n", trec);
ASSERT (trec != NO_TREC);
ASSERT ((trec -> state == TREC_ACTIVE) ||
......@@ -707,17 +842,20 @@ void stmAbortTransaction(StgTRecHeader *trec) {
if (trec -> state == TREC_WAITING) {
ASSERT (trec -> enclosing_trec == NO_TREC);
TRACE("%p : stmAbortTransaction aborting waiting transaction\n", trec);
remove_wait_queue_entries_for_trec(trec);
remove_wait_queue_entries_for_trec(cap, trec);
}
trec -> state = TREC_ABORTED;
unlock_stm(trec);
free_stg_trec_header(cap, trec);
TRACE("%p : stmAbortTransaction done\n", trec);
}
/*......................................................................*/
void stmCondemnTransaction(StgTRecHeader *trec) {
void stmCondemnTransaction(Capability *cap,
StgTRecHeader *trec) {
TRACE("%p : stmCondemnTransaction\n", trec);
ASSERT (trec != NO_TREC);
ASSERT ((trec -> state == TREC_ACTIVE) ||
......@@ -728,7 +866,7 @@ void stmCondemnTransaction(StgTRecHeader *trec) {
if (trec -> state == TREC_WAITING) {
ASSERT (trec -> enclosing_trec == NO_TREC);
TRACE("%p : stmCondemnTransaction condemning waiting transaction\n", trec);
remove_wait_queue_entries_for_trec(trec);
remove_wait_queue_entries_for_trec(cap, trec);
}
trec -> state = TREC_CONDEMNED;
unlock_stm(trec);
......@@ -781,6 +919,7 @@ StgBool stmValidateNestOfTransactions(StgTRecHeader *trec) {
StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
int result;
StgInt64 max_commits_at_start = max_commits;
TRACE("%p : stmCommitTransaction()\n", trec);
ASSERT (trec != NO_TREC);
......@@ -800,6 +939,14 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
TRACE("%p : doing read check\n", trec);
result = check_read_only(trec);
TRACE("%p : read-check %s\n", trec, result ? "succeeded" : "failed");
StgInt64 max_commits_at_end = max_commits;
StgInt64 max_concurrent_commits;
max_concurrent_commits = ((max_commits_at_end - max_commits_at_start) +
(n_capabilities * TOKEN_BATCH_SIZE));
if (((max_concurrent_commits >> 32) > 0) || shake()) {
result = FALSE;
}
}
if (result) {
......@@ -818,7 +965,7 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
TRACE("%p : writing %p to %p, waking waiters\n", trec, e -> new_value, s);
unpark_waiters_on(cap,s);
IF_STM_FG_LOCKS({
s -> last_update_by = trec;
s -> num_updates ++;
});
unlock_tvar(trec, s, e -> new_value, TRUE);
}
......@@ -831,6 +978,8 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
unlock_stm(trec);
free_stg_trec_header(cap, trec);
TRACE("%p : stmCommitTransaction()=%d\n", trec, result);
return result;
......@@ -883,6 +1032,8 @@ StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) {
unlock_stm(trec);
free_stg_trec_header(cap, trec);
TRACE("%p : stmCommitNestedTransaction()=%d\n", trec, result);
return result;
......@@ -922,6 +1073,7 @@ StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) {
} else {
unlock_stm(trec);
free_stg_trec_header(cap, trec);
}
TRACE("%p : stmWait(%p)=%d\n", trec, tso, result);
......@@ -930,14 +1082,14 @@ StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) {
void
stmWaitUnlock(Capability *cap, StgTRecHeader *trec) {
stmWaitUnlock(Capability *cap STG_UNUSED, StgTRecHeader *trec) {
revert_ownership(trec, TRUE);
unlock_stm(trec);
}
/*......................................................................*/
StgBool stmReWait(StgTSO *tso) {
StgBool stmReWait(Capability *cap, StgTSO *tso) {
int result;
StgTRecHeader *trec = tso->trec;
......@@ -960,9 +1112,9 @@ StgBool stmReWait(StgTSO *tso) {
// The transcation has become invalid. We can now remove it from the wait
// queues.
if (trec -> state != TREC_CONDEMNED) {
remove_wait_queue_entries_for_trec (trec);
remove_wait_queue_entries_for_trec (cap, trec);
}
free_stg_trec_header(cap, trec);
}
unlock_stm(trec);
......@@ -1099,7 +1251,7 @@ StgTVar *stmNewTVar(Capability *cap,
result -> current_value = new_value;
result -> first_wait_queue_entry = END_STM_WAIT_QUEUE;
#if defined(SMP)
result -> last_update_by = NO_TREC;
result -> num_updates = 0;
#endif
return result;
}
......
......@@ -3802,7 +3802,7 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
case ATOMICALLY_FRAME:
if (stop_at_atomically) {
ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
stmCondemnTransaction(tso -> trec);
stmCondemnTransaction(cap, tso -> trec);
#ifdef REG_R1
tso->sp = frame;
#else
......@@ -3829,8 +3829,10 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
// and will not be visible after the abort.
IF_DEBUG(stm,
debugBelch("Found atomically block delivering async exception\n"));
stmAbortTransaction(tso -> trec);
tso -> trec = stmGetEnclosingTRec(tso -> trec);
StgTRecHeader *trec = tso -> trec;
StgTRecHeader *outer = stmGetEnclosingTRec(trec);
stmAbortTransaction(cap, trec);
tso -> trec = outer;
break;
default:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment