Commit e2495eb6 authored by Duncan Coutts's avatar Duncan Coutts
Browse files

Add some tests for the JobControl impls

Including cancelling.
parent 53e7da37
......@@ -372,6 +372,7 @@ Test-Suite unit-tests
UnitTests.Distribution.Client.Tar
UnitTests.Distribution.Client.UserConfig
UnitTests.Distribution.Client.ProjectConfig
UnitTests.Distribution.Client.JobControl
UnitTests.Distribution.Solver.Modular.PSQ
UnitTests.Distribution.Solver.Modular.Solver
UnitTests.Distribution.Solver.Modular.DSL
......
......@@ -21,6 +21,7 @@ import qualified UnitTests.Distribution.Client.Tar
import qualified UnitTests.Distribution.Client.Targets
import qualified UnitTests.Distribution.Client.UserConfig
import qualified UnitTests.Distribution.Client.ProjectConfig
import qualified UnitTests.Distribution.Client.JobControl
import UnitTests.Options
......@@ -55,6 +56,8 @@ tests mtimeChangeCalibrated =
UnitTests.Distribution.Client.UserConfig.tests
, testGroup "UnitTests.Distribution.Client.ProjectConfig"
UnitTests.Distribution.Client.ProjectConfig.tests
, testGroup "UnitTests.Distribution.Client.JobControl"
UnitTests.Distribution.Client.JobControl.tests
]
main :: IO ()
......
{-# LANGUAGE DeriveDataTypeable #-}
module UnitTests.Distribution.Client.JobControl (tests) where
import Distribution.Client.JobControl
import Data.List
import Data.Maybe
import Data.IORef
import Control.Monad
import Control.Concurrent (threadDelay)
import Control.Exception (Exception, try, throwIO)
import Data.Typeable (Typeable)
import qualified Data.Set as Set
import Test.Tasty
import Test.Tasty.QuickCheck hiding (collect)
tests :: [TestTree]
tests =
[ testGroup "serial"
[ testProperty "submit batch" prop_submit_serial
, testProperty "submit batch" prop_remaining_serial
, testProperty "submit interleaved" prop_interleaved_serial
, testProperty "concurrent jobs" prop_concurrent_serial
, testProperty "cancel" prop_cancel_serial
, testProperty "exceptions" prop_exception_serial
]
, testGroup "parallel"
[ testProperty "submit batch" prop_submit_parallel
, testProperty "submit batch" prop_remaining_parallel
, testProperty "submit interleaved" prop_interleaved_parallel
, testProperty "concurrent jobs" prop_concurrent_parallel
, testProperty "cancel" prop_cancel_parallel
, testProperty "exceptions" prop_exception_parallel
]
]
prop_submit_serial :: [Int] -> Property
prop_submit_serial xs =
ioProperty $ do
jobCtl <- newSerialJobControl
prop_submit jobCtl xs
prop_submit_parallel :: Positive (Small Int) -> [Int] -> Property
prop_submit_parallel (Positive (Small maxJobLimit)) xs =
ioProperty $ do
jobCtl <- newParallelJobControl maxJobLimit
prop_submit jobCtl xs
prop_remaining_serial :: [Int] -> Property
prop_remaining_serial xs =
ioProperty $ do
jobCtl <- newSerialJobControl
prop_remaining jobCtl xs
prop_remaining_parallel :: Positive (Small Int) -> [Int] -> Property
prop_remaining_parallel (Positive (Small maxJobLimit)) xs =
ioProperty $ do
jobCtl <- newParallelJobControl maxJobLimit
prop_remaining jobCtl xs
prop_interleaved_serial :: [Int] -> Property
prop_interleaved_serial xs =
ioProperty $ do
jobCtl <- newSerialJobControl
prop_submit_interleaved jobCtl xs
prop_interleaved_parallel :: Positive (Small Int) -> [Int] -> Property
prop_interleaved_parallel (Positive (Small maxJobLimit)) xs =
ioProperty $ do
jobCtl <- newParallelJobControl maxJobLimit
prop_submit_interleaved jobCtl xs
prop_submit :: JobControl IO Int -> [Int] -> IO Bool
prop_submit jobCtl xs = do
mapM_ (\x -> spawnJob jobCtl (return x)) xs
xs' <- mapM (\_ -> collectJob jobCtl) xs
return (sort xs == sort xs')
prop_remaining :: JobControl IO Int -> [Int] -> IO Bool
prop_remaining jobCtl xs = do
mapM_ (\x -> spawnJob jobCtl (return x)) xs
xs' <- collectRemainingJobs jobCtl
return (sort xs == sort xs')
collectRemainingJobs :: Monad m => JobControl m a -> m [a]
collectRemainingJobs jobCtl = go []
where
go xs = do
remaining <- remainingJobs jobCtl
if remaining
then do x <- collectJob jobCtl
go (x:xs)
else return xs
prop_submit_interleaved :: JobControl IO (Maybe Int) -> [Int] -> IO Bool
prop_submit_interleaved jobCtl xs = do
xs' <- sequence
[ spawn >> collect
| let spawns = map (\x -> spawnJob jobCtl (return (Just x))) xs
++ repeat (return ())
collects = replicate 5 (return Nothing)
++ map (\_ -> collectJob jobCtl) xs
, (spawn, collect) <- zip spawns collects
]
return (sort xs == sort (catMaybes xs'))
prop_concurrent_serial :: NonNegative (Small Int) -> Property
prop_concurrent_serial (NonNegative (Small ntasks)) =
ioProperty $ do
jobCtl <- newSerialJobControl
countRef <- newIORef (0 :: Int)
replicateM_ ntasks (spawnJob jobCtl (task countRef))
counts <- replicateM ntasks (collectJob jobCtl)
return $ length counts == ntasks
&& all (\(n0, n1) -> n0 == 0 && n1 == 1) counts
where
task countRef = do
n0 <- atomicModifyIORef countRef (\n -> (n+1, n))
threadDelay 100
n1 <- atomicModifyIORef countRef (\n -> (n-1, n))
return (n0, n1)
prop_concurrent_parallel :: Positive (Small Int) -> NonNegative Int -> Property
prop_concurrent_parallel (Positive (Small maxJobLimit)) (NonNegative ntasks) =
ioProperty $ do
jobCtl <- newParallelJobControl maxJobLimit
countRef <- newIORef (0 :: Int)
replicateM_ ntasks (spawnJob jobCtl (task countRef))
counts <- replicateM ntasks (collectJob jobCtl)
return $ length counts == ntasks
&& all (\(n0, n1) -> n0 >= 0 && n0 < maxJobLimit
&& n1 > 0 && n1 <= maxJobLimit) counts
-- we do hit the concurrency limit (in the right circumstances)
&& if ntasks >= maxJobLimit*2 -- give us enough of a margin
then any (\(_,n1) -> n1 == maxJobLimit) counts
else True
where
task countRef = do
n0 <- atomicModifyIORef countRef (\n -> (n+1, n))
threadDelay 100
n1 <- atomicModifyIORef countRef (\n -> (n-1, n))
return (n0, n1)
prop_cancel_serial :: [Int] -> [Int] -> Property
prop_cancel_serial xs ys =
ioProperty $ do
jobCtl <- newSerialJobControl
mapM_ (\x -> spawnJob jobCtl (return x)) (xs++ys)
xs' <- mapM (\_ -> collectJob jobCtl) xs
cancelJobs jobCtl
ys' <- collectRemainingJobs jobCtl
return (sort xs == sort xs' && null ys')
prop_cancel_parallel :: Positive (Small Int) -> [Int] -> [Int] -> Property
prop_cancel_parallel (Positive (Small maxJobLimit)) xs ys = do
ioProperty $ do
jobCtl <- newParallelJobControl maxJobLimit
mapM_ (\x -> spawnJob jobCtl (threadDelay 100 >> return x)) (xs++ys)
xs' <- mapM (\_ -> collectJob jobCtl) xs
cancelJobs jobCtl
ys' <- collectRemainingJobs jobCtl
return $ Set.fromList (xs'++ys') `Set.isSubsetOf` Set.fromList (xs++ys)
data TestException = TestException Int
deriving (Typeable, Show)
instance Exception TestException
prop_exception_serial :: [Either Int Int] -> Property
prop_exception_serial xs =
ioProperty $ do
jobCtl <- newSerialJobControl
prop_exception jobCtl xs
prop_exception_parallel :: Positive (Small Int) -> [Either Int Int] -> Property
prop_exception_parallel (Positive (Small maxJobLimit)) xs =
ioProperty $ do
jobCtl <- newParallelJobControl maxJobLimit
prop_exception jobCtl xs
prop_exception :: JobControl IO Int -> [Either Int Int] -> IO Bool
prop_exception jobCtl xs = do
mapM_ (\x -> spawnJob jobCtl (either (throwIO . TestException) return x)) xs
xs' <- replicateM (length xs) $ do
mx <- try (collectJob jobCtl)
return $ case mx of
Left (TestException n) -> Left n
Right n -> Right n
return (sort xs == sort xs')
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment