Commit 454033b5 authored by Simon Marlow's avatar Simon Marlow

Add hs_try_putmvar()

Summary:
This is a fast, non-blocking, asynchronous, interface to tryPutMVar that
can be called from C/C++.

It's useful for callback-based C/C++ APIs: the idea is that the callback
invokes hs_try_putmvar(), and the Haskell code waits for the callback to
run by blocking in takeMVar.

The callback doesn't block - this is often a requirement of
callback-based APIs.  The callback wakes up the Haskell thread with
minimal overhead and no unnecessary context-switches.

There are a couple of benchmarks in
testsuite/tests/concurrent/should_run.  Some example results comparing
hs_try_putmvar() with using a standard foreign export:

    ./hs_try_putmvar003 1 64 16 100 +RTS -s -N4     0.49s
    ./hs_try_putmvar003 2 64 16 100 +RTS -s -N4     2.30s

hs_try_putmvar() is 4x faster for this workload (see the source for
hs_try_putmvar003.hs for details of the workload).

An alternative solution is to use the IO Manager for this.  We've tried
it, but there are problems with that approach:
* Need to create a new file descriptor for each callback
* The IO Manger thread(s) become a bottleneck
* More potential for things to go wrong, e.g. throwing an exception in
  an IO Manager callback kills the IO Manager thread.

Test Plan: validate; new unit tests

Reviewers: niteria, erikd, ezyang, bgamari, austin, hvr

Subscribers: thomie

