Commit 43583aaa authored by Simon Marlow's avatar Simon Marlow
Browse files

Integrated new I/O manager

(patch originally by Johan Tibell <johan.tibell@gmail.com>, minor merging by me)
parent 41fe4e3e
......@@ -120,7 +120,7 @@ import GHC.IO.Exception
import GHC.Exception
import GHC.Show
-- import GHC.Exception hiding ( Exception )
import GHC.Conc
import GHC.Conc.Sync
#endif
#ifdef __HUGS__
......
This diff is collapsed.
{-# OPTIONS_GHC -XNoImplicitPrelude #-}
{-# OPTIONS_GHC -fno-warn-missing-signatures #-}
{-# OPTIONS_HADDOCK not-home #-}
-----------------------------------------------------------------------------
-- |
-- Module : GHC.Conc.IO
-- Copyright : (c) The University of Glasgow, 1994-2002
-- License : see libraries/base/LICENSE
--
-- Maintainer : cvs-ghc@haskell.org
-- Stability : internal
-- Portability : non-portable (GHC extensions)
--
-- Basic concurrency stuff.
--
-----------------------------------------------------------------------------
-- No: #hide, because bits of this module are exposed by the stm package.
-- However, we don't want this module to be the home location for the
-- bits it exports, we'd rather have Control.Concurrent and the other
-- higher level modules be the home. Hence:
#include "Typeable.h"
-- #not-home
module GHC.Conc.IO
( ensureIOManagerIsRunning
-- * Waiting
, threadDelay -- :: Int -> IO ()
, registerDelay -- :: Int -> IO (TVar Bool)
, threadWaitRead -- :: Int -> IO ()
, threadWaitWrite -- :: Int -> IO ()
#ifdef mingw32_HOST_OS
, asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
, asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
, asyncDoProc -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
, asyncReadBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
, asyncWriteBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
, ConsoleEvent(..)
, win32ConsoleHandler
, toWin32ConsoleEvent
#endif
) where
import Control.Monad
import Foreign
import GHC.Base
import GHC.Conc.Sync as Sync
import GHC.Real ( fromIntegral )
import System.Posix.Types
#ifdef mingw32_HOST_OS
import qualified GHC.Conc.Windows as Windows
import GHC.Conc.Windows (asyncRead, asyncWrite, asyncDoProc, asyncReadBA,
asyncWriteBA, ConsoleEvent(..), win32ConsoleHandler,
toWin32ConsoleEvent)
#else
import qualified System.Event.Thread as Event
#endif
ensureIOManagerIsRunning :: IO ()
#ifndef mingw32_HOST_OS
ensureIOManagerIsRunning = Event.ensureIOManagerIsRunning
#else
ensureIOManagerIsRunning = Windows.ensureIOManagerIsRunning
#endif
-- | Block the current thread until data is available to read on the
-- given file descriptor (GHC only).
threadWaitRead :: Fd -> IO ()
threadWaitRead fd
#ifndef mingw32_HOST_OS
| threaded = Event.threadWaitRead fd
#endif
| otherwise = IO $ \s ->
case fromIntegral fd of { I# fd# ->
case waitRead# fd# s of { s' -> (# s', () #)
}}
-- | Block the current thread until data can be written to the
-- given file descriptor (GHC only).
threadWaitWrite :: Fd -> IO ()
threadWaitWrite fd
#ifndef mingw32_HOST_OS
| threaded = Event.threadWaitWrite fd
#endif
| otherwise = IO $ \s ->
case fromIntegral fd of { I# fd# ->
case waitWrite# fd# s of { s' -> (# s', () #)
}}
-- | Suspends the current thread for a given number of microseconds
-- (GHC only).
--
-- There is no guarantee that the thread will be rescheduled promptly
-- when the delay has expired, but the thread will never continue to
-- run /earlier/ than specified.
--
threadDelay :: Int -> IO ()
threadDelay time
#ifdef mingw32_HOST_OS
| threaded = Windows.threadDelay time
#else
| threaded = Event.threadDelay time
#endif
| otherwise = IO $ \s ->
case fromIntegral time of { I# time# ->
case delay# time# s of { s' -> (# s', () #)
}}
-- | Set the value of returned TVar to True after a given number of
-- microseconds. The caveats associated with threadDelay also apply.
--
registerDelay :: Int -> IO (TVar Bool)
registerDelay usecs
#ifdef mingw32_HOST_OS
| threaded = Windows.registerDelay usecs
#else
| threaded = Event.registerDelay usecs
#endif
| otherwise = error "registerDelay: requires -threaded"
foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
{-# LANGUAGE NoImplicitPrelude #-}
module GHC.Conc.Signal
( Signal
, HandlerFun
, setHandler
, runHandlers
) where
import Control.Concurrent.MVar (MVar, newMVar, withMVar)
import Data.Dynamic (Dynamic)
import Data.Maybe (Maybe(..))
import Foreign.C.Types (CInt)
import Foreign.ForeignPtr (ForeignPtr)
import Foreign.StablePtr (castPtrToStablePtr, castStablePtrToPtr,
deRefStablePtr, freeStablePtr, newStablePtr)
import Foreign.Ptr (Ptr, castPtr)
import GHC.Arr (inRange)
import GHC.Base
import GHC.Conc.Sync (forkIO)
import GHC.IO (mask_, unsafePerformIO)
import GHC.IOArray (IOArray, boundsIOArray, newIOArray, unsafeReadIOArray,
unsafeWriteIOArray)
import GHC.Num (fromInteger)
import GHC.Real (fromIntegral)
import GHC.Word (Word8)
------------------------------------------------------------------------
-- Signal handling
type Signal = CInt
maxSig :: Int
maxSig = 64
type HandlerFun = ForeignPtr Word8 -> IO ()
-- Lock used to protect concurrent access to signal_handlers. Symptom
-- of this race condition is GHC bug #1922, although that bug was on
-- Windows a similar bug also exists on Unix.
signal_handlers :: MVar (IOArray Int (Maybe (HandlerFun,Dynamic)))
signal_handlers = unsafePerformIO $ do
arr <- newIOArray (0, maxSig) Nothing
m <- newMVar arr
sharedCAF m getOrSetGHCConcSignalSignalHandlerStore
{-# NOINLINE signal_handlers #-}
foreign import ccall unsafe "getOrSetGHCConcSignalSignalHandlerStore"
getOrSetGHCConcSignalSignalHandlerStore :: Ptr a -> IO (Ptr a)
setHandler :: Signal -> Maybe (HandlerFun, Dynamic)
-> IO (Maybe (HandlerFun, Dynamic))
setHandler sig handler = do
let int = fromIntegral sig
withMVar signal_handlers $ \arr ->
if not (inRange (boundsIOArray arr) int)
then error "GHC.Conc.setHandler: signal out of range"
else do old <- unsafeReadIOArray arr int
unsafeWriteIOArray arr int handler
return old
runHandlers :: ForeignPtr Word8 -> Signal -> IO ()
runHandlers p_info sig = do
let int = fromIntegral sig
withMVar signal_handlers $ \arr ->
if not (inRange (boundsIOArray arr) int)
then return ()
else do handler <- unsafeReadIOArray arr int
case handler of
Nothing -> return ()
Just (f,_) -> do _ <- forkIO (f p_info)
return ()
-- Machinery needed to ensure that we only have one copy of certain
-- CAFs in this module even when the base package is present twice, as
-- it is when base is dynamically loaded into GHCi. The RTS keeps
-- track of the single true value of the CAF, so even when the CAFs in
-- the dynamically-loaded base package are reverted, nothing bad
-- happens.
--
sharedCAF :: a -> (Ptr a -> IO (Ptr a)) -> IO a
sharedCAF a get_or_set =
mask_ $ do
stable_ref <- newStablePtr a
let ref = castPtr (castStablePtrToPtr stable_ref)
ref2 <- get_or_set ref
if ref == ref2
then return a
else do freeStablePtr stable_ref
deRefStablePtr (castPtrToStablePtr (castPtr ref2))
This diff is collapsed.
{-# OPTIONS_GHC -XNoImplicitPrelude #-}
{-# OPTIONS_GHC -fno-warn-missing-signatures #-}
{-# OPTIONS_HADDOCK not-home #-}
-----------------------------------------------------------------------------
-- |
-- Module : GHC.Conc.Windows
-- Copyright : (c) The University of Glasgow, 1994-2002
-- License : see libraries/base/LICENSE
--
-- Maintainer : cvs-ghc@haskell.org
-- Stability : internal
-- Portability : non-portable (GHC extensions)
--
-- Windows I/O manager
--
-----------------------------------------------------------------------------
-- #not-home
module GHC.Conc.Windows
( ensureIOManagerIsRunning
-- * Waiting
, threadDelay
, registerDelay
-- * Miscellaneous
, asyncRead -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
, asyncWrite -- :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
, asyncDoProc -- :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
, asyncReadBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
, asyncWriteBA -- :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int, Int)
, ConsoleEvent(..)
, win32ConsoleHandler
, toWin32ConsoleEvent
) where
import Control.Monad
import Data.Bits (shiftR)
import Data.Maybe (Maybe(..))
import Data.Typeable
import Foreign.C.Error (throwErrno)
import GHC.Base
import GHC.Conc.Sync
import GHC.Enum (Enum)
import GHC.IO (unsafePerformIO)
import GHC.IORef
import GHC.MVar
import GHC.Num (Num(..))
import GHC.Ptr
import GHC.Read (Read)
import GHC.Real (div, fromIntegral)
import GHC.Show (Show)
import GHC.Word (Word32, Word64)
-- ----------------------------------------------------------------------------
-- Thread waiting
-- Note: threadWaitRead and threadWaitWrite aren't really functional
-- on Win32, but left in there because lib code (still) uses them (the manner
-- in which they're used doesn't cause problems on a Win32 platform though.)
asyncRead :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
asyncRead (I# fd) (I# isSock) (I# len) (Ptr buf) =
IO $ \s -> case asyncRead# fd isSock len buf s of
(# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
asyncWrite :: Int -> Int -> Int -> Ptr a -> IO (Int, Int)
asyncWrite (I# fd) (I# isSock) (I# len) (Ptr buf) =
IO $ \s -> case asyncWrite# fd isSock len buf s of
(# s', len#, err# #) -> (# s', (I# len#, I# err#) #)
asyncDoProc :: FunPtr (Ptr a -> IO Int) -> Ptr a -> IO Int
asyncDoProc (FunPtr proc) (Ptr param) =
-- the 'length' value is ignored; simplifies implementation of
-- the async*# primops to have them all return the same result.
IO $ \s -> case asyncDoProc# proc param s of
(# s', _len#, err# #) -> (# s', I# err# #)
-- to aid the use of these primops by the IO Handle implementation,
-- provide the following convenience funs:
-- this better be a pinned byte array!
asyncReadBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
asyncReadBA fd isSock len off bufB =
asyncRead fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
asyncWriteBA :: Int -> Int -> Int -> Int -> MutableByteArray# RealWorld -> IO (Int,Int)
asyncWriteBA fd isSock len off bufB =
asyncWrite fd isSock len ((Ptr (byteArrayContents# (unsafeCoerce# bufB))) `plusPtr` off)
-- ----------------------------------------------------------------------------
-- Threaded RTS implementation of threadDelay
-- | Suspends the current thread for a given number of microseconds
-- (GHC only).
--
-- There is no guarantee that the thread will be rescheduled promptly
-- when the delay has expired, but the thread will never continue to
-- run /earlier/ than specified.
--
threadDelay :: Int -> IO ()
threadDelay time
| threaded = waitForDelayEvent time
| otherwise = IO $ \s ->
case fromIntegral time of { I# time# ->
case delay# time# s of { s' -> (# s', () #)
}}
-- | Set the value of returned TVar to True after a given number of
-- microseconds. The caveats associated with threadDelay also apply.
--
registerDelay :: Int -> IO (TVar Bool)
registerDelay usecs
| threaded = waitForDelayEventSTM usecs
| otherwise = error "registerDelay: requires -threaded"
foreign import ccall unsafe "rtsSupportsBoundThreads" threaded :: Bool
waitForDelayEvent :: Int -> IO ()
waitForDelayEvent usecs = do
m <- newEmptyMVar
target <- calculateTarget usecs
atomicModifyIORef pendingDelays (\xs -> (Delay target m : xs, ()))
prodServiceThread
takeMVar m
-- Delays for use in STM
waitForDelayEventSTM :: Int -> IO (TVar Bool)
waitForDelayEventSTM usecs = do
t <- atomically $ newTVar False
target <- calculateTarget usecs
atomicModifyIORef pendingDelays (\xs -> (DelaySTM target t : xs, ()))
prodServiceThread
return t
calculateTarget :: Int -> IO USecs
calculateTarget usecs = do
now <- getUSecOfDay
return $ now + (fromIntegral usecs)
data DelayReq
= Delay {-# UNPACK #-} !USecs {-# UNPACK #-} !(MVar ())
| DelaySTM {-# UNPACK #-} !USecs {-# UNPACK #-} !(TVar Bool)
{-# NOINLINE pendingDelays #-}
pendingDelays :: IORef [DelayReq]
pendingDelays = unsafePerformIO $ do
m <- newIORef []
sharedCAF m getOrSetGHCConcWindowsPendingDelaysStore
foreign import ccall unsafe "getOrSetGHCConcWindowsPendingDelaysStore"
getOrSetGHCConcWindowsPendingDelaysStore :: Ptr a -> IO (Ptr a)
{-# NOINLINE ioManagerThread #-}
ioManagerThread :: MVar (Maybe ThreadId)
ioManagerThread = unsafePerformIO $ do
m <- newMVar Nothing
sharedCAF m getOrSetGHCConcWindowsIOManagerThreadStore
foreign import ccall unsafe "getOrSetGHCConcWindowsIOManagerThreadStore"
getOrSetGHCConcWindowsIOManagerThreadStore :: Ptr a -> IO (Ptr a)
ensureIOManagerIsRunning :: IO ()
ensureIOManagerIsRunning
| threaded = startIOManagerThread
| otherwise = return ()
startIOManagerThread :: IO ()
startIOManagerThread = do
modifyMVar_ ioManagerThread $ \old -> do
let create = do t <- forkIO ioManager; return (Just t)
case old of
Nothing -> create
Just t -> do
s <- threadStatus t
case s of
ThreadFinished -> create
ThreadDied -> create
_other -> return (Just t)
insertDelay :: DelayReq -> [DelayReq] -> [DelayReq]
insertDelay d [] = [d]
insertDelay d1 ds@(d2 : rest)
| delayTime d1 <= delayTime d2 = d1 : ds
| otherwise = d2 : insertDelay d1 rest
delayTime :: DelayReq -> USecs
delayTime (Delay t _) = t
delayTime (DelaySTM t _) = t
type USecs = Word64
foreign import ccall unsafe "getUSecOfDay"
getUSecOfDay :: IO USecs
{-# NOINLINE prodding #-}
prodding :: IORef Bool
prodding = unsafePerformIO $ do
r <- newIORef False
sharedCAF r getOrSetGHCConcWindowsProddingStore
foreign import ccall unsafe "getOrSetGHCConcWindowsProddingStore"
getOrSetGHCConcWindowsProddingStore :: Ptr a -> IO (Ptr a)
prodServiceThread :: IO ()
prodServiceThread = do
-- NB. use atomicModifyIORef here, otherwise there are race
-- conditions in which prodding is left at True but the server is
-- blocked in select().
was_set <- atomicModifyIORef prodding $ \b -> (True,b)
unless was_set wakeupIOManager
-- ----------------------------------------------------------------------------
-- Windows IO manager thread
ioManager :: IO ()
ioManager = do
wakeup <- c_getIOManagerEvent
service_loop wakeup []
service_loop :: HANDLE -- read end of pipe
-> [DelayReq] -- current delay requests
-> IO ()
service_loop wakeup old_delays = do
-- pick up new delay requests
new_delays <- atomicModifyIORef pendingDelays (\a -> ([],a))
let delays = foldr insertDelay old_delays new_delays
now <- getUSecOfDay
(delays', timeout) <- getDelay now delays
r <- c_WaitForSingleObject wakeup timeout
case r of
0xffffffff -> do c_maperrno; throwErrno "service_loop"
0 -> do
r2 <- c_readIOManagerEvent
exit <-
case r2 of
_ | r2 == io_MANAGER_WAKEUP -> return False
_ | r2 == io_MANAGER_DIE -> return True
0 -> return False -- spurious wakeup
_ -> do start_console_handler (r2 `shiftR` 1); return False
unless exit $ service_cont wakeup delays'
_other -> service_cont wakeup delays' -- probably timeout
service_cont :: HANDLE -> [DelayReq] -> IO ()
service_cont wakeup delays = do
r <- atomicModifyIORef prodding (\_ -> (False,False))
r `seq` return () -- avoid space leak
service_loop wakeup delays
-- must agree with rts/win32/ThrIOManager.c
io_MANAGER_WAKEUP, io_MANAGER_DIE :: Word32
io_MANAGER_WAKEUP = 0xffffffff
io_MANAGER_DIE = 0xfffffffe
data ConsoleEvent
= ControlC
| Break
| Close
-- these are sent to Services only.
| Logoff
| Shutdown
deriving (Eq, Ord, Enum, Show, Read, Typeable)
start_console_handler :: Word32 -> IO ()
start_console_handler r =
case toWin32ConsoleEvent r of
Just x -> withMVar win32ConsoleHandler $ \handler -> do
_ <- forkIO (handler x)
return ()
Nothing -> return ()
toWin32ConsoleEvent :: Num a => a -> Maybe ConsoleEvent
toWin32ConsoleEvent ev =
case ev of
0 {- CTRL_C_EVENT-} -> Just ControlC
1 {- CTRL_BREAK_EVENT-} -> Just Break
2 {- CTRL_CLOSE_EVENT-} -> Just Close
5 {- CTRL_LOGOFF_EVENT-} -> Just Logoff
6 {- CTRL_SHUTDOWN_EVENT-} -> Just Shutdown
_ -> Nothing
win32ConsoleHandler :: MVar (ConsoleEvent -> IO ())
win32ConsoleHandler = unsafePerformIO (newMVar (error "win32ConsoleHandler"))
wakeupIOManager :: IO ()
wakeupIOManager = c_sendIOManagerEvent io_MANAGER_WAKEUP
-- Walk the queue of pending delays, waking up any that have passed
-- and return the smallest delay to wait for. The queue of pending
-- delays is kept ordered.
getDelay :: USecs -> [DelayReq] -> IO ([DelayReq], DWORD)
getDelay _ [] = return ([], iNFINITE)
getDelay now all@(d : rest)
= case d of
Delay time m | now >= time -> do
putMVar m ()
getDelay now rest
DelaySTM time t | now >= time -> do
atomically $ writeTVar t True
getDelay now rest
_otherwise ->
-- delay is in millisecs for WaitForSingleObject
let micro_seconds = delayTime d - now
milli_seconds = (micro_seconds + 999) `div` 1000
in return (all, fromIntegral milli_seconds)
-- ToDo: this just duplicates part of System.Win32.Types, which isn't
-- available yet. We should move some Win32 functionality down here,
-- maybe as part of the grand reorganisation of the base package...
type HANDLE = Ptr ()
type DWORD = Word32
iNFINITE :: DWORD
iNFINITE = 0xFFFFFFFF -- urgh
foreign import ccall unsafe "getIOManagerEvent" -- in the RTS (ThrIOManager.c)
c_getIOManagerEvent :: IO HANDLE
foreign import ccall unsafe "readIOManagerEvent" -- in the RTS (ThrIOManager.c)
c_readIOManagerEvent :: IO Word32
foreign import ccall unsafe "sendIOManagerEvent" -- in the RTS (ThrIOManager.c)
c_sendIOManagerEvent :: Word32 -> IO ()
foreign import ccall unsafe "maperrno" -- in Win32Utils.c
c_maperrno :: IO ()
foreign import stdcall "WaitForSingleObject"
c_WaitForSingleObject :: HANDLE -> DWORD -> IO DWORD
......@@ -37,7 +37,7 @@ import GHC.IO.Buffer
import GHC.IO.BufferedIO
import qualified GHC.IO.Device
import GHC.IO.Device (SeekMode(..), IODeviceType(..))
import GHC.Conc
import GHC.Conc.IO
import GHC.IO.Exception
import Foreign
......
......@@ -59,7 +59,7 @@ import GHC.IO.Device (IODevice, SeekMode(..))
import qualified GHC.IO.Device as IODevice
import qualified GHC.IO.BufferedIO as Buffered
import GHC.Conc
import GHC.Conc.Sync
import GHC.Real
import GHC.Base
import GHC.Exception
......
module System.Event
( -- * Types
EventManager
-- * Creation
, new
-- * Running
, loop
-- ** Stepwise running
, step
, shutdown