Skip to content
Snippets Groups Projects
Commit d5f42045 authored by Luite Stegeman's avatar Luite Stegeman Committed by Marge Bot
Browse files

Interpreter: Add locking for communication with external interpreter

This adds locking to communication with the external interpreter
to prevent concurrent tasks interfering with each other. This
fixes Template Haskell with the external interpreter in parallel (-j)
builds.

Fixes #25083
parent 25edf849
No related branches found
No related tags found
No related merge requests found
...@@ -578,10 +578,12 @@ spawnIServ conf = do ...@@ -578,10 +578,12 @@ spawnIServ conf = do
[] []
(iservConfOpts conf) (iservConfOpts conf)
lo_ref <- newIORef Nothing lo_ref <- newIORef Nothing
lock <- newMVar ()
let pipe = Pipe { pipeRead = rh, pipeWrite = wh, pipeLeftovers = lo_ref } let pipe = Pipe { pipeRead = rh, pipeWrite = wh, pipeLeftovers = lo_ref }
let process = InterpProcess let process = InterpProcess
{ interpHandle = ph { interpHandle = ph
, interpPipe = pipe , interpPipe = pipe
, interpLock = lock
} }
pending_frees <- newMVar [] pending_frees <- newMVar []
......
...@@ -130,10 +130,12 @@ startTHRunnerProcess interp_js settings = do ...@@ -130,10 +130,12 @@ startTHRunnerProcess interp_js settings = do
std_in <- readIORef interp_in std_in <- readIORef interp_in
lo_ref <- newIORef Nothing lo_ref <- newIORef Nothing
lock <- newMVar ()
let pipe = Pipe { pipeRead = rh, pipeWrite = wh, pipeLeftovers = lo_ref } let pipe = Pipe { pipeRead = rh, pipeWrite = wh, pipeLeftovers = lo_ref }
let proc = InterpProcess let proc = InterpProcess
{ interpHandle = hdl { interpHandle = hdl
, interpPipe = pipe , interpPipe = pipe
, interpLock = lock
} }
pure (std_in, proc) pure (std_in, proc)
......
{-# LANGUAGE LambdaCase #-}
module GHC.Runtime.Interpreter.Process module GHC.Runtime.Interpreter.Process
( (
-- * Low-level API
callInterpProcess
, readInterpProcess
, writeInterpProcess
-- * Message API -- * Message API
, Message(..) Message(..)
, DelayedResponse (..) , DelayedResponse (..)
-- * Top-level message API (these acquire/release a lock)
, sendMessage , sendMessage
, sendMessageNoResponse , sendMessageNoResponse
, sendMessageDelayedResponse , sendMessageDelayedResponse
, receiveDelayedResponse
-- * Nested message API (these require the interpreter to already be locked)
, sendAnyValue , sendAnyValue
, receiveAnyValue , receiveAnyValue
, receiveDelayedResponse
, receiveTHMessage , receiveTHMessage
) )
where where
...@@ -31,45 +28,79 @@ import GHC.Utils.Exception as Ex ...@@ -31,45 +28,79 @@ import GHC.Utils.Exception as Ex
import Data.Binary import Data.Binary
import System.Exit import System.Exit
import System.Process import System.Process
import Control.Concurrent.MVar (MVar, withMVar, takeMVar, putMVar, isEmptyMVar)
data DelayedResponse a = DelayedResponse data DelayedResponse a = DelayedResponse
-- -----------------------------------------------------------------------------
-- Top-level Message API
-- | Send a message to the interpreter process that doesn't expect a response -- | Send a message to the interpreter process that doesn't expect a response
-- (locks the interpreter while sending)
sendMessageNoResponse :: ExtInterpInstance d -> Message () -> IO () sendMessageNoResponse :: ExtInterpInstance d -> Message () -> IO ()
sendMessageNoResponse i m = writeInterpProcess (instProcess i) (putMessage m) sendMessageNoResponse i m =
withLock i $ writeInterpProcess (instProcess i) (putMessage m)
-- | Send a message to the interpreter that excepts a response -- | Send a message to the interpreter that expects a response
-- (locks the interpreter while until the response is received)
sendMessage :: Binary a => ExtInterpInstance d -> Message a -> IO a sendMessage :: Binary a => ExtInterpInstance d -> Message a -> IO a
sendMessage i m = callInterpProcess (instProcess i) m sendMessage i m = withLock i $ callInterpProcess (instProcess i) m
-- | Send a message to the interpreter process whose response is expected later -- | Send a message to the interpreter process whose response is expected later
-- --
-- This is useful to avoid forgetting to receive the value and to ensure that -- This is useful to avoid forgetting to receive the value and to ensure that
-- the type of the response isn't lost. Use receiveDelayedResponse to read it. -- the type of the response isn't lost. Use receiveDelayedResponse to read it.
-- (locks the interpreter until the response is received using
-- `receiveDelayedResponse`)
sendMessageDelayedResponse :: ExtInterpInstance d -> Message a -> IO (DelayedResponse a) sendMessageDelayedResponse :: ExtInterpInstance d -> Message a -> IO (DelayedResponse a)
sendMessageDelayedResponse i m = do sendMessageDelayedResponse i m = do
lock i
writeInterpProcess (instProcess i) (putMessage m) writeInterpProcess (instProcess i) (putMessage m)
pure DelayedResponse pure DelayedResponse
-- | Send any value -- | Expect a delayed result to be received now
receiveDelayedResponse :: Binary a => ExtInterpInstance d -> DelayedResponse a -> IO a
receiveDelayedResponse i DelayedResponse = do
ensureLocked i
r <- readInterpProcess (instProcess i) get
unlock i
pure r
-- -----------------------------------------------------------------------------
-- Nested Message API
-- | Send any value (requires locked interpreter)
sendAnyValue :: Binary a => ExtInterpInstance d -> a -> IO () sendAnyValue :: Binary a => ExtInterpInstance d -> a -> IO ()
sendAnyValue i m = writeInterpProcess (instProcess i) (put m) sendAnyValue i m = ensureLocked i >> writeInterpProcess (instProcess i) (put m)
-- | Expect a value to be received -- | Expect a value to be received (requires locked interpreter)
receiveAnyValue :: ExtInterpInstance d -> Get a -> IO a receiveAnyValue :: ExtInterpInstance d -> Get a -> IO a
receiveAnyValue i get = readInterpProcess (instProcess i) get receiveAnyValue i get = ensureLocked i >> readInterpProcess (instProcess i) get
-- | Expect a delayed result to be received now -- | Wait for a Template Haskell message (requires locked interpreter)
receiveDelayedResponse :: Binary a => ExtInterpInstance d -> DelayedResponse a -> IO a
receiveDelayedResponse i DelayedResponse = readInterpProcess (instProcess i) get
-- | Expect a value to be received
receiveTHMessage :: ExtInterpInstance d -> IO THMsg receiveTHMessage :: ExtInterpInstance d -> IO THMsg
receiveTHMessage i = receiveAnyValue i getTHMessage receiveTHMessage i = ensureLocked i >> receiveAnyValue i getTHMessage
-- ----------------------------------------------------------------------------- -- -----------------------------------------------------------------------------
-- Low-level API
getLock :: ExtInterpInstance d -> MVar ()
getLock = interpLock . instProcess
withLock :: ExtInterpInstance d -> IO a -> IO a
withLock i f = withMVar (getLock i) (const f)
lock :: ExtInterpInstance d -> IO ()
lock i = takeMVar (getLock i)
unlock :: ExtInterpInstance d -> IO ()
unlock i = putMVar (getLock i) ()
ensureLocked :: ExtInterpInstance d -> IO ()
ensureLocked i =
isEmptyMVar (getLock i) >>= \case
False -> panic "ensureLocked: external interpreter not locked"
_ -> pure ()
-- | Send a 'Message' and receive the response from the interpreter process -- | Send a 'Message' and receive the response from the interpreter process
callInterpProcess :: Binary a => InterpProcess -> Message a -> IO a callInterpProcess :: Binary a => InterpProcess -> Message a -> IO a
......
...@@ -90,6 +90,7 @@ type WasmInterp = ExtInterpState WasmInterpConfig () ...@@ -90,6 +90,7 @@ type WasmInterp = ExtInterpState WasmInterpConfig ()
data InterpProcess = InterpProcess data InterpProcess = InterpProcess
{ interpPipe :: !Pipe -- ^ Pipe to communicate with the server { interpPipe :: !Pipe -- ^ Pipe to communicate with the server
, interpHandle :: !ProcessHandle -- ^ Process handle of the server , interpHandle :: !ProcessHandle -- ^ Process handle of the server
, interpLock :: !(MVar ()) -- ^ Lock to prevent concurrent access to the stream
} }
-- | Status of an external interpreter -- | Status of an external interpreter
......
...@@ -62,12 +62,14 @@ spawnWasmInterp WasmInterpConfig {..} = do ...@@ -62,12 +62,14 @@ spawnWasmInterp WasmInterpConfig {..} = do
hSetBuffering rh NoBuffering hSetBuffering rh NoBuffering
lo_ref <- newIORef Nothing lo_ref <- newIORef Nothing
pending_frees <- newMVar [] pending_frees <- newMVar []
lock <- newMVar ()
pure pure
$ ExtInterpInstance $ ExtInterpInstance
{ instProcess = { instProcess =
InterpProcess InterpProcess
{ interpHandle = ph, { interpHandle = ph,
interpPipe = Pipe {pipeRead = rh, pipeWrite = wh, pipeLeftovers = lo_ref} interpPipe = Pipe {pipeRead = rh, pipeWrite = wh, pipeLeftovers = lo_ref},
interpLock = lock
}, },
instPendingFrees = pending_frees, instPendingFrees = pending_frees,
instExtra = () instExtra = ()
......
{-
T25083_A and T25083_B contain a long-running (100ms) Template Haskell splice.
Run this with -fexternal-interpreter -j to check that we properly synchronize
the communication with the external interpreter.
This test will fail with a timeout or serialization error if communication
is not correctly serialized.
-}
{-# LANGUAGE TemplateHaskell, QuasiQuotes #-}
import Language.Haskell.TH
import Control.Concurrent
import T25083_A
import T25083_B
main :: IO ()
main = do
print ta
print tb
0
42
{-# LANGUAGE TemplateHaskell, QuasiQuotes #-}
module T25083_A where
import Control.Concurrent
import Language.Haskell.TH
ta :: Integer
ta =
$(do runIO (threadDelay 100000)
litE . integerL . toInteger . length =<< reifyInstances ''Show [])
{-# LANGUAGE TemplateHaskell, QuasiQuotes #-}
module T25083_B where
import Control.Concurrent
import Language.Haskell.TH
tb :: Integer
tb = $(runIO (threadDelay 100000) >> [| 42 |])
...@@ -631,3 +631,4 @@ test('T25252', ...@@ -631,3 +631,4 @@ test('T25252',
req_th, req_th,
req_c], req_c],
compile_and_run, ['-fPIC T25252_c.c']) compile_and_run, ['-fPIC T25252_c.c'])
test('T25083', [extra_files(['T25083_A.hs', 'T25083_B.hs'])], multimod_compile_and_run, ['T25083', '-v0 -j'])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment