Commit 9fd507e5 authored by Sergei Trofimovich's avatar Sergei Trofimovich Committed by Simon Marlow

Raise exceptions when blocked in bad FDs (fixes Trac #4934)

Before the patch any call to 'select()' with 'bad_fd' led to:
- unblocking of all threads
- hiding exception for 'threadWaitRead bad_fd'

The patch fixes both cases in this way:
after 'select()' failure we iterate over each blocked descriptor
and poll individually to see it's actual status, which is:
- READY (move to run queue)
- BLOCKED (leave in blocked queue)
- INVALID (send an IOErrror exception)
Signed-off-by: default avatarSergei Trofimovich <slyfox@gentoo.org>
parent 2f8b4c93
......@@ -12,9 +12,10 @@ module GHC.Event.Thread
, closeFdWith
, threadDelay
, registerDelay
, blockedOnBadFD -- used by RTS
) where
import Control.Exception (finally)
import Control.Exception (finally, SomeException, toException)
import Control.Monad (forM, forM_, sequence_, zipWithM, when)
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import Data.List (zipWith3)
......@@ -115,6 +116,9 @@ threadWait evt fd = mask_ $ do
then ioError $ errnoToIOError "threadWait" eBADF Nothing Nothing
else return ()
-- used at least by RTS in 'select()' IO manager backend
blockedOnBadFD :: SomeException
blockedOnBadFD = toException $ errnoToIOError "awaitEvent" eBADF Nothing Nothing
threadWaitSTM :: Event -> Fd -> IO (STM (), IO ())
threadWaitSTM evt fd = mask_ $ do
......
......@@ -42,6 +42,7 @@ PRELUDE_CLOSURE(base_GHCziIOziException_blockedIndefinitelyOnMVar_closure);
PRELUDE_CLOSURE(base_GHCziIOziException_blockedIndefinitelyOnSTM_closure);
PRELUDE_CLOSURE(base_ControlziExceptionziBase_nonTermination_closure);
PRELUDE_CLOSURE(base_ControlziExceptionziBase_nestedAtomically_closure);
PRELUDE_CLOSURE(base_GHCziEventziThread_blockedOnBadFD_closure);
PRELUDE_CLOSURE(base_GHCziConcziSync_runSparks_closure);
PRELUDE_CLOSURE(base_GHCziConcziIO_ensureIOManagerIsRunning_closure);
......@@ -104,6 +105,7 @@ PRELUDE_INFO(base_GHCziStable_StablePtr_con_info);
#define blockedIndefinitelyOnSTM_closure DLL_IMPORT_DATA_REF(base_GHCziIOziException_blockedIndefinitelyOnSTM_closure)
#define nonTermination_closure DLL_IMPORT_DATA_REF(base_ControlziExceptionziBase_nonTermination_closure)
#define nestedAtomically_closure DLL_IMPORT_DATA_REF(base_ControlziExceptionziBase_nestedAtomically_closure)
#define blockedOnBadFD_closure DLL_IMPORT_DATA_REF(base_GHCziEventziThread_blockedOnBadFD_closure)
#define Czh_static_info DLL_IMPORT_DATA_REF(ghczmprim_GHCziTypes_Czh_static_info)
#define Fzh_static_info DLL_IMPORT_DATA_REF(ghczmprim_GHCziTypes_Fzh_static_info)
......
......@@ -209,6 +209,7 @@ hs_init_ghc(int *argc, char **argv[], RtsConfig rts_config)
getStablePtr((StgPtr)nonTermination_closure);
getStablePtr((StgPtr)blockedIndefinitelyOnSTM_closure);
getStablePtr((StgPtr)nestedAtomically_closure);
getStablePtr((StgPtr)blockedOnBadFD_closure);
getStablePtr((StgPtr)runSparks_closure);
getStablePtr((StgPtr)ensureIOManagerIsRunning_closure);
......
......@@ -99,6 +99,7 @@ ld-options:
, "-Wl,-u,_base_GHCziIOziException_blockedIndefinitelyOnMVar_closure"
, "-Wl,-u,_base_GHCziIOziException_blockedIndefinitelyOnSTM_closure"
, "-Wl,-u,_base_ControlziExceptionziBase_nestedAtomically_closure"
, "-Wl,-u,_base_GHCziEventziThread_blockedOnBadFD_closure"
, "-Wl,-u,_base_GHCziWeak_runFinalizzerBatch_closure"
, "-Wl,-u,_base_GHCziTopHandler_flushStdHandles_closure"
, "-Wl,-u,_base_GHCziTopHandler_runIO_closure"
......@@ -139,6 +140,7 @@ ld-options:
, "-Wl,-u,base_GHCziIOziException_blockedIndefinitelyOnMVar_closure"
, "-Wl,-u,base_GHCziIOziException_blockedIndefinitelyOnSTM_closure"
, "-Wl,-u,base_ControlziExceptionziBase_nestedAtomically_closure"
, "-Wl,-u,base_GHCziEventziThread_blockedOnBadFD_closure"
, "-Wl,-u,base_GHCziWeak_runFinalizzerBatch_closure"
, "-Wl,-u,base_GHCziTopHandler_flushStdHandles_closure"
, "-Wl,-u,base_GHCziTopHandler_runIO_closure"
......
......@@ -14,6 +14,8 @@
#include "Signals.h"
#include "Schedule.h"
#include "Prelude.h"
#include "RaiseAsync.h"
#include "RtsUtils.h"
#include "Itimer.h"
#include "Capability.h"
......@@ -120,6 +122,85 @@ fdOutOfRange (int fd)
stg_exit(EXIT_FAILURE);
}
/*
* State of individual file descriptor after a 'select()' poll.
*/
enum FdState {
RTS_FD_IS_READY = 0,
RTS_FD_IS_BLOCKING,
RTS_FD_IS_INVALID,
};
static enum FdState fdPollReadState (int fd)
{
int r;
fd_set rfd;
struct timeval now;
FD_ZERO(&rfd);
FD_SET(fd, &rfd);
/* only poll */
now.tv_sec = 0;
now.tv_usec = 0;
for (;;)
{
r = select(fd+1, &rfd, NULL, NULL, &now);
/* the descriptor is sane */
if (r != -1)
break;
switch (errno)
{
case EBADF: return RTS_FD_IS_INVALID;
case EINTR: continue;
default:
sysErrorBelch("select");
stg_exit(EXIT_FAILURE);
}
}
if (r == 0)
return RTS_FD_IS_BLOCKING;
else
return RTS_FD_IS_READY;
}
static enum FdState fdPollWriteState (int fd)
{
int r;
fd_set wfd;
struct timeval now;
FD_ZERO(&wfd);
FD_SET(fd, &wfd);
/* only poll */
now.tv_sec = 0;
now.tv_usec = 0;
for (;;)
{
r = select(fd+1, NULL, &wfd, NULL, &now);
/* the descriptor is sane */
if (r != -1)
break;
switch (errno)
{
case EBADF: return RTS_FD_IS_INVALID;
case EINTR: continue;
default:
sysErrorBelch("select");
stg_exit(EXIT_FAILURE);
}
}
if (r == 0)
return RTS_FD_IS_BLOCKING;
else
return RTS_FD_IS_READY;
}
/* Argument 'wait' says whether to wait for I/O to become available,
* or whether to just check and return immediately. If there are
* other threads ready to run, we normally do the non-waiting variety,
......@@ -137,12 +218,10 @@ void
awaitEvent(rtsBool wait)
{
StgTSO *tso, *prev, *next;
rtsBool ready;
fd_set rfd,wfd;
int numFound;
int maxfd = -1;
rtsBool select_succeeded = rtsTrue;
rtsBool unblock_all = rtsFalse;
rtsBool seen_bad_fd = rtsFalse;
struct timeval tv, *ptv;
LowResTime now;
......@@ -225,25 +304,8 @@ awaitEvent(rtsBool wait)
while ((numFound = select(maxfd+1, &rfd, &wfd, NULL, ptv)) < 0) {
if (errno != EINTR) {
/* Handle bad file descriptors by unblocking all the
waiting threads. Why? Because a thread might have been
a bit naughty and closed a file descriptor while another
was blocked waiting. This is less-than-good programming
practice, but having the RTS as a result fall over isn't
acceptable, so we simply unblock all the waiting threads
should we see a bad file descriptor & give the threads
a chance to clean up their act.
Note: assume here that threads becoming unblocked
will try to read/write the file descriptor before trying
to issue a threadWaitRead/threadWaitWrite again (==> an
IOError will result for the thread that's got the bad
file descriptor.) Hence, there's no danger of a bad
file descriptor being repeatedly select()'ed on, so
the RTS won't loop.
*/
if ( errno == EBADF ) {
unblock_all = rtsTrue;
seen_bad_fd = rtsTrue;
break;
} else {
sysErrorBelch("select");
......@@ -286,33 +348,58 @@ awaitEvent(rtsBool wait)
*/
prev = NULL;
if (select_succeeded || unblock_all) {
for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
next = tso->_link;
{
for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
next = tso->_link;
int fd;
enum FdState fd_state = RTS_FD_IS_BLOCKING;
switch (tso->why_blocked) {
case BlockedOnRead:
ready = unblock_all || FD_ISSET(tso->block_info.fd, &rfd);
break;
case BlockedOnWrite:
ready = unblock_all || FD_ISSET(tso->block_info.fd, &wfd);
break;
default:
barf("awaitEvent");
}
if (ready) {
IF_DEBUG(scheduler,debugBelch("Waking up blocked thread %lu\n", (unsigned long)tso->id));
tso->why_blocked = NotBlocked;
tso->_link = END_TSO_QUEUE;
pushOnRunQueue(&MainCapability,tso);
} else {
if (prev == NULL)
blocked_queue_hd = tso;
else
setTSOLink(&MainCapability, prev, tso);
prev = tso;
}
case BlockedOnRead:
fd = tso->block_info.fd;
if (seen_bad_fd) {
fd_state = fdPollReadState (fd);
} else if (FD_ISSET(fd, &rfd)) {
fd_state = RTS_FD_IS_READY;
}
break;
case BlockedOnWrite:
fd = tso->block_info.fd;
if (seen_bad_fd) {
fd_state = fdPollWriteState (fd);
} else if (FD_ISSET(fd, &wfd)) {
fd_state = RTS_FD_IS_READY;
}
break;
default:
barf("awaitEvent");
}
switch (fd_state) {
case RTS_FD_IS_INVALID:
/*
* Don't let RTS loop on such descriptors,
* pass an IOError to blocked threads (Trac #4934)
*/
IF_DEBUG(scheduler,debugBelch("Killing blocked thread %lu on bad fd=%i\n", (unsigned long)tso->id, fd));
throwToSingleThreaded(&MainCapability, tso, (StgClosure *)blockedOnBadFD_closure);
break;
case RTS_FD_IS_READY:
IF_DEBUG(scheduler,debugBelch("Waking up blocked thread %lu\n", (unsigned long)tso->id));
tso->why_blocked = NotBlocked;
tso->_link = END_TSO_QUEUE;
pushOnRunQueue(&MainCapability,tso);
break;
case RTS_FD_IS_BLOCKING:
if (prev == NULL)
blocked_queue_hd = tso;
else
setTSOLink(&MainCapability, prev, tso);
prev = tso;
break;
}
}
if (prev == NULL)
......
......@@ -40,5 +40,4 @@ EXPORTS
base_ControlziExceptionziBase_nonTermination_closure
base_ControlziExceptionziBase_nestedAtomically_closure
base_GHCziEventziThread_blockedOnBadFD_closure
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment