Select.c 7.37 KB
Newer Older
1 2
/* -----------------------------------------------------------------------------
 *
3
 * (c) The GHC Team 1995-2002
4 5 6 7 8 9
 *
 * Support for concurrent non-blocking I/O and thread waiting.
 *
 * ---------------------------------------------------------------------------*/

/* we're outside the realms of POSIX here... */
10
/* #include "PosixSource.h" */
11 12 13 14 15

#include "Rts.h"
#include "Schedule.h"
#include "RtsUtils.h"
#include "RtsFlags.h"
sof's avatar
sof committed
16
#include "Timer.h"
sof's avatar
sof committed
17
#include "Itimer.h"
18
#include "Signals.h"
19
#include "Capability.h"
20
#include "posix/Select.h"
21

rrt's avatar
rrt committed
22
# ifdef HAVE_SYS_TYPES_H
23 24 25 26 27 28 29
#  include <sys/types.h>
# endif

# ifdef HAVE_SYS_TIME_H
#  include <sys/time.h>
# endif

30 31 32
#include <errno.h>
#include <string.h>

33 34 35 36
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif

37
#if !defined(THREADED_RTS)
38
/* last timestamp */
39
lnat timestamp = 0;
40

41 42 43 44
/* 
 * The threaded RTS uses an IO-manager thread in Haskell instead (see GHC.Conc) 
 */

45 46 47 48 49 50 51 52 53 54 55
/* There's a clever trick here to avoid problems when the time wraps
 * around.  Since our maximum delay is smaller than 31 bits of ticks
 * (it's actually 31 bits of microseconds), we can safely check
 * whether a timer has expired even if our timer will wrap around
 * before the target is reached, using the following formula:
 *
 *        (int)((uint)current_time - (uint)target_time) < 0
 *
 * if this is true, then our time has expired.
 * (idea due to Andy Gill).
 */
56
static rtsBool
57
wakeUpSleepingThreads(lnat ticks)
58 59 60 61 62 63 64 65 66 67
{
    StgTSO *tso;
    rtsBool flag = rtsFalse;

    while (sleeping_queue != END_TSO_QUEUE &&
	   (int)(ticks - sleeping_queue->block_info.target) > 0) {
	tso = sleeping_queue;
	sleeping_queue = tso->link;
	tso->why_blocked = NotBlocked;
	tso->link = END_TSO_QUEUE;
68
	IF_DEBUG(scheduler,debugBelch("Waking up sleeping thread %d\n", tso->id));
69 70
	// MainCapability: this code is !THREADED_RTS
	pushOnRunQueue(&MainCapability,tso);
71 72 73 74
	flag = rtsTrue;
    }
    return flag;
}
75 76 77 78 79

/* Argument 'wait' says whether to wait for I/O to become available,
 * or whether to just check and return immediately.  If there are
 * other threads ready to run, we normally do the non-waiting variety,
 * otherwise we wait (see Schedule.c).
80 81
 *
 * SMP note: must be called with sched_mutex locked.
rrt's avatar
rrt committed
82 83 84 85 86 87
 *
 * Windows: select only works on sockets, so this doesn't really work,
 * though it makes things better than before. MsgWaitForMultipleObjects
 * should really be used, though it only seems to work for read handles,
 * not write handles.
 *
88 89 90 91 92 93 94
 */
void
awaitEvent(rtsBool wait)
{
    StgTSO *tso, *prev, *next;
    rtsBool ready;
    fd_set rfd,wfd;
95
    int numFound;
96
    int maxfd = -1;
97
    rtsBool select_succeeded = rtsTrue;
sof's avatar
sof committed
98
    rtsBool unblock_all = rtsFalse;
99
    struct timeval tv;
100 101 102 103
    lnat min, ticks;

    tv.tv_sec  = 0;
    tv.tv_usec = 0;
sof's avatar
sof committed
104
    
105
    IF_DEBUG(scheduler,
106
	     debugBelch("scheduler: checking for threads blocked on I/O");
107
	     if (wait) {
108
		 debugBelch(" (waiting)");
109
	     }
110
	     debugBelch("\n");
111
	     );
112

113 114 115 116
    /* loop until we've woken up some threads.  This loop is needed
     * because the select timing isn't accurate, we sometimes sleep
     * for a while but not long enough to wake up a thread in
     * a threadDelay.
117
     */
118
    do {
119

120 121 122 123
      ticks = timestamp = getourtimeofday();
      if (wakeUpSleepingThreads(ticks)) { 
	  return;
      }
124

125 126 127 128 129 130 131 132
      if (!wait) {
	  min = 0;
      } else if (sleeping_queue != END_TSO_QUEUE) {
	  min = (sleeping_queue->block_info.target - ticks) 
	      * TICK_MILLISECS * 1000;
      } else {
	  min = 0x7ffffff;
      }
133

134
      /* 
135
       * Collect all of the fd's that we're interested in
136 137 138 139 140 141 142 143 144 145 146
       */
      FD_ZERO(&rfd);
      FD_ZERO(&wfd);

      for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
	next = tso->link;

	switch (tso->why_blocked) {
	case BlockedOnRead:
	  { 
	    int fd = tso->block_info.fd;
147 148 149
	    if (fd >= FD_SETSIZE) {
		barf("awaitEvent: descriptor out of range");
	    }
150 151 152 153 154 155 156 157
	    maxfd = (fd > maxfd) ? fd : maxfd;
	    FD_SET(fd, &rfd);
	    continue;
	  }

	case BlockedOnWrite:
	  { 
	    int fd = tso->block_info.fd;
158 159 160
	    if (fd >= FD_SETSIZE) {
		barf("awaitEvent: descriptor out of range");
	    }
161 162 163 164 165 166 167
	    maxfd = (fd > maxfd) ? fd : maxfd;
	    FD_SET(fd, &wfd);
	    continue;
	  }

	default:
	  barf("AwaitEvent");
168 169
	}
      }
170

171
      /* Check for any interesting events */
172 173
      
      tv.tv_sec  = min / 1000000;
174
      tv.tv_usec = min % 1000000;
175

176 177
      while ((numFound = select(maxfd+1, &rfd, &wfd, NULL, &tv)) < 0) {
	  if (errno != EINTR) {
sof's avatar
sof committed
178 179 180 181 182 183 184 185 186
	    /* Handle bad file descriptors by unblocking all the
	       waiting threads. Why? Because a thread might have been
	       a bit naughty and closed a file descriptor while another
	       was blocked waiting. This is less-than-good programming
	       practice, but having the RTS as a result fall over isn't
	       acceptable, so we simply unblock all the waiting threads
	       should we see a bad file descriptor & give the threads
	       a chance to clean up their act. 
	       
sof's avatar
sof committed
187 188 189 190 191 192 193
	       Note: assume here that threads becoming unblocked
	       will try to read/write the file descriptor before trying
	       to issue a threadWaitRead/threadWaitWrite again (==> an
	       IOError will result for the thread that's got the bad
	       file descriptor.) Hence, there's no danger of a bad
	       file descriptor being repeatedly select()'ed on, so
	       the RTS won't loop.
sof's avatar
sof committed
194
	    */
sof's avatar
sof committed
195
	    if ( errno == EBADF ) {
sof's avatar
sof committed
196 197 198 199
	      unblock_all = rtsTrue;
	      break;
	    } else {
 	      perror("select");
200
	      barf("select failed");
sof's avatar
sof committed
201
	    }
202
	  }
rrt's avatar
rrt committed
203

204 205 206 207 208
	  /* We got a signal; could be one of ours.  If so, we need
	   * to start up the signal handler straight away, otherwise
	   * we could block for a long time before the signal is
	   * serviced.
	   */
sof's avatar
sof committed
209
#if defined(RTS_USER_SIGNALS)
210
	  if (signals_pending()) {
211
	      startSignalHandlers(&MainCapability);
212 213
	      return; /* still hold the lock */
	  }
sof's avatar
sof committed
214
#endif
rrt's avatar
rrt committed
215

216 217
	  /* we were interrupted, return to the scheduler immediately.
	   */
218
	  if (sched_state >= SCHED_INTERRUPTING) {
219 220 221 222 223 224 225 226 227 228
	      return; /* still hold the lock */
	  }
	  
	  /* check for threads that need waking up 
	   */
	  wakeUpSleepingThreads(getourtimeofday());
	  
	  /* If new runnable threads have arrived, stop waiting for
	   * I/O and run them.
	   */
229
	  if (!emptyRunQueue(&MainCapability)) {
230 231 232
	      return; /* still hold the lock */
	  }
      }
233

234 235 236
      /* Step through the waiting queue, unblocking every thread that now has
       * a file descriptor in a ready state.
       */
237

238
      prev = NULL;
sof's avatar
sof committed
239
      if (select_succeeded || unblock_all) {
240 241 242 243
	  for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
	      next = tso->link;
	      switch (tso->why_blocked) {
	      case BlockedOnRead:
sof's avatar
sof committed
244
		  ready = unblock_all || FD_ISSET(tso->block_info.fd, &rfd);
245 246
		  break;
	      case BlockedOnWrite:
sof's avatar
sof committed
247
		  ready = unblock_all || FD_ISSET(tso->block_info.fd, &wfd);
248 249 250 251
		  break;
	      default:
		  barf("awaitEvent");
	      }
252
      
253
	      if (ready) {
254
		  IF_DEBUG(scheduler,debugBelch("Waking up blocked thread %d\n", tso->id));
255 256
		  tso->why_blocked = NotBlocked;
		  tso->link = END_TSO_QUEUE;
257
		  pushOnRunQueue(&MainCapability,tso);
258 259 260 261 262 263 264 265
	      } else {
		  if (prev == NULL)
		      blocked_queue_hd = tso;
		  else
		      prev->link = tso;
		  prev = tso;
	      }
	  }
266

267 268 269 270 271 272
	  if (prev == NULL)
	      blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
	  else {
	      prev->link = END_TSO_QUEUE;
	      blocked_queue_tl = prev;
	  }
273
      }
274
      
275 276
    } while (wait && sched_state == SCHED_RUNNING
	     && emptyRunQueue(&MainCapability));
277
}
278

279
#endif /* THREADED_RTS */