diff --git a/smp/threads007/Control/Concurrent/CML.hs b/smp/threads007/Control/Concurrent/CML.hs new file mode 100644 index 0000000000000000000000000000000000000000..4c0a0e8784d988b7c82368a08a8c613518f00436 --- /dev/null +++ b/smp/threads007/Control/Concurrent/CML.hs @@ -0,0 +1,261 @@ +-------------------------------------------------------------------------------- +-- | +-- Module : Control.Concurrent.CML +-- Copyright : Avik Chaudhuri 2009 (avik@cs.ucsc.edu) +-- License : BSD3 +-- +-- Maintainer : ben.franksen@online.de +-- Stability : provisional +-- Portability : portable +-- +-- Events and Channels as in Concurrent ML (extended with communication guards) +-- +-- See /A Concurrent ML Library in Concurrent Haskell/ by Avik Chaudhuri +-- (avik\@cs.ucsc.edu). The original code as well as the papers can be +-- found at <http://www.cs.umd.edu/~avik/projects/cmllch/>. +-------------------------------------------------------------------------------- +module Control.Concurrent.CML ( + -- * Channels + -- $channels + Channel, + channel, + receive, + transmit, + -- * Events + -- $events + Event, + sync, + choose, + wrap, + guard, + wrapabort, + spawn +) where + +import Control.Concurrent.MVar(MVar, newEmptyMVar, putMVar, takeMVar) +import Control.Concurrent(ThreadId, forkIO) +import Control.Monad.Fix(fix) +import Control.Monad(foldM, forever) +import Data.Maybe(isJust) + +-------------------------------------------------------------------------------- + +type Commit = MVar Bool +type Decision = MVar (Maybe Commit) +type Candidate = MVar (Maybe Decision) +type In a = MVar (Candidate, a -> Bool, Synchronizer) +type Out a = MVar (Candidate, a, Synchronizer) + +-- | Values of type @a@ can be transported over channels of type @Channel a@. +data Channel a = Channel (In a) (Out a) (MVar a) + +instance Eq (Channel a) where + Channel _ _ m1 == Channel _ _ m2 = m1 == m2 + +type Point = MVar () +type Name = MVar [Point] +type Abort = MVar ([Point], IO ()) +type Synchronizer = MVar (Point, Decision) + +-- | Events return a value on synchronization. +-- +-- Note that by construction, an event can synchronize at exactly one +-- /commit point/, where a message is either sent or accepted on a +-- channel. This commit point may be selected among several other, +-- potential commit points. Some code may be run before +-- synchronization, as specified by 'guard' functions throughout the +-- event. Some more code may be run after synchronization, as specified +-- by 'wrap' functions that surround the commit point, and by 'wrapabort' +-- functions that do not surround the commit point. +newtype Event a = Event (Synchronizer -> Abort -> Name -> IO a) + +-------------------------------------------------------------------------------- + +atchan :: In a -> Out a -> IO () +atchan i o = do + (cand_i,patt,si) <- takeMVar i + (cand_o,y,so) <- takeMVar o + if (patt y && si /= so) + then do + dec_i <- newEmptyMVar + putMVar cand_i (Just dec_i) + ki <- takeMVar dec_i + dec_o <- newEmptyMVar + putMVar cand_o (Just dec_o) + ko <- takeMVar dec_o + maybe (return ()) (\ci -> putMVar ci (isJust ko)) ki + maybe (return ()) (\co -> putMVar co (isJust ki)) ko + else do + putMVar cand_i Nothing + putMVar cand_o Nothing + atchan i o + +atsync :: Synchronizer -> Abort -> IO () -> IO () +atsync r a x = do + (t,s) <- takeMVar r + forkIO $ fix $ \z -> do + (_,s') <- takeMVar r + forkIO z + putMVar s' Nothing + c <- newEmptyMVar + putMVar s (Just c) + b <- takeMVar c + if b + then do + putMVar t () + fix $ \z -> do + (tL,f) <- takeMVar a + forkIO z + if elem t tL + then return () + else f + else x + +atpointI :: Synchronizer -> Point -> In a -> (a -> Bool) -> IO a -> IO a +atpointI r t i patt x = do + e <- newEmptyMVar + putMVar i (e,patt,r) + ms <- takeMVar e + maybe + (atpointI r t i patt x) + (\s -> do + putMVar r (t,s) + takeMVar t + x + ) + ms + +atpointO :: Synchronizer -> Point -> Out a -> a -> IO () -> IO () +atpointO r t o y x = do + e <- newEmptyMVar + putMVar o (e,y,r) + ms <- takeMVar e + maybe + (atpointO r t o y x) + (\s -> do + putMVar r (t,s) + takeMVar t + x + ) + ms + +-------------------------------------------------------------------------------- + +-- $channels +-- Channels transport a single value at a time. The operations on channels are: +-- creation, transmit, and receive. None of them block the calling thread, in +-- fact transmit and receive are pure functions, not IO actions. Blocking may +-- occur only when a thread explicitly synchronizes on the resulting event. + +-- | Create a new channel. +channel :: IO (Channel a) +channel = do + i <- newEmptyMVar + o <- newEmptyMVar + forkIO $ forever $ atchan i o + m <- newEmptyMVar + return (Channel i o m) + +-- | Receive a message from a channel. +-- +-- More precisely, @receive c cond@ returns an event that, on synchronization, +-- accepts a message @m@ on channel @c@ and returns @m@. The resulting +-- event is eligible for synchronization with a @transmit c m@ only if @cond m@ +-- is true. +receive :: Channel a -> (a -> Bool) -> Event a +receive (Channel i _ m) patt = Event efun where + efun r _ n = do + t <- newEmptyMVar + forkIO (putMVar n [t]) + atpointI r t i patt (takeMVar m) + +-- | Transmit a message over a channel. +-- +-- More precisely, @transmit c m@ returns an event that, on synchronization, +-- sends the message @m@ on channel @c@ and returns @()@. Such an event must +-- synchronize with @receive c@. +transmit :: Channel a -> a -> Event () +transmit (Channel _ o m) y = Event efun where + efun r _ n = do + t <- newEmptyMVar + forkIO (putMVar n [t]) + atpointO r t o y (putMVar m y) + +-- $events +-- Events encapsulate a potentially blocking point of synchronization between +-- threads, together with possible pre- and post-synchronization code as well +-- as code that is executed (in a separate thread) when an event is /not/ +-- selected (aborted). + +-- | Non-deterministically select an event from a list of events, so that +-- the selected event can be synchronized. The other events in the list are +-- /aborted/. +choose :: [Event a] -> Event a +choose vL = Event efun where + efun r a n = do + j <- newEmptyMVar + tL <- foldM (\tL -> \(Event v) -> do + n' <- newEmptyMVar + forkIO $ v r a n' >>= putMVar j + tL' <- takeMVar n' + putMVar n' tL' + return (tL' ++ tL) + ) [] vL + forkIO (putMVar n tL) + takeMVar j + +-- | Specify a post-synchronization action. +-- +-- More precisely, @wrap v f@ returns an event that, on synchronization, +-- synchronizes the event @v@ and then runs the action returned by @f@ +-- applied to the result. +wrap :: Event a -> (a -> IO b) -> Event b +wrap (Event v) f = Event efun where + efun r a n = v r a n >>= f + +-- | Specify a pre-synchronization action. +-- +-- More precisely, @guard a@ returns an event that, on synchronization, +-- synchronizes the event returned by the action @a@. Here, @a@ is run +-- every time a thread /tries/ to synchronize @guard a@. +guard :: IO (Event a) -> Event a +guard vs = Event efun where + efun r a n = do + Event v <- vs + v r a n + +-- | Specify a post-synchronization action that is spawned if an event is +-- /not/ selected by a 'choose'. +-- +-- More precisely, @wrapabort a v@ returns an event that, on +-- synchronization, synchronizes the event @v@, and on abortion, spawns a +-- thread that runs the action @a@. Here, if @v@ itself is of the form +-- @choose vs@ and one of the events in @vs@ is selected, then @v@ is +-- considered selected, so @a@ is not spawned. +wrapabort :: IO () -> Event a -> Event a +wrapabort f (Event v) = Event efun where + efun r a n = do + forkIO $ do + tL <- takeMVar n + putMVar n tL + putMVar a (tL, f) + v r a n + +-- | Synchronize an event. +-- +-- This blocks the calling thread until a matching event is available. +sync :: Event a -> IO a +sync (Event v) = do + j <- newEmptyMVar + forkIO $ fix $ \z -> do + r <- newEmptyMVar + a <- newEmptyMVar + n <- newEmptyMVar + forkIO $ atsync r a z + x <- v r a n + putMVar j x + takeMVar j + +-- | A synonym for 'forkIO'. +spawn :: IO () -> IO ThreadId +spawn = forkIO diff --git a/smp/threads007/Main.hs b/smp/threads007/Main.hs new file mode 100644 index 0000000000000000000000000000000000000000..e3f42e2a063bb904aec16e94fc56ef4fbf7e09cf --- /dev/null +++ b/smp/threads007/Main.hs @@ -0,0 +1,16 @@ +-- Program from Neil Brown on haskell-cafe. +-- +-- It exposes the O(n^2) behaviour in removing threads from the queue +-- on an MVar during shutdown - in GHC 6.12.1 the program takes 25s in +-- the EXIT phase deleting threads. + +import Control.Concurrent +import Control.Concurrent.CML +import Control.Monad + +main :: IO () +main = do + let numChoices = 2 + cs <- replicateM numChoices channel + mapM_ forkIO [replicateM_ (50000 `div` numChoices) $ sync $ transmit c () | c <- cs] + replicateM_ 50000 $ sync $ choose [receive c (const True) | c <- cs]