Commit d7018d5b authored by sof's avatar sof
Browse files

[project @ 1999-04-27 17:44:26 by sof]

For a forkIO'ed process, report uncaught exceptions on stderr.
parent 65bec1e3
......@@ -17,16 +17,131 @@ module Concurrent (
module ChannelVar,
module Channel,
module Semaphore,
module Merge,
module SampleVar,
module PrelConc
module SampleVar
, ThreadId
-- Forking and suchlike
, forkIO -- :: IO () -> IO ThreadId
, myThreadId -- :: IO ThreadId
, killThread -- :: ThreadId -> IO ()
, raiseInThread -- :: ThreadId -> Exception -> IO ()
, par -- :: a -> b -> b
, seq -- :: a -> b -> b
, fork -- :: a -> b -> b
{-threadDelay, threadWaitRead, threadWaitWrite,-}
-- MVars
, MVar -- abstract
, newMVar -- :: a -> IO (MVar a)
, newEmptyMVar -- :: IO (MVar a)
, takeMVar -- :: MVar a -> IO a
, putMVar -- :: MVar a -> a -> IO ()
, readMVar -- :: MVar a -> IO a
, swapMVar -- :: MVar a -> a -> IO a
, isEmptyMVar -- :: MVar a -> IO Bool
-- merging of streams
, mergeIO -- :: [a] -> [a] -> IO [a]
, nmergeIO -- :: [[a]] -> IO [a]
) where
import Parallel
import ChannelVar
import Channel
import Semaphore
import Merge
import SampleVar
import PrelConc
import PrelHandle ( topHandler )
import PrelException
import PrelIOBase ( IO(..) )
import IO
import PrelAddr ( Addr )
import PrelArr ( ByteArray )
import PrelPack ( packString )
import PrelIOBase ( unsafePerformIO , unsafeInterleaveIO )
import PrelBase ( fork# )
infixr 0 `fork`
\end{code}
\begin{code}
forkIO :: IO () -> IO ThreadId
forkIO action = IO $ \ s ->
case (fork# action_plus s) of (# s1, id #) -> (# s1, ThreadId id #)
where
action_plus =
catchException action
(topHandler False{-don't quit on exception raised-})
{-# INLINE fork #-}
fork :: a -> b -> b
fork x y = unsafePerformIO (forkIO (x `seq` return ())) `seq` y
\end{code}
\begin{code}
max_buff_size :: Int
max_buff_size = 1
mergeIO :: [a] -> [a] -> IO [a]
nmergeIO :: [[a]] -> IO [a]
mergeIO ls rs
= newEmptyMVar >>= \ tail_node ->
newMVar tail_node >>= \ tail_list ->
newQSem max_buff_size >>= \ e ->
newMVar 2 >>= \ branches_running ->
let
buff = (tail_list,e)
in
forkIO (suckIO branches_running buff ls) >>
forkIO (suckIO branches_running buff rs) >>
takeMVar tail_node >>= \ val ->
signalQSem e >>
return val
type Buffer a
= (MVar (MVar [a]), QSem)
suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
suckIO branches_running buff@(tail_list,e) vs
= case vs of
[] -> takeMVar branches_running >>= \ val ->
if val == 1 then
takeMVar tail_list >>= \ node ->
putMVar node [] >>
putMVar tail_list node
else
putMVar branches_running (val-1)
(x:xs) ->
waitQSem e >>
takeMVar tail_list >>= \ node ->
newEmptyMVar >>= \ next_node ->
unsafeInterleaveIO (
takeMVar next_node >>= \ x ->
signalQSem e >>
return x) >>= \ next_node_val ->
putMVar node (x:next_node_val) >>
putMVar tail_list next_node >>
suckIO branches_running buff xs
nmergeIO lss
= let
len = length lss
in
newEmptyMVar >>= \ tail_node ->
newMVar tail_node >>= \ tail_list ->
newQSem max_buff_size >>= \ e ->
newMVar len >>= \ branches_running ->
let
buff = (tail_list,e)
in
mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
takeMVar tail_node >>= \ val ->
signalQSem e >>
return val
where
mapIO f xs = sequence (map f xs)
\end{code}
......@@ -6,79 +6,14 @@
Avoiding the loss of ref. transparency by attaching the merge to the
IO monad.
(The ops. are now defined in Concurrent to avoid module loop trouble).
\begin{code}
module Merge
(
mergeIO, -- :: [a] -> [a] -> IO [a]
nmergeIO -- :: [[a]] -> IO [a]
merge
, nmergeIO
) where
import Semaphore
import PrelConc
import PrelIOBase
max_buff_size :: Int
max_buff_size = 1
mergeIO :: [a] -> [a] -> IO [a]
nmergeIO :: [[a]] -> IO [a]
mergeIO ls rs
= newEmptyMVar >>= \ tail_node ->
newMVar tail_node >>= \ tail_list ->
newQSem max_buff_size >>= \ e ->
newMVar 2 >>= \ branches_running ->
let
buff = (tail_list,e)
in
forkIO (suckIO branches_running buff ls) >>
forkIO (suckIO branches_running buff rs) >>
takeMVar tail_node >>= \ val ->
signalQSem e >>
return val
type Buffer a
= (MVar (MVar [a]), QSem)
suckIO :: MVar Int -> Buffer a -> [a] -> IO ()
suckIO branches_running buff@(tail_list,e) vs
= case vs of
[] -> takeMVar branches_running >>= \ val ->
if val == 1 then
takeMVar tail_list >>= \ node ->
putMVar node [] >>
putMVar tail_list node
else
putMVar branches_running (val-1)
(x:xs) ->
waitQSem e >>
takeMVar tail_list >>= \ node ->
newEmptyMVar >>= \ next_node ->
unsafeInterleaveIO (
takeMVar next_node >>= \ x ->
signalQSem e >>
return x) >>= \ next_node_val ->
putMVar node (x:next_node_val) >>
putMVar tail_list next_node >>
suckIO branches_running buff xs
nmergeIO lss
= let
len = length lss
in
newEmptyMVar >>= \ tail_node ->
newMVar tail_node >>= \ tail_list ->
newQSem max_buff_size >>= \ e ->
newMVar len >>= \ branches_running ->
let
buff = (tail_list,e)
in
mapIO (\ x -> forkIO (suckIO branches_running buff x)) lss >>
takeMVar tail_node >>= \ val ->
signalQSem e >>
return val
where
mapIO f xs = sequence (map f xs)
import Concurrent
\end{code}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment