Commit 8d9edfed authored by parcs's avatar parcs

Implement the parallel upsweep (#910)

The parallel upsweep is the parallel counterpart to the default
sequential upsweep. It attempts to compile modules in parallel by
subdividing the work of the upsweep into parts that can be executed
concurrently by multiple Haskell threads.

In order to enable the parallel upsweep, the user has to pass the -jN
flag to GHC, where N is an optional number denoting the number of jobs,
or modules, to compile in parallel, like with GNU make. In GHC this just
sets the number of capabilities to N.
parent e8d0dc7e
......@@ -579,6 +579,10 @@ data DynFlags = DynFlags {
ruleCheck :: Maybe String,
strictnessBefore :: [Int], -- ^ Additional demand analysis
parUpsweepNum :: Maybe Int, -- ^ The number of modules to compile in parallel
-- during the upsweep, where Nothing ==> compile as
-- many in parallel as there are CPUs.
simplTickFactor :: Int, -- ^ Multiplier for simplifier ticks
specConstrThreshold :: Maybe Int, -- ^ Threshold for SpecConstr
specConstrCount :: Maybe Int, -- ^ Max number of specialisations for any one function
......@@ -1254,6 +1258,8 @@ defaultDynFlags mySettings =
historySize = 20,
strictnessBefore = [],
parUpsweepNum = Just 1,
cmdlineHcIncludes = [],
importPaths = ["."],
mainModIs = mAIN,
......@@ -2012,6 +2018,8 @@ dynamic_flags = [
addWarn "-#include and INCLUDE pragmas are deprecated: They no longer have any effect"))
, Flag "v" (OptIntSuffix setVerbosity)
, Flag "j" (OptIntSuffix (\n -> upd (\d -> d {parUpsweepNum = n})))
------- ways --------------------------------------------------------
, Flag "prof" (NoArg (addWay WayProf))
, Flag "eventlog" (NoArg (addWay WayEventLog))
......
......@@ -40,9 +40,10 @@ import TcRnMonad ( initIfaceCheck )
import Bag ( listToBag )
import BasicTypes
import Digraph
import Exception ( evaluate, tryIO )
import Exception ( tryIO, gbracket, gfinally )
import FastString
import Maybes ( expectJust, mapCatMaybes )
import MonadUtils ( allM )
import Outputable
import Panic
import SrcLoc
......@@ -54,17 +55,24 @@ import Util
import qualified Data.Map as Map
import qualified FiniteMap as Map ( insertListWith )
import Control.Concurrent ( forkIOWithUnmask, killThread )
import Control.Concurrent.MVar
import Control.Concurrent.QSem
import Control.Exception
import Control.Monad
import Data.IORef
import Data.List
import qualified Data.List as List
import Data.Maybe
import Data.Ord ( comparing )
import Data.Time
import System.Directory
import System.FilePath
import System.IO ( fixIO )
import System.IO.Error ( isDoesNotExistError )
import GHC.Conc ( getNumProcessors, getNumCapabilities, setNumCapabilities )
-- -----------------------------------------------------------------------------
-- Loading the program
......@@ -253,16 +261,22 @@ load how_much = do
mg = stable_mg ++ unstable_mg
-- clean up between compilations
let cleanup hsc_env = intermediateCleanTempFiles dflags
let cleanup hsc_env = intermediateCleanTempFiles (hsc_dflags hsc_env)
(flattenSCCs mg2_with_srcimps)
hsc_env
liftIO $ debugTraceMsg dflags 2 (hang (text "Ready for upsweep")
2 (ppr mg))
n_jobs <- case parUpsweepNum dflags of
Nothing -> liftIO getNumProcessors
Just n -> return n
let upsweep_fn | n_jobs > 1 = parUpsweep n_jobs
| otherwise = upsweep
setSession hsc_env{ hsc_HPT = emptyHomePackageTable }
(upsweep_ok, modsUpswept)
<- upsweep pruned_hpt stable_mods cleanup mg
<- upsweep_fn pruned_hpt stable_mods cleanup mg
-- Make modsDone be the summaries for each home module now
-- available; this should equal the domain of hpt3.
......@@ -595,6 +609,340 @@ checkStability hpt sccs all_home_mods = foldl checkSCC ([],[]) sccs
linkableTime l >= ms_hs_date ms
_other -> False
{- Parallel Upsweep
-
- The parallel upsweep attempts to concurrently compile the modules in the
- compilation graph using multiple Haskell threads.
-
- The Algorithm
-
- A Haskell thread is spawned for each module in the module graph, waiting for
- its direct dependencies to finish building before it itself begins to build.
-
- Each module is associated with an initially empty MVar that stores the
- result of that particular module's compile. If the compile succeeded, then
- the HscEnv (synchronized by an MVar) is updated with the fresh HMI of that
- module, and the module's HMI is deleted from the old HPT (synchronized by an
- IORef) to save space.
-
- Instead of immediately outputting messages to the standard handles, all
- compilation output is deferred to a per-module TQueue. A QSem is used to
- limit the number of workers that are compiling simultaneously.
-
- Meanwhile, the main thread sequentially loops over all the modules in the
- module graph, outputting the messages stored in each module's TQueue.
-}
-- | Each module is given a unique 'LogQueue' to redirect compilation messages
-- to. A 'Nothing' value contains the result of compilation, and denotes the
-- end of the message queue.
data LogQueue = LogQueue !(IORef [Maybe (Severity, SrcSpan, PprStyle, MsgDoc)])
!(MVar ())
-- | The graph of modules to compile and their corresponding result 'MVar' and
-- 'LogQueue'.
type CompilationGraph = [(ModSummary, MVar SuccessFlag, LogQueue)]
-- | Build a 'CompilationGraph' out of a list of strongly-connected modules,
-- also returning the first, if any, encountered module cycle.
buildCompGraph :: [SCC ModSummary] -> IO (CompilationGraph, Maybe [ModSummary])
buildCompGraph [] = return ([], Nothing)
buildCompGraph (scc:sccs) = case scc of
AcyclicSCC ms -> do
mvar <- newEmptyMVar
log_queue <- do
ref <- newIORef []
sem <- newEmptyMVar
return (LogQueue ref sem)
(rest,cycle) <- buildCompGraph sccs
return ((ms,mvar,log_queue):rest, cycle)
CyclicSCC mss -> return ([], Just mss)
-- | The entry point to the parallel upsweep.
--
-- See also the simpler, sequential 'upsweep'.
parUpsweep
:: GhcMonad m
=> Int
-- ^ The number of workers we wish to run in parallel
-> HomePackageTable
-> ([ModuleName],[ModuleName])
-> (HscEnv -> IO ())
-> [SCC ModSummary]
-> m (SuccessFlag,
[ModSummary])
parUpsweep n_jobs old_hpt stable_mods cleanup sccs = do
hsc_env <- getSession
let dflags = hsc_dflags hsc_env
-- The bits of shared state we'll be using:
-- The global HscEnv is updated with the module's HMI when a module
-- successfully compiles.
hsc_env_var <- liftIO $ newMVar hsc_env
-- The old HPT is used for recompilation checking in upsweep_mod. When a
-- module sucessfully gets compiled, its HMI is pruned from the old HPT.
old_hpt_var <- liftIO $ newIORef old_hpt
-- The list of modules that have so far been successfully compiled. This is
-- used to re-typecheck module loops after the last module in the loop has
-- been compiled (see reTypecheckLoop).
mods_done_var <- liftIO $ newIORef []
-- What we use to limit parallelism with.
par_sem <- liftIO $ newQSem n_jobs
let updNumCapabilities = liftIO $ do
n_capabilities <- getNumCapabilities
unless (n_capabilities /= 1) $ setNumCapabilities n_jobs
return n_capabilities
-- Reset the number of capabilities once the upsweep ends.
let resetNumCapabilities orig_n = liftIO $ setNumCapabilities orig_n
gbracket updNumCapabilities resetNumCapabilities $ \_ -> do
-- Sync the global session with the latest HscEnv once the upsweep ends.
let finallySyncSession io = io `gfinally` do
hsc_env <- liftIO $ readMVar hsc_env_var
setSession hsc_env
finallySyncSession $ do
-- Build the compilation graph out of the list of SCCs. Module cycles are
-- handled at the very end, after some useful work gets done. Note that
-- this list is topologically sorted (by virtue of 'sccs' being sorted so).
(comp_graph,cycle) <- liftIO $ buildCompGraph sccs
let comp_graph_w_idx = zip comp_graph [1..]
-- Build a Map out of the compilation graph with which we can efficiently
-- look up the result MVar associated with a particular home module.
let mod_map :: Map.Map (Module,Bool) (MVar SuccessFlag, Int)
mod_map = Map.fromList [ ((ms_mod ms, isBootSummary ms), (mvar,idx))
| ((ms,mvar,_),idx) <- comp_graph_w_idx ]
-- For each module in the module graph, spawn a worker thread that will
-- compile this module.
let { spawnWorkers = forM comp_graph_w_idx $ \((mod,!mvar,!log_queue),!mod_idx) ->
forkIOWithUnmask $ \unmask -> do
-- Replace the default log_action with one that writes each
-- message to the module's log_queue. The main thread will
-- deal with synchronously printing these messages.
--
-- Use a local filesToClean var so that we can clean up
-- intermediate files in a timely fashion (as soon as
-- compilation for that module is finished) without having to
-- worry about accidentally deleting a simultaneous compile's
-- important files.
lcl_files_to_clean <- newIORef []
let lcl_dflags = dflags { log_action = parLogAction log_queue
, filesToClean = lcl_files_to_clean }
-- Unmask asynchronous exceptions and perform the thread-local
-- work to compile the module (see parUpsweep_one).
m_res <- try $ unmask $ prettyPrintGhcErrors lcl_dflags $
parUpsweep_one mod mod_map lcl_dflags cleanup par_sem
hsc_env_var old_hpt_var mods_done_var
stable_mods mod_idx (length sccs)
res <- case m_res of
Right flag -> return flag
Left exc -> do
-- Don't print ThreadKilled exceptions: they are used
-- to kill the worker thread in the event of a user
-- interrupt, and the user doesn't have to be informed
-- about that.
when (fromException exc /= Just ThreadKilled)
(errorMsg lcl_dflags (text (show exc)))
return Failed
-- Populate the result MVar.
putMVar mvar res
-- Write the end marker to the message queue, telling the main
-- thread that it can stop waiting for messages from this
-- particular compile.
writeLogQueue log_queue Nothing
-- Add the remaining files that weren't cleaned up to the
-- global filesToClean ref, for cleanup later.
files_kept <- readIORef (filesToClean lcl_dflags)
addFilesToClean dflags files_kept
-- Kill all the workers, masking interrupts (since killThread is
-- interruptible). XXX: This is not ideal.
; killWorkers = uninterruptibleMask_ . mapM_ killThread }
-- Spawn the workers, making sure to kill them later. Collect the results
-- of each compile.
results <- liftIO $ bracket spawnWorkers killWorkers $ \_ ->
-- Loop over each module in the compilation graph in order, printing
-- each message from its log_queue.
forM comp_graph $ \(mod,mvar,log_queue) -> do
printLogs dflags log_queue
result <- readMVar mvar
if succeeded result then return (Just mod) else return Nothing
-- Collect and return the ModSummaries of all the successful compiles.
-- NB: Reverse this list to maintain output parity with the sequential upsweep.
let ok_results = reverse (catMaybes results)
-- Handle any cycle in the original compilation graph and return the result
-- of the upsweep.
case cycle of
Just mss -> do
liftIO $ fatalErrorMsg dflags (cyclicModuleErr mss)
return (Failed,ok_results)
Nothing -> do
let success_flag = successIf (all isJust results)
return (success_flag,ok_results)
where
writeLogQueue :: LogQueue -> Maybe (Severity,SrcSpan,PprStyle,MsgDoc) -> IO ()
writeLogQueue (LogQueue ref sem) msg = do
atomicModifyIORef ref $ \msgs -> (msg:msgs,())
_ <- tryPutMVar sem ()
return ()
-- The log_action callback that is used to synchronize messages from a
-- worker thread.
parLogAction :: LogQueue -> LogAction
parLogAction log_queue _dflags !severity !srcSpan !style !msg = do
writeLogQueue log_queue (Just (severity,srcSpan,style,msg))
-- Print each message from the log_queue using the log_action from the
-- session's DynFlags.
printLogs :: DynFlags -> LogQueue -> IO ()
printLogs !dflags (LogQueue ref sem) = read_msgs
where read_msgs = do
takeMVar sem
msgs <- atomicModifyIORef ref $ \xs -> ([], reverse xs)
print_loop msgs
print_loop [] = read_msgs
print_loop (x:xs) = case x of
Just (severity,srcSpan,style,msg) -> do
log_action dflags dflags severity srcSpan style msg
print_loop xs
-- Exit the loop once we encounter the end marker.
Nothing -> return ()
-- The interruptible subset of the worker threads' work.
parUpsweep_one
:: ModSummary
-- ^ The module we wish to compile
-> Map.Map (Module,Bool) (MVar SuccessFlag, Int)
-- ^ The map of home modules and their result MVar
-> DynFlags
-- ^ The thread-local DynFlags
-> (HscEnv -> IO ())
-- ^ The callback for cleaning up intermediate files
-> QSem
-- ^ The semaphore for limiting the number of simultaneous compiles
-> MVar HscEnv
-- ^ The MVar that synchronizes updates to the global HscEnv
-> IORef HomePackageTable
-- ^ The old HPT
-> IORef [ModSummary]
-- ^ The list of modules that have successfully compiled
-> ([ModuleName],[ModuleName])
-- ^ Lists of stable objects and BCOs
-> Int
-- ^ The index of this module
-> Int
-- ^ The total number of modules
-> IO SuccessFlag
-- ^ The result of this compile
parUpsweep_one mod mod_map lcl_dflags cleanup par_sem hsc_env_var
old_hpt_var mods_done_var stable_mods mod_index num_mods = do
let home_imps = map unLoc $ ms_home_imps mod
home_src_imps = map unLoc $ ms_home_srcimps mod
all_imps = zip home_imps (repeat False) ++
zip home_src_imps (repeat True)
-- The module's home-module dependencies.
dependencies_w_idx =
[ (mvar,idx) | (imp_name,is_boot) <- all_imps
, let imp = mkModule (thisPackage lcl_dflags) imp_name
, Just (mvar,idx) <- [Map.lookup (imp,is_boot) mod_map] ]
-- Sort the list of dependencies in reverse-topological order. This
-- way, by the time we get woken up by the result of an earlier
-- dependency, subsequent dependencies are more likely to have
-- finished. This step effectively reduces the number of MVars that
-- each thread blocks on.
dependencies = map fst $ sortBy (flip (comparing snd)) dependencies_w_idx
-- Wait for the all the module's dependencies to finish building.
deps_ok <- allM (fmap succeeded . readMVar) dependencies
-- We can't build this module if any of its dependencies failed to build.
if not deps_ok
then return Failed
else do
-- Any hsc_env at this point is OK to use since we only really require
-- that the HPT contains the HMIs of our dependencies.
hsc_env <- readMVar hsc_env_var
old_hpt <- readIORef old_hpt_var
let logger err = printBagOfErrors lcl_dflags (srcErrorMessages err)
-- Limit the number of parallel compiles.
let withSem sem = bracket_ (waitQSem sem) (signalQSem sem)
mb_mod_info <- withSem par_sem $
handleSourceError (\err -> do logger err; return Nothing) $ do
-- Have the ModSummary and HscEnv point to our local log_action
-- and filesToClean var.
let lcl_mod = localize_mod mod
let lcl_hsc_env = localize_hsc_env hsc_env
-- Compile the module.
mod_info <- upsweep_mod lcl_hsc_env old_hpt stable_mods lcl_mod
mod_index num_mods
return (Just mod_info)
case mb_mod_info of
Nothing -> return Failed
Just mod_info -> do
let this_mod = ms_mod_name mod
-- Prune the old HPT unless this is an hs-boot module.
unless (isBootSummary mod) $
atomicModifyIORef old_hpt_var $ \old_hpt ->
(delFromUFM old_hpt this_mod, ())
-- Update and fetch the list of completed modules.
mods_done <- atomicModifyIORef mods_done_var $ \mods_done ->
let mods_done' = mod:mods_done
in (mods_done',mods_done')
-- Update and fetch the global HscEnv, and re-typecheck any
-- module loops.
lcl_hsc_env' <- modifyMVar hsc_env_var $ \hsc_env -> do
let hsc_env' = hsc_env { hsc_HPT = addToUFM (hsc_HPT hsc_env)
this_mod mod_info }
hsc_env'' <- reTypecheckLoop hsc_env' mod mods_done
return (hsc_env'', localize_hsc_env hsc_env'')
-- Clean up any intermediate files.
cleanup lcl_hsc_env'
return Succeeded
where
localize_mod mod
= mod { ms_hspp_opts = (ms_hspp_opts mod)
{ log_action = log_action lcl_dflags
, filesToClean = filesToClean lcl_dflags } }
localize_hsc_env hsc_env
= hsc_env { hsc_dflags = (hsc_dflags hsc_env)
{ log_action = log_action lcl_dflags
, filesToClean = filesToClean lcl_dflags } }
-- -----------------------------------------------------------------------------
--
-- | The upsweep
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment