Select.c 9.5 KB
Newer Older
1
/* -----------------------------------------------------------------------------
sof's avatar
sof committed
2
 * $Id: Select.c,v 1.25 2003/02/22 04:51:57 sof Exp $
3
 *
4
 * (c) The GHC Team 1995-2002
5 6 7 8 9
 *
 * Support for concurrent non-blocking I/O and thread waiting.
 *
 * ---------------------------------------------------------------------------*/

sof's avatar
sof committed
10

11
/* we're outside the realms of POSIX here... */
12
/* #include "PosixSource.h" */
13 14

#include "Rts.h"
sof's avatar
sof committed
15 16 17
#ifndef mingw32_TARGET_OS
/* to the end */

18 19 20
#include "Schedule.h"
#include "RtsUtils.h"
#include "RtsFlags.h"
sof's avatar
sof committed
21
#include "Timer.h"
22
#include "Signals.h"
23
#include "Capability.h"
24

rrt's avatar
rrt committed
25
# ifdef HAVE_SYS_TYPES_H
26 27 28 29 30 31 32
#  include <sys/types.h>
# endif

# ifdef HAVE_SYS_TIME_H
#  include <sys/time.h>
# endif

33 34 35
#include <errno.h>
#include <string.h>

36 37 38
/* last timestamp */
nat timestamp = 0;

39 40 41 42 43 44 45
#ifdef RTS_SUPPORTS_THREADS
static rtsBool isWorkerBlockedInAwaitEvent = rtsFalse;
static rtsBool workerWakeupPending = rtsFalse;
static int workerWakeupPipe[2];
static rtsBool workerWakeupInited = rtsFalse;
#endif

46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
/* There's a clever trick here to avoid problems when the time wraps
 * around.  Since our maximum delay is smaller than 31 bits of ticks
 * (it's actually 31 bits of microseconds), we can safely check
 * whether a timer has expired even if our timer will wrap around
 * before the target is reached, using the following formula:
 *
 *        (int)((uint)current_time - (uint)target_time) < 0
 *
 * if this is true, then our time has expired.
 * (idea due to Andy Gill).
 */
rtsBool
wakeUpSleepingThreads(nat ticks)
{
    StgTSO *tso;
    rtsBool flag = rtsFalse;

    while (sleeping_queue != END_TSO_QUEUE &&
	   (int)(ticks - sleeping_queue->block_info.target) > 0) {
	tso = sleeping_queue;
	sleeping_queue = tso->link;
	tso->why_blocked = NotBlocked;
	tso->link = END_TSO_QUEUE;
	IF_DEBUG(scheduler,belch("Waking up sleeping thread %d\n", tso->id));
	PUSH_ON_RUN_QUEUE(tso);
	flag = rtsTrue;
    }
    return flag;
}
75 76 77 78 79

/* Argument 'wait' says whether to wait for I/O to become available,
 * or whether to just check and return immediately.  If there are
 * other threads ready to run, we normally do the non-waiting variety,
 * otherwise we wait (see Schedule.c).
80 81
 *
 * SMP note: must be called with sched_mutex locked.
rrt's avatar
rrt committed
82 83 84 85 86 87
 *
 * Windows: select only works on sockets, so this doesn't really work,
 * though it makes things better than before. MsgWaitForMultipleObjects
 * should really be used, though it only seems to work for read handles,
 * not write handles.
 *
88 89 90 91 92 93 94
 */
void
awaitEvent(rtsBool wait)
{
    StgTSO *tso, *prev, *next;
    rtsBool ready;
    fd_set rfd,wfd;
95
    int numFound;
96
    int maxfd = -1;
97
    rtsBool select_succeeded = rtsTrue;
sof's avatar
sof committed
98
    rtsBool unblock_all = rtsFalse;
99
    struct timeval tv;
100 101 102 103
    lnat min, ticks;

    tv.tv_sec  = 0;
    tv.tv_usec = 0;
sof's avatar
sof committed
104
    
105 106 107 108 109 110 111
    IF_DEBUG(scheduler,
	     belch("scheduler: checking for threads blocked on I/O");
	     if (wait) {
		 belch(" (waiting)");
	     }
	     belch("\n");
	     );
112

113 114 115 116
    /* loop until we've woken up some threads.  This loop is needed
     * because the select timing isn't accurate, we sometimes sleep
     * for a while but not long enough to wake up a thread in
     * a threadDelay.
117
     */
118
    do {
119

120 121 122 123
      ticks = timestamp = getourtimeofday();
      if (wakeUpSleepingThreads(ticks)) { 
	  return;
      }
124

125 126 127 128 129 130 131 132
      if (!wait) {
	  min = 0;
      } else if (sleeping_queue != END_TSO_QUEUE) {
	  min = (sleeping_queue->block_info.target - ticks) 
	      * TICK_MILLISECS * 1000;
      } else {
	  min = 0x7ffffff;
      }
133

134
      /* 
135
       * Collect all of the fd's that we're interested in
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
       */
      FD_ZERO(&rfd);
      FD_ZERO(&wfd);

      for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
	next = tso->link;

	switch (tso->why_blocked) {
	case BlockedOnRead:
	  { 
	    int fd = tso->block_info.fd;
	    maxfd = (fd > maxfd) ? fd : maxfd;
	    FD_SET(fd, &rfd);
	    continue;
	  }

	case BlockedOnWrite:
	  { 
	    int fd = tso->block_info.fd;
	    maxfd = (fd > maxfd) ? fd : maxfd;
	    FD_SET(fd, &wfd);
	    continue;
	  }

	default:
	  barf("AwaitEvent");
162 163
	}
      }
164

165 166 167 168 169 170 171 172 173
#ifdef RTS_SUPPORTS_THREADS
      if(!workerWakeupInited) {
          pipe(workerWakeupPipe);
          workerWakeupInited = rtsTrue;
      }
      FD_SET(workerWakeupPipe[0], &rfd);
      maxfd = workerWakeupPipe[0] > maxfd ? workerWakeupPipe[0] : maxfd;
#endif
      
174 175 176 177 178 179 180 181 182 183 184 185
      /* Release the scheduler lock while we do the poll.
       * this means that someone might muck with the blocked_queue
       * while we do this, but it shouldn't matter:
       *
       *   - another task might poll for I/O and remove one
       *     or more threads from the blocked_queue.
       *   - more I/O threads may be added to blocked_queue.
       *   - more delayed threads may be added to blocked_queue. We'll
       *     just subtract delta from their delays after the poll.
       *
       * I believe none of these cases lead to trouble --SDM.
       */
186 187 188 189 190
      
#ifdef RTS_SUPPORTS_THREADS
      isWorkerBlockedInAwaitEvent = rtsTrue;
      workerWakeupPending = rtsFalse;
#endif
191 192 193
      RELEASE_LOCK(&sched_mutex);

      /* Check for any interesting events */
194 195
      
      tv.tv_sec  = min / 1000000;
196
      tv.tv_usec = min % 1000000;
197

198 199
      while ((numFound = select(maxfd+1, &rfd, &wfd, NULL, &tv)) < 0) {
	  if (errno != EINTR) {
sof's avatar
sof committed
200 201 202 203 204 205 206 207 208
	    /* Handle bad file descriptors by unblocking all the
	       waiting threads. Why? Because a thread might have been
	       a bit naughty and closed a file descriptor while another
	       was blocked waiting. This is less-than-good programming
	       practice, but having the RTS as a result fall over isn't
	       acceptable, so we simply unblock all the waiting threads
	       should we see a bad file descriptor & give the threads
	       a chance to clean up their act. 
	       
sof's avatar
sof committed
209 210 211 212 213 214 215
	       Note: assume here that threads becoming unblocked
	       will try to read/write the file descriptor before trying
	       to issue a threadWaitRead/threadWaitWrite again (==> an
	       IOError will result for the thread that's got the bad
	       file descriptor.) Hence, there's no danger of a bad
	       file descriptor being repeatedly select()'ed on, so
	       the RTS won't loop.
sof's avatar
sof committed
216
	    */
sof's avatar
sof committed
217
	    if ( errno == EBADF ) {
sof's avatar
sof committed
218 219 220 221 222 223
	      unblock_all = rtsTrue;
	      break;
	    } else {
 	      fprintf(stderr,"%d\n", errno);
 	      fflush(stderr);
 	      perror("select");
224
	      barf("select failed");
sof's avatar
sof committed
225
	    }
226 227
	  }
	  ACQUIRE_LOCK(&sched_mutex);
228 229 230
#ifdef RTS_SUPPORTS_THREADS
          isWorkerBlockedInAwaitEvent = rtsFalse;
#endif
rrt's avatar
rrt committed
231

232 233 234 235 236 237 238
	  /* We got a signal; could be one of ours.  If so, we need
	   * to start up the signal handler straight away, otherwise
	   * we could block for a long time before the signal is
	   * serviced.
	   */
	  if (signals_pending()) {
	      RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
239
	      startSignalHandlers();
240 241 242
	      ACQUIRE_LOCK(&sched_mutex);
	      return; /* still hold the lock */
	  }
rrt's avatar
rrt committed
243

244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260
	  /* we were interrupted, return to the scheduler immediately.
	   */
	  if (interrupted) {
	      return; /* still hold the lock */
	  }
	  
	  /* check for threads that need waking up 
	   */
	  wakeUpSleepingThreads(getourtimeofday());
	  
	  /* If new runnable threads have arrived, stop waiting for
	   * I/O and run them.
	   */
	  if (run_queue_hd != END_TSO_QUEUE) {
	      return; /* still hold the lock */
	  }
	  
261 262 263 264 265 266 267 268 269 270 271 272
#ifdef RTS_SUPPORTS_THREADS
	  /* If another worker thread wants to take over,
	   * return to the scheduler
	   */
	  if (needToYieldToReturningWorker()) {
	      return; /* still hold the lock */
	  }
#endif
	  
#ifdef RTS_SUPPORTS_THREADS
          isWorkerBlockedInAwaitEvent = rtsTrue;
#endif
273 274
	  RELEASE_LOCK(&sched_mutex);
      }
275

276
      ACQUIRE_LOCK(&sched_mutex);
277

278 279 280
      /* Step through the waiting queue, unblocking every thread that now has
       * a file descriptor in a ready state.
       */
281

282
      prev = NULL;
sof's avatar
sof committed
283
      if (select_succeeded || unblock_all) {
284 285 286 287
	  for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
	      next = tso->link;
	      switch (tso->why_blocked) {
	      case BlockedOnRead:
sof's avatar
sof committed
288
		  ready = unblock_all || FD_ISSET(tso->block_info.fd, &rfd);
289 290
		  break;
	      case BlockedOnWrite:
sof's avatar
sof committed
291
		  ready = unblock_all || FD_ISSET(tso->block_info.fd, &wfd);
292 293 294 295
		  break;
	      default:
		  barf("awaitEvent");
	      }
296
      
297 298 299 300 301 302 303 304 305 306 307 308 309
	      if (ready) {
		  IF_DEBUG(scheduler,belch("Waking up blocked thread %d\n", tso->id));
		  tso->why_blocked = NotBlocked;
		  tso->link = END_TSO_QUEUE;
		  PUSH_ON_RUN_QUEUE(tso);
	      } else {
		  if (prev == NULL)
		      blocked_queue_hd = tso;
		  else
		      prev->link = tso;
		  prev = tso;
	      }
	  }
310

311 312 313 314 315 316
	  if (prev == NULL)
	      blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
	  else {
	      prev->link = END_TSO_QUEUE;
	      blocked_queue_tl = prev;
	  }
317
      }
318
      
sof's avatar
sof committed
319
#if defined(RTS_SUPPORTS_THREADS)
320 321 322 323 324 325 326 327
      	// if we were woken up by wakeBlockedWorkerThread,
      	// read the dummy byte from the pipe
      if(select_succeeded && FD_ISSET(workerWakeupPipe[0], &rfd)) {
          unsigned char dummy;
          wait = rtsFalse;
          read(workerWakeupPipe[0],&dummy,1);
      }
#endif
328
    } while (wait && !interrupted && run_queue_hd == END_TSO_QUEUE);
329
}
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350


#ifdef RTS_SUPPORTS_THREADS
/* wakeBlockedWorkerThread
 *
 * If a worker thread is currently blocked within awaitEvent,
 * wake it.
 * Must be called with sched_mutex held.
 */
void
wakeBlockedWorkerThread()
{
    if(isWorkerBlockedInAwaitEvent && !workerWakeupPending) {
    	unsigned char dummy = 42;	// Any value will do here
    	
			// write something so that select() wakes up
    	write(workerWakeupPipe[1],&dummy,1);
    	workerWakeupPending = rtsTrue;
    }
}
#endif
sof's avatar
sof committed
351 352

#endif /* !mingw_TARGET_OS */