Skip to content

Add write-only variant of newTChan

The dupTChan function gives TChan broadcast functionality. Namely, one can:

  • Create a single TChan, which we'll call the "broadcast channel".
  • Have multiple clients read the channel by duplicating it with dupTChan.
  • Write to the broadcast channel. All clients will see the message, because each client is traversing the channel with its own cursor.

The problem is that if the broadcast channel is only duplicated and never read directly, items will pile up in memory. To illustrate:

* -> * -> * -> * -> * -> * -> * -> * -> * -> *
^                   ^         ^              ^
broadcast           client 1  client 2       write cursor
channel
(read cursor)

A TChan consists of two cursors: a "read cursor", and a "write cursor". dupTChan constructs a new read cursor, and reuses the source TChan's write cursor.

Because the broadcast channel is never read (only duplicated), the broadcast channel's read cursor stays at the beginning of the queue forever. This means items that have already been read will never get garbage collected. Consider the following example:

import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad (forM_, forever, replicateM_)
import System.IO.Unsafe (unsafePerformIO)

broadcastChan :: TChan Integer
broadcastChan = unsafePerformIO newTChanIO
{-# NOINLINE broadcastChan #-}

main :: IO ()
main = do
    _ <- forkIO $ do
        threadDelay 1000000
            -- Give clients time to get ready.  Otherwise, they'll miss the
            -- first few numbers.
        forM_ [1..] $ \i ->
            atomically $ writeTChan broadcastChan i

    forM_ [1,2] $ \i -> forkIO $ do
        myChan <- atomically $ dupTChan broadcastChan
        replicateM_ 100 $ do
            msg <- atomically $ readTChan myChan
            putStrLn $ "Client " ++ show (i :: Int) ++ " read " ++ show msg

    forever $ threadDelay 1000000

Note that if the broadcast channel is created in main rather than with top-level mutable state, and the program is compiled with -O2, the program does not leak memory (perhaps because GHC optimizes out the read end of the broadcast channel).

I propose adding a new function: newBroadcastTChan :: STM (TChan a). It creates a dummy read cursor that should not be read. In the attached patch, reading the broadcast channel will simply 'retry' (unless items are "put back" on the broadcast channel using unGetTChan). Perhaps it should throw an error instead?

The attached patch series adds newBroadcastTChan and newBroadcastTChanIO. It also adds an Eq instance to TChan, and makes a couple documentation cleanups.

Trac metadata
Trac field Value
Version 7.4.1
Type FeatureRequest
TypeOfFailure OtherFailure
Priority normal
Resolution Unresolved
Component libraries (other)
Test case
Differential revisions
BlockedBy
Related
Blocking
CC
Operating system
Architecture
To upload designs, you'll need to enable LFS and have an admin enable hashed storage. More information