Concurrent.lhs 4.58 KB
 partain committed Jan 08, 1996 1 %  simonpj committed Dec 19, 1996 2 % (c) The AQUA Project, Glasgow University, 1994-1996  partain committed Jan 08, 1996 3 %  simonpj committed Dec 19, 1996 4   partain committed Jan 08, 1996 5 6 7 8 9 10 11 12 13 14 \section[Concurrent]{Concurrent Haskell constructs} A common interface to a collection of useful concurrency abstractions. Currently, the collection only contains the abstractions found in the {\em Concurrent Haskell} paper (presented at the Haskell Workshop 1995, draft available via \tr{ftp} from \tr{ftp.dcs.gla.ac.uk/pub/glasgow-fp/drafts}.) plus a couple of others. See the paper and the individual files containing the module definitions for explanation on what they do.  simonpj committed Dec 19, 1996 15 \begin{code}  partain committed Jan 08, 1996 16 module Concurrent (  partain committed Jun 27, 1996 17 18 19  module ChannelVar, module Channel, module Semaphore,  sof committed Apr 27, 1999 20 21 22 23 24 25 26 27 28 29 30 31  module SampleVar , ThreadId -- Forking and suchlike , forkIO -- :: IO () -> IO ThreadId , myThreadId -- :: IO ThreadId , killThread -- :: ThreadId -> IO () , raiseInThread -- :: ThreadId -> Exception -> IO () , par -- :: a -> b -> b , seq -- :: a -> b -> b , fork -- :: a -> b -> b  sof committed May 14, 1999 32 33  , yield -- :: IO ()  simonmar committed Aug 25, 1999 34 35 36  , threadDelay -- :: Int -> IO () , threadWaitRead -- :: Int -> IO () , threadWaitWrite -- :: Int -> IO ()  sof committed Apr 27, 1999 37 38 39 40 41 42 43 44 45 46 47 48 49 50  -- MVars , MVar -- abstract , newMVar -- :: a -> IO (MVar a) , newEmptyMVar -- :: IO (MVar a) , takeMVar -- :: MVar a -> IO a , putMVar -- :: MVar a -> a -> IO () , readMVar -- :: MVar a -> IO a , swapMVar -- :: MVar a -> a -> IO a , isEmptyMVar -- :: MVar a -> IO Bool -- merging of streams , mergeIO -- :: [a] -> [a] -> IO [a] , nmergeIO -- :: [[a]] -> IO [a]  partain committed Jan 08, 1996 51 52 53 54 55 56 57  ) where import Parallel import ChannelVar import Channel import Semaphore import SampleVar  simonm committed Feb 02, 1998 58 import PrelConc  simonmar committed Aug 27, 1999 59 import PrelHandle ( topHandler )  sof committed Apr 27, 1999 60 61 62 63 64 65 66 67 import PrelException import PrelIOBase ( IO(..) ) import IO import PrelAddr ( Addr ) import PrelArr ( ByteArray ) import PrelPack ( packString ) import PrelIOBase ( unsafePerformIO , unsafeInterleaveIO ) import PrelBase ( fork# )  simonmar committed Aug 25, 1999 68 import PrelGHC ( Addr#, unsafeCoerce# )  sof committed Apr 27, 1999 69 70 71 72  infixr 0 fork \end{code}  simonmar committed Aug 25, 1999 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 Thread Ids, specifically the instances of Eq and Ord for these things. The ThreadId type itself is defined in std/PrelConc.lhs. Rather than define a new primitve, we use a little helper function cmp_thread in the RTS. \begin{code} foreign import ccall "cmp_thread" unsafe cmp_thread :: Addr# -> Addr# -> Int -- Returns -1, 0, 1 cmpThread :: ThreadId -> ThreadId -> Ordering cmpThread (ThreadId t1) (ThreadId t2) = case cmp_thread (unsafeCoerce# t1) (unsafeCoerce# t2) of -1 -> LT 0 -> EQ 1 -> GT instance Eq ThreadId where t1 == t2 = case t1 cmpThread t2 of EQ -> True _ -> False instance Ord ThreadId where compare = cmpThread \end{code}  sof committed Apr 27, 1999 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 \begin{code} forkIO :: IO () -> IO ThreadId forkIO action = IO \$ \ s -> case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #) where action_plus = catchException action (topHandler False{-don't quit on exception raised-}) {-# INLINE fork #-} fork :: a -> b -> b fork x y = unsafePerformIO (forkIO (x seq return ())) seq y \end{code} \begin{code} max_buff_size :: Int max_buff_size = 1 mergeIO :: [a] -> [a] -> IO [a] nmergeIO :: [[a]] -> IO [a] mergeIO ls rs = newEmptyMVar >>= \ tail_node -> newMVar tail_node >>= \ tail_list -> newQSem max_buff_size >>= \ e -> newMVar 2 >>= \ branches_running -> let buff = (tail_list,e) in forkIO (suckIO branches_running buff ls) >> forkIO (suckIO branches_running buff rs) >> takeMVar tail_node >>= \ val -> signalQSem e >> return val type Buffer a = (MVar (MVar [a]), QSem) suckIO :: MVar Int -> Buffer a -> [a] -> IO () suckIO branches_running buff@(tail_list,e) vs = case vs of [] -> takeMVar branches_running >>= \ val -> if val == 1 then takeMVar tail_list >>= \ node -> putMVar node [] >> putMVar tail_list node else putMVar branches_running (val-1) (x:xs) -> waitQSem e >> takeMVar tail_list >>= \ node -> newEmptyMVar >>= \ next_node -> unsafeInterleaveIO ( takeMVar next_node >>= \ x -> signalQSem e >> return x) >>= \ next_node_val -> putMVar node (x:next_node_val) >> putMVar tail_list next_node >> suckIO branches_running buff xs nmergeIO lss = let len = length lss in newEmptyMVar >>= \ tail_node -> newMVar tail_node >>= \ tail_list -> newQSem max_buff_size >>= \ e -> newMVar len >>= \ branches_running -> let buff = (tail_list,e) in mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >> takeMVar tail_node >>= \ val -> signalQSem e >> return val where mapIO f xs = sequence (map f xs)  simonpj committed Dec 19, 1996 179 \end{code}