Skip to content
Snippets Groups Projects
Unverified Commit 63afa2fc authored by Ben Gamari's avatar Ben Gamari :turtle: Committed by GitHub
Browse files

Merge pull request #80 from haskell/wip/revert-tbqueue-rewrite

Revert "Rewrite `TBQueue` to use `TArray Int (Maybe a)`"
parents a1e91f4e 110318ad
No related branches found
No related tags found
No related merge requests found
......@@ -46,185 +46,204 @@ module Control.Concurrent.STM.TBQueue (
capacityTBQueue,
) where
#if !MIN_VERSION_base(4,8,0)
import Control.Applicative (pure)
#endif
import Data.Array.Base
import Data.Maybe (isJust, isNothing)
import Data.Typeable (Typeable)
import GHC.Conc
import Numeric.Natural (Natural)
import Prelude hiding (read)
import Control.Concurrent.STM.TArray
import Control.Monad (unless)
import Data.Typeable (Typeable)
import GHC.Conc (STM, TVar, newTVar, newTVarIO, orElse,
readTVar, retry, writeTVar)
import Numeric.Natural (Natural)
import Prelude hiding (read)
-- | 'TBQueue' is an abstract type representing a bounded FIFO channel.
--
-- @since 2.4
data TBQueue a
= TBQueue {-# UNPACK #-} !(TVar Int) -- read index
{-# UNPACK #-} !(TVar Int) -- write index
{-# UNPACK #-} !(TArray Int (Maybe a)) -- elements
{-# UNPACK #-} !Int -- initial capacity
= TBQueue {-# UNPACK #-} !(TVar Natural) -- CR: read capacity
{-# UNPACK #-} !(TVar [a]) -- R: elements waiting to be read
{-# UNPACK #-} !(TVar Natural) -- CW: write capacity
{-# UNPACK #-} !(TVar [a]) -- W: elements written (head is most recent)
!(Natural) -- CAP: initial capacity
deriving Typeable
instance Eq (TBQueue a) where
-- each `TBQueue` has its own `TVar`s, so it's sufficient to compare the first one
TBQueue a _ _ _ == TBQueue b _ _ _ = a == b
-- incMod x cap == (x + 1) `mod` cap
incMod :: Int -> Int -> Int
incMod x cap = let y = x + 1 in if y == cap then 0 else y
TBQueue a _ _ _ _ == TBQueue b _ _ _ _ = a == b
-- decMod x cap = (x - 1) `mod` cap
decMod :: Int -> Int -> Int
decMod x cap = if x == 0 then cap - 1 else x - 1
-- Total channel capacity remaining is CR + CW. Reads only need to
-- access CR, writes usually need to access only CW but sometimes need
-- CR. So in the common case we avoid contention between CR and CW.
--
-- - when removing an element from R:
-- CR := CR + 1
--
-- - when adding an element to W:
-- if CW is non-zero
-- then CW := CW - 1
-- then if CR is non-zero
-- then CW := CR - 1; CR := 0
-- else **FULL**
-- | Builds and returns a new instance of 'TBQueue'.
newTBQueue :: Natural -- ^ maximum number of elements the queue can hold
-> STM (TBQueue a)
newTBQueue cap
| cap <= 0 = error "capacity has to be greater than 0"
| cap > fromIntegral (maxBound :: Int) = error "capacity is too big"
| otherwise = do
rindex <- newTVar 0
windex <- newTVar 0
elements <- newArray (0, cap' - 1) Nothing
pure (TBQueue rindex windex elements cap')
where
cap' = fromIntegral cap
newTBQueue size = do
read <- newTVar []
write <- newTVar []
rsize <- newTVar 0
wsize <- newTVar size
return (TBQueue rsize read wsize write size)
-- | @IO@ version of 'newTBQueue'. This is useful for creating top-level
-- 'TBQueue's using 'System.IO.Unsafe.unsafePerformIO', because using
-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
-- possible.
newTBQueueIO :: Natural -> IO (TBQueue a)
newTBQueueIO cap
| cap <= 0 = error "capacity has to be greater than 0"
| cap > fromIntegral (maxBound :: Int) = error "capacity is too big"
| otherwise = do
rindex <- newTVarIO 0
windex <- newTVarIO 0
elements <- newArray (0, cap' - 1) Nothing
pure (TBQueue rindex windex elements cap')
where
cap' = fromIntegral cap
-- | Write a value to a 'TBQueue'; retries if the queue is full.
newTBQueueIO size = do
read <- newTVarIO []
write <- newTVarIO []
rsize <- newTVarIO 0
wsize <- newTVarIO size
return (TBQueue rsize read wsize write size)
-- |Write a value to a 'TBQueue'; blocks if the queue is full.
writeTBQueue :: TBQueue a -> a -> STM ()
writeTBQueue (TBQueue _ windex elements cap) a = do
w <- readTVar windex
ele <- unsafeRead elements w
case ele of
Nothing -> unsafeWrite elements w (Just a)
Just _ -> retry
writeTVar windex $! incMod w cap
-- | Read the next value from the 'TBQueue'; retries if the queue is empty.
writeTBQueue (TBQueue rsize _read wsize write _size) a = do
w <- readTVar wsize
if (w > 0)
then do writeTVar wsize $! w - 1
else do
r <- readTVar rsize
if (r > 0)
then do writeTVar rsize 0
writeTVar wsize $! r - 1
else retry
listend <- readTVar write
writeTVar write (a:listend)
-- |Read the next value from the 'TBQueue'.
readTBQueue :: TBQueue a -> STM a
readTBQueue (TBQueue rindex _ elements cap) = do
r <- readTVar rindex
ele <- unsafeRead elements r
a <- case ele of
Nothing -> retry
Just a -> do
unsafeWrite elements r Nothing
pure a
writeTVar rindex $! incMod r cap
pure a
readTBQueue (TBQueue rsize read _wsize write _size) = do
xs <- readTVar read
r <- readTVar rsize
writeTVar rsize $! r + 1
case xs of
(x:xs') -> do
writeTVar read xs'
return x
[] -> do
ys <- readTVar write
case ys of
[] -> retry
_ -> do
-- NB. lazy: we want the transaction to be
-- short, otherwise it will conflict
let ~(z,zs) = case reverse ys of
z':zs' -> (z',zs')
_ -> error "readTBQueue: impossible"
writeTVar write []
writeTVar read zs
return z
-- | A version of 'readTBQueue' which does not retry. Instead it
-- returns @Nothing@ if no value is available.
tryReadTBQueue :: TBQueue a -> STM (Maybe a)
tryReadTBQueue q = fmap Just (readTBQueue q) `orElse` pure Nothing
tryReadTBQueue q = fmap Just (readTBQueue q) `orElse` return Nothing
-- | Efficiently read the entire contents of a 'TBQueue' into a list. This
-- function never retries.
--
-- @since 2.4.5
flushTBQueue :: forall a. TBQueue a -> STM [a]
flushTBQueue (TBQueue _rindex windex elements cap) = do
w <- readTVar windex
go (decMod w cap) []
where
go :: Int -> [a] -> STM [a]
go i acc = do
ele <- unsafeRead elements i
case ele of
Nothing -> pure acc
Just a -> do
unsafeWrite elements i Nothing
go (decMod i cap) (a : acc)
flushTBQueue :: TBQueue a -> STM [a]
flushTBQueue (TBQueue rsize read wsize write size) = do
xs <- readTVar read
ys <- readTVar write
if null xs && null ys
then return []
else do
unless (null xs) $ writeTVar read []
unless (null ys) $ writeTVar write []
writeTVar rsize 0
writeTVar wsize size
return (xs ++ reverse ys)
-- | Get the next value from the @TBQueue@ without removing it,
-- retrying if the queue is empty.
-- retrying if the channel is empty.
peekTBQueue :: TBQueue a -> STM a
peekTBQueue (TBQueue rindex _ elements _) = do
r <- readTVar rindex
ele <- unsafeRead elements r
case ele of
Nothing -> retry
Just a -> pure a
peekTBQueue (TBQueue _ read _ write _) = do
xs <- readTVar read
case xs of
(x:_) -> return x
[] -> do
ys <- readTVar write
case ys of
[] -> retry
_ -> do
let (z:zs) = reverse ys -- NB. lazy: we want the transaction to be
-- short, otherwise it will conflict
writeTVar write []
writeTVar read (z:zs)
return z
-- | A version of 'peekTBQueue' which does not retry. Instead it
-- returns @Nothing@ if no value is available.
tryPeekTBQueue :: TBQueue a -> STM (Maybe a)
tryPeekTBQueue q = fmap Just (peekTBQueue q) `orElse` pure Nothing
tryPeekTBQueue c = do
m <- tryReadTBQueue c
case m of
Nothing -> return Nothing
Just x -> do
unGetTBQueue c x
return m
-- | Put a data item back onto a channel, where it will be the next item read.
-- Retries if the queue is full.
-- Blocks if the queue is full.
unGetTBQueue :: TBQueue a -> a -> STM ()
unGetTBQueue (TBQueue rindex _ elements cap) a = do
r <- readTVar rindex
ele <- unsafeRead elements r
case ele of
Nothing -> unsafeWrite elements r (Just a)
Just _ -> retry
writeTVar rindex $! decMod r cap
unGetTBQueue (TBQueue rsize read wsize _write _size) a = do
r <- readTVar rsize
if (r > 0)
then do writeTVar rsize $! r - 1
else do
w <- readTVar wsize
if (w > 0)
then writeTVar wsize $! w - 1
else retry
xs <- readTVar read
writeTVar read (a:xs)
-- | Return the length of a 'TBQueue'.
--
-- @since 2.5.0.0
lengthTBQueue :: TBQueue a -> STM Natural
lengthTBQueue (TBQueue rindex windex elements cap) = do
r <- readTVar rindex
w <- readTVar windex
if w == r then do
-- length is 0 or cap
ele <- unsafeRead elements r
case ele of
Nothing -> pure 0
Just _ -> pure $! fromIntegral cap
else do
let len' = w - r
pure $! fromIntegral (if len' < 0 then len' + cap else len')
lengthTBQueue (TBQueue rsize _read wsize _write size) = do
r <- readTVar rsize
w <- readTVar wsize
return $! size - r - w
-- | Returns 'True' if the supplied 'TBQueue' is empty.
isEmptyTBQueue :: TBQueue a -> STM Bool
isEmptyTBQueue (TBQueue rindex windex elements _) = do
r <- readTVar rindex
w <- readTVar windex
if w == r then do
ele <- unsafeRead elements r
pure $! isNothing ele
else
pure False
isEmptyTBQueue (TBQueue _rsize read _wsize write _size) = do
xs <- readTVar read
case xs of
(_:_) -> return False
[] -> do ys <- readTVar write
case ys of
[] -> return True
_ -> return False
-- | Returns 'True' if the supplied 'TBQueue' is full.
--
-- @since 2.4.3
isFullTBQueue :: TBQueue a -> STM Bool
isFullTBQueue (TBQueue rindex windex elements _) = do
r <- readTVar rindex
w <- readTVar windex
if w == r then do
ele <- unsafeRead elements r
pure $! isJust ele
else
pure False
isFullTBQueue (TBQueue rsize _read wsize _write _size) = do
w <- readTVar wsize
if (w > 0)
then return False
else do
r <- readTVar rsize
if (r > 0)
then return False
else return True
-- | The maximum number of elements the queue can hold.
--
-- @since TODO
capacityTBQueue :: TBQueue a -> Natural
capacityTBQueue (TBQueue _ _ _ cap) = fromIntegral cap
capacityTBQueue (TBQueue _ _ _ _ cap) = fromIntegral cap
{-# LANGUAGE CPP, RankNTypes #-}
import Control.Concurrent.Async
import Control.Monad
import System.Environment
import Control.Concurrent.Chan
import Control.Concurrent.STM
import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TBQueue
-- Using CPP rather than a runtime choice between channel types,
-- because we want the compiler to be able to optimise the calls.
-- #define CHAN
-- #define TCHAN
-- #define TQUEUE
-- #define TBQUEUE
#ifdef CHAN
newc = newChan
readc c = readChan c
writec c x = writeChan c x
#elif defined(TCHAN)
newc = newTChanIO
readc c = atomically $ readTChan c
writec c x = atomically $ writeTChan c x
#elif defined(TQUEUE)
newc = newTQueueIO
readc c = atomically $ readTQueue c
writec c x = atomically $ writeTQueue c x
#elif defined(TBQUEUE)
newc = newTBQueueIO 4096
readc c = atomically $ readTBQueue c
writec c x = atomically $ writeTBQueue c x
#endif
main = do
[stest,sn] <- getArgs -- 2000000 is a good number
let n = read sn :: Int
test = read stest :: Int
runtest n test
runtest :: Int -> Int -> IO ()
runtest n test = do
c <- newc
case test of
0 -> do
a <- async $ replicateM_ n $ writec c (1 :: Int)
b <- async $ replicateM_ n $ readc c
waitBoth a b
return ()
1 -> do
replicateM_ n $ writec c (1 :: Int)
replicateM_ n $ readc c
2 -> do
let n1000 = n `quot` 1000
replicateM_ 1000 $ do
replicateM_ n1000 $ writec c (1 :: Int)
replicateM_ n1000 $ readc c
packages:
.
testsuite
bench
packages: . testsuite/
package testsuite
tests: true
cradle:
cabal:
{-# LANGUAGE CPP #-}
-- see https://github.com/haskell/stm/pull/19
--
-- Test-case contributed by Alexey Kuleshevich <alexey@kukeshevi.ch>
--
-- This bug is observable in all versions with TBQueue from `stm-2.4` to
-- `stm-2.4.5.1` inclusive.
module Issue17 (main) where
import Control.Concurrent.STM
import Test.HUnit.Base (assertBool, assertEqual)
main :: IO ()
main = do
-- New queue capacity is set to 0
queueIO <- newTBQueueIO 0
assertNoCapacityTBQueue queueIO
-- Same as above, except created within STM
queueSTM <- atomically $ newTBQueue 0
assertNoCapacityTBQueue queueSTM
#if !MIN_VERSION_stm(2,5,0)
-- NB: below are expected failures
-- New queue capacity is set to a negative numer
queueIO' <- newTBQueueIO (-1 :: Int)
assertNoCapacityTBQueue queueIO'
-- Same as above, except created within STM and different negative number
queueSTM' <- atomically $ newTBQueue (minBound :: Int)
assertNoCapacityTBQueue queueSTM'
#endif
assertNoCapacityTBQueue :: TBQueue Int -> IO ()
assertNoCapacityTBQueue queue = do
assertEmptyTBQueue queue
assertFullTBQueue queue
-- Attempt to write into the queue.
eValWrite <- atomically $ orElse (fmap Left (writeTBQueue queue 217))
(fmap Right (tryReadTBQueue queue))
assertEqual "Expected queue with no capacity: writeTBQueue" eValWrite (Right Nothing)
eValUnGet <- atomically $ orElse (fmap Left (unGetTBQueue queue 218))
(fmap Right (tryReadTBQueue queue))
assertEqual "Expected queue with no capacity: unGetTBQueue" eValUnGet (Right Nothing)
-- Make sure that attempt to write didn't affect the queue
assertEmptyTBQueue queue
assertFullTBQueue queue
assertEmptyTBQueue :: TBQueue Int -> IO ()
assertEmptyTBQueue queue = do
atomically (isEmptyTBQueue queue) >>=
assertBool "Expected empty: isEmptyTBQueue should return True"
atomically (tryReadTBQueue queue) >>=
assertEqual "Expected empty: tryReadTBQueue should return Nothing" Nothing
atomically (tryPeekTBQueue queue) >>=
assertEqual "Expected empty: tryPeekTBQueue should return Nothing" Nothing
atomically (flushTBQueue queue) >>=
assertEqual "Expected empty: flushTBQueue should return []" []
assertFullTBQueue :: TBQueue Int -> IO ()
assertFullTBQueue queue = do
atomically (isFullTBQueue queue) >>=
assertBool "Expected full: isFullTBQueue shoule return True"
......@@ -6,6 +6,7 @@ import Test.Framework (defaultMain, testGroup)
import Test.Framework.Providers.HUnit
import qualified Issue9
import qualified Issue17
import qualified Stm052
import qualified Stm064
import qualified Stm065
......@@ -18,6 +19,7 @@ main = do
tests = [
testGroup "regression"
[ testCase "issue #9" Issue9.main
, testCase "issue #17" Issue17.main
, testCase "stm052" Stm052.main
, testCase "stm064" Stm064.main
, testCase "stm065" Stm065.main
......
......@@ -20,6 +20,7 @@ test-suite stm
main-is: Main.hs
other-modules:
Issue9
Issue17
Stm052
Stm064
Stm065
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment