Add write-only variant of newTChan
The dupTChan function gives TChan broadcast functionality. Namely, one can:
- Create a single TChan, which we'll call the "broadcast channel".
- Have multiple clients read the channel by duplicating it with dupTChan.
- Write to the broadcast channel. All clients will see the message, because each client is traversing the channel with its own cursor.
The problem is that if the broadcast channel is only duplicated and never read directly, items will pile up in memory. To illustrate:
* -> * -> * -> * -> * -> * -> * -> * -> * -> *
^ ^ ^ ^
broadcast client 1 client 2 write cursor
channel
(read cursor)
A TChan consists of two cursors: a "read cursor", and a "write cursor". dupTChan constructs a new read cursor, and reuses the source TChan's write cursor.
Because the broadcast channel is never read (only duplicated), the broadcast channel's read cursor stays at the beginning of the queue forever. This means items that have already been read will never get garbage collected. Consider the following example:
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad (forM_, forever, replicateM_)
import System.IO.Unsafe (unsafePerformIO)
broadcastChan :: TChan Integer
broadcastChan = unsafePerformIO newTChanIO
{-# NOINLINE broadcastChan #-}
main :: IO ()
main = do
_ <- forkIO $ do
threadDelay 1000000
-- Give clients time to get ready. Otherwise, they'll miss the
-- first few numbers.
forM_ [1..] $ \i ->
atomically $ writeTChan broadcastChan i
forM_ [1,2] $ \i -> forkIO $ do
myChan <- atomically $ dupTChan broadcastChan
replicateM_ 100 $ do
msg <- atomically $ readTChan myChan
putStrLn $ "Client " ++ show (i :: Int) ++ " read " ++ show msg
forever $ threadDelay 1000000
Note that if the broadcast channel is created in main
rather than with top-level mutable state, and the program is compiled with -O2, the program does not leak memory (perhaps because GHC optimizes out the read end of the broadcast channel).
I propose adding a new function: newBroadcastTChan :: STM (TChan a)
. It creates a dummy read cursor that should not be read. In the attached patch, reading the broadcast channel will simply 'retry' (unless items are "put back" on the broadcast channel using unGetTChan). Perhaps it should throw an error instead?
The attached patch series adds newBroadcastTChan and newBroadcastTChanIO. It also adds an Eq instance to TChan, and makes a couple documentation cleanups.
Trac metadata
Trac field | Value |
---|---|
Version | 7.4.1 |
Type | FeatureRequest |
TypeOfFailure | OtherFailure |
Priority | normal |
Resolution | Unresolved |
Component | libraries (other) |
Test case | |
Differential revisions | |
BlockedBy | |
Related | |
Blocking | |
CC | |
Operating system | |
Architecture |