Commit 29421899 authored by simonmar's avatar simonmar

[project @ 2000-01-14 13:39:59 by simonmar]

cleanup
parent f75c814c
/* ---------------------------------------------------------------------------
* $Id: Schedule.c,v 1.43 2000/01/14 13:17:16 hwloidl Exp $
* $Id: Schedule.c,v 1.44 2000/01/14 13:39:59 simonmar Exp $
*
* (c) The GHC Team, 1998-1999
*
......@@ -121,28 +121,10 @@ static StgMainThread *main_threads;
* Locks required: sched_mutex.
*/
#if DEBUG
char *whatNext_strs[] = {
"ThreadEnterGHC",
"ThreadRunGHC",
"ThreadEnterHugs",
"ThreadKilled",
"ThreadComplete"
};
char *threadReturnCode_strs[] = {
"HeapOverflow", /* might also be StackOverflow */
"StackOverflow",
"ThreadYielding",
"ThreadBlocked",
"ThreadFinished"
};
#endif
#if defined(GRAN)
StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
// rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c
/* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */
/*
In GranSim we have a runable and a blocked queue for each processor.
......@@ -276,16 +258,6 @@ StgTSO *MainTSO;
//@node Prototypes, Main scheduling loop, Variables and Data structures, Main scheduling code
//@subsection Prototypes
#if 0 && defined(GRAN)
// ToDo: replace these with macros
static /* inline */ void add_to_run_queue(StgTSO* tso);
static /* inline */ void push_on_run_queue(StgTSO* tso);
static /* inline */ StgTSO *take_off_run_queue(StgTSO *tso);
/* Thread management */
void initScheduler(void);
#endif
//@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
//@subsection Main scheduling loop
......@@ -425,7 +397,9 @@ schedule( void )
if (spark == NULL) {
break; /* no more sparks in the pool */
} else {
// I'd prefer this to be done in activateSpark -- HWL
/* I'd prefer this to be done in activateSpark -- HWL */
/* tricky - it needs to hold the scheduler lock and
* not try to re-acquire it -- SDM */
StgTSO *tso;
tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
pushClosure(tso,spark);
......@@ -521,7 +495,7 @@ schedule( void )
#if defined(GRAN)
# error ToDo: implement GranSim scheduler
#elif defined(PAR)
// ToDo: phps merge with spark activation above
/* ToDo: phps merge with spark activation above */
/* check whether we have local work and send requests if we have none */
if (run_queue_hd == END_TSO_QUEUE) { /* no runnable threads */
/* :-[ no local threads => look out for local sparks */
......@@ -544,10 +518,10 @@ schedule( void )
belch("== [%x] schedule: Created TSO %p (%d); %d threads active",
mytid, tso, tso->id, advisory_thread_count));
if (tso==END_TSO_QUEUE) { // failed to activate spark -> back to loop
if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
belch("^^ failed to activate spark");
goto next_thread;
} // otherwise fall through & pick-up new tso
} /* otherwise fall through & pick-up new tso */
} else {
IF_PAR_DEBUG(verbose,
belch("^^ no local sparks (spark pool contains only NFs: %d)",
......@@ -1509,21 +1483,6 @@ take_off_run_queue(StgTSO *tso) {
#endif /* 0 */
nat
run_queue_len(void)
{
nat i;
StgTSO *tso;
for (i=0, tso=run_queue_hd;
tso != END_TSO_QUEUE;
i++, tso=tso->link)
/* nothing */
return i;
}
//@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
//@subsection Garbage Collextion Routines
......@@ -1547,29 +1506,26 @@ static void GetRoots(void)
StgMainThread *m;
#if defined(GRAN)
nat i;
for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
run_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]);
if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
run_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]);
if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]);
if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]);
if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]);
{
nat i;
for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
run_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]);
if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
run_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]);
if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]);
if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]);
if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]);
}
}
markEventQueue();
#elif defined(PAR)
run_queue_hd = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd);
run_queue_tl = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl);
blocked_queue_hd = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd);
blocked_queue_tl = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl);
#else
#else /* !GRAN */
run_queue_hd = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd);
run_queue_tl = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl);
......@@ -1720,7 +1676,7 @@ threadStackOverflow(StgTSO *tso)
Wake up a queue that was blocked on some resource.
------------------------------------------------------------------------ */
// ToDo: check push_on_run_queue vs. PUSH_ON_RUN_QUEUE
/* ToDo: check push_on_run_queue vs. PUSH_ON_RUN_QUEUE */
#if defined(GRAN)
static inline void
......@@ -2010,266 +1966,6 @@ awakenBlockedQueue(StgTSO *tso)
}
#endif
#if 0
// ngoq ngo'
#if defined(GRAN)
/*
Awakening a blocking queue in GranSim means checking for each of the
TSOs in the queue whether they are local or not, issuing a ResumeThread
or an UnblockThread event, respectively. The basic iteration over the
blocking queue is the same as in the standard setup.
*/
void
awaken_blocked_queue(StgBlockingQueueElement *q, StgClosure *node)
{
StgBlockingQueueElement *bqe, *next;
StgTSO *tso;
PEs node_loc, tso_loc;
rtsTime bq_processing_time = 0;
nat len = 0, len_local = 0;
IF_GRAN_DEBUG(bq,
belch("## AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
node, CurrentProc, CurrentTime[CurrentProc],
CurrentTSO->id, CurrentTSO));
node_loc = where_is(node);
ASSERT(get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave
get_itbl(q)->type == CONSTR); // closure (type constructor)
ASSERT(is_unique(node));
/* FAKE FETCH: magically copy the node to the tso's proc;
no Fetch necessary because in reality the node should not have been
moved to the other PE in the first place
*/
if (CurrentProc!=node_loc) {
IF_GRAN_DEBUG(bq,
belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
node, node_loc, CurrentProc, CurrentTSO->id,
// CurrentTSO, where_is(CurrentTSO),
node->header.gran.procs));
node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
IF_GRAN_DEBUG(bq,
belch("## new bitmask of node %p is %#x",
node, node->header.gran.procs));
if (RtsFlags.GranFlags.GranSimStats.Global) {
globalGranStats.tot_fake_fetches++;
}
}
next = q;
// ToDo: check: ASSERT(CurrentProc==node_loc);
while (get_itbl(next)->type==TSO) { // q != END_TSO_QUEUE) {
bqe = next;
next = bqe->link;
/*
bqe points to the current element in the queue
next points to the next element in the queue
*/
tso = (StgTSO *)bqe; // wastes an assignment to get the type right
tso_loc = where_is(tso);
if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
/* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
bq_processing_time += RtsFlags.GranFlags.Costs.lunblocktime;
// insertThread(tso, node_loc);
new_event(tso_loc, tso_loc,
CurrentTime[CurrentProc]+bq_processing_time,
ResumeThread,
tso, node, (rtsSpark*)NULL);
tso->link = END_TSO_QUEUE; // overwrite link just to be sure
len_local++;
len++;
} else { // TSO is remote (actually should be FMBQ)
bq_processing_time += RtsFlags.GranFlags.Costs.mpacktime;
bq_processing_time += RtsFlags.GranFlags.Costs.gunblocktime;
new_event(tso_loc, CurrentProc,
CurrentTime[CurrentProc]+bq_processing_time+
RtsFlags.GranFlags.Costs.latency,
UnblockThread,
tso, node, (rtsSpark*)NULL);
tso->link = END_TSO_QUEUE; // overwrite link just to be sure
bq_processing_time += RtsFlags.GranFlags.Costs.mtidytime;
len++;
}
/* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
IF_GRAN_DEBUG(bq,
fprintf(stderr," %s TSO %d (%p) [PE %d] (blocked_on=%p) (next=%p) ,",
(node_loc==tso_loc ? "Local" : "Global"),
tso->id, tso, CurrentProc, tso->block_info.closure, tso->link))
tso->block_info.closure = NULL;
IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)",
tso->id, tso));
}
/* if this is the BQ of an RBH, we have to put back the info ripped out of
the closure to make room for the anchor of the BQ */
if (next!=END_BQ_QUEUE) {
ASSERT(get_itbl(node)->type == RBH && get_itbl(next)->type == CONSTR);
/*
ASSERT((info_ptr==&RBH_Save_0_info) ||
(info_ptr==&RBH_Save_1_info) ||
(info_ptr==&RBH_Save_2_info));
*/
/* cf. convertToRBH in RBH.c for writing the RBHSave closure */
((StgRBH *)node)->blocking_queue = ((StgRBHSave *)next)->payload[0];
((StgRBH *)node)->mut_link = ((StgRBHSave *)next)->payload[1];
IF_GRAN_DEBUG(bq,
belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
node, info_type(node)));
}
/* statistics gathering */
if (RtsFlags.GranFlags.GranSimStats.Global) {
globalGranStats.tot_bq_processing_time += bq_processing_time;
globalGranStats.tot_bq_len += len; // total length of all bqs awakened
globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only
globalGranStats.tot_awbq++; // total no. of bqs awakened
}
IF_GRAN_DEBUG(bq,
fprintf(stderr,"## BQ Stats of %p: [%d entries, %d local] %s\n",
node, len, len_local, (next!=END_TSO_QUEUE) ? "RBH" : ""));
}
#elif defined(PAR)
/*
Awakening a blocking queue in GUM has to check whether an entry in the
queue is a normal TSO or a BLOCKED_FETCH. The later indicates that a TSO is
waiting for the result of this computation on another PE. Thus, when
finding a BLOCKED_FETCH we have to send off a message to that PE.
Actually, we defer sending off a message, by just putting the BLOCKED_FETCH
onto the PendingFetches queue, which will be later traversed by
processFetches, sending off a RESUME message for each BLOCKED_FETCH.
NB: There is no check for an RBHSave closure (type CONSTR) in the code
below. The reason is, if we awaken the BQ of an RBH closure (RBHSaves
only exist at the end of such BQs) we know that the closure has been
unpacked successfully on the other PE, and we can discard the info
contained in the RBHSave closure. The current closure will be turned
into a FetchMe closure anyway.
*/
void
awaken_blocked_queue(StgBlockingQueueElement *q, StgClosure *node)
{
StgBlockingQueueElement *bqe, *next;
IF_PAR_DEBUG(verbose,
belch("## AwBQ for node %p on [%x]: ",
node, mytid));
ASSERT(get_itbl(q)->type == TSO ||
get_itbl(q)->type == BLOCKED_FETCH ||
get_itbl(q)->type == CONSTR);
next = q;
while (get_itbl(next)->type==TSO ||
get_itbl(next)->type==BLOCKED_FETCH) {
bqe = next;
switch (get_itbl(bqe)->type) {
case TSO:
/* if it's a TSO just push it onto the run_queue */
next = bqe->link;
#if defined(DEBUG)
((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging only
#endif
push_on_run_queue((StgTSO *)bqe); // HWL: was: PUSH_ON_RUN_QUEUE(tso);
/* write RESUME events to log file and
update blocked and fetch time (depending on type of the orig closure) */
if (RtsFlags.ParFlags.ParStats.Full) {
DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
0, spark_queue_len(ADVISORY_POOL));
switch (get_itbl(node)->type) {
case FETCH_ME_BQ:
((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
break;
case RBH:
case FETCH_ME:
case BLACKHOLE_BQ:
((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
break;
default:
barf("{awaken_blocked_queue}Daq Qagh: unexpected closure %p (%s) with blocking queue",
node, info_type(node));
}
}
/* reset block_info.closure field after dumping event */
((StgTSO *)bqe)->block_info.closure = NULL;
/* rest of this branch is debugging only */
IF_PAR_DEBUG(verbose,
fprintf(stderr," TSO %d (%p) [PE %lx] (block_info.closure=%p) (next=%p) ,",
((StgTSO *)bqe)->id, (StgTSO *)bqe,
mytid, ((StgTSO *)bqe)->block_info.closure, ((StgTSO *)bqe)->link));
IF_DEBUG(scheduler,
if (!RtsFlags.ParFlags.Debug.verbose)
belch("-- Waking up thread %ld (%p)",
((StgTSO *)bqe)->id, (StgTSO *)bqe));
break;
case BLOCKED_FETCH:
/* if it's a BLOCKED_FETCH put it on the PendingFetches list */
next = bqe->link;
bqe->link = PendingFetches;
PendingFetches = bqe;
// bqe.tso->block_info.closure = NULL;
/* rest of this branch is debugging only */
IF_PAR_DEBUG(verbose,
fprintf(stderr," BLOCKED_FETCH (%p) on node %p [PE %lx] (next=%p) ,",
((StgBlockedFetch *)bqe),
((StgBlockedFetch *)bqe)->node,
mytid, ((StgBlockedFetch *)bqe)->link));
break;
# if defined(DEBUG)
/* can ignore this case in a non-debugging setup;
see comments on RBHSave closures above */
case CONSTR:
/* check that the closure is an RBHSave closure */
ASSERT(get_itbl((StgClosure *)bqe) == &RBH_Save_0_info ||
get_itbl((StgClosure *)bqe) == &RBH_Save_1_info ||
get_itbl((StgClosure *)bqe) == &RBH_Save_2_info);
break;
default:
barf("{awaken_blocked_queue}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe),
(StgClosure *)bqe);
# endif
}
}
}
#else /* !GRAN && !PAR */
void
awaken_blocked_queue(StgTSO *q) { awakenBlockedQueue(q); }
/*
{
StgTSO *tso;
while (q != END_TSO_QUEUE) {
ASSERT(get_itbl(q)->type == TSO);
tso = q;
q = tso->link;
push_on_run_queue(tso); // HWL: was: PUSH_ON_RUN_QUEUE(tso);
//tso->block_info.closure = NULL;
IF_DEBUG(scheduler, belch("-- Waking up thread %ld (%p)", tso->id, tso));
}
}
*/
#endif /* GRAN */
#endif /* 0 */
//@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
//@subsection Exception Handling Routines
......@@ -2797,7 +2493,7 @@ print_bq (StgClosure *node)
for (tso = ((StgBlockingQueue*)node)->blocking_queue;
tso != END_TSO_QUEUE;
tso=tso->link) {
ASSERT(tso!=(StgTSO*)NULL && tso!=END_TSO_QUEUE); // sanity check
ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check
ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check
fprintf(stderr," TSO %d (%p),", tso->id, tso);
}
......@@ -2805,9 +2501,21 @@ print_bq (StgClosure *node)
}
# endif
/* A debugging function used all over the place in GranSim and GUM.
Dummy function in other setups.
*/
#if defined(PAR)
static nat
run_queue_len(void)
{
nat i;
StgTSO *tso;
for (i=0, tso=run_queue_hd;
tso != END_TSO_QUEUE;
i++, tso=tso->link)
/* nothing */
return i;
}
#endif
static void
sched_belch(char *s, ...)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment