Commit 96c5411a authored by David Feuer's avatar David Feuer Committed by Marge Bot
Browse files

Use an IORef for QSemN

Replace the outer `MVar` in `QSemN` with an `IORef`. This should
probably be lighter, and it removes the need for `uninterruptibleMask`.

Previously Differential Revision https://phabricator.haskell.org/D4896
parent 2d2cc76f
{-# LANGUAGE Safe #-}
{-# LANGUAGE Trustworthy #-}
{-# LANGUAGE BangPatterns #-}
{-# OPTIONS_GHC -funbox-strict-fields #-}
-----------------------------------------------------------------------------
-- |
......@@ -23,11 +25,12 @@ module Control.Concurrent.QSemN
signalQSemN -- :: QSemN -> Int -> IO ()
) where
import Control.Concurrent.MVar ( MVar, newEmptyMVar, takeMVar, tryTakeMVar
, putMVar, newMVar
import Control.Concurrent.MVar ( MVar, newEmptyMVar, takeMVar
, tryPutMVar, isEmptyMVar)
import Control.Exception
import Data.Maybe
import Control.Monad (when)
import Data.IORef (IORef, newIORef, atomicModifyIORef)
import System.IO.Unsafe (unsafePerformIO)
-- | 'QSemN' is a quantity semaphore in which the resource is acquired
-- and released in units of one. It provides guaranteed FIFO ordering
......@@ -39,7 +42,7 @@ import Data.Maybe
--
-- is safe; it never loses any of the resource.
--
newtype QSemN = QSemN (MVar (Int, [(Int, MVar ())], [(Int, MVar ())]))
data QSemN = QSemN !(IORef (Int, [(Int, MVar ())], [(Int, MVar ())]))
-- The semaphore state (i, xs, ys):
--
......@@ -55,9 +58,7 @@ newtype QSemN = QSemN (MVar (Int, [(Int, MVar ())], [(Int, MVar ())]))
-- A thread can dequeue itself by also putting () into the MVar, which
-- it must do if it receives an exception while blocked in waitQSemN.
-- This means that when unblocking a thread in signalQSemN we must
-- first check whether the MVar is already full; the MVar lock on the
-- semaphore itself resolves race conditions between signalQSemN and a
-- thread attempting to dequeue itself.
-- first check whether the MVar is already full.
-- |Build a new 'QSemN' with a supplied initial quantity.
-- The initial quantity must be at least 0.
......@@ -65,54 +66,65 @@ newQSemN :: Int -> IO QSemN
newQSemN initial
| initial < 0 = fail "newQSemN: Initial quantity must be non-negative"
| otherwise = do
sem <- newMVar (initial, [], [])
sem <- newIORef (initial, [], [])
return (QSemN sem)
-- An unboxed version of Maybe (MVar a)
data MaybeMV a = JustMV !(MVar a) | NothingMV
-- |Wait for the specified quantity to become available
waitQSemN :: QSemN -> Int -> IO ()
waitQSemN (QSemN m) sz =
mask_ $ do
(i,b1,b2) <- takeMVar m
-- We need to mask here. Once we've enqueued our MVar, we need
-- to be sure to wait for it. Otherwise, we could lose our
-- allocated resource.
waitQSemN qs@(QSemN m) sz = mask_ $ do
-- unsafePerformIO and not unsafeDupablePerformIO. We must
-- be sure to wait on the same MVar that gets enqueued.
mmvar <- atomicModifyIORef m $ \ (i,b1,b2) -> unsafePerformIO $ do
let z = i-sz
if z < 0
then do
b <- newEmptyMVar
putMVar m (i, b1, (sz,b):b2)
wait b
else do
putMVar m (z, b1, b2)
return ()
then do
b <- newEmptyMVar
return ((i, b1, (sz,b):b2), JustMV b)
else return ((z, b1, b2), NothingMV)
-- Note: this case match actually allocates the MVar if necessary.
case mmvar of
NothingMV -> return ()
JustMV b -> wait b
where
wait :: MVar () -> IO ()
wait b = do
takeMVar b `onException`
(uninterruptibleMask_ $ do -- Note [signal uninterruptible]
(i,b1,b2) <- takeMVar m
r <- tryTakeMVar b
r' <- if isJust r
then signal sz (i,b1,b2)
else do putMVar b (); return (i,b1,b2)
putMVar m r')
takeMVar b `onException` do
already_filled <- not <$> tryPutMVar b ()
when already_filled $ signalQSemN qs sz
-- |Signal that a given quantity is now available from the 'QSemN'.
signalQSemN :: QSemN -> Int -> IO ()
signalQSemN (QSemN m) sz = uninterruptibleMask_ $ do
r <- takeMVar m
r' <- signal sz r
putMVar m r'
signal :: Int
-> (Int,[(Int,MVar ())],[(Int,MVar ())])
-> IO (Int,[(Int,MVar ())],[(Int,MVar ())])
-- We don't need to mask here because we should *already* be masked
-- here (e.g., by bracket). Indeed, if we're not already masked,
-- it's too late to do so.
--
-- What if the unsafePerformIO thunk is forced in another thread,
-- and receives an asynchronous exception? That shouldn't be a
-- problem: when we force it ourselves, presumably masked, we
-- will resume its execution.
signalQSemN (QSemN m) sz0 = do
-- unsafePerformIO and not unsafeDupablePerformIO. We must not
-- wake up more threads than we're supposed to.
unit <- atomicModifyIORef m $ \(i,a1,a2) ->
unsafePerformIO (loop (sz0 + i) a1 a2)
signal sz0 (i,a1,a2) = loop (sz0 + i) a1 a2
-- Forcing this will actually wake the necessary threads.
evaluate unit
where
loop 0 bs b2 = return (0, bs, b2)
loop sz [] [] = return (sz, [], [])
loop 0 bs b2 = return ((0, bs, b2), ())
loop sz [] [] = return ((sz, [], []), ())
loop sz [] b2 = loop sz (reverse b2) []
loop sz ((j,b):bs) b2
| j > sz = do
r <- isEmptyMVar b
if r then return (sz, (j,b):bs, b2)
if r then return ((sz, (j,b):bs, b2), ())
else loop sz bs b2
| otherwise = do
r <- tryPutMVar b ()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment