Commit 3138b775 authored by Ben Gamari's avatar Ben Gamari 🐢

Refactor waiting on job objects

Previously in order to wait on a job object we would create an IO
Completion Port, configure the job object to emit notifications to it
with SetInformationJobObject, and wait for JOB_OBJECT_MSG_EXIT_PROCESS
notifications until all processes have died.

This followed one piece of guidance from Microsoft [1] but according to
Microsoft's own documentation, this cannot work reliably as delivery of
job notifications is not guaranteed [2]. I have seen cases where the
processes hang waiting on job objects so I can only guess that message
loss is indeed possible.

Instead we now take a simpler approach: look at the processes in the
job, if there are none then we are done. If there are still processes,
choose one and wait for it to finish. Iterate until the job is empty.

Credit for this approach goes to Davean Scies.

[1] https://devblogs.microsoft.com/oldnewthing/20130405-00/?p=4743
[2] https://docs.microsoft.com/en-us/windows/win32/api/winnt/ns-winnt-jobobject_associate_completion_port
parent 8895de09
......@@ -655,24 +655,23 @@ waitForProcess ph@(ProcessHandle _ delegating_ctlc _) = lockWaitpid $ do
return (ClosedHandle e, e)
return e'
#if defined(WINDOWS)
OpenExtHandle h job iocp -> do
OpenExtHandle h job -> do
-- First wait for completion of the job...
code <- waitForJobCompletion job iocp timeout_Infinite
let e = maybe (ExitFailure (-1)) mkExitCode code
waitForJobCompletion job
e <- waitForProcess' h
e' <- modifyProcessHandle ph $ \p_' ->
case p_' of
ClosedHandle e' -> return (p_', e')
OpenHandle{} -> fail "waitForProcess(OpenHandle): this cannot happen"
OpenExtHandle ph' job' iocp' -> do
OpenExtHandle ph' job' -> do
closePHANDLE ph'
closePHANDLE job'
closePHANDLE iocp'
when delegating_ctlc $
endDelegateControlC e
return (ClosedHandle e, e)
return e
return e'
#else
OpenExtHandle _ _job _iocp ->
OpenExtHandle _ _job ->
return $ ExitFailure (-1)
#endif
where
......@@ -733,7 +732,7 @@ getProcessExitCode ph@(ProcessHandle _ delegating_ctlc _) = tryLockWaitpid $ do
where getHandle :: ProcessHandle__ -> Maybe PHANDLE
getHandle (OpenHandle h) = Just h
getHandle (ClosedHandle _) = Nothing
getHandle (OpenExtHandle h _ _) = Just h
getHandle (OpenExtHandle h _) = Just h
-- If somebody is currently holding the waitpid lock, we don't want to
-- accidentally remove the pid from the process table.
......
......@@ -192,8 +192,6 @@ data ProcessHandle__ = OpenHandle { phdlProcessHandle :: PHANDLE }
, phdlJobHandle :: PHANDLE
-- ^ the job containing the process and
-- its subprocesses
, phdlIocpHandle :: PHANDLE
-- ^ the job's IO Completion Port
}
| ClosedHandle ExitCode
data ProcessHandle
......
......@@ -22,7 +22,6 @@ import System.Process.Common
import Control.Concurrent
import Control.Exception
import Data.Bits
import Data.Maybe
import Foreign.C
import Foreign.Marshal
import Foreign.Ptr
......@@ -60,11 +59,11 @@ throwErrnoIfBadPHandle = throwErrnoIfNull
-- On Windows, we have to close this HANDLE when it is no longer required,
-- hence we add a finalizer to it
mkProcessHandle :: PHANDLE -> PHANDLE -> PHANDLE -> IO ProcessHandle
mkProcessHandle h job io = do
m <- if job == nullPtr && io == nullPtr
mkProcessHandle :: PHANDLE -> PHANDLE -> IO ProcessHandle
mkProcessHandle h job = do
m <- if job == nullPtr
then newMVar (OpenHandle h)
else newMVar (OpenExtHandle h job io)
else newMVar (OpenExtHandle h job)
_ <- mkWeakMVar m (processHandleFinaliser m)
l <- newMVar ()
return (ProcessHandle m False l)
......@@ -74,9 +73,8 @@ processHandleFinaliser m =
modifyMVar_ m $ \p_ -> do
case p_ of
OpenHandle ph -> closePHANDLE ph
OpenExtHandle ph job io -> closePHANDLE ph
OpenExtHandle ph job -> closePHANDLE ph
>> closePHANDLE job
>> closePHANDLE io
_ -> return ()
return (error "closed process handle")
......@@ -114,7 +112,6 @@ createProcess_Internal fun CreateProcess{ cmdspec = cmdsp,
alloca $ \ pfdStdOutput ->
alloca $ \ pfdStdError ->
allocaBytes lenPtr $ \ hJob ->
allocaBytes lenPtr $ \ hIOcpPort ->
maybeWith withCEnvironment mb_env $ \pEnv ->
maybeWith withCWString mb_cwd $ \pWorkDir -> do
withCWString cmdline $ \pcmdline -> do
......@@ -145,15 +142,13 @@ createProcess_Internal fun CreateProcess{ cmdspec = cmdsp,
.|.(if mb_new_session then RUN_PROCESS_NEW_SESSION else 0))
use_job
hJob
hIOcpPort
hndStdInput <- mbPipe mb_stdin pfdStdInput WriteMode
hndStdOutput <- mbPipe mb_stdout pfdStdOutput ReadMode
hndStdError <- mbPipe mb_stderr pfdStdError ReadMode
phJob <- peek hJob
phIOCP <- peek hIOcpPort
ph <- mkProcessHandle proc_handle phJob phIOCP
ph <- mkProcessHandle proc_handle phJob
return ProcRetHandles { hStdInput = hndStdInput
, hStdOutput = hndStdOutput
, hStdError = hndStdError
......@@ -187,44 +182,21 @@ terminateJob :: ProcessHandle -> CUInt -> IO Bool
terminateJob jh ecode =
withProcessHandle jh $ \p_ -> do
case p_ of
ClosedHandle _ -> return False
OpenHandle _ -> return False
OpenExtHandle _ job _ -> c_terminateJobObject job ecode
ClosedHandle _ -> return False
OpenHandle _ -> return False
OpenExtHandle _ job -> c_terminateJobObject job ecode
timeout_Infinite :: CUInt
timeout_Infinite = 0xFFFFFFFF
waitForJobCompletion :: PHANDLE
-> PHANDLE
-> CUInt
-> IO (Maybe CInt)
waitForJobCompletion job io timeout =
alloca $ \p_exitCode -> do
items <- newMVar $ []
setter <- mkSetter (insertItem items)
getter <- mkGetter (getItem items)
ret <- c_waitForJobCompletion job io timeout p_exitCode setter getter
if ret == 0
then Just <$> peek p_exitCode
else return Nothing
insertItem :: MVar [(k, v)] -> k -> v -> IO ()
insertItem env_ k v = modifyMVar_ env_ (return . ((k, v):))
getItem :: Eq k => MVar [(k, v)] -> k -> IO v
getItem env_ k = withMVar env_ (\m -> return $ fromJust $ lookup k m)
waitForJobCompletion :: PHANDLE -- ^ job handle
-> IO ()
waitForJobCompletion job =
throwErrnoIf_ not "waitForJobCompletion" $ c_waitForJobCompletion job
-- ----------------------------------------------------------------------------
-- Interface to C bits
type SetterDef = CUInt -> Ptr () -> IO ()
type GetterDef = CUInt -> IO (Ptr ())
foreign import ccall "wrapper"
mkSetter :: SetterDef -> IO (FunPtr SetterDef)
foreign import ccall "wrapper"
mkGetter :: GetterDef -> IO (FunPtr GetterDef)
foreign import WINDOWS_CCONV unsafe "TerminateJobObject"
c_terminateJobObject
:: PHANDLE
......@@ -234,12 +206,7 @@ foreign import WINDOWS_CCONV unsafe "TerminateJobObject"
foreign import ccall interruptible "waitForJobCompletion" -- NB. safe - can block
c_waitForJobCompletion
:: PHANDLE
-> PHANDLE
-> CUInt
-> Ptr CInt
-> FunPtr (SetterDef)
-> FunPtr (GetterDef)
-> IO CInt
-> IO Bool
foreign import ccall unsafe "runInteractiveProcess"
c_runInteractiveProcess
......@@ -255,7 +222,6 @@ foreign import ccall unsafe "runInteractiveProcess"
-> CInt -- flags
-> Bool -- useJobObject
-> Ptr PHANDLE -- Handle to Job
-> Ptr PHANDLE -- Handle to I/O Completion Port
-> IO PHANDLE
commandToProcess
......@@ -338,7 +304,7 @@ createPipeInternal = do
(do readh <- fdToHandle readfd
writeh <- fdToHandle writefd
return (readh, writeh)) `onException` (close' readfd >> close' writefd)
createPipeInternalFd :: IO (FD, FD)
createPipeInternalFd = do
allocaArray 2 $ \ pfds -> do
......@@ -365,9 +331,9 @@ interruptProcessGroupOfInternal ph = do
case p_ of
ClosedHandle _ -> return ()
_ -> do let h = case p_ of
OpenHandle x -> x
OpenExtHandle x _ _ -> x
_ -> error "interruptProcessGroupOfInternal"
OpenHandle x -> x
OpenExtHandle x _ -> x
_ -> error "interruptProcessGroupOfInternal"
#if mingw32_HOST_OS
pid <- getProcessId h
generateConsoleCtrlEvent cTRL_BREAK_EVENT pid
......
......@@ -569,29 +569,6 @@ createJob ()
return NULL;
}
static HANDLE
createCompletionPort (HANDLE hJob)
{
HANDLE ioPort = CreateIoCompletionPort (INVALID_HANDLE_VALUE, NULL, 0, 1);
if (!ioPort)
{
// Something failed. Error is in GetLastError, let caller handler it.
return NULL;
}
JOBOBJECT_ASSOCIATE_COMPLETION_PORT Port;
Port.CompletionKey = hJob;
Port.CompletionPort = ioPort;
if (!SetInformationJobObject(hJob,
JobObjectAssociateCompletionPortInformation,
&Port, sizeof(Port))) {
// Something failed. Error is in GetLastError, let caller handler it.
return NULL;
}
return ioPort;
}
/* Note [Windows exec interaction]
The basic issue that process jobs tried to solve is this:
......@@ -629,7 +606,7 @@ runInteractiveProcess (wchar_t *cmd, wchar_t *workingDirectory,
wchar_t *environment,
int fdStdIn, int fdStdOut, int fdStdErr,
int *pfdStdInput, int *pfdStdOutput, int *pfdStdError,
int flags, bool useJobObject, HANDLE *hJob, HANDLE *hIOcpPort)
int flags, bool useJobObject, HANDLE *hJob)
{
STARTUPINFO sInfo;
PROCESS_INFORMATION pInfo;
......@@ -750,16 +727,8 @@ runInteractiveProcess (wchar_t *cmd, wchar_t *workingDirectory,
{
goto cleanup_err;
}
// Create the completion port and attach it to the job
*hIOcpPort = createCompletionPort(*hJob);
if (!*hIOcpPort)
{
goto cleanup_err;
}
} else {
*hJob = NULL;
*hIOcpPort = NULL;
}
if (!CreateProcess(NULL, cmd, NULL, NULL, inherit, dwFlags, environment, workingDirectory, &sInfo, &pInfo))
......@@ -803,7 +772,6 @@ cleanup_err:
if (hStdErrorRead != INVALID_HANDLE_VALUE) CloseHandle(hStdErrorRead);
if (hStdErrorWrite != INVALID_HANDLE_VALUE) CloseHandle(hStdErrorWrite);
if (useJobObject && hJob && *hJob ) CloseHandle(*hJob);
if (useJobObject && hIOcpPort && *hIOcpPort) CloseHandle(*hIOcpPort);
maperrno();
return NULL;
......@@ -886,78 +854,49 @@ waitForProcess (ProcHandle handle, int *pret)
return -1;
}
// Returns true on success.
int
waitForJobCompletion ( HANDLE hJob, HANDLE ioPort, DWORD timeout, int *pExitCode, setterDef set, getterDef get )
waitForJobCompletion ( HANDLE hJob )
{
DWORD CompletionCode;
ULONG_PTR CompletionKey;
LPOVERLAPPED Overlapped;
*pExitCode = 0;
// We have to loop here. It's a blocking call, but
// we get notified on each completion event. So if it's
// not one we care for we should just block again.
// If all processes are finished before this call is made
// then the initial call will return false.
// List of events we can listen to:
// https://msdn.microsoft.com/en-us/library/windows/desktop/ms684141(v=vs.85).aspx
while (GetQueuedCompletionStatus (ioPort, &CompletionCode,
&CompletionKey, &Overlapped, timeout)) {
// If event wasn't meant of us, keep listening.
if ((HANDLE)CompletionKey != hJob)
continue;
switch (CompletionCode)
{
case JOB_OBJECT_MSG_NEW_PROCESS:
{
// A new child process is born.
// Retrieve and save the process handle from the process id.
// We'll need it for later but we can't retrieve it after the
// process has exited.
DWORD pid = (DWORD)(uintptr_t)Overlapped;
HANDLE pHwnd = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, TRUE, pid);
set(pid, pHwnd);
}
break;
case JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS:
case JOB_OBJECT_MSG_EXIT_PROCESS:
{
// A child process has just exited.
// Read exit code, We assume the last process to exit
// is the process whose exit code we're interested in.
HANDLE pHwnd = get((DWORD)(uintptr_t)Overlapped);
if (GetExitCodeProcess(pHwnd, (DWORD *)pExitCode) == 0)
{
maperrno();
return 1;
}
// Check to see if the child has actually exited.
if (*(DWORD *)pExitCode == STILL_ACTIVE)
waitForProcess ((ProcHandle)pHwnd, pExitCode);
}
break;
case JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO:
// All processes in the tree are done.
return 0;
default:
break;
JOBOBJECT_BASIC_PROCESS_ID_LIST pid_list;
pid_list.NumberOfAssignedProcesses = 1;
while (true) {
// Find a process in the job...
bool success = QueryInformationJobObject(
hJob,
JobObjectBasicProcessIdList,
&pid_list,
sizeof(JOBOBJECT_BASIC_PROCESS_ID_LIST),
NULL);
if (pid_list.NumberOfProcessIdsInList == 0) {
// We're done
return true;
}
HANDLE pHwnd = OpenProcess(SYNCHRONIZE, TRUE, pid_list.ProcessIdList[0]);
if (pHwnd == NULL) {
switch (GetLastError()) {
case ERROR_INVALID_PARAMETER:
// Presumably the process terminated; try again.
continue;
default:
maperrno();
return false;
}
}
}
// Check to see if a timeout has occurred or that the
// all processes in the job were finished by the time we
// got to the loop.
if (Overlapped == NULL && (HANDLE)CompletionKey != hJob)
{
// Timeout occurred.
return -1;
}
// Wait for it to finish...
if (WaitForSingleObject(pHwnd, INFINITE) != WAIT_OBJECT_0) {
maperrno();
CloseHandle(pHwnd);
return false;
}
return 2;
// The process signalled, loop again to try the next process.
CloseHandle(pHwnd);
}
}
#endif /* Win32 */
......@@ -86,14 +86,13 @@ extern ProcHandle runInteractiveProcess( wchar_t *cmd,
int *pfdStdError,
int flags,
bool useJobObject,
HANDLE *hJob,
HANDLE *hIOcpPort );
HANDLE *hJob );
typedef void(*setterDef)(DWORD, HANDLE);
typedef HANDLE(*getterDef)(DWORD);
extern int terminateJob( ProcHandle handle );
extern int waitForJobCompletion( HANDLE hJob, HANDLE ioPort, DWORD timeout, int *pExitCode, setterDef set, getterDef get );
extern int waitForJobCompletion( HANDLE hJob );
#endif
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment