Select.c 7.75 KB
Newer Older
1
/* -----------------------------------------------------------------------------
2
 * $Id: Select.c,v 1.21 2002/07/17 09:21:51 simonmar Exp $
3
 *
4
 * (c) The GHC Team 1995-2002
5 6 7 8 9 10
 *
 * Support for concurrent non-blocking I/O and thread waiting.
 *
 * ---------------------------------------------------------------------------*/

/* we're outside the realms of POSIX here... */
11
/* #include "PosixSource.h" */
12 13 14 15 16 17

#include "Rts.h"
#include "Schedule.h"
#include "RtsUtils.h"
#include "RtsFlags.h"
#include "Itimer.h"
18
#include "Signals.h"
19

rrt's avatar
rrt committed
20
# ifdef HAVE_SYS_TYPES_H
21 22 23 24 25 26 27
#  include <sys/types.h>
# endif

# ifdef HAVE_SYS_TIME_H
#  include <sys/time.h>
# endif

rrt's avatar
rrt committed
28
# ifdef mingw32_TARGET_OS
29
#  include <windows.h>
rrt's avatar
rrt committed
30 31
# endif

32 33 34
#include <errno.h>
#include <string.h>

35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
/* last timestamp */
nat timestamp = 0;

/* There's a clever trick here to avoid problems when the time wraps
 * around.  Since our maximum delay is smaller than 31 bits of ticks
 * (it's actually 31 bits of microseconds), we can safely check
 * whether a timer has expired even if our timer will wrap around
 * before the target is reached, using the following formula:
 *
 *        (int)((uint)current_time - (uint)target_time) < 0
 *
 * if this is true, then our time has expired.
 * (idea due to Andy Gill).
 */
rtsBool
wakeUpSleepingThreads(nat ticks)
{
    StgTSO *tso;
    rtsBool flag = rtsFalse;

    while (sleeping_queue != END_TSO_QUEUE &&
	   (int)(ticks - sleeping_queue->block_info.target) > 0) {
	tso = sleeping_queue;
	sleeping_queue = tso->link;
	tso->why_blocked = NotBlocked;
	tso->link = END_TSO_QUEUE;
	IF_DEBUG(scheduler,belch("Waking up sleeping thread %d\n", tso->id));
	PUSH_ON_RUN_QUEUE(tso);
	flag = rtsTrue;
    }
    return flag;
}
67 68 69 70 71

/* Argument 'wait' says whether to wait for I/O to become available,
 * or whether to just check and return immediately.  If there are
 * other threads ready to run, we normally do the non-waiting variety,
 * otherwise we wait (see Schedule.c).
72 73
 *
 * SMP note: must be called with sched_mutex locked.
rrt's avatar
rrt committed
74 75 76 77 78 79
 *
 * Windows: select only works on sockets, so this doesn't really work,
 * though it makes things better than before. MsgWaitForMultipleObjects
 * should really be used, though it only seems to work for read handles,
 * not write handles.
 *
80 81 82 83 84 85 86
 */
void
awaitEvent(rtsBool wait)
{
    StgTSO *tso, *prev, *next;
    rtsBool ready;
    fd_set rfd,wfd;
rrt's avatar
rrt committed
87
#ifndef mingw32_TARGET_OS
88
    int numFound;
89
    int maxfd = -1;
rrt's avatar
rrt committed
90
#endif
91
    rtsBool select_succeeded = rtsTrue;
sof's avatar
sof committed
92 93
    rtsBool unblock_all = rtsFalse;
    static rtsBool prev_unblocked_all = rtsFalse;
94
    struct timeval tv;
95 96 97 98
    lnat min, ticks;

    tv.tv_sec  = 0;
    tv.tv_usec = 0;
sof's avatar
sof committed
99
    
100 101 102 103 104 105 106
    IF_DEBUG(scheduler,
	     belch("scheduler: checking for threads blocked on I/O");
	     if (wait) {
		 belch(" (waiting)");
	     }
	     belch("\n");
	     );
107

108 109 110 111
    /* loop until we've woken up some threads.  This loop is needed
     * because the select timing isn't accurate, we sometimes sleep
     * for a while but not long enough to wake up a thread in
     * a threadDelay.
112
     */
113
    do {
114

115 116 117 118
      ticks = timestamp = getourtimeofday();
      if (wakeUpSleepingThreads(ticks)) { 
	  return;
      }
119

120 121 122 123 124 125 126 127
      if (!wait) {
	  min = 0;
      } else if (sleeping_queue != END_TSO_QUEUE) {
	  min = (sleeping_queue->block_info.target - ticks) 
	      * TICK_MILLISECS * 1000;
      } else {
	  min = 0x7ffffff;
      }
128

rrt's avatar
rrt committed
129
#ifndef mingw32_TARGET_OS
130
      /* 
131
       * Collect all of the fd's that we're interested in
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
       */
      FD_ZERO(&rfd);
      FD_ZERO(&wfd);

      for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
	next = tso->link;

	switch (tso->why_blocked) {
	case BlockedOnRead:
	  { 
	    int fd = tso->block_info.fd;
	    maxfd = (fd > maxfd) ? fd : maxfd;
	    FD_SET(fd, &rfd);
	    continue;
	  }

	case BlockedOnWrite:
	  { 
	    int fd = tso->block_info.fd;
	    maxfd = (fd > maxfd) ? fd : maxfd;
	    FD_SET(fd, &wfd);
	    continue;
	  }

	default:
	  barf("AwaitEvent");
158 159
	}
      }
160

161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
      /* Release the scheduler lock while we do the poll.
       * this means that someone might muck with the blocked_queue
       * while we do this, but it shouldn't matter:
       *
       *   - another task might poll for I/O and remove one
       *     or more threads from the blocked_queue.
       *   - more I/O threads may be added to blocked_queue.
       *   - more delayed threads may be added to blocked_queue. We'll
       *     just subtract delta from their delays after the poll.
       *
       * I believe none of these cases lead to trouble --SDM.
       */
      RELEASE_LOCK(&sched_mutex);

      /* Check for any interesting events */
176 177
      
      tv.tv_sec  = min / 1000000;
178
      tv.tv_usec = min % 1000000;
179

180 181
      while ((numFound = select(maxfd+1, &rfd, &wfd, NULL, &tv)) < 0) {
	  if (errno != EINTR) {
sof's avatar
sof committed
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
	    /* Handle bad file descriptors by unblocking all the
	       waiting threads. Why? Because a thread might have been
	       a bit naughty and closed a file descriptor while another
	       was blocked waiting. This is less-than-good programming
	       practice, but having the RTS as a result fall over isn't
	       acceptable, so we simply unblock all the waiting threads
	       should we see a bad file descriptor & give the threads
	       a chance to clean up their act. 
	       
	       To avoid getting stuck in a loop, repeated EBADF failures
	       are 'handled' through barfing.
	    */
	    if ( errno == EBADF && !prev_unblocked_all) {
	      unblock_all = rtsTrue;
	      prev_unblocked_all = rtsTrue;
	      break;
	    } else {
 	      fprintf(stderr,"%d\n", errno);
 	      fflush(stderr);
 	      perror("select");
202
	      barf("select failed");
sof's avatar
sof committed
203
	    }
204
	  }
rrt's avatar
rrt committed
205 206 207 208
#else /* on mingwin */
      while (1) {
	  Sleep(0); /* don't busy wait */
#endif /* mingw32_TARGET_OS */
209
	  ACQUIRE_LOCK(&sched_mutex);
rrt's avatar
rrt committed
210

sof's avatar
sof committed
211 212
	  prev_unblocked_all = rtsFalse;

rrt's avatar
rrt committed
213
#ifndef mingw32_TARGET_OS
214 215 216 217 218 219 220
	  /* We got a signal; could be one of ours.  If so, we need
	   * to start up the signal handler straight away, otherwise
	   * we could block for a long time before the signal is
	   * serviced.
	   */
	  if (signals_pending()) {
	      RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
221
	      startSignalHandlers();
222 223 224
	      ACQUIRE_LOCK(&sched_mutex);
	      return; /* still hold the lock */
	  }
rrt's avatar
rrt committed
225 226
#endif

227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
	  /* we were interrupted, return to the scheduler immediately.
	   */
	  if (interrupted) {
	      return; /* still hold the lock */
	  }
	  
	  /* check for threads that need waking up 
	   */
	  wakeUpSleepingThreads(getourtimeofday());
	  
	  /* If new runnable threads have arrived, stop waiting for
	   * I/O and run them.
	   */
	  if (run_queue_hd != END_TSO_QUEUE) {
	      return; /* still hold the lock */
	  }
	  
	  RELEASE_LOCK(&sched_mutex);
      }
246

247
      ACQUIRE_LOCK(&sched_mutex);
248

249 250 251
      /* Step through the waiting queue, unblocking every thread that now has
       * a file descriptor in a ready state.
       */
252

253
      prev = NULL;
sof's avatar
sof committed
254
      if (select_succeeded || unblock_all) {
255 256 257 258
	  for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
	      next = tso->link;
	      switch (tso->why_blocked) {
	      case BlockedOnRead:
sof's avatar
sof committed
259
		  ready = unblock_all || FD_ISSET(tso->block_info.fd, &rfd);
260 261
		  break;
	      case BlockedOnWrite:
sof's avatar
sof committed
262
		  ready = unblock_all || FD_ISSET(tso->block_info.fd, &wfd);
263 264 265 266
		  break;
	      default:
		  barf("awaitEvent");
	      }
267
      
268 269 270 271 272 273 274 275 276 277 278 279 280
	      if (ready) {
		  IF_DEBUG(scheduler,belch("Waking up blocked thread %d\n", tso->id));
		  tso->why_blocked = NotBlocked;
		  tso->link = END_TSO_QUEUE;
		  PUSH_ON_RUN_QUEUE(tso);
	      } else {
		  if (prev == NULL)
		      blocked_queue_hd = tso;
		  else
		      prev->link = tso;
		  prev = tso;
	      }
	  }
281

282 283 284 285 286 287
	  if (prev == NULL)
	      blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
	  else {
	      prev->link = END_TSO_QUEUE;
	      blocked_queue_tl = prev;
	  }
288
      }
289

290
    } while (wait && !interrupted && run_queue_hd == END_TSO_QUEUE);
291
}