Select.c 7.71 KB
Newer Older
1
/* -----------------------------------------------------------------------------
sof's avatar
sof committed
2
 * $Id: Select.c,v 1.20 2002/07/09 20:44:24 sof Exp $
3 4 5 6 7 8 9 10
 *
 * (c) The GHC Team 1995-1999
 *
 * Support for concurrent non-blocking I/O and thread waiting.
 *
 * ---------------------------------------------------------------------------*/

/* we're outside the realms of POSIX here... */
11
/* #include "PosixSource.h" */
12 13 14 15 16 17

#include "Rts.h"
#include "Schedule.h"
#include "RtsUtils.h"
#include "RtsFlags.h"
#include "Itimer.h"
18
#include "Signals.h"
19

rrt's avatar
rrt committed
20
# ifdef HAVE_SYS_TYPES_H
21 22 23 24 25 26 27
#  include <sys/types.h>
# endif

# ifdef HAVE_SYS_TIME_H
#  include <sys/time.h>
# endif

rrt's avatar
rrt committed
28
# ifdef mingw32_TARGET_OS
29
#  include <windows.h>
rrt's avatar
rrt committed
30 31
# endif

32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
/* last timestamp */
nat timestamp = 0;

/* There's a clever trick here to avoid problems when the time wraps
 * around.  Since our maximum delay is smaller than 31 bits of ticks
 * (it's actually 31 bits of microseconds), we can safely check
 * whether a timer has expired even if our timer will wrap around
 * before the target is reached, using the following formula:
 *
 *        (int)((uint)current_time - (uint)target_time) < 0
 *
 * if this is true, then our time has expired.
 * (idea due to Andy Gill).
 */
rtsBool
wakeUpSleepingThreads(nat ticks)
{
    StgTSO *tso;
    rtsBool flag = rtsFalse;

    while (sleeping_queue != END_TSO_QUEUE &&
	   (int)(ticks - sleeping_queue->block_info.target) > 0) {
	tso = sleeping_queue;
	sleeping_queue = tso->link;
	tso->why_blocked = NotBlocked;
	tso->link = END_TSO_QUEUE;
	IF_DEBUG(scheduler,belch("Waking up sleeping thread %d\n", tso->id));
	PUSH_ON_RUN_QUEUE(tso);
	flag = rtsTrue;
    }
    return flag;
}
64 65 66 67 68

/* Argument 'wait' says whether to wait for I/O to become available,
 * or whether to just check and return immediately.  If there are
 * other threads ready to run, we normally do the non-waiting variety,
 * otherwise we wait (see Schedule.c).
69 70
 *
 * SMP note: must be called with sched_mutex locked.
rrt's avatar
rrt committed
71 72 73 74 75 76
 *
 * Windows: select only works on sockets, so this doesn't really work,
 * though it makes things better than before. MsgWaitForMultipleObjects
 * should really be used, though it only seems to work for read handles,
 * not write handles.
 *
77 78 79 80 81 82 83
 */
void
awaitEvent(rtsBool wait)
{
    StgTSO *tso, *prev, *next;
    rtsBool ready;
    fd_set rfd,wfd;
rrt's avatar
rrt committed
84
#ifndef mingw32_TARGET_OS
85
    int numFound;
86
    int maxfd = -1;
rrt's avatar
rrt committed
87
#endif
88
    rtsBool select_succeeded = rtsTrue;
sof's avatar
sof committed
89 90
    rtsBool unblock_all = rtsFalse;
    static rtsBool prev_unblocked_all = rtsFalse;
91
    struct timeval tv;
92 93 94 95
    lnat min, ticks;

    tv.tv_sec  = 0;
    tv.tv_usec = 0;
sof's avatar
sof committed
96
    
97 98 99 100 101 102 103
    IF_DEBUG(scheduler,
	     belch("scheduler: checking for threads blocked on I/O");
	     if (wait) {
		 belch(" (waiting)");
	     }
	     belch("\n");
	     );
104

105 106 107 108
    /* loop until we've woken up some threads.  This loop is needed
     * because the select timing isn't accurate, we sometimes sleep
     * for a while but not long enough to wake up a thread in
     * a threadDelay.
109
     */
110
    do {
111

112 113 114 115
      ticks = timestamp = getourtimeofday();
      if (wakeUpSleepingThreads(ticks)) { 
	  return;
      }
116

117 118 119 120 121 122 123 124
      if (!wait) {
	  min = 0;
      } else if (sleeping_queue != END_TSO_QUEUE) {
	  min = (sleeping_queue->block_info.target - ticks) 
	      * TICK_MILLISECS * 1000;
      } else {
	  min = 0x7ffffff;
      }
125

rrt's avatar
rrt committed
126
#ifndef mingw32_TARGET_OS
127
      /* 
128
       * Collect all of the fd's that we're interested in
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
       */
      FD_ZERO(&rfd);
      FD_ZERO(&wfd);

      for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
	next = tso->link;

	switch (tso->why_blocked) {
	case BlockedOnRead:
	  { 
	    int fd = tso->block_info.fd;
	    maxfd = (fd > maxfd) ? fd : maxfd;
	    FD_SET(fd, &rfd);
	    continue;
	  }

	case BlockedOnWrite:
	  { 
	    int fd = tso->block_info.fd;
	    maxfd = (fd > maxfd) ? fd : maxfd;
	    FD_SET(fd, &wfd);
	    continue;
	  }

	default:
	  barf("AwaitEvent");
155 156
	}
      }
157

158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
      /* Release the scheduler lock while we do the poll.
       * this means that someone might muck with the blocked_queue
       * while we do this, but it shouldn't matter:
       *
       *   - another task might poll for I/O and remove one
       *     or more threads from the blocked_queue.
       *   - more I/O threads may be added to blocked_queue.
       *   - more delayed threads may be added to blocked_queue. We'll
       *     just subtract delta from their delays after the poll.
       *
       * I believe none of these cases lead to trouble --SDM.
       */
      RELEASE_LOCK(&sched_mutex);

      /* Check for any interesting events */
173 174
      
      tv.tv_sec  = min / 1000000;
175
      tv.tv_usec = min % 1000000;
176

177 178
      while ((numFound = select(maxfd+1, &rfd, &wfd, NULL, &tv)) < 0) {
	  if (errno != EINTR) {
sof's avatar
sof committed
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
	    /* Handle bad file descriptors by unblocking all the
	       waiting threads. Why? Because a thread might have been
	       a bit naughty and closed a file descriptor while another
	       was blocked waiting. This is less-than-good programming
	       practice, but having the RTS as a result fall over isn't
	       acceptable, so we simply unblock all the waiting threads
	       should we see a bad file descriptor & give the threads
	       a chance to clean up their act. 
	       
	       To avoid getting stuck in a loop, repeated EBADF failures
	       are 'handled' through barfing.
	    */
	    if ( errno == EBADF && !prev_unblocked_all) {
	      unblock_all = rtsTrue;
	      prev_unblocked_all = rtsTrue;
	      break;
	    } else {
 	      fprintf(stderr,"%d\n", errno);
 	      fflush(stderr);
 	      perror("select");
199
	      barf("select failed");
sof's avatar
sof committed
200
	    }
201
	  }
rrt's avatar
rrt committed
202 203 204 205
#else /* on mingwin */
      while (1) {
	  Sleep(0); /* don't busy wait */
#endif /* mingw32_TARGET_OS */
206
	  ACQUIRE_LOCK(&sched_mutex);
rrt's avatar
rrt committed
207

sof's avatar
sof committed
208 209
	  prev_unblocked_all = rtsFalse;

rrt's avatar
rrt committed
210
#ifndef mingw32_TARGET_OS
211 212 213 214 215 216 217
	  /* We got a signal; could be one of ours.  If so, we need
	   * to start up the signal handler straight away, otherwise
	   * we could block for a long time before the signal is
	   * serviced.
	   */
	  if (signals_pending()) {
	      RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
218
	      startSignalHandlers();
219 220 221
	      ACQUIRE_LOCK(&sched_mutex);
	      return; /* still hold the lock */
	  }
rrt's avatar
rrt committed
222 223
#endif

224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
	  /* we were interrupted, return to the scheduler immediately.
	   */
	  if (interrupted) {
	      return; /* still hold the lock */
	  }
	  
	  /* check for threads that need waking up 
	   */
	  wakeUpSleepingThreads(getourtimeofday());
	  
	  /* If new runnable threads have arrived, stop waiting for
	   * I/O and run them.
	   */
	  if (run_queue_hd != END_TSO_QUEUE) {
	      return; /* still hold the lock */
	  }
	  
	  RELEASE_LOCK(&sched_mutex);
      }
243

244
      ACQUIRE_LOCK(&sched_mutex);
245

246 247 248
      /* Step through the waiting queue, unblocking every thread that now has
       * a file descriptor in a ready state.
       */
249

250
      prev = NULL;
sof's avatar
sof committed
251
      if (select_succeeded || unblock_all) {
252 253 254 255
	  for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
	      next = tso->link;
	      switch (tso->why_blocked) {
	      case BlockedOnRead:
sof's avatar
sof committed
256
		  ready = unblock_all || FD_ISSET(tso->block_info.fd, &rfd);
257 258
		  break;
	      case BlockedOnWrite:
sof's avatar
sof committed
259
		  ready = unblock_all || FD_ISSET(tso->block_info.fd, &wfd);
260 261 262 263
		  break;
	      default:
		  barf("awaitEvent");
	      }
264
      
265 266 267 268 269 270 271 272 273 274 275 276 277
	      if (ready) {
		  IF_DEBUG(scheduler,belch("Waking up blocked thread %d\n", tso->id));
		  tso->why_blocked = NotBlocked;
		  tso->link = END_TSO_QUEUE;
		  PUSH_ON_RUN_QUEUE(tso);
	      } else {
		  if (prev == NULL)
		      blocked_queue_hd = tso;
		  else
		      prev->link = tso;
		  prev = tso;
	      }
	  }
278

279 280 281 282 283 284
	  if (prev == NULL)
	      blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
	  else {
	      prev->link = END_TSO_QUEUE;
	      blocked_queue_tl = prev;
	  }
285
      }
286

287
    } while (wait && !interrupted && run_queue_hd == END_TSO_QUEUE);
288
}