Select.c 6.87 KB
Newer Older
1
/* -----------------------------------------------------------------------------
rrt's avatar
rrt committed
2
 * $Id: Select.c,v 1.12 2000/04/03 15:24:21 rrt Exp $
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
 *
 * (c) The GHC Team 1995-1999
 *
 * Support for concurrent non-blocking I/O and thread waiting.
 *
 * ---------------------------------------------------------------------------*/

/* we're outside the realms of POSIX here... */
#define NON_POSIX_SOURCE

#include "Rts.h"
#include "Schedule.h"
#include "RtsUtils.h"
#include "RtsFlags.h"
#include "Itimer.h"
18
#include "Signals.h"
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33

# if defined(HAVE_SYS_TYPES_H)
#  include <sys/types.h>
# endif

# ifdef HAVE_SYS_TIME_H
#  include <sys/time.h>
# endif

nat ticks_since_select = 0;

/* Argument 'wait' says whether to wait for I/O to become available,
 * or whether to just check and return immediately.  If there are
 * other threads ready to run, we normally do the non-waiting variety,
 * otherwise we wait (see Schedule.c).
34 35
 *
 * SMP note: must be called with sched_mutex locked.
36 37 38 39
 */
void
awaitEvent(rtsBool wait)
{
sof's avatar
sof committed
40 41 42 43 44 45 46 47 48
#ifdef mingw32_TARGET_OS
/*
 * Win32 doesn't support select(). ToDo: use MsgWaitForMultipleObjects()
 * to achieve (similar) effect.
 *
 */
    return;
#else

49 50 51
    StgTSO *tso, *prev, *next;
    rtsBool ready;
    fd_set rfd,wfd;
52 53
    int numFound;
    nat min, delta;
54
    int maxfd = -1;
55
    rtsBool select_succeeded = rtsTrue;
56
   
57 58 59 60
    struct timeval tv;
#ifndef linux_TARGET_OS
    struct timeval tv_before,tv_after;
#endif
61 62 63

    IF_DEBUG(scheduler,belch("Checking for threads blocked on I/O...\n"));

64 65 66 67
    /* loop until we've woken up some threads.  This loop is needed
     * because the select timing isn't accurate, we sometimes sleep
     * for a while but not long enough to wake up a thread in
     * a threadDelay.
68
     */
69
    do {
70

71 72 73 74 75 76
      /* see how long it's been since we last checked the blocked queue.
       * ToDo: make this check atomic, so we don't lose any ticks.
       */
      delta = ticks_since_select;
      ticks_since_select = 0;
      delta = delta * TICK_MILLISECS * 1000;
77

78
      min = wait == rtsTrue ? 0x7fffffff : 0;
79

80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
      /* 
       * Collect all of the fd's that we're interested in, and capture
       * the minimum waiting time (in microseconds) for the delayed threads.
       */
      FD_ZERO(&rfd);
      FD_ZERO(&wfd);

      for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
	next = tso->link;

	switch (tso->why_blocked) {
	case BlockedOnRead:
	  { 
	    int fd = tso->block_info.fd;
	    maxfd = (fd > maxfd) ? fd : maxfd;
	    FD_SET(fd, &rfd);
	    continue;
	  }

	case BlockedOnWrite:
	  { 
	    int fd = tso->block_info.fd;
	    maxfd = (fd > maxfd) ? fd : maxfd;
	    FD_SET(fd, &wfd);
	    continue;
	  }

	case BlockedOnDelay:
	  {
andy's avatar
andy committed
109
	    int candidate; /* signed int is intentional */
rrt's avatar
rrt committed
110
#if defined(HAVE_SETITIMER) || defined(mingw32_TARGET_OS)
andy's avatar
andy committed
111 112 113 114 115 116 117 118 119 120
	    candidate = tso->block_info.delay;
#else
	    candidate = tso->block_info.target - getourtimeofday();
	    if (candidate < 0) {
	      candidate = 0;
	    }
#endif
	    if ((nat)candidate < min) {
	      min = candidate;
	    }
121 122 123 124 125
	    continue;
	  }

	default:
	  barf("AwaitEvent");
126 127
	}
      }
128

129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
      /* Release the scheduler lock while we do the poll.
       * this means that someone might muck with the blocked_queue
       * while we do this, but it shouldn't matter:
       *
       *   - another task might poll for I/O and remove one
       *     or more threads from the blocked_queue.
       *   - more I/O threads may be added to blocked_queue.
       *   - more delayed threads may be added to blocked_queue. We'll
       *     just subtract delta from their delays after the poll.
       *
       * I believe none of these cases lead to trouble --SDM.
       */
      RELEASE_LOCK(&sched_mutex);

      /* Check for any interesting events */
144

145 146
      tv.tv_sec = min / 1000000;
      tv.tv_usec = min % 1000000;
147

148
#ifndef linux_TARGET_OS
149
      gettimeofday(&tv_before, (struct timezone *) NULL);
150
#endif
151

152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
      while (!interrupted &&
	     (numFound = select(maxfd+1, &rfd, &wfd, NULL, &tv)) < 0) {
	if (errno != EINTR) {
	  /* fflush(stdout); */
	  perror("select");
	  barf("select failed");
	}
	ACQUIRE_LOCK(&sched_mutex);

	/* We got a signal; could be one of ours.  If so, we need
	 * to start up the signal handler straight away, otherwise
	 * we could block for a long time before the signal is
	 * serviced.
	 */
	if (signals_pending()) {
	  RELEASE_LOCK(&sched_mutex);
168
	  start_signal_handlers();
169 170
	  /* Don't wake up any other threads that were waiting on I/O */
	  select_succeeded = rtsFalse;
171 172
	  break;
	}
173

174 175 176 177 178
	/* If new runnable threads have arrived, stop waiting for
	 * I/O and run them.
	 */
	if (run_queue_hd != END_TSO_QUEUE) {
	  RELEASE_LOCK(&sched_mutex);
179
	  select_succeeded = rtsFalse;
180 181 182
	  break;
	}
	
183
	RELEASE_LOCK(&sched_mutex);
184
      }	
185

186
#ifdef linux_TARGET_OS
187 188 189 190
      /* on Linux, tv is set to indicate the amount of time not
       * slept, so we don't need to gettimeofday() to find out.
       */
      delta += min - (tv.tv_sec * 1000000 + tv.tv_usec);
191
#else
192 193 194
      gettimeofday(&tv_after, (struct timezone *) NULL);
      delta += (tv_after.tv_sec - tv_before.tv_sec) * 1000000 +
	tv_after.tv_usec - tv_before.tv_usec;
195 196 197
#endif

#if 0
198 199
      if (delta != 0) { fprintf(stderr,"waited: %d %d %d\n", min, delta,
				interrupted); }
200
#endif
201

202
      ACQUIRE_LOCK(&sched_mutex);
203

204 205 206 207 208 209
      /* Step through the waiting queue, unblocking every thread that now has
       * a file descriptor in a ready state.
	
       * For the delayed threads, decrement the number of microsecs
       * we've been blocked for. Unblock the threads that have thusly expired.
       */
210

211 212 213 214 215
      prev = NULL;
      for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
	next = tso->link;
	switch (tso->why_blocked) {
	case BlockedOnRead:
216
	  ready = select_succeeded && FD_ISSET(tso->block_info.fd, &rfd);
217 218 219
	  break;
	
	case BlockedOnWrite:
220
	  ready = select_succeeded && FD_ISSET(tso->block_info.fd, &wfd);
221
	  break;
222
	
223
	case BlockedOnDelay:
andy's avatar
andy committed
224
	  {
rrt's avatar
rrt committed
225
#if defined(HAVE_SETITIMER) || defined(mingw32_TARGET_OS)
andy's avatar
andy committed
226 227 228 229 230 231 232 233
	    if (tso->block_info.delay > delta) {
	      tso->block_info.delay -= delta;
	      ready = 0;
	    } else {
	      tso->block_info.delay = 0;
	      ready = 1;
	    }
#else
234
	    int candidate; /* signed int is intentional */
andy's avatar
andy committed
235 236 237 238 239 240 241 242 243 244 245
	    candidate = tso->block_info.target - getourtimeofday();
	    if (candidate < 0) {
	      candidate = 0;
	    }
	    if ((nat)candidate > delta) {
	      ready = 0;
	    } else {
	      ready = 1;
	    }
#endif
	    break;
246
	  }
247
	
248 249 250 251 252 253 254 255 256
	default:
	  barf("awaitEvent");
	}
      
	if (ready) {
	  IF_DEBUG(scheduler,belch("Waking up thread %d\n", tso->id));
	  tso->why_blocked = NotBlocked;
	  tso->link = END_TSO_QUEUE;
	  PUSH_ON_RUN_QUEUE(tso);
257
	} else {
258 259 260 261 262
	  if (prev == NULL)
	    blocked_queue_hd = tso;
	  else
	    prev->link = tso;
	  prev = tso;
263
	}
264
      }
265 266 267 268 269 270

      if (prev == NULL)
	blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
      else {
	prev->link = END_TSO_QUEUE;
	blocked_queue_tl = prev;
271
      }
272 273

    } while (wait && run_queue_hd == END_TSO_QUEUE);
sof's avatar
sof committed
274
#endif
275
}