AsyncIO.c 9.96 KB
Newer Older
sof's avatar
sof committed
1
2
3
4
5
6
7
/* AsyncIO.c
 *
 * Integrating Win32 asynchronous I/O with the GHC RTS.
 *
 * (c) sof, 2002-2003.
 */
#include "Rts.h"
sof's avatar
sof committed
8
#include "RtsUtils.h"
sof's avatar
sof committed
9
10
11
#include <windows.h>
#include <stdio.h>
#include "Schedule.h"
12
#include "RtsFlags.h"
13
#include "Capability.h"
sof's avatar
sof committed
14
15
16
17
18
19
20
#include "win32/AsyncIO.h"
#include "win32/IOManager.h"

/*
 * Overview:
 *
 * Haskell code issue asynchronous I/O requests via the 
sof's avatar
sof committed
21
 * async{Read,Write,DoOp}# primops. These cause addIORequest()
sof's avatar
sof committed
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
 * to be invoked, which forwards the request to the underlying
 * asynchronous I/O subsystem. Each request is tagged with a unique
 * ID.
 *
 * addIORequest() returns this ID, so that when the blocked CH
 * thread is added onto blocked_queue, its TSO is annotated with
 * it. Upon completion of an I/O request, the async I/O handling
 * code makes a back-call to signal its completion; the local
 * onIOComplete() routine. It adds the IO request ID (along with
 * its result data) to a queue of completed requests before returning. 
 *
 * The queue of completed IO request is read by the thread operating
 * the RTS scheduler. It de-queues the CH threads corresponding
 * to the request IDs, making them runnable again.
 *
 */

typedef struct CompletedReq {
sof's avatar
sof committed
40
41
42
    unsigned int   reqID;
    int            len;
    int            errCode;
sof's avatar
sof committed
43
44
45
46
47
48
} CompletedReq;

#define MAX_REQUESTS 200

static CRITICAL_SECTION queue_lock;
static HANDLE           completed_req_event;
sof's avatar
sof committed
49
50
static HANDLE           abandon_req_wait;
static HANDLE           wait_handles[2];
sof's avatar
sof committed
51
52
static CompletedReq     completedTable[MAX_REQUESTS];
static int              completed_hw;
sof's avatar
sof committed
53
static HANDLE           completed_table_sema;
sof's avatar
sof committed
54
55
56
57
58
59
static int              issued_reqs;

static void
onIOComplete(unsigned int reqID,
	     int   fd STG_UNUSED,
	     int   len,
sof's avatar
sof committed
60
	     void* buf STG_UNUSED,
sof's avatar
sof committed
61
62
	     int   errCode)
{
sof's avatar
sof committed
63
64
65
66
67
68
69
70
71
72
73
74
    DWORD dwRes;
    /* Deposit result of request in queue/table..when there's room. */
    dwRes = WaitForSingleObject(completed_table_sema, INFINITE);
    switch (dwRes) {
    case WAIT_OBJECT_0:
	break;
    default:
	/* Not likely */
	fprintf(stderr, "onIOComplete: failed to grab table semaphore, dropping request 0x%x\n", reqID);
	fflush(stderr);
	return;
    }
sof's avatar
sof committed
75
76
    EnterCriticalSection(&queue_lock);
    if (completed_hw == MAX_REQUESTS) {
sof's avatar
sof committed
77
78
	/* Shouldn't happen */
	fprintf(stderr, "onIOComplete: ERROR -- Request table overflow (%d); dropping.\n", reqID);
sof's avatar
sof committed
79
80
	fflush(stderr);
    } else {
sof's avatar
sof committed
81
#if 0
sof's avatar
sof committed
82
83
84
	fprintf(stderr, "onCompl: %d %d %d %d %d\n", 
		reqID, len, errCode, issued_reqs, completed_hw); 
	fflush(stderr);
sof's avatar
sof committed
85
#endif
sof's avatar
sof committed
86
87
88
89
90
91
92
	completedTable[completed_hw].reqID   = reqID;
	completedTable[completed_hw].len     = len;
	completedTable[completed_hw].errCode = errCode;
	completed_hw++;
	issued_reqs--;
	if (completed_hw == 1) {
	    /* The event is used to wake up the scheduler thread should it
sof's avatar
sof committed
93
	     * be blocked waiting for requests to complete. The event resets once
sof's avatar
sof committed
94
95
96
97
	     * that thread has cleared out the request queue/table.
	     */
	    SetEvent(completed_req_event);
	}
sof's avatar
sof committed
98
    }
sof's avatar
sof committed
99
    LeaveCriticalSection(&queue_lock);
sof's avatar
sof committed
100
101
102
103
104
105
106
107
108
}

