diff --git a/ghc/lib/concurrent/Concurrent.lhs b/ghc/lib/concurrent/Concurrent.lhs index b0169c2bb9879d02ec63dc7a5970deec6e3cea97..2fe58d3ddd70afb81e4ac65cf9f57d8b30c94b07 100644 --- a/ghc/lib/concurrent/Concurrent.lhs +++ b/ghc/lib/concurrent/Concurrent.lhs @@ -17,16 +17,131 @@ module Concurrent ( module ChannelVar, module Channel, module Semaphore, - module Merge, - module SampleVar, - module PrelConc + module SampleVar + + , ThreadId + + -- Forking and suchlike + , forkIO -- :: IO () -> IO ThreadId + , myThreadId -- :: IO ThreadId + , killThread -- :: ThreadId -> IO () + , raiseInThread -- :: ThreadId -> Exception -> IO () + , par -- :: a -> b -> b + , seq -- :: a -> b -> b + , fork -- :: a -> b -> b + {-threadDelay, threadWaitRead, threadWaitWrite,-} + + -- MVars + , MVar -- abstract + , newMVar -- :: a -> IO (MVar a) + , newEmptyMVar -- :: IO (MVar a) + , takeMVar -- :: MVar a -> IO a + , putMVar -- :: MVar a -> a -> IO () + , readMVar -- :: MVar a -> IO a + , swapMVar -- :: MVar a -> a -> IO a + , isEmptyMVar -- :: MVar a -> IO Bool + + -- merging of streams + , mergeIO -- :: [a] -> [a] -> IO [a] + , nmergeIO -- :: [[a]] -> IO [a] ) where import Parallel import ChannelVar import Channel import Semaphore -import Merge import SampleVar import PrelConc +import PrelHandle ( topHandler ) +import PrelException +import PrelIOBase ( IO(..) ) +import IO +import PrelAddr ( Addr ) +import PrelArr ( ByteArray ) +import PrelPack ( packString ) +import PrelIOBase ( unsafePerformIO , unsafeInterleaveIO ) +import PrelBase ( fork# ) + +infixr 0 `fork` +\end{code} + +\begin{code} +forkIO :: IO () -> IO ThreadId +forkIO action = IO $ \ s -> + case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #) + where + action_plus = + catchException action + (topHandler False{-don't quit on exception raised-}) + +{-# INLINE fork #-} +fork :: a -> b -> b +fork x y = unsafePerformIO (forkIO (x `seq` return ())) `seq` y +\end{code} + + +\begin{code} +max_buff_size :: Int +max_buff_size = 1 + +mergeIO :: [a] -> [a] -> IO [a] +nmergeIO :: [[a]] -> IO [a] + +mergeIO ls rs + = newEmptyMVar >>= \ tail_node -> + newMVar tail_node >>= \ tail_list -> + newQSem max_buff_size >>= \ e -> + newMVar 2 >>= \ branches_running -> + let + buff = (tail_list,e) + in + forkIO (suckIO branches_running buff ls) >> + forkIO (suckIO branches_running buff rs) >> + takeMVar tail_node >>= \ val -> + signalQSem e >> + return val + +type Buffer a + = (MVar (MVar [a]), QSem) + +suckIO :: MVar Int -> Buffer a -> [a] -> IO () + +suckIO branches_running buff@(tail_list,e) vs + = case vs of + [] -> takeMVar branches_running >>= \ val -> + if val == 1 then + takeMVar tail_list >>= \ node -> + putMVar node [] >> + putMVar tail_list node + else + putMVar branches_running (val-1) + (x:xs) -> + waitQSem e >> + takeMVar tail_list >>= \ node -> + newEmptyMVar >>= \ next_node -> + unsafeInterleaveIO ( + takeMVar next_node >>= \ x -> + signalQSem e >> + return x) >>= \ next_node_val -> + putMVar node (x:next_node_val) >> + putMVar tail_list next_node >> + suckIO branches_running buff xs + +nmergeIO lss + = let + len = length lss + in + newEmptyMVar >>= \ tail_node -> + newMVar tail_node >>= \ tail_list -> + newQSem max_buff_size >>= \ e -> + newMVar len >>= \ branches_running -> + let + buff = (tail_list,e) + in + mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >> + takeMVar tail_node >>= \ val -> + signalQSem e >> + return val + where + mapIO f xs = sequence (map f xs) \end{code} diff --git a/ghc/lib/concurrent/Merge.lhs b/ghc/lib/concurrent/Merge.lhs index 5414c97fbb279d8aee783a331c45e60b241ceecd..22165bdb51624bd8fb2c2bca3bc10beb8f391e12 100644 --- a/ghc/lib/concurrent/Merge.lhs +++ b/ghc/lib/concurrent/Merge.lhs @@ -6,79 +6,14 @@ Avoiding the loss of ref. transparency by attaching the merge to the IO monad. +(The ops. are now defined in Concurrent to avoid module loop trouble). + \begin{code} module Merge - ( - mergeIO, -- :: [a] -> [a] -> IO [a] - nmergeIO -- :: [[a]] -> IO [a] + merge + , nmergeIO ) where -import Semaphore -import PrelConc -import PrelIOBase - -max_buff_size :: Int -max_buff_size = 1 - -mergeIO :: [a] -> [a] -> IO [a] -nmergeIO :: [[a]] -> IO [a] - -mergeIO ls rs - = newEmptyMVar >>= \ tail_node -> - newMVar tail_node >>= \ tail_list -> - newQSem max_buff_size >>= \ e -> - newMVar 2 >>= \ branches_running -> - let - buff = (tail_list,e) - in - forkIO (suckIO branches_running buff ls) >> - forkIO (suckIO branches_running buff rs) >> - takeMVar tail_node >>= \ val -> - signalQSem e >> - return val - -type Buffer a - = (MVar (MVar [a]), QSem) - -suckIO :: MVar Int -> Buffer a -> [a] -> IO () - -suckIO branches_running buff@(tail_list,e) vs - = case vs of - [] -> takeMVar branches_running >>= \ val -> - if val == 1 then - takeMVar tail_list >>= \ node -> - putMVar node [] >> - putMVar tail_list node - else - putMVar branches_running (val-1) - (x:xs) -> - waitQSem e >> - takeMVar tail_list >>= \ node -> - newEmptyMVar >>= \ next_node -> - unsafeInterleaveIO ( - takeMVar next_node >>= \ x -> - signalQSem e >> - return x) >>= \ next_node_val -> - putMVar node (x:next_node_val) >> - putMVar tail_list next_node >> - suckIO branches_running buff xs - -nmergeIO lss - = let - len = length lss - in - newEmptyMVar >>= \ tail_node -> - newMVar tail_node >>= \ tail_list -> - newQSem max_buff_size >>= \ e -> - newMVar len >>= \ branches_running -> - let - buff = (tail_list,e) - in - mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >> - takeMVar tail_node >>= \ val -> - signalQSem e >> - return val - where - mapIO f xs = sequence (map f xs) +import Concurrent \end{code}