Differential Revision: https://phabricator.haskell.org/D2501
parent 0e7ccf6d
......@@ -616,6 +616,125 @@ the threads have exited first. (Unofficially, if you want to use this
fast and loose version of ``hs_exit()``, then call
``shutdownHaskellAndExit()`` instead).
.. _hs_try_putmvar:
Waking up Haskell threads from C
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Sometimes we want to be able to wake up a Haskell thread from some C
code. For example, when using a callback-based C API, we register a C
callback and then we need to wait for the callback to run.
One way to do this is to create a ``foreign export`` that will do
whatever needs to be done to wake up the Haskell thread - perhaps
``putMVar`` - and then call this from our C callback. There are a
couple of problems with this:
1. Calling a foreign export has a lot of overhead: it creates a
complete new Haskell thread, for example.
2. The call may block for a long time if a GC is in progress. We
can't use this method if the C API we're calling doesn't allow
blocking in the callback.
For these reasons GHC provides an external API to ``tryPutMVar``,
``hs_try_putmvar``, which you can use to cheaply and asynchronously
wake up a Haskell thread from C/C++.
.. code-block:: c
void hs_try_putmvar (int capability, HsStablePtr sp);
The C call ``hs_try_putmvar(cap, mvar)`` is equivalent to the Haskell
call ``tryPutMVar mvar ()``, except that it is
* non-blocking: takes a bounded, short, amount of time
* asynchronous: the actual putMVar may be performed after the call
returns (for example, if the RTS is currently garbage collecting).
That's why ``hs_try_putmvar()`` doesn't return a result to say
whether the put succeeded. It is your responsibility to ensure that
the ``MVar`` is empty; if it is full, ``hs_try_putmvar()`` will have
no effect.
**Example**. Suppose we have a C/C++ function to call that will return and then
invoke a callback at some point in the future, passing us some data.
We want to wait in Haskell for the callback to be called, and retrieve
the data. We can do it like this:
.. code-block:: haskell
import GHC.Conc (newStablePtrPrimMVar, PrimMVar)
makeExternalCall = mask_ $ do
mvar <- newEmptyMVar
sp <- newStablePtrPrimMVar mvar
fp <- mallocForeignPtr
withForeignPtr fp $ \presult -> do
cap <- threadCapability =<< myThreadId
scheduleCallback sp cap presult
takeMVar mvar `onException`
forkIO (do takeMVar mvar; touchForeignPtr fp)
peek presult
foreign import ccall "scheduleCallback"
scheduleCallback :: StablePtr PrimMVar
-> Int
-> Ptr Result
-> IO ()
And inside ``scheduleCallback``, we create a callback that will in due
course store the result data in the ``Ptr Result``, and then call
``hs_try_putmvar()``.
There are a few things to note here.
* There's a special function to create the ``StablePtr``:
``newStablePtrPrimMVar``, because the RTS needs a ``StablePtr`` to
the primitive ``MVar#`` object, and we can't create that directly.
Do *not* just use ``newStablePtr`` on the ``MVar``: your program
will crash.
* The ``StablePtr`` is freed by ``hs_try_putmvar()``. This is because
it would otherwise be difficult to arrange to free the ``StablePtr``
reliably: we can't free it in Haskell, because if the ``takeMVar``
is interrupted by an asynchronous exception, then the callback will
fire at a later time. We can't free it in C, because we don't know
when to free it (not when ``hs_try_putmvar()`` returns, because that
is an async call that uses the ``StablePtr`` at some time in the
future).
* The ``mask_`` is to avoid asynchronous exceptions before the
``scheduleCallback`` call, which would leak the ``StablePtr``.
* We find out the current capability number and pass it to C. This is
passed back to ``hs_try_putmvar``, and helps the RTS to know which
capability it should try to perform the ``tryPutMVar`` on. If you
don't care, you can pass ``-1`` for the capability to
``hs_try_putmvar``, and it will pick an arbitrary one.
Picking the right capability will help avoid unnecessary context
switches. Ideally you should pass the capability that the thread
that will be woken up last ran on, which you can find by calling
``threadCapability`` in Haskell.
* If you want to also pass some data back from the C callback to
Haskell, this is best done by first allocating some memory in
Haskell to receive the data, and passing the address to C, as we did
in the above example.
* ``takeMVar`` can be interrupted by an asynchronous exception. If
this happens, the callback in C will still run at some point in the
future, will still write the result, and will still call
``hs_try_putmvar()``. Therefore we have to arrange that the memory
for the result stays alive until the callback has run, so if an
exception is thrown during ``takeMVar`` we fork another thread to
wait for the callback and hold the memory alive using
``touchForeignPtr``.
For a fully working example, see
``testsuite/tests/concurrent/should_run/hs_try_putmvar001.hs`` in the
GHC source tree.
.. _ffi-floating-point:
Floating point and the FFI
......
......@@ -113,8 +113,12 @@ extern StgPtr hs_spt_lookup(StgWord64 key[2]);
extern int hs_spt_keys(StgPtr keys[], int szKeys);
extern int hs_spt_key_count (void);
extern void hs_try_putmvar (int capability, HsStablePtr sp);
/* -------------------------------------------------------------------------- */
#ifdef __cplusplus
}
#endif
......
......@@ -50,6 +50,8 @@ module GHC.Conc
, threadStatus
, threadCapability
, newStablePtrPrimMVar, PrimMVar
-- * Waiting
, threadDelay
, registerDelay
......
......@@ -59,6 +59,8 @@ module GHC.Conc.Sync
, threadStatus
, threadCapability
, newStablePtrPrimMVar, PrimMVar
-- * Allocation counter and quota
, setAllocationCounter
, getAllocationCounter
......@@ -117,6 +119,7 @@ import GHC.MVar
import GHC.Ptr
import GHC.Real ( fromIntegral )
import GHC.Show ( Show(..), showString )
import GHC.Stable ( StablePtr(..) )
import GHC.Weak
infixr 0 `par`, `pseq`
......@@ -615,6 +618,17 @@ mkWeakThreadId t@(ThreadId t#) = IO $ \s ->
(# s1, w #) -> (# s1, Weak w #)
data PrimMVar
-- | Make a StablePtr that can be passed to the C function
-- @hs_try_putmvar()@. The RTS wants a 'StablePtr' to the underlying
-- 'MVar#', but a 'StablePtr#' can only refer to lifted types, so we
-- have to cheat by coercing.
newStablePtrPrimMVar :: MVar () -> IO (StablePtr PrimMVar)
newStablePtrPrimMVar (MVar m) = IO $ \s0 ->
case makeStablePtr# (unsafeCoerce# m :: PrimMVar) s0 of
(# s1, sp #) -> (# s1, StablePtr sp #)
-----------------------------------------------------------------------------
-- Transactional heap operations
-----------------------------------------------------------------------------
......
......@@ -266,6 +266,7 @@ initCapability (Capability *cap, uint32_t i)
cap->returning_tasks_tl = NULL;
cap->n_returning_tasks = 0;
cap->inbox = (Message*)END_TSO_QUEUE;
cap->putMVars = NULL;
cap->sparks = allocSparkPool();
cap->spark_stats.created = 0;
cap->spark_stats.dud = 0;
......
......@@ -123,6 +123,7 @@ struct Capability_ {
// returning_tasks_{hd,tl}
// wakeup_queue
// inbox
// putMVars
Mutex lock;
// Tasks waiting to return from a foreign call, or waiting to make
......@@ -138,6 +139,10 @@ struct Capability_ {
// Locks required: cap->lock
Message *inbox;
// putMVars are really messages, but they're allocated with malloc() so they
// can't go on the inbox queue: the GC would get confused.
struct PutMVar_ *putMVars;
SparkPool *sparks;
// Stats on spark creation/conversion
......@@ -378,6 +383,11 @@ extern uint32_t numa_map[MAX_NUMA_NODES];
Messages
-------------------------------------------------------------------------- */
typedef struct PutMVar_ {
StgStablePtr mvar;
struct PutMVar_ *link;
} PutMVar;
#ifdef THREADED_RTS
INLINE_HEADER rtsBool emptyInbox(Capability *cap);
......@@ -459,7 +469,8 @@ contextSwitchCapability (Capability *cap)
INLINE_HEADER rtsBool emptyInbox(Capability *cap)
{
return (cap->inbox == (Message*)END_TSO_QUEUE);
return (cap->inbox == (Message*)END_TSO_QUEUE &&
cap->putMVars == NULL);
}
#endif
......
......@@ -24,6 +24,7 @@
* modules these names are defined in.
*/
PRELUDE_CLOSURE(ghczmprim_GHCziTuple_Z0T_closure);
PRELUDE_CLOSURE(ghczmprim_GHCziTypes_True_closure);
PRELUDE_CLOSURE(ghczmprim_GHCziTypes_False_closure);
PRELUDE_CLOSURE(base_GHCziPack_unpackCString_closure);
......@@ -87,6 +88,7 @@ PRELUDE_INFO(base_GHCziWord_W64zh_con_info);
PRELUDE_INFO(base_GHCziStable_StablePtr_static_info);
PRELUDE_INFO(base_GHCziStable_StablePtr_con_info);
#define Unit_closure DLL_IMPORT_DATA_REF(ghczmprim_GHCziTuple_Z0T_closure)
#define True_closure DLL_IMPORT_DATA_REF(ghczmprim_GHCziTypes_True_closure)
#define False_closure DLL_IMPORT_DATA_REF(ghczmprim_GHCziTypes_False_closure)
#define unpackCString_closure DLL_IMPORT_DATA_REF(base_GHCziPack_unpackCString_closure)
......
......@@ -1739,6 +1739,13 @@ loop:
}
// NOTE: there is another implementation of this function in
// Threads.c:performTryPutMVar(). Keep them in sync! It was
// measurably slower to call the C function from here (70% for a
// tight loop doing tryPutMVar#).
//
// TODO: we could kill the duplication by making tryPutMVar# into an
// inline primop that expands into a C call to performTryPutMVar().
stg_tryPutMVarzh ( P_ mvar, /* :: MVar a */
P_ val, /* :: a */ )
{
......@@ -1812,6 +1819,7 @@ loop:
return (1);
}
stg_readMVarzh ( P_ mvar, /* :: MVar a */ )
{
W_ val, info, tso, q;
......
......@@ -16,6 +16,7 @@
#include "Schedule.h"
#include "Capability.h"
#include "Stable.h"
#include "Threads.h"
#include "Weak.h"
/* ----------------------------------------------------------------------------
......@@ -620,3 +621,77 @@ void rts_done (void)
freeMyTask();
}
/* -----------------------------------------------------------------------------
tryPutMVar from outside Haskell
The C call
hs_try_putmvar(cap, mvar)
is equivalent to the Haskell call
tryPutMVar mvar ()
but it is
* non-blocking: takes a bounded, short, amount of time
* asynchronous: the actual putMVar may be performed after the
call returns. That's why hs_try_putmvar() doesn't return a
result to say whether the put succeeded.
NOTE: this call transfers ownership of the StablePtr to the RTS, which will
free it after the tryPutMVar has taken place. The reason is that otherwise,
it would be very difficult for the caller to arrange to free the StablePtr
in all circumstances.
For more details, see the section "Waking up Haskell threads from C" in the
User's Guide.
-------------------------------------------------------------------------- */
void hs_try_putmvar (/* in */ int capability,
/* in */ HsStablePtr mvar)
{
Task *task = getTask();
Capability *cap;
if (capability < 0) {
capability = task->preferred_capability;
if (capability < 0) {
capability = 0;
}
}
cap = capabilities[capability % enabled_capabilities];
#if !defined(THREADED_RTS)
performTryPutMVar(cap, (StgMVar*)deRefStablePtr(mvar), Unit_closure);
freeStablePtr(mvar);
#else
ACQUIRE_LOCK(&cap->lock);
// If the capability is free, we can perform the tryPutMVar immediately
if (cap->running_task == NULL) {
cap->running_task = task;
task->cap = cap;
RELEASE_LOCK(&cap->lock);
performTryPutMVar(cap, (StgMVar*)deRefStablePtr(mvar), Unit_closure);
freeStablePtr(mvar);
// Wake up the capability, which will start running the thread that we
// just awoke (if there was one).
releaseCapability(cap);
} else {
PutMVar *p = stgMallocBytes(sizeof(PutMVar),"hs_try_putmvar");
// We cannot deref the StablePtr if we don't have a capability,
// so we have to store it and deref it later.
p->mvar = mvar;
p->link = cap->putMVars;
cap->putMVars = p;
RELEASE_LOCK(&cap->lock);
}
#endif
}
......@@ -723,7 +723,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
if (!emptyRunQueue(cap0)
|| cap0->n_returning_tasks != 0
|| cap0->inbox != (Message*)END_TSO_QUEUE) {
|| !emptyInbox(cap0)) {
// it already has some work, we just grabbed it at
// the wrong moment. Or maybe it's deadlocked!
releaseCapability(cap0);
......@@ -982,6 +982,7 @@ scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
{
#if defined(THREADED_RTS)
Message *m, *next;
PutMVar *p, *pnext;
int r;
Capability *cap = *pcap;
......@@ -1006,7 +1007,9 @@ scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
if (r != 0) return;
m = cap->inbox;
p = cap->putMVars;
cap->inbox = (Message*)END_TSO_QUEUE;
cap->putMVars = NULL;
RELEASE_LOCK(&cap->lock);
......@@ -1015,10 +1018,20 @@ scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
executeMessage(cap, m);
m = next;
}
while (p != NULL) {
pnext = p->link;
performTryPutMVar(cap, (StgMVar*)deRefStablePtr(p->mvar),
Unit_closure);
freeStablePtr(p->mvar);
stgFree(p);
p = pnext;
}
}
#endif
}
/* ----------------------------------------------------------------------------
* Activate spark threads (THREADED_RTS)
* ------------------------------------------------------------------------- */
......
......@@ -36,7 +36,6 @@ uint32_t peakWorkerCount;
static int tasksInitialized = 0;
static void freeTask (Task *task);
static Task * allocTask (void);
static Task * newTask (rtsBool);
#if defined(THREADED_RTS)
......@@ -117,8 +116,7 @@ freeTaskManager (void)
return tasksRunning;
}
static Task *
allocTask (void)
Task* getTask (void)
{
Task *task;
......@@ -209,7 +207,7 @@ newTask (rtsBool worker)
task->cap = NULL;
task->worker = worker;
task->stopped = rtsFalse;
task->stopped = rtsTrue;
task->running_finalizers = rtsFalse;
task->n_spare_incalls = 0;
task->spare_incalls = NULL;
......@@ -304,7 +302,7 @@ newBoundTask (void)
stg_exit(EXIT_FAILURE);
}
task = allocTask();
task = getTask();
task->stopped = rtsFalse;
......@@ -452,6 +450,7 @@ startWorkerTask (Capability *cap)
// A worker always gets a fresh Task structure.
task = newTask(rtsTrue);
task->stopped = rtsFalse;
// The lock here is to synchronise with taskStart(), to make sure
// that we have finished setting up the Task structure before the
......@@ -499,7 +498,7 @@ void rts_setInCallCapability (
int preferred_capability,
int affinity USED_IF_THREADS)
{
Task *task = allocTask();
Task *task = getTask();
task->preferred_capability = preferred_capability;
#ifdef THREADED_RTS
......
......@@ -150,7 +150,8 @@ typedef struct Task_ {
struct InCall_ *spare_incalls;
rtsBool worker; // == rtsTrue if this is a worker Task
rtsBool stopped; // this task has stopped or exited Haskell
rtsBool stopped; // == rtsTrue between newBoundTask and
// boundTaskExiting, or in a worker Task.
// So that we can detect when a finalizer illegally calls back into Haskell
rtsBool running_finalizers;
......@@ -205,7 +206,12 @@ uint32_t freeTaskManager (void);
// thread-local storage and will remain even after boundTaskExiting()
// has been called; to free the memory, see freeMyTask().
//
Task *newBoundTask (void);
Task* newBoundTask (void);
// Return the current OS thread's Task, which is created if it doesn't already
// exist. After you have finished using RTS APIs, you should call freeMyTask()
// to release this thread's Task.
Task* getTask (void);
// The current task is a bound task that is exiting.
//
......
......@@ -743,6 +743,85 @@ threadStackUnderflow (Capability *cap, StgTSO *tso)
return retvals;
}
/* ----------------------------------------------------------------------------
Implementation of tryPutMVar#
NOTE: this should be kept in sync with stg_tryPutMVarzh in PrimOps.cmm
------------------------------------------------------------------------- */
rtsBool performTryPutMVar(Capability *cap, StgMVar *mvar, StgClosure *value)
{
const StgInfoTable *info;
StgMVarTSOQueue *q;
StgTSO *tso;
info = lockClosure((StgClosure*)mvar);
if (mvar->value != &stg_END_TSO_QUEUE_closure) {
#if defined(THREADED_RTS)
unlockClosure((StgClosure*)mvar, info);
#endif
return rtsFalse;
}
q = mvar->head;
loop:
if (q == (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure) {
/* No further takes, the MVar is now full. */
if (info == &stg_MVAR_CLEAN_info) {
dirty_MVAR(&cap->r, (StgClosure*)mvar);
}
mvar->value = value;
unlockClosure((StgClosure*)mvar, &stg_MVAR_DIRTY_info);
return rtsTrue;
}
if (q->header.info == &stg_IND_info ||
q->header.info == &stg_MSG_NULL_info) {
q = (StgMVarTSOQueue*)((StgInd*)q)->indirectee;
goto loop;
}
// There are takeMVar(s) waiting: wake up the first one
tso = q->tso;
mvar->head = q->link;
if (mvar->head == (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure) {
mvar->tail = (StgMVarTSOQueue*)&stg_END_TSO_QUEUE_closure;
}
ASSERT(tso->block_info.closure == (StgClosure*)mvar);
// save why_blocked here, because waking up the thread destroys
// this information
StgWord why_blocked = tso->why_blocked;
// actually perform the takeMVar
StgStack* stack = tso->stackobj;
stack->sp[1] = (W_)value;
stack->sp[0] = (W_)&stg_ret_p_info;
// indicate that the MVar operation has now completed.
tso->_link = (StgTSO*)&stg_END_TSO_QUEUE_closure;
if (stack->dirty == 0) {
dirty_STACK(cap, stack);
}
tryWakeupThread(cap, tso);
// If it was an readMVar, then we can still do work,
// so loop back. (XXX: This could take a while)
if (why_blocked == BlockedOnMVarRead) {
q = ((StgMVarTSOQueue*)q)->link;
goto loop;
}
ASSERT(why_blocked == BlockedOnMVar);
unlockClosure((StgClosure*)mvar, info);
return rtsTrue;
}
/* ----------------------------------------------------------------------------
* Debugging: why is a thread blocked
* ------------------------------------------------------------------------- */
......
......@@ -41,6 +41,8 @@ StgBool isThreadBound (StgTSO* tso);
void threadStackOverflow (Capability *cap, StgTSO *tso);
W_ threadStackUnderflow (Capability *cap, StgTSO *tso);
rtsBool performTryPutMVar(Capability *cap, StgMVar *mvar, StgClosure *value);
#ifdef DEBUG
void printThreadBlockage (StgTSO *tso);
void printThreadStatus (StgTSO *t);
......
......@@ -103,6 +103,7 @@ ld-options:
, "-Wl,-u,_base_GHCziPtr_Ptr_con_info"
, "-Wl,-u,_base_GHCziPtr_FunPtr_con_info"
, "-Wl,-u,_base_GHCziStable_StablePtr_con_info"
, "-Wl,-u,_ghczmprim_GHCziTuple_Z0T_closure"
, "-Wl,-u,_ghczmprim_GHCziTypes_False_closure"
, "-Wl,-u,_ghczmprim_GHCziTypes_True_closure"
, "-Wl,-u,_base_GHCziPack_unpackCString_closure"
......@@ -199,6 +200,7 @@ ld-options:
, "-Wl,-u,base_GHCziPtr_Ptr_con_info"
, "-Wl,-u,base_GHCziPtr_FunPtr_con_info"
, "-Wl,-u,base_GHCziStable_StablePtr_con_info"
, "-Wl,-u,ghczmprim_GHCziTuple_Z0T_closure"
, "-Wl,-u,ghczmprim_GHCziTypes_False_closure"
, "-Wl,-u,ghczmprim_GHCziTypes_True_closure"
, "-Wl,-u,base_GHCziPack_unpackCString_closure"
......
......@@ -4,3 +4,9 @@ include $(TOP)/mk/test.mk
conc059_setup :
'$(TEST_HC)' $(TEST_HC_OPTS) -c conc059.hs
hs_try_putmvar002_setup :
'$(TEST_HC)' $(TEST_HC_OPTS) -c hs_try_putmvar002.hs
hs_try_putmvar003_setup :
'$(TEST_HC)' $(TEST_HC_OPTS) -c hs_try_putmvar003.hs
......@@ -255,3 +255,34 @@ test('setnumcapabilities001',
# omit ghci, which can't handle unboxed tuples:
test('compareAndSwap', [omit_ways(['ghci','hpc']), reqlib('primitive')], compile_and_run, [''])
test('hs_try_putmvar001',
[
when(opsys('mingw32'),skip), # uses pthread APIs in the C code
only_ways(['threaded1','threaded2']),
extra_clean(['hs_try_putmvar001_c.o'])],
compile_and_run,
['hs_try_putmvar001_c.c'])
# A benchmark for hs_try_putmvar() vs. foreign export
# This one should work for both threaded and non-threaded RTS
test('hs_try_putmvar002',
[
pre_cmd('$MAKE -s --no-print-directory hs_try_putmvar002_setup'),
extra_clean(['hs_try_putmvar002_c.o']),
extra_run_opts('1 8 10000')
],
compile_and_run,
['hs_try_putmvar002_c.c'])
# Another benchmark for hs_try_putmvar() vs. foreign export
test('hs_try_putmvar003',