unsigned int
addIORequest(int   fd,
	     int   forWriting,
	     int   isSock,
	     int   len,
	     char* buf)
{
sof's avatar
sof committed
109
110
111
    EnterCriticalSection(&queue_lock);
    issued_reqs++;
    LeaveCriticalSection(&queue_lock);
sof's avatar
sof committed
112
#if 0
sof's avatar
sof committed
113
    fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr);
sof's avatar
sof committed
114
#endif
sof's avatar
sof committed
115
    return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete);
sof's avatar
sof committed
116
117
}

sof's avatar
sof committed
118
unsigned int
sof's avatar
sof committed
119
addDelayRequest(int msecs)
sof's avatar
sof committed
120
{
sof's avatar
sof committed
121
122
123
    EnterCriticalSection(&queue_lock);
    issued_reqs++;
    LeaveCriticalSection(&queue_lock);
sof's avatar
sof committed
124
#if 0
sof's avatar
sof committed
125
    fprintf(stderr, "addDelayReq: %d\n", msecs); fflush(stderr);
sof's avatar
sof committed
126
#endif
sof's avatar
sof committed
127
    return AddDelayRequest(msecs,onIOComplete);
sof's avatar
sof committed
128
129
}

sof's avatar
sof committed
130
131
132
unsigned int
addDoProcRequest(void* proc, void* param)
{
sof's avatar
sof committed
133
134
135
    EnterCriticalSection(&queue_lock);
    issued_reqs++;
    LeaveCriticalSection(&queue_lock);
sof's avatar
sof committed
136
#if 0
sof's avatar
sof committed
137
    fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr);
sof's avatar
sof committed
138
#endif
sof's avatar
sof committed
139
    return AddProcRequest(proc,param,onIOComplete);
sof's avatar
sof committed
140
141
142
}


sof's avatar
sof committed
143
144
145
int
startupAsyncIO()
{
sof's avatar
sof committed
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
    if (!StartIOManager()) {
	return 0;
    }
    InitializeCriticalSection(&queue_lock);
    /* Create a pair of events:
     *
     *    - completed_req_event  -- signals the deposit of request result; manual reset.
     *    - abandon_req_wait     -- external OS thread tells current RTS/Scheduler
     *                              thread to abandon wait for IO request completion.
     *                              Auto reset.
     */
    completed_req_event = CreateEvent (NULL, TRUE,  FALSE, NULL);
    abandon_req_wait    = CreateEvent (NULL, FALSE, FALSE, NULL);
    wait_handles[0] = completed_req_event;
    wait_handles[1] = abandon_req_wait;
    completed_hw = 0;
sof's avatar
sof committed
162
163
    if ( !(completed_table_sema = CreateSemaphore (NULL, MAX_REQUESTS, MAX_REQUESTS, NULL)) ) {
	DWORD rc = GetLastError();
164
	fprintf(stderr, "startupAsyncIO: CreateSemaphore failed 0x%x\n", (int)rc);
sof's avatar
sof committed
165
166
167
168
169
170
	fflush(stderr);
    }

    return ( completed_req_event  != INVALID_HANDLE_VALUE &&
	     abandon_req_wait     != INVALID_HANDLE_VALUE &&
	     completed_table_sema != NULL );
sof's avatar
sof committed
171
172
173
174
175
}

void
shutdownAsyncIO()
{
sof's avatar
sof committed
176
177
    CloseHandle(completed_req_event);
    ShutdownIOManager();
sof's avatar
sof committed
178
179
}

sof's avatar
sof committed
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
/*
 * Function: awaitRequests(wait)
 *
 * Check for the completion of external IO work requests. Worker
 * threads signal completion of IO requests by depositing them
 * in a table (completedTable). awaitRequests() matches up 
 * requests in that table with threads on the blocked_queue, 
 * making the threads whose IO requests have completed runnable
 * again.
 * 
 * awaitRequests() is called by the scheduler periodically _or_ if
 * it is out of work, and need to wait for the completion of IO
 * requests to make further progress. In the latter scenario, 
 * awaitRequests() will simply block waiting for worker threads 
 * to complete if the 'completedTable' is empty.
 */
sof's avatar
sof committed
196
197
198
int
awaitRequests(rtsBool wait)
{
199
200
201
#ifndef THREADED_RTS
  // none of this is actually used in the threaded RTS

sof's avatar
sof committed
202
203
start:
#if 0
sof's avatar
sof committed
204
205
    fprintf(stderr, "awaitRequests(): %d %d %d\n", issued_reqs, completed_hw, wait);
    fflush(stderr);
sof's avatar
sof committed
206
#endif
sof's avatar
sof committed
207
208
    EnterCriticalSection(&queue_lock);
    /* Nothing immediately available & we won't wait */
209
210
211
212
213
214
215
    if ((!wait && completed_hw == 0)
#if 0
	// If we just return when wait==rtsFalse, we'll go into a busy
	// wait loop, so I disabled this condition --SDM 18/12/2003
	(issued_reqs == 0 && completed_hw == 0)
#endif
	) {
sof's avatar
sof committed
216
	LeaveCriticalSection(&queue_lock);
sof's avatar
sof committed
217
	return 0;
sof's avatar
sof committed
218
    }
sof's avatar
sof committed
219
220
221
    if (completed_hw == 0) {
	/* empty table, drop lock and wait */
	LeaveCriticalSection(&queue_lock);
Simon Marlow's avatar
Simon Marlow committed
222
	if ( wait && sched_state == SCHED_RUNNING ) {
sof's avatar
sof committed
223
224
225
	    DWORD dwRes = WaitForMultipleObjects(2, wait_handles, FALSE, INFINITE);
	    switch (dwRes) {
	    case WAIT_OBJECT_0:
sof's avatar
sof committed
226
		/* a request was completed */
sof's avatar
sof committed
227
228
229
		break;
	    case WAIT_OBJECT_0 + 1:
	    case WAIT_TIMEOUT:
sof's avatar
sof committed
230
		/* timeout (unlikely) or told to abandon waiting */
sof's avatar
sof committed
231
		return 0;
sof's avatar
sof committed
232
233
234
235
236
	    case WAIT_FAILED: {
		DWORD dw = GetLastError();
		fprintf(stderr, "awaitRequests: wait failed -- error code: %lu\n", dw); fflush(stderr);
		return 0;
	    }
sof's avatar
sof committed
237
238
239
240
241
	    default:
		fprintf(stderr, "awaitRequests: unexpected wait return code %lu\n", dwRes); fflush(stderr);
		return 0;
	    }
	} else {
242
	    return 0;
sof's avatar
sof committed
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
	}
	goto start;
    } else {
	int i;
	StgTSO *tso, *prev;
	
	for (i=0; i < completed_hw; i++) {
	    /* For each of the completed requests, match up their Ids
	     * with those of the threads on the blocked_queue. If the
	     * thread that made the IO request has been subsequently
	     * killed (and removed from blocked_queue), no match will
	     * be found for that request Id. 
	     *
	     * i.e., killing a Haskell thread doesn't attempt to cancel
	     * the IO request it is blocked on.
	     *
sof's avatar
sof committed
259
	     */
sof's avatar
sof committed
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
	    unsigned int rID = completedTable[i].reqID;
	    
	    prev = NULL;
	    for(tso = blocked_queue_hd ; tso != END_TSO_QUEUE; prev = tso, tso = tso->link) {
	
		switch(tso->why_blocked) {
		case BlockedOnRead:
		case BlockedOnWrite:
		case BlockedOnDoProc:
		    if (tso->block_info.async_result->reqID == rID) {
			/* Found the thread blocked waiting on request; stodgily fill 
			 * in its result block. 
			 */
			tso->block_info.async_result->len = completedTable[i].len;
			tso->block_info.async_result->errCode = completedTable[i].errCode;
			
			/* Drop the matched TSO from blocked_queue */
			if (prev) {
			    prev->link = tso->link;
			} else {
			    blocked_queue_hd = tso->link;
			}
			if (blocked_queue_tl == tso) {
283
			    blocked_queue_tl = prev ? prev : END_TSO_QUEUE;
sof's avatar
sof committed
284
285
286
287
288
			}
		    
			/* Terminates the run queue + this inner for-loop. */
			tso->link = END_TSO_QUEUE;
			tso->why_blocked = NotBlocked;
289
			pushOnRunQueue(&MainCapability, tso);
sof's avatar
sof committed
290
291
292
293
294
295
296
297
			break;
		    }
		    break;
		default:
		    if (tso->why_blocked != NotBlocked) {
			barf("awaitRequests: odd thread state");
		    }
		    break;
sof's avatar
sof committed
298
		}
sof's avatar
sof committed
299
	    }
sof's avatar
sof committed
300
301
302
	    /* Signal that there's completed table slots available */
	    if ( !ReleaseSemaphore(completed_table_sema, 1, NULL) ) {
		DWORD dw = GetLastError();
303
		fprintf(stderr, "awaitRequests: failed to signal semaphore (error code=0x%x)\n", (int)dw);
sof's avatar
sof committed
304
305
		fflush(stderr);
	    }
sof's avatar
sof committed
306
	}
sof's avatar
sof committed
307
308
309
310
	completed_hw = 0;
	ResetEvent(completed_req_event);
	LeaveCriticalSection(&queue_lock);
	return 1;
sof's avatar
sof committed
311
    }
312
#endif /* !THREADED_RTS */
sof's avatar
sof committed
313
}
sof's avatar
sof committed
314

sof's avatar
sof committed
315
316
317
318
319
320
/*
 * Function: abandonRequestWait()
 *
 * Wake up a thread that's blocked waiting for new IO requests
 * to complete (via awaitRequests().)
 */
sof's avatar
sof committed
321
void
322
abandonRequestWait( void )
sof's avatar
sof committed
323
{
sof's avatar
sof committed
324
325
326
    /* the event is auto-reset, but in case there's no thread
     * already waiting on the event, we want to return it to
     * a non-signalled state.
327
328
329
330
331
332
333
334
335
     *
     * Careful!  There is no synchronisation between
     * abandonRequestWait and awaitRequest, which means that
     * abandonRequestWait might be called just before a thread
     * goes into a wait, and we miss the abandon signal.  So we
     * must SetEvent() here rather than PulseEvent() to ensure
     * that the event isn't lost.  We can re-optimise by resetting
     * the event somewhere safe if we know the event has been
     * properly serviced (see resetAbandon() below).  --SDM 18/12/2003
sof's avatar
sof committed
336
     */
337
338
339
340
341
342
343
    SetEvent(abandon_req_wait);
}

void
resetAbandonRequestWait( void )
{
    ResetEvent(abandon_req_wait);
sof's avatar
sof committed
344
}
345