unGetChan should be able to interrupt readChan
If you are in the following situation:
- A empty Chan shared between two threads
- Another thread blocked reading the Chan
- A main thread about to unGetChan to add something to its front
Then the program will shortly die with "thread blocked indefinitely". The reason is that the thread doing readChan is modifying the read MVar of the Chan. This means that when unGetChan tries to modify it as well, the main thread blocks and nothing can make progress...
A workaround (that sort of works for my situation) is to forkIO the unGetChan, so the main thread can continue going and write something to the channel at some point in the future. When that happens one of the unGetChan and readChan will be able to make progress.
One of these two things needs to happen:
- Better documentation for unGetChan, in particular WRT how it can experience this behaviour but writeChan cannot
- A smarter implementation for getChan that allows its wait on the MVar to be serviced by an unGetChan
Test program that demonstrates the problem:
$ ghc --make Control/Concurrent/Benchmark/StressTest.hs && Control/Concurrent/Benchmark/StressTest.exe
[1 of 1] Compiling Main ( Control\Concurrent\Benchmark\StressTest.hs, Control\Concurrent\Benchmark\StressTest.o )
Linking Control\Concurrent\Benchmark\StressTest.exe ...
1
Acting
Done acting
Writing
readChan thread blocked indefinitely
join Control.Concurrent.Parallel: parallel thread died.
thread blocked indefinitely
Exception on thread: Control.Concurrent.Parallel: parallel thread died.
Control.Concurrent.Parallel: parallel thread died.
thread blocked indefinitely
StressTest.exe: thread blocked indefinitely
{-# LANGUAGE ScopedTypeVariables #-}
import Control.Concurrent
import Control.Monad
import System.Random
import GHC.Conc
--------------------------
-- LIBRARY
-- Much of the details in this module arose from discussions on haskell-cafe@
-- http://www.nabble.com/Parallel-combinator%2C-performance-advice-td22926243.html
-- NB: this is modified from the "real" version for extra debuggability and simplicity
{-
REQUIREMENTS:
* Fairness - the number of threads executing at any one time
should be exactly the number specified (N). There should never be
either N+1 or N-1 executing.
* Reenterant - parallel_ computations can call other parallel_ computations.
* Timeliness - it's best to stop as soon as the last task has finished,
provided that doesn't violate the other principles.
-}
import GHC.Conc
import Control.Concurrent
import Control.Monad
import Control.Exception as E
import System.IO.Unsafe
-- initialise on the main thread, and keep
{-# NOINLINE mainThread #-}
mainThread :: ThreadId
mainThread = unsafePerformIO $ myThreadId
-- True = kill the thread after it finishes
{-# NOINLINE queue #-}
queue :: Chan (IO Bool)
queue = seq mainThread $ unsafePerformIO $ newChan
{-# NOINLINE addWorker #-}
addWorker :: IO ()
addWorker = do
forkIO $ handle_exceptions "Exception on thread:" f
return ()
where
handle_exceptions :: String -> IO a -> IO a
handle_exceptions str act = act `E.catch` \(e :: SomeException) -> do
putStrLn $ str ++ " " ++ show e
throwTo mainThread $ ErrorCall $ "Control.Concurrent.Parallel: parallel thread died.\n" ++ show e
return (error "handle_exceptions")
{-# NOINLINE f #-}
f :: IO ()
f = do
--putStrLn "Working"
kill <- handle_exceptions "join" $ join $ handle_exceptions "readChan" $ readChan queue
--putStrLn $ "Dying? " ++ show kill
unless kill f
-- If you don't call this then no one holds the queue, the queue gets
-- GC'd, the threads find themselves blocked indefinately, and you get
-- exceptions. This cleanly shuts down the threads, then the queue isn't important.
-- Only call this AFTER all parallel_ calls have completed.
{-# NOINLINE parallelStop #-}
parallelStop :: IO ()
parallelStop = evaluate queue >> return ()
-- | Run the list of computations in parallel
-- Rule: No thread should get pre-empted (although not a guarantee)
-- On return all actions have been performed
{-# NOINLINE parallel_ #-}
parallel_ :: [IO a] -> IO ()
parallel_ xs = sequence_ xs
{-# NOINLINE parallelBlock #-}
parallelBlock :: IO a -> IO a
parallelBlock act = E.bracket_ addWorker (putStrLn "Writing" >> unGetChan queue (return True) >> putStrLn "Done writing")
(putStrLn "Acting" >> act >>= \x -> putStrLn "Done acting" >> return x)
parallelReadMVar :: MVar a -> IO a
parallelReadMVar = parallelBlock . readMVar
---------------------------
-- TEST PROGRAM
n :: Int
n = 1000
randomDelay :: IO ()
randomDelay = do
delay <- randomRIO (0, 100)
threadDelay delay
process :: MVar a -> IO ()
process m = do
randomDelay
p <- randomRIO (0, 1 :: Double)
when (p < 0.4) (parallelReadMVar m >> return ())
randomDelay
main :: IO ()
main = do
print numCapabilities
m <- newMVar ()
parallel_ $ replicate n $ process m
parallelStop
Trac metadata
Trac field | Value |
---|---|
Version | 6.10.4 |
Type | Bug |
TypeOfFailure | OtherFailure |
Priority | normal |
Resolution | Unresolved |
Component | libraries/base |
Test case | |
Differential revisions | |
BlockedBy | |
Related | |
Blocking | |
CC | batterseapower@hotmail.com;ndmitchell@gmail.com |
Operating system | |
Architecture |