Commit c520a3a2 authored by Simon Marlow's avatar Simon Marlow

Add a new primitive forkOn#, for forking a thread on a specific Capability

This gives some control over affinity, while we figure out the best
way to automatically schedule threads to make best use of the
available parallelism.

In addition to the primitive, there is also:
 
  GHC.Conc.forkOnIO :: Int -> IO () -> IO ThreadId

where 'forkOnIO i m' creates a thread on Capability (i `rem` N), where
N is the number of available Capabilities set by +RTS -N.

Threads forked by forkOnIO do not automatically migrate when there are
free Capabilities, like normal threads do.  Still, if you're using
forkOnIO exclusively, it's a good idea to do +RTS -qm to disable work
pushing anyway (work pushing takes too much time when the run queues
are large, this is something we need to fix).
parent 5ed93b10
......@@ -1441,6 +1441,13 @@ primop ForkOp "fork#" GenPrimOp
has_side_effects = True
out_of_line = True
primop ForkOnOp "forkOn#" GenPrimOp
Int# -> a -> State# RealWorld -> (# State# RealWorld, ThreadId# #)
with
usage = { mangle ForkOnOp [mkO, mkP] mkR }
has_side_effects = True
out_of_line = True
primop KillThreadOp "killThread#" GenPrimOp
ThreadId# -> a -> State# RealWorld -> State# RealWorld
with
......
......@@ -579,6 +579,7 @@ RTS_FUN(makeStablePtrzh_fast);
RTS_FUN(deRefStablePtrzh_fast);
RTS_FUN(forkzh_fast);
RTS_FUN(forkOnzh_fast);
RTS_FUN(yieldzh_fast);
RTS_FUN(killThreadzh_fast);
RTS_FUN(blockAsyncExceptionszh_fast);
......
......@@ -93,7 +93,13 @@ typedef StgWord32 StgThreadID;
*/
#define TSO_DIRTY 1
#define tsoDirty(tso) ((tso)->flags & TSO_DIRTY)
/*
* TSO_LOCKED is set when a TSO is locked to a particular Capability.
*/
#define TSO_LOCKED 2
#define tsoDirty(tso) ((tso)->flags & TSO_DIRTY)
#define tsoLocked(tso) ((tso)->flags & TSO_LOCKED)
/*
* Type returned after running a thread. Values of this type
......
......@@ -876,19 +876,45 @@ decodeDoublezh_fast
forkzh_fast
{
/* args: R1 = closure to spark */
MAYBE_GC(R1_PTR, forkzh_fast);
// create it right now, return ThreadID in R1
"ptr" R1 = foreign "C" createIOThread( MyCapability() "ptr",
W_ closure;
W_ threadid;
closure = R1;
"ptr" threadid = foreign "C" createIOThread( MyCapability() "ptr",
RtsFlags_GcFlags_initialStkSize(RtsFlags),
closure "ptr") [];
foreign "C" scheduleThread(MyCapability() "ptr", threadid "ptr") [];
// switch at the earliest opportunity
CInt[context_switch] = 1 :: CInt;
RET_P(threadid);
}
forkOnzh_fast
{
/* args: R1 = cpu, R2 = closure to spark */
MAYBE_GC(R2_PTR, forkOnzh_fast);
W_ cpu;
W_ closure;
W_ threadid;
cpu = R1;
closure = R2;
"ptr" threadid = foreign "C" createIOThread( MyCapability() "ptr",
RtsFlags_GcFlags_initialStkSize(RtsFlags),
R1 "ptr") [R1];
foreign "C" scheduleThread(MyCapability() "ptr", R1 "ptr") [R1];
closure "ptr") [];
foreign "C" scheduleThreadOn(MyCapability() "ptr", cpu, threadid "ptr") [];
// switch at the earliest opportunity
CInt[context_switch] = 1 :: CInt;
RET_P(R1);
RET_P(threadid);
}
yieldzh_fast
......
......@@ -204,6 +204,7 @@ static void schedulePushWork(Capability *cap, Task *task);
#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
static void scheduleCheckBlackHoles (Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
#if defined(GRAN)
......@@ -482,20 +483,7 @@ schedule (Capability *initialCapability, Task *task)
// list each time around the scheduler.
if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
// Any threads that were woken up by other Capabilities get
// appended to our run queue.
if (!emptyWakeupQueue(cap)) {
ACQUIRE_LOCK(&cap->lock);
if (emptyRunQueue(cap)) {
cap->run_queue_hd = cap->wakeup_queue_hd;
cap->run_queue_tl = cap->wakeup_queue_tl;
} else {
cap->run_queue_tl->link = cap->wakeup_queue_hd;
cap->run_queue_tl = cap->wakeup_queue_tl;
}
cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
RELEASE_LOCK(&cap->lock);
}
scheduleCheckWakeupThreads(cap);
scheduleCheckBlockedThreads(cap);
......@@ -841,7 +829,8 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
next = t->link;
t->link = END_TSO_QUEUE;
if (t->what_next == ThreadRelocated
|| t->bound == task) { // don't move my bound thread
|| t->bound == task // don't move my bound thread
|| tsoLocked(t)) { // don't move a locked thread
prev->link = t;
prev = t;
} else if (i == n_free_caps) {
......@@ -927,6 +916,31 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
}
/* ----------------------------------------------------------------------------
* Check for threads woken up by other Capabilities
* ------------------------------------------------------------------------- */
static void
scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
{
#if defined(THREADED_RTS)
// Any threads that were woken up by other Capabilities get
// appended to our run queue.
if (!emptyWakeupQueue(cap)) {
ACQUIRE_LOCK(&cap->lock);
if (emptyRunQueue(cap)) {
cap->run_queue_hd = cap->wakeup_queue_hd;
cap->run_queue_tl = cap->wakeup_queue_tl;
} else {
cap->run_queue_tl->link = cap->wakeup_queue_hd;
cap->run_queue_tl = cap->wakeup_queue_tl;
}
cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
RELEASE_LOCK(&cap->lock);
}
#endif
}
/* ----------------------------------------------------------------------------
* Check for threads blocked on BLACKHOLEs that can be woken up
* ------------------------------------------------------------------------- */
......@@ -2709,6 +2723,28 @@ scheduleThread(Capability *cap, StgTSO *tso)
appendToRunQueue(cap,tso);
}
void
scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
{
#if defined(THREADED_RTS)
tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
// move this thread from now on.
cpu %= RtsFlags.ParFlags.nNodes;
if (cpu == cap->no) {
appendToRunQueue(cap,tso);
} else {
Capability *target_cap = &capabilities[cpu];
if (tso->bound) {
tso->bound->cap = target_cap;
}
tso->cap = target_cap;
wakeupThreadOnCapability(target_cap,tso);
}
#else
appendToRunQueue(cap,tso);
#endif
}
Capability *
scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
{
......@@ -3244,7 +3280,8 @@ unblockOne(Capability *cap, StgTSO *tso)
next = tso->link;
tso->link = END_TSO_QUEUE;
if (RtsFlags.ParFlags.wakeupMigrate || tso->cap == cap) {
#if defined(THREADED_RTS)
if (tso->cap == cap || (!tsoLocked(tso) && RtsFlags.ParFlags.wakeupMigrate)) {
// We are waking up this thread on the current Capability, which
// might involve migrating it from the Capability it was last on.
if (tso->bound) {
......@@ -3260,6 +3297,10 @@ unblockOne(Capability *cap, StgTSO *tso)
// we'll try to wake it up on the Capability it was last on.
wakeupThreadOnCapability(tso->cap, tso);
}
#else
appendToRunQueue(cap,tso);
context_switch = 1;
#endif
IF_DEBUG(scheduler,sched_belch("waking up thread %ld on cap %d", (long)tso->id, tso->cap->no));
return next;
......
......@@ -20,9 +20,14 @@
void initScheduler (void);
void exitScheduler (void);
// Place a new thread on the run queue of the specified Capability
// Place a new thread on the run queue of the current Capability
void scheduleThread (Capability *cap, StgTSO *tso);
// Place a new thread on the run queue of a specified Capability
// (cap is the currently owned Capability, cpu is the number of
// the desired Capability).
void scheduleThreadOn(Capability *cap, StgWord cpu, StgTSO *tso);
/* awakenBlockedQueue()
*
* Takes a pointer to the beginning of a blocked TSO queue, and
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment