Select.c 10.1 KB
Newer Older
1
/* -----------------------------------------------------------------------------
sof's avatar
sof committed
2
 * $Id: Select.c,v 1.24 2003/02/21 05:34:16 sof Exp $
3
 *
4
 * (c) The GHC Team 1995-2002
5 6 7 8 9 10
 *
 * Support for concurrent non-blocking I/O and thread waiting.
 *
 * ---------------------------------------------------------------------------*/

/* we're outside the realms of POSIX here... */
11
/* #include "PosixSource.h" */
12 13 14 15 16 17

#include "Rts.h"
#include "Schedule.h"
#include "RtsUtils.h"
#include "RtsFlags.h"
#include "Itimer.h"
18
#include "Signals.h"
19
#include "Capability.h"
20

rrt's avatar
rrt committed
21
# ifdef HAVE_SYS_TYPES_H
22 23 24 25 26 27 28
#  include <sys/types.h>
# endif

# ifdef HAVE_SYS_TIME_H
#  include <sys/time.h>
# endif

rrt's avatar
rrt committed
29
# ifdef mingw32_TARGET_OS
30
#  include <windows.h>
sof's avatar
sof committed
31
#  include "win32/AsyncIO.h"
rrt's avatar
rrt committed
32 33
# endif

34 35 36
#include <errno.h>
#include <string.h>

37 38 39
/* last timestamp */
nat timestamp = 0;

40 41 42 43 44 45 46 47 48
#ifdef RTS_SUPPORTS_THREADS
static rtsBool isWorkerBlockedInAwaitEvent = rtsFalse;
static rtsBool workerWakeupPending = rtsFalse;
#ifndef mingw32_TARGET_OS
static int workerWakeupPipe[2];
static rtsBool workerWakeupInited = rtsFalse;
#endif
#endif

49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
/* There's a clever trick here to avoid problems when the time wraps
 * around.  Since our maximum delay is smaller than 31 bits of ticks
 * (it's actually 31 bits of microseconds), we can safely check
 * whether a timer has expired even if our timer will wrap around
 * before the target is reached, using the following formula:
 *
 *        (int)((uint)current_time - (uint)target_time) < 0
 *
 * if this is true, then our time has expired.
 * (idea due to Andy Gill).
 */
rtsBool
wakeUpSleepingThreads(nat ticks)
{
    StgTSO *tso;
    rtsBool flag = rtsFalse;

    while (sleeping_queue != END_TSO_QUEUE &&
	   (int)(ticks - sleeping_queue->block_info.target) > 0) {
	tso = sleeping_queue;
	sleeping_queue = tso->link;
	tso->why_blocked = NotBlocked;
	tso->link = END_TSO_QUEUE;
	IF_DEBUG(scheduler,belch("Waking up sleeping thread %d\n", tso->id));
	PUSH_ON_RUN_QUEUE(tso);
	flag = rtsTrue;
    }
    return flag;
}
78 79 80 81 82

/* Argument 'wait' says whether to wait for I/O to become available,
 * or whether to just check and return immediately.  If there are
 * other threads ready to run, we normally do the non-waiting variety,
 * otherwise we wait (see Schedule.c).
83 84
 *
 * SMP note: must be called with sched_mutex locked.
rrt's avatar
rrt committed
85 86 87 88 89 90
 *
 * Windows: select only works on sockets, so this doesn't really work,
 * though it makes things better than before. MsgWaitForMultipleObjects
 * should really be used, though it only seems to work for read handles,
 * not write handles.
 *
91 92 93 94 95 96 97
 */
void
awaitEvent(rtsBool wait)
{
    StgTSO *tso, *prev, *next;
    rtsBool ready;
    fd_set rfd,wfd;
rrt's avatar
rrt committed
98
#ifndef mingw32_TARGET_OS
99
    int numFound;
100
    int maxfd = -1;
rrt's avatar
rrt committed
101
#endif
102
    rtsBool select_succeeded = rtsTrue;
sof's avatar
sof committed
103
    rtsBool unblock_all = rtsFalse;
104
    struct timeval tv;
105 106 107 108
    lnat min, ticks;

    tv.tv_sec  = 0;
    tv.tv_usec = 0;
sof's avatar
sof committed
109
    
110 111 112 113 114 115 116
    IF_DEBUG(scheduler,
	     belch("scheduler: checking for threads blocked on I/O");
	     if (wait) {
		 belch(" (waiting)");
	     }
	     belch("\n");
	     );
117

118 119 120 121
    /* loop until we've woken up some threads.  This loop is needed
     * because the select timing isn't accurate, we sometimes sleep
     * for a while but not long enough to wake up a thread in
     * a threadDelay.
122
     */
123
    do {
124

125 126 127 128
      ticks = timestamp = getourtimeofday();
      if (wakeUpSleepingThreads(ticks)) { 
	  return;
      }
129

130 131 132 133 134 135 136 137
      if (!wait) {
	  min = 0;
      } else if (sleeping_queue != END_TSO_QUEUE) {
	  min = (sleeping_queue->block_info.target - ticks) 
	      * TICK_MILLISECS * 1000;
      } else {
	  min = 0x7ffffff;
      }
138

rrt's avatar
rrt committed
139
#ifndef mingw32_TARGET_OS
140
      /* 
141
       * Collect all of the fd's that we're interested in
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
       */
      FD_ZERO(&rfd);
      FD_ZERO(&wfd);

      for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
	next = tso->link;

	switch (tso->why_blocked) {
	case BlockedOnRead:
	  { 
	    int fd = tso->block_info.fd;
	    maxfd = (fd > maxfd) ? fd : maxfd;
	    FD_SET(fd, &rfd);
	    continue;
	  }

	case BlockedOnWrite:
	  { 
	    int fd = tso->block_info.fd;
	    maxfd = (fd > maxfd) ? fd : maxfd;
	    FD_SET(fd, &wfd);
	    continue;
	  }

	default:
	  barf("AwaitEvent");
168 169
	}
      }
170

171 172 173 174 175 176 177 178 179
#ifdef RTS_SUPPORTS_THREADS
      if(!workerWakeupInited) {
          pipe(workerWakeupPipe);
          workerWakeupInited = rtsTrue;
      }
      FD_SET(workerWakeupPipe[0], &rfd);
      maxfd = workerWakeupPipe[0] > maxfd ? workerWakeupPipe[0] : maxfd;
#endif
      
180 181 182 183 184 185 186 187 188 189 190 191
      /* Release the scheduler lock while we do the poll.
       * this means that someone might muck with the blocked_queue
       * while we do this, but it shouldn't matter:
       *
       *   - another task might poll for I/O and remove one
       *     or more threads from the blocked_queue.
       *   - more I/O threads may be added to blocked_queue.
       *   - more delayed threads may be added to blocked_queue. We'll
       *     just subtract delta from their delays after the poll.
       *
       * I believe none of these cases lead to trouble --SDM.
       */
192 193 194 195 196
      
#ifdef RTS_SUPPORTS_THREADS
      isWorkerBlockedInAwaitEvent = rtsTrue;
      workerWakeupPending = rtsFalse;
#endif
197 198 199
      RELEASE_LOCK(&sched_mutex);

      /* Check for any interesting events */
200 201
      
      tv.tv_sec  = min / 1000000;
202
      tv.tv_usec = min % 1000000;
203

204 205
      while ((numFound = select(maxfd+1, &rfd, &wfd, NULL, &tv)) < 0) {
	  if (errno != EINTR) {
sof's avatar
sof committed
206 207 208 209 210 211 212 213 214
	    /* Handle bad file descriptors by unblocking all the
	       waiting threads. Why? Because a thread might have been
	       a bit naughty and closed a file descriptor while another
	       was blocked waiting. This is less-than-good programming
	       practice, but having the RTS as a result fall over isn't
	       acceptable, so we simply unblock all the waiting threads
	       should we see a bad file descriptor & give the threads
	       a chance to clean up their act. 
	       
sof's avatar
sof committed
215 216 217 218 219 220 221
	       Note: assume here that threads becoming unblocked
	       will try to read/write the file descriptor before trying
	       to issue a threadWaitRead/threadWaitWrite again (==> an
	       IOError will result for the thread that's got the bad
	       file descriptor.) Hence, there's no danger of a bad
	       file descriptor being repeatedly select()'ed on, so
	       the RTS won't loop.
sof's avatar
sof committed
222
	    */
sof's avatar
sof committed
223
	    if ( errno == EBADF ) {
sof's avatar
sof committed
224 225 226 227 228 229
	      unblock_all = rtsTrue;
	      break;
	    } else {
 	      fprintf(stderr,"%d\n", errno);
 	      fflush(stderr);
 	      perror("select");
230
	      barf("select failed");
sof's avatar
sof committed
231
	    }
232
	  }
rrt's avatar
rrt committed
233
#else /* on mingwin */
234 235 236 237
#ifdef RTS_SUPPORTS_THREADS
      isWorkerBlockedInAwaitEvent = rtsTrue;
#endif
      RELEASE_LOCK(&sched_mutex);
rrt's avatar
rrt committed
238
      while (1) {
sof's avatar
sof committed
239 240 241
	  if (!awaitRequests(wait)) {
	    Sleep(0); /* don't busy wait */
	  }
rrt's avatar
rrt committed
242
#endif /* mingw32_TARGET_OS */
243
	  ACQUIRE_LOCK(&sched_mutex);
244 245 246
#ifdef RTS_SUPPORTS_THREADS
          isWorkerBlockedInAwaitEvent = rtsFalse;
#endif
rrt's avatar
rrt committed
247 248

#ifndef mingw32_TARGET_OS
249 250 251 252 253 254 255
	  /* We got a signal; could be one of ours.  If so, we need
	   * to start up the signal handler straight away, otherwise
	   * we could block for a long time before the signal is
	   * serviced.
	   */
	  if (signals_pending()) {
	      RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
256
	      startSignalHandlers();
257 258 259
	      ACQUIRE_LOCK(&sched_mutex);
	      return; /* still hold the lock */
	  }
rrt's avatar
rrt committed
260 261
#endif

262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
	  /* we were interrupted, return to the scheduler immediately.
	   */
	  if (interrupted) {
	      return; /* still hold the lock */
	  }
	  
	  /* check for threads that need waking up 
	   */
	  wakeUpSleepingThreads(getourtimeofday());
	  
	  /* If new runnable threads have arrived, stop waiting for
	   * I/O and run them.
	   */
	  if (run_queue_hd != END_TSO_QUEUE) {
	      return; /* still hold the lock */
	  }
	  
279 280 281 282 283 284 285 286 287 288 289 290
#ifdef RTS_SUPPORTS_THREADS
	  /* If another worker thread wants to take over,
	   * return to the scheduler
	   */
	  if (needToYieldToReturningWorker()) {
	      return; /* still hold the lock */
	  }
#endif
	  
#ifdef RTS_SUPPORTS_THREADS
          isWorkerBlockedInAwaitEvent = rtsTrue;
#endif
291 292
	  RELEASE_LOCK(&sched_mutex);
      }
293

294
      ACQUIRE_LOCK(&sched_mutex);
295

296 297 298
      /* Step through the waiting queue, unblocking every thread that now has
       * a file descriptor in a ready state.
       */
299

300
      prev = NULL;
sof's avatar
sof committed
301
      if (select_succeeded || unblock_all) {
302 303 304 305
	  for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
	      next = tso->link;
	      switch (tso->why_blocked) {
	      case BlockedOnRead:
sof's avatar
sof committed
306
		  ready = unblock_all || FD_ISSET(tso->block_info.fd, &rfd);
307 308
		  break;
	      case BlockedOnWrite:
sof's avatar
sof committed
309
		  ready = unblock_all || FD_ISSET(tso->block_info.fd, &wfd);
310 311 312 313
		  break;
	      default:
		  barf("awaitEvent");
	      }
314
      
315 316 317 318 319 320 321 322 323 324 325 326 327
	      if (ready) {
		  IF_DEBUG(scheduler,belch("Waking up blocked thread %d\n", tso->id));
		  tso->why_blocked = NotBlocked;
		  tso->link = END_TSO_QUEUE;
		  PUSH_ON_RUN_QUEUE(tso);
	      } else {
		  if (prev == NULL)
		      blocked_queue_hd = tso;
		  else
		      prev->link = tso;
		  prev = tso;
	      }
	  }
328

329 330 331 332 333 334
	  if (prev == NULL)
	      blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
	  else {
	      prev->link = END_TSO_QUEUE;
	      blocked_queue_tl = prev;
	  }
335
      }
336 337 338 339 340 341 342 343 344 345
      
#if defined(RTS_SUPPORTS_THREADS) && !defined(mingw32_TARGET_OS)
      	// if we were woken up by wakeBlockedWorkerThread,
      	// read the dummy byte from the pipe
      if(select_succeeded && FD_ISSET(workerWakeupPipe[0], &rfd)) {
          unsigned char dummy;
          wait = rtsFalse;
          read(workerWakeupPipe[0],&dummy,1);
      }
#endif
346
    } while (wait && !interrupted && run_queue_hd == END_TSO_QUEUE);
347
}
348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375


#ifdef RTS_SUPPORTS_THREADS
/* wakeBlockedWorkerThread
 *
 * If a worker thread is currently blocked within awaitEvent,
 * wake it.
 * Must be called with sched_mutex held.
 */

void
wakeBlockedWorkerThread()
{
#ifndef mingw32_TARGET_OS
    if(isWorkerBlockedInAwaitEvent && !workerWakeupPending) {
    	unsigned char dummy = 42;	// Any value will do here
    	
			// write something so that select() wakes up
    	write(workerWakeupPipe[1],&dummy,1);
    	workerWakeupPending = rtsTrue;
    }
#else
	// The Win32 implementation currently uses a polling loop,
	// so there is no need to explicitly wake it
#endif
}

#endif