Skip to content
Snippets Groups Projects
Commit 98215788 authored by konsumlamm's avatar konsumlamm
Browse files

Rewrite `TBQueue` to use `TArray Int (Maybe a)`

parent a38412b3
No related branches found
No related tags found
No related merge requests found
{-# OPTIONS_GHC -fno-warn-incomplete-uni-patterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE ScopedTypeVariables #-}
#if __GLASGOW_HASKELL__ >= 701
{-# LANGUAGE Trustworthy #-}
......@@ -29,214 +30,192 @@
-----------------------------------------------------------------------------
module Control.Concurrent.STM.TBQueue (
-- * TBQueue
TBQueue,
newTBQueue,
newTBQueueIO,
readTBQueue,
tryReadTBQueue,
flushTBQueue,
peekTBQueue,
tryPeekTBQueue,
writeTBQueue,
unGetTBQueue,
lengthTBQueue,
isEmptyTBQueue,
isFullTBQueue,
-- * TBQueue
TBQueue,
newTBQueue,
newTBQueueIO,
readTBQueue,
tryReadTBQueue,
flushTBQueue,
peekTBQueue,
tryPeekTBQueue,
writeTBQueue,
unGetTBQueue,
lengthTBQueue,
isEmptyTBQueue,
isFullTBQueue,
) where
import Control.Monad (unless)
import Data.Typeable (Typeable)
import GHC.Conc (STM, TVar, newTVar, newTVarIO, orElse,
readTVar, retry, writeTVar)
import Numeric.Natural (Natural)
import Prelude hiding (read)
import Data.Array.Base
import Data.Maybe (isJust, isNothing)
import Data.Typeable (Typeable)
import GHC.Conc
import Numeric.Natural (Natural)
import Prelude hiding (read)
import Control.Concurrent.STM.TArray
-- | 'TBQueue' is an abstract type representing a bounded FIFO channel.
--
-- @since 2.4
data TBQueue a
= TBQueue {-# UNPACK #-} !(TVar Natural) -- CR: read capacity
{-# UNPACK #-} !(TVar [a]) -- R: elements waiting to be read
{-# UNPACK #-} !(TVar Natural) -- CW: write capacity
{-# UNPACK #-} !(TVar [a]) -- W: elements written (head is most recent)
!(Natural) -- CAP: initial capacity
= TBQueue {-# UNPACK #-} !(TVar Int) -- read index
{-# UNPACK #-} !(TVar Int) -- write index
{-# UNPACK #-} !(TArray Int (Maybe a)) -- elements
{-# UNPACK #-} !Int -- initial capacity
deriving Typeable
instance Eq (TBQueue a) where
TBQueue a _ _ _ _ == TBQueue b _ _ _ _ = a == b
-- each `TBQueue` has its own `TVar`s, so it's sufficient to compare the first one
TBQueue a _ _ _ == TBQueue b _ _ _ = a == b
-- Total channel capacity remaining is CR + CW. Reads only need to
-- access CR, writes usually need to access only CW but sometimes need
-- CR. So in the common case we avoid contention between CR and CW.
--
-- - when removing an element from R:
-- CR := CR + 1
--
-- - when adding an element to W:
-- if CW is non-zero
-- then CW := CW - 1
-- then if CR is non-zero
-- then CW := CR - 1; CR := 0
-- else **FULL**
-- incMod x cap == (x + 1) `mod` cap
incMod :: Int -> Int -> Int
incMod x cap = let y = x + 1 in if y == cap then 0 else y
-- decMod x cap = (x - 1) `mod` cap
decMod :: Int -> Int -> Int
decMod x cap = if x == 0 then cap - 1 else x - 1
-- | Builds and returns a new instance of 'TBQueue'.
newTBQueue :: Natural -- ^ maximum number of elements the queue can hold
-> STM (TBQueue a)
newTBQueue size = do
read <- newTVar []
write <- newTVar []
rsize <- newTVar 0
wsize <- newTVar size
return (TBQueue rsize read wsize write size)
-- |@IO@ version of 'newTBQueue'. This is useful for creating top-level
newTBQueue size
| size <= 0 = error "capacity has to be greater than 0"
| size > fromIntegral (maxBound :: Int) = error "capacity is too big"
| otherwise = do
rindex <- newTVar 0
windex <- newTVar 0
elements <- newArray (0, size' - 1) Nothing
pure (TBQueue rindex windex elements size')
where
size' = fromIntegral size
-- | @IO@ version of 'newTBQueue'. This is useful for creating top-level
-- 'TBQueue's using 'System.IO.Unsafe.unsafePerformIO', because using
-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
-- possible.
newTBQueueIO :: Natural -> IO (TBQueue a)
newTBQueueIO size = do
read <- newTVarIO []
write <- newTVarIO []
rsize <- newTVarIO 0
wsize <- newTVarIO size
return (TBQueue rsize read wsize write size)
-- |Write a value to a 'TBQueue'; blocks if the queue is full.
newTBQueueIO size
| size <= 0 = error "capacity has to be greater than 0"
| size > fromIntegral (maxBound :: Int) = error "capacity is too big"
| otherwise = do
rindex <- newTVarIO 0
windex <- newTVarIO 0
elements <- newArray (0, size' - 1) Nothing
pure (TBQueue rindex windex elements size')
where
size' = fromIntegral size
-- | Write a value to a 'TBQueue'; blocks if the queue is full.
writeTBQueue :: TBQueue a -> a -> STM ()
writeTBQueue (TBQueue rsize _read wsize write _size) a = do
w <- readTVar wsize
if (w > 0)
then do writeTVar wsize $! w - 1
else do
r <- readTVar rsize
if (r > 0)
then do writeTVar rsize 0
writeTVar wsize $! r - 1
else retry
listend <- readTVar write
writeTVar write (a:listend)
-- |Read the next value from the 'TBQueue'.
writeTBQueue (TBQueue _ windex elements size) a = do
w <- readTVar windex
ele <- unsafeRead elements w
case ele of
Nothing -> unsafeWrite elements w (Just a)
Just _ -> retry
writeTVar windex $! incMod w size
-- | Read the next value from the 'TBQueue'.
readTBQueue :: TBQueue a -> STM a
readTBQueue (TBQueue rsize read _wsize write _size) = do
xs <- readTVar read
r <- readTVar rsize
writeTVar rsize $! r + 1
case xs of
(x:xs') -> do
writeTVar read xs'
return x
[] -> do
ys <- readTVar write
case ys of
[] -> retry
_ -> do
-- NB. lazy: we want the transaction to be
-- short, otherwise it will conflict
let ~(z,zs) = case reverse ys of
z':zs' -> (z',zs')
_ -> error "readTBQueue: impossible"
writeTVar write []
writeTVar read zs
return z
readTBQueue (TBQueue rindex _ elements size) = do
r <- readTVar rindex
ele <- unsafeRead elements r
a <- case ele of
Nothing -> retry
Just a -> do
unsafeWrite elements r Nothing
pure a
writeTVar rindex $! incMod r size
pure a
-- | A version of 'readTBQueue' which does not retry. Instead it
-- returns @Nothing@ if no value is available.
tryReadTBQueue :: TBQueue a -> STM (Maybe a)
tryReadTBQueue c = fmap Just (readTBQueue c) `orElse` return Nothing
tryReadTBQueue q = fmap Just (readTBQueue q) `orElse` pure Nothing
-- | Efficiently read the entire contents of a 'TBQueue' into a list. This
-- function never retries.
--
-- @since 2.4.5
flushTBQueue :: TBQueue a -> STM [a]
flushTBQueue (TBQueue rsize read wsize write size) = do
xs <- readTVar read
ys <- readTVar write
if null xs && null ys
then return []
else do
unless (null xs) $ writeTVar read []
unless (null ys) $ writeTVar write []
writeTVar rsize 0
writeTVar wsize size
return (xs ++ reverse ys)
flushTBQueue :: forall a. TBQueue a -> STM [a]
flushTBQueue (TBQueue _rindex windex elements size) = do
w <- readTVar windex
go (decMod w size) []
where
go :: Int -> [a] -> STM [a]
go i acc = do
ele <- unsafeRead elements i
case ele of
Nothing -> pure acc
Just a -> do
unsafeWrite elements i Nothing
go (decMod i size) (a : acc)
-- | Get the next value from the @TBQueue@ without removing it,
-- retrying if the channel is empty.
peekTBQueue :: TBQueue a -> STM a
peekTBQueue (TBQueue _ read _ write _) = do
xs <- readTVar read
case xs of
(x:_) -> return x
[] -> do
ys <- readTVar write
case ys of
[] -> retry
_ -> do
let (z:zs) = reverse ys -- NB. lazy: we want the transaction to be
-- short, otherwise it will conflict
writeTVar write []
writeTVar read (z:zs)
return z
peekTBQueue (TBQueue rindex _ elements _) = do
r <- readTVar rindex
ele <- unsafeRead elements r
case ele of
Nothing -> retry
Just a -> pure a
-- | A version of 'peekTBQueue' which does not retry. Instead it
-- returns @Nothing@ if no value is available.
tryPeekTBQueue :: TBQueue a -> STM (Maybe a)
tryPeekTBQueue c = do
m <- tryReadTBQueue c
case m of
Nothing -> return Nothing
Just x -> do
unGetTBQueue c x
return m
-- |Put a data item back onto a channel, where it will be the next item read.
tryPeekTBQueue q = fmap Just (peekTBQueue q) `orElse` pure Nothing
-- | Put a data item back onto a channel, where it will be the next item read.
-- Blocks if the queue is full.
unGetTBQueue :: TBQueue a -> a -> STM ()
unGetTBQueue (TBQueue rsize read wsize _write _size) a = do
r <- readTVar rsize
if (r > 0)
then do writeTVar rsize $! r - 1
else do
w <- readTVar wsize
if (w > 0)
then writeTVar wsize $! w - 1
else retry
xs <- readTVar read
writeTVar read (a:xs)
-- |Return the length of a 'TBQueue'.
unGetTBQueue (TBQueue rindex _ elements size) a = do
r <- readTVar rindex
ele <- unsafeRead elements r
case ele of
Nothing -> unsafeWrite elements r (Just a)
Just _ -> retry
writeTVar rindex $! decMod r size
-- | Return the length of a 'TBQueue'.
--
-- @since 2.5.0.0
lengthTBQueue :: TBQueue a -> STM Natural
lengthTBQueue (TBQueue rsize _read wsize _write size) = do
r <- readTVar rsize
w <- readTVar wsize
return $! size - r - w
-- |Returns 'True' if the supplied 'TBQueue' is empty.
lengthTBQueue (TBQueue rindex windex elements size) = do
r <- readTVar rindex
w <- readTVar windex
if w == r then do
-- length is 0 or size
ele <- unsafeRead elements r
case ele of
Nothing -> pure 0
Just _ -> pure $! fromIntegral size
else do
let len' = w - r
pure $! fromIntegral (if len' < 0 then len' + size else len')
-- | Returns 'True' if the supplied 'TBQueue' is empty.
isEmptyTBQueue :: TBQueue a -> STM Bool
isEmptyTBQueue (TBQueue _rsize read _wsize write _size) = do
xs <- readTVar read
case xs of
(_:_) -> return False
[] -> do ys <- readTVar write
case ys of
[] -> return True
_ -> return False
-- |Returns 'True' if the supplied 'TBQueue' is full.
isEmptyTBQueue (TBQueue rindex windex elements _) = do
r <- readTVar rindex
w <- readTVar windex
if w == r then do
ele <- unsafeRead elements r
pure $! isNothing ele
else
pure False
-- | Returns 'True' if the supplied 'TBQueue' is full.
--
-- @since 2.4.3
isFullTBQueue :: TBQueue a -> STM Bool
isFullTBQueue (TBQueue rsize _read wsize _write _size) = do
w <- readTVar wsize
if (w > 0)
then return False
else do
r <- readTVar rsize
if (r > 0)
then return False
else return True
isFullTBQueue (TBQueue rindex windex elements _) = do
r <- readTVar rindex
w <- readTVar windex
if w == r then do
ele <- unsafeRead elements r
pure $! isJust ele
else
pure False
module Async where
import Control.Concurrent (forkIO)
import Control.Concurrent.MVar
newtype Async a = Async (MVar a)
async :: IO a -> IO (Async a)
async action = do
mvar <- newEmptyMVar
_ <- forkIO $ do
x <- action
putMVar mvar x
pure (Async mvar)
wait :: Async a -> IO a
wait (Async a) = takeMVar a
{-# LANGUAGE AllowAmbiguousTypes, ScopedTypeVariables, RankNTypes, TypeApplications #-}
import Control.Monad
import Data.Foldable (traverse_)
import System.Environment
import Test.Tasty (localOption)
import Test.Tasty.Bench
import Control.Concurrent.Chan as Chan
import Control.Concurrent.STM
import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TBQueue
import Async
class Channel c where
newc :: IO (c a)
readc :: c a -> IO a
writec :: c a -> a -> IO ()
instance Channel Chan where
newc = newChan
readc = Chan.readChan
writec = Chan.writeChan
instance Channel TChan where
newc = newTChanIO
readc c = atomically $ readTChan c
writec c x = atomically $ writeTChan c x
instance Channel TQueue where
newc = newTQueueIO
readc c = atomically $ readTQueue c
writec c x = atomically $ writeTQueue c x
instance Channel TBQueue where
newc = newTBQueueIO 4096
readc c = atomically $ readTBQueue c
writec c x = atomically $ writeTBQueue c x
-- concurrent writing and reading with single producer, single consumer
concurrentSpsc :: forall c. (Channel c) => Int -> IO ()
concurrentSpsc n = do
c :: c Int <- newc
writer <- async $ replicateM_ n $ writec c 1
reader <- async $ replicateM_ n $ readc c
wait writer
wait reader
-- concurrent writing and reading with multiple producers, multiple consumers
concurrentMpmc :: forall c. (Channel c) => Int -> IO ()
concurrentMpmc n = do
c :: c Int <- newc
writers <- replicateM 10 $ async $ replicateM_ (n `div` 10) $ writec c 1
readers <- replicateM 10 $ async $ replicateM_ (n `div` 10) $ readc c
traverse_ wait writers
traverse_ wait readers
-- bulk write, then bulk read
bulk :: forall c. (Channel c) => Int -> IO ()
bulk n = do
c :: c Int <- newc
replicateM_ n $ writec c 1
replicateM_ n $ readc c
-- bursts of bulk writes, then bulk reads
burst :: forall c. (Channel c) => Int -> Int -> IO ()
burst k n = do
c :: c Int <- newc
replicateM_ k $ do
replicateM_ (n `div` k) $ writec c 1
replicateM_ (n `div` k) $ readc c
main :: IO ()
main = defaultMain
[ localOption WallTime $ bgroup "concurrent spsc"
[ bench "Chan" $ whnfAppIO (concurrentSpsc @Chan) n
, bench "TChan" $ whnfAppIO (concurrentSpsc @TChan) n
, bench "TQueue" $ whnfAppIO (concurrentSpsc @TQueue) n
, bench "TBQueue" $ whnfAppIO (concurrentSpsc @TBQueue) n
]
, localOption WallTime $ bgroup "concurrent mpmc"
[ bench "Chan" $ whnfAppIO (concurrentMpmc @Chan) n
, bench "TChan" $ whnfAppIO (concurrentMpmc @TChan) n
, bench "TQueue" $ whnfAppIO (concurrentMpmc @TQueue) n
, bench "TBQueue" $ whnfAppIO (concurrentMpmc @TBQueue) n
]
, bgroup "bulk"
[ bench "Chan" $ whnfAppIO (bulk @Chan) n
, bench "TChan" $ whnfAppIO (bulk @TChan) n
, bench "TQueue" $ whnfAppIO (bulk @TQueue) n
]
, bgroup "burst"
[ bench "Chan" $ whnfAppIO (burst @Chan 1000) n
, bench "TChan" $ whnfAppIO (burst @TChan 1000) n
, bench "TQueue" $ whnfAppIO (burst @TQueue 1000) n
, bench "TBQueue" $ whnfAppIO (burst @TBQueue 1000) n
]
]
where
n = 2000000
{-# LANGUAGE CPP, RankNTypes #-}
import Control.Concurrent.Async
import Control.Monad
import System.Environment
import Control.Concurrent.Chan
import Control.Concurrent.STM
import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TBQueue
-- Using CPP rather than a runtime choice between channel types,
-- because we want the compiler to be able to optimise the calls.
-- #define CHAN
-- #define TCHAN
-- #define TQUEUE
-- #define TBQUEUE
#ifdef CHAN
newc = newChan
readc c = readChan c
writec c x = writeChan c x
#elif defined(TCHAN)
newc = newTChanIO
readc c = atomically $ readTChan c
writec c x = atomically $ writeTChan c x
#elif defined(TQUEUE)
newc = newTQueueIO
readc c = atomically $ readTQueue c
writec c x = atomically $ writeTQueue c x
#elif defined(TBQUEUE)
newc = newTBQueueIO 4096
readc c = atomically $ readTBQueue c
writec c x = atomically $ writeTBQueue c x
#endif
main = do
[stest,sn] <- getArgs -- 2000000 is a good number
let n = read sn :: Int
test = read stest :: Int
runtest n test
runtest :: Int -> Int -> IO ()
runtest n test = do
c <- newc
case test of
0 -> do
a <- async $ replicateM_ n $ writec c (1 :: Int)
b <- async $ replicateM_ n $ readc c
waitBoth a b
return ()
1 -> do
replicateM_ n $ writec c (1 :: Int)
replicateM_ n $ readc c
2 -> do
let n1000 = n `quot` 1000
replicateM_ 1000 $ do
replicateM_ n1000 $ writec c (1 :: Int)
replicateM_ n1000 $ readc c
cabal-version: 2.2
name: stm-bench
version: 0
synopsis: External testsuite for stm package
category: Benchmarking
license: BSD-3-Clause
maintainer: libraries@haskell.org
tested-with: GHC==8.8.*, GHC==8.6.*, GHC==8.4.*, GHC==8.2.*, GHC==8.0.*, GHC==7.10.*, GHC==7.8.*, GHC==7.6.*, GHC==7.4.*, GHC==7.2.*, GHC==7.0.*
description:
The benchmarks are in a separate project to avoid cyclic dependencies.
benchmark chanbench
type: exitcode-stdio-1.0
main-is: ChanBench.hs
other-modules:
Async
build-depends: base, stm, tasty, tasty-bench
default-language: Haskell2010
ghc-options: -O2 -threaded -with-rtsopts=-N
packages: . testsuite/
packages:
.
testsuite
bench
package testsuite
tests: true
cradle:
cabal:
{-# LANGUAGE CPP #-}
-- see https://github.com/haskell/stm/pull/19
--
-- Test-case contributed by Alexey Kuleshevich <alexey@kukeshevi.ch>
--
-- This bug is observable in all versions with TBQueue from `stm-2.4` to
-- `stm-2.4.5.1` inclusive.
module Issue17 (main) where
import Control.Concurrent.STM
import Test.HUnit.Base (assertBool, assertEqual)
main :: IO ()
main = do
-- New queue capacity is set to 0
queueIO <- newTBQueueIO 0
assertNoCapacityTBQueue queueIO
-- Same as above, except created within STM
queueSTM <- atomically $ newTBQueue 0
assertNoCapacityTBQueue queueSTM
#if !MIN_VERSION_stm(2,5,0)
-- NB: below are expected failures
-- New queue capacity is set to a negative numer
queueIO' <- newTBQueueIO (-1 :: Int)
assertNoCapacityTBQueue queueIO'
-- Same as above, except created within STM and different negative number
queueSTM' <- atomically $ newTBQueue (minBound :: Int)
assertNoCapacityTBQueue queueSTM'
#endif
assertNoCapacityTBQueue :: TBQueue Int -> IO ()
assertNoCapacityTBQueue queue = do
assertEmptyTBQueue queue
assertFullTBQueue queue
-- Attempt to write into the queue.
eValWrite <- atomically $ orElse (fmap Left (writeTBQueue queue 217))
(fmap Right (tryReadTBQueue queue))
assertEqual "Expected queue with no capacity: writeTBQueue" eValWrite (Right Nothing)
eValUnGet <- atomically $ orElse (fmap Left (unGetTBQueue queue 218))
(fmap Right (tryReadTBQueue queue))
assertEqual "Expected queue with no capacity: unGetTBQueue" eValUnGet (Right Nothing)
-- Make sure that attempt to write didn't affect the queue
assertEmptyTBQueue queue
assertFullTBQueue queue
assertEmptyTBQueue :: TBQueue Int -> IO ()
assertEmptyTBQueue queue = do
atomically (isEmptyTBQueue queue) >>=
assertBool "Expected empty: isEmptyTBQueue should return True"
atomically (tryReadTBQueue queue) >>=
assertEqual "Expected empty: tryReadTBQueue should return Nothing" Nothing
atomically (tryPeekTBQueue queue) >>=
assertEqual "Expected empty: tryPeekTBQueue should return Nothing" Nothing
atomically (flushTBQueue queue) >>=
assertEqual "Expected empty: flushTBQueue should return []" []
assertFullTBQueue :: TBQueue Int -> IO ()
assertFullTBQueue queue = do
atomically (isFullTBQueue queue) >>=
assertBool "Expected full: isFullTBQueue shoule return True"
......@@ -6,7 +6,6 @@ import Test.Framework (defaultMain, testGroup)
import Test.Framework.Providers.HUnit
import qualified Issue9
import qualified Issue17
import qualified Stm052
import qualified Stm064
import qualified Stm065
......@@ -19,7 +18,6 @@ main = do
tests = [
testGroup "regression"
[ testCase "issue #9" Issue9.main
, testCase "issue #17" Issue17.main
, testCase "stm052" Stm052.main
, testCase "stm064" Stm064.main
, testCase "stm065" Stm065.main
......
......@@ -20,7 +20,6 @@ test-suite stm
main-is: Main.hs
other-modules:
Issue9
Issue17
Stm052
Stm064
Stm065
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment