Commit 50e9df49 authored by Dylan Yudaken's avatar Dylan Yudaken Committed by Marge Bot

When using rts_setInCallCapability, lock incall threads

This diff makes sure that incall threads, when using `rts_setInCallCapability`, will be created as locked.
If the thread is not locked, the thread might end up being scheduled to a different capability.
While this is mentioned in the docs for `rts_setInCallCapability,`, it makes the method significantly less useful as there is no guarantees on the capability being used.

This commit also adds a test to make sure things stay on the correct capability.
parent 451455fd
......@@ -415,7 +415,7 @@ f_helper(StablePtr s, HsBool b, HsInt i)
{
Capability *cap;
cap = rts_lock();
rts_evalIO(&cap,
rts_inCall(&cap,
rts_apply(rts_apply(deRefStablePtr(s),
rts_mkBool(b)), rts_mkInt(i)));
rts_unlock(cap);
......@@ -630,7 +630,7 @@ mkFExportCBits dflags c_nm maybe_target arg_htys res_hty is_IO_res_ty cc
| otherwise
= cResType <+> pprCconv <+> ftext c_nm <> parens fun_args
-- the target which will form the root of what we ask rts_evalIO to run
-- the target which will form the root of what we ask rts_inCall to run
the_cfun
= case maybe_target of
Nothing -> text "(StgClosure*)deRefStablePtr(the_stableptr)"
......@@ -638,7 +638,7 @@ mkFExportCBits dflags c_nm maybe_target arg_htys res_hty is_IO_res_ty cc
cap = text "cap" <> comma
-- the expression we give to rts_evalIO
-- the expression we give to rts_inCall
expr_to_run
= foldl' appArg the_cfun arg_info -- NOT aug_arg_info
where
......@@ -674,7 +674,7 @@ mkFExportCBits dflags c_nm maybe_target arg_htys res_hty is_IO_res_ty cc
, declareCResult
, text "cap = rts_lock();"
-- create the application + perform it.
, text "rts_evalIO" <> parens (
, text "rts_inCall" <> parens (
char '&' <> cap <>
text "rts_apply" <> parens (
cap <>
......
......@@ -374,10 +374,6 @@ Capability *rts_unsafeGetMyCapability (void);
// into Haskell. The actual capability will be calculated as the supplied
// value modulo the number of enabled Capabilities.
//
// Note that the thread may still be migrated by the RTS scheduler, but that
// will only happen if there are multiple threads running on one Capability and
// another Capability is free.
//
// If affinity is non-zero, the current thread will be bound to
// specific CPUs according to the prevailing affinity policy for the
// specified capability, set by either +RTS -qa or +RTS --numa.
......@@ -479,6 +475,10 @@ void rts_evalLazyIO_ (/* inout */ Capability **,
/* in */ unsigned int stack_size,
/* out */ HaskellObj *ret);
void rts_inCall (/* inout */ Capability **,
/* in */ HaskellObj p,
/* out */ HaskellObj *ret);
void rts_checkSchedStatus (char* site, Capability *);
SchedulerStatus rts_getSchedStatus (Capability *cap);
......
......@@ -460,6 +460,26 @@ void rts_evalIO (/* inout */ Capability **cap,
scheduleWaitThread(tso,ret,cap);
}
/*
* rts_inCall() is similar to rts_evalIO, but expects to be called as an incall,
* and is not expected to be called by user code directly.
*/
void rts_inCall (/* inout */ Capability **cap,
/* in */ HaskellObj p,
/* out */ HaskellObj *ret)
{
StgTSO* tso;
tso = createStrictIOThread(*cap, RtsFlags.GcFlags.initialStkSize, p);
if ((*cap)->running_task->preferred_capability != -1) {
// enabled_capabilities should not change between here and waitCapability()
ASSERT((*cap)->no == ((*cap)->running_task->preferred_capability % enabled_capabilities));
// we requested explicit affinity; don't move this thread from now on.
tso->flags |= TSO_LOCKED;
}
scheduleWaitThread(tso,ret,cap);
}
/*
* rts_evalStableIOMain() is suitable for calling main Haskell thread
* stored in (StablePtr (IO a)) it calls rts_evalStableIO but wraps
......
......@@ -763,6 +763,7 @@
SymI_HasProto(rts_evalStableIOMain) \
SymI_HasProto(rts_evalStableIO) \
SymI_HasProto(rts_eval_) \
SymI_HasProto(rts_inCall) \
SymI_HasProto(rts_getBool) \
SymI_HasProto(rts_getChar) \
SymI_HasProto(rts_getDouble) \
......
module Lib (capTest) where
import Control.Concurrent
import Control.Exception
import Control.Concurrent.MVar
import Control.Monad (when)
import System.Exit
foreign export ccall "capTest" capTest :: IO Int
capTest :: IO Int
capTest = catch go handle
where
handle :: SomeException -> IO Int
handle e = do
putStrLn $ "Failed " ++ (show e)
return (-1)
getCap = fmap fst $ threadCapability =<< myThreadId
go = do
when (not rtsSupportsBoundThreads) $
die "This test requires -threaded"
mvar <- newEmptyMVar
mvar2 <- newEmptyMVar
(cap, locked) <- threadCapability =<< myThreadId
forkOn cap $ do
putMVar mvar =<< getCap
takeMVar mvar2
-- if cap is locked, then this would get scheduled on a different
-- capacity.
fCap <- takeMVar mvar
putMVar mvar2 ()
cap2 <- getCap
when (fCap /= cap) (fail "expected cap to be the same")
when (cap2 /= cap) (fail "expected cap to be the same when returning")
when (not locked) (fail "expected to be locked")
return cap
#include "HsFFI.h"
#include <stdio.h>
#include "Rts.h"
#include <pthread.h>
#define THREADS 6
#define OK 9999
static OSThreadId ids[THREADS];
static int results[THREADS];
static int waiters = 0;
static int done = 0;
static Condition cond;
static Mutex mutex;
HsInt capTest();
void* OSThreadProcAttr go(void *info)
{
int cap;
int res;
int threadNum = *(int*)(info);
// divide everything onto two caps (if there are two)
cap = (threadNum % 2) % enabled_capabilities;
OS_ACQUIRE_LOCK(&mutex);
waiters++;
if (waiters == THREADS) {
broadcastCondition(&cond);
} else {
while(waiters != THREADS) {
waitCondition(&cond, &mutex);
}
}
OS_RELEASE_LOCK(&mutex);
rts_setInCallCapability(cap, 0);
res = capTest();
*(int*)info = res == cap ? OK : res;
OS_ACQUIRE_LOCK(&mutex);
done++;
broadcastCondition(&cond);
OS_RELEASE_LOCK(&mutex);
return 0;
}
int main(int argc, char *argv[])
{
int n;
bool ok;
hs_init(&argc, &argv);
initCondition(&cond);
initMutex(&mutex);
waiters = 0;
done = 0;
ok = true;
for (n=0; n < THREADS; n++) {
results[n] = n;
if (createOSThread(&ids[n], "test", go, (void*)&results[n])) {
printf("unable to create thread %d\n", n);
exit(1);
}
}
OS_ACQUIRE_LOCK(&mutex);
while(done != THREADS) {
waitCondition(&cond, &mutex);
}
OS_RELEASE_LOCK(&mutex);
for (n = 0; n < THREADS; n++) {
if (results[n] != OK) {
printf("%d: unexpected result was %d\n", n, results[n]);
ok = false;
}
}
hs_exit();
return ok ? 0 : 1;
}
......@@ -218,3 +218,5 @@ test('UnliftedNewtypesByteArrayOffset', [omit_ways(['ghci'])], compile_and_run,
test('T17471', [omit_ways(['ghci'])], compile_and_run,
['T17471_c.c -optc-D -optcFOO'])
test('IncallAffinity', [req_smp, only_ways(['threaded1', 'threaded2'])], compile_and_run, ['IncallAffinity_c.c -no-hs-main'])
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment