Commit 5b4f5a6a authored by sof's avatar sof

[project @ 2003-02-21 05:34:12 by sof]

Asynchronous / non-blocking I/O for Win32 platforms.

This commit introduces a Concurrent Haskell friendly view of I/O on
Win32 platforms. Through the use of a pool of worker Win32 threads, CH
threads may issue asynchronous I/O requests without blocking the
progress of other CH threads. The issuing CH thread is blocked until
the request has been serviced though.

GHC.Conc exports the primops that take care of issuing the
asynchronous I/O requests, which the IO implementation now takes
advantage of. By default, all Handles are non-blocking/asynchronous,
but should performance become an issue, having a per-Handle flag for
turning off non-blocking could easily be imagined&introduced.

[Incidentally, this thread pool-based implementation could easily be
extended to also allow Haskell code to delegate the execution of
arbitrary pieces of (potentially blocking) external code to another OS
thread. Given how relatively gnarly the locking story has turned out
to be with the 'threaded' RTS, that may not be such a bad idea.]
parent e4341897
-----------------------------------------------------------------------
-- $Id: primops.txt.pp,v 1.24 2003/02/04 12:40:00 simonpj Exp $
-- $Id: primops.txt.pp,v 1.25 2003/02/21 05:34:14 sof Exp $
--
-- Primitive Operations
--
......@@ -1415,6 +1415,24 @@ primop WaitWriteOp "waitWrite#" GenPrimOp
has_side_effects = True
out_of_line = True
#ifdef mingw32_TARGET_OS
primop AsyncReadOp "asyncRead#" GenPrimOp
Int# -> Int# -> Int# -> Addr# -> State# RealWorld-> (# State# RealWorld, Int#, Int# #)
{Asynchronously read bytes from specified file descriptor.}
with
needs_wrapper = True
has_side_effects = True
out_of_line = True
primop AsyncWriteOp "asyncWrite#" GenPrimOp
Int# -> Int# -> Int# -> Addr# -> State# RealWorld-> (# State# RealWorld, Int#, Int# #)
{Asynchronously write bytes from specified file descriptor.}
with
needs_wrapper = True
has_side_effects = True
out_of_line = True
#endif
------------------------------------------------------------------------
section "Concurrency primitives"
{(In a non-concurrent implementation, ThreadId\# can be as singleton
......
/* -----------------------------------------------------------------------------
* $Id: PrimOps.h,v 1.99 2002/10/22 11:01:18 simonmar Exp $
* $Id: PrimOps.h,v 1.100 2003/02/21 05:34:15 sof Exp $
*
* (c) The GHC Team, 1998-2000
*
......@@ -241,6 +241,10 @@ EXTFUN_RTS(tryPutMVarzh_fast);
EXTFUN_RTS(waitReadzh_fast);
EXTFUN_RTS(waitWritezh_fast);
EXTFUN_RTS(delayzh_fast);
#ifdef mingw32_TARGET_OS
EXTFUN_RTS(asyncReadzh_fast);
EXTFUN_RTS(asyncWritezh_fast);
#endif
/* -----------------------------------------------------------------------------
......
/* -----------------------------------------------------------------------------
* $Id: StgMiscClosures.h,v 1.45 2003/01/07 09:29:24 simonmar Exp $
* $Id: StgMiscClosures.h,v 1.46 2003/02/21 05:34:15 sof Exp $
*
* (c) The GHC Team, 1998-2002
*
......@@ -282,4 +282,6 @@ EF_(stg_block_noregs);
EF_(stg_block_1);
EF_(stg_block_takemvar);
EF_(stg_block_putmvar);
#ifdef mingw32_TARGET_OS
EF_(stg_block_async);
#endif
/* -----------------------------------------------------------------------------
* $Id: TSO.h,v 1.29 2003/01/25 15:54:48 wolfgang Exp $
* $Id: TSO.h,v 1.30 2003/02/21 05:34:15 sof Exp $
*
* (c) The GHC Team, 1998-1999
*
......@@ -150,10 +150,22 @@ typedef enum {
#endif
} StgTSOBlockReason;
#ifdef mingw32_TARGET_OS
/* results from an async I/O request + it's ID. */
typedef struct {
unsigned int reqID;
int len;
int errCode;
} StgAsyncIOResult;
#endif
typedef union {
StgClosure *closure;
struct StgTSO_ *tso;
int fd;
#ifdef mingw32_TARGET_OS
StgAsyncIOResult* async_result;
#endif
unsigned int target;
} StgTSOBlockInfo;
......
/* -----------------------------------------------------------------------------
* $Id: HeapStackCheck.hc,v 1.27 2002/12/11 15:36:42 simonmar Exp $
* $Id: HeapStackCheck.hc,v 1.28 2003/02/21 05:34:15 sof Exp $
*
* (c) The GHC Team, 1998-2002
*
......@@ -1020,3 +1020,36 @@ FN_(stg_block_putmvar)
BLOCK_GENERIC;
FE_
}
#ifdef mingw32_TARGET_OS
INFO_TABLE_RET( stg_block_async_info, stg_block_async_ret,
MK_SMALL_BITMAP(0/*framesize*/, 0/*bitmap*/),
0/*SRT*/, 0/*SRT_OFF*/, 0/*SRT_LEN*/,
RET_SMALL,, IF_, 0, 0);
IF_(stg_block_async_ret)
{
StgAsyncIOResult* ares;
int len,errC;
FB_
ares = CurrentTSO->block_info.async_result;
len = ares->len;
errC = ares->errCode;
CurrentTSO->block_info.async_result = NULL;
STGCALL1(free,ares);
R1.w = len;
*Sp = (W_)errC;
JMP_(ENTRY_CODE(Sp[1]));
FE_
}
FN_(stg_block_async)
{
FB_
Sp -= 1;
Sp[0] = (W_)&stg_block_async_info;
BLOCK_GENERIC;
FE_
}
#endif
/* -----------------------------------------------------------------------------
* $Id: Linker.c,v 1.114 2003/02/10 23:35:03 wolfgang Exp $
* $Id: Linker.c,v 1.115 2003/02/21 05:34:15 sof Exp $
*
* (c) The GHC Team, 2000, 2001
*
......@@ -221,6 +221,8 @@ typedef struct _RtsSymbolVal {
/* These are statically linked from the mingw libraries into the ghc
executable, so we have to employ this hack. */
#define RTS_MINGW_ONLY_SYMBOLS \
SymX(asyncReadzh_fast) \
SymX(asyncWritezh_fast) \
SymX(memset) \
SymX(inet_ntoa) \
SymX(inet_addr) \
......
......@@ -36,6 +36,10 @@ NON_HS_PACKAGE = YES
# grab sources from these subdirectories
ALL_DIRS = hooks parallel
ifeq "$(HOSTPLATFORM)" "i386-unknown-mingw32"
ALL_DIRS += win32
endif
ifneq "$(DLLized)" "YES"
EXCLUDED_SRCS += RtsDllMain.c
else
......@@ -82,6 +86,10 @@ STANDARD_OPTS += -I../includes -I. -Iparallel
# COMPILING_RTS is only used when building Win32 DLL support.
STANDARD_OPTS += -DCOMPILING_RTS
ifeq "$(HOSTPLATFORM)" "i386-unknown-mingw32"
STANDARD_OPTS += -Iwin32
endif
# HC_OPTS is included in both .c and .hc compilations, whereas CC_OPTS is
# only included in .c compilations. HC_OPTS included the WAY_* opts, which
# must be included in both types of compilations.
......
/* -----------------------------------------------------------------------------
* $Id: PrimOps.hc,v 1.103 2002/12/11 15:36:45 simonmar Exp $
* $Id: PrimOps.hc,v 1.104 2003/02/21 05:34:15 sof Exp $
*
* (c) The GHC Team, 1998-2002
*
......@@ -28,6 +28,11 @@
#include <stdlib.h>
#ifdef mingw32_TARGET_OS
#include <windows.h>
#include "win32/AsyncIO.h"
#endif
/* ** temporary **
classes CCallable and CReturnable don't really exist, but the
......@@ -1629,3 +1634,48 @@ FN_(delayzh_fast)
FE_
}
#ifdef mingw32_TARGET_OS
FN_(asyncReadzh_fast)
{
StgAsyncIOResult* ares;
unsigned int reqID;
FB_
/* args: R1.i = fd, R2.i = isSock, R3.i = len, R4.p = buf */
ASSERT(CurrentTSO->why_blocked == NotBlocked);
CurrentTSO->why_blocked = BlockedOnRead;
ACQUIRE_LOCK(&sched_mutex);
/* could probably allocate this on the heap instead */
ares = (StgAsyncIOResult*)RET_STGCALL2(P_,stgMallocBytes,sizeof(StgAsyncIOResult), "asyncWritezh_fast");
reqID = RET_STGCALL5(W_,addIORequest,R1.i,FALSE,R2.i,R3.i,(char*)R4.p);
ares->reqID = reqID;
ares->len = 0;
ares->errCode = 0;
CurrentTSO->block_info.async_result = ares;
APPEND_TO_BLOCKED_QUEUE(CurrentTSO);
RELEASE_LOCK(&sched_mutex);
JMP_(stg_block_async);
FE_
}
FN_(asyncWritezh_fast)
{
StgAsyncIOResult* ares;
unsigned int reqID;
FB_
/* args: R1.i */
/* args: R1.i = fd, R2.i = isSock, R3.i = len, R4.p = buf */
ASSERT(CurrentTSO->why_blocked == NotBlocked);
CurrentTSO->why_blocked = BlockedOnWrite;
ACQUIRE_LOCK(&sched_mutex);
ares = (StgAsyncIOResult*)RET_STGCALL2(P_,stgMallocBytes,sizeof(StgAsyncIOResult), "asyncWritezh_fast");
reqID = RET_STGCALL5(W_,addIORequest,R1.i,TRUE,R2.i,R3.i,(char*)R4.p);
ares->reqID = reqID;
ares->len = 0;
ares->errCode = 0;
CurrentTSO->block_info.async_result = ares;
APPEND_TO_BLOCKED_QUEUE(CurrentTSO);
RELEASE_LOCK(&sched_mutex);
JMP_(stg_block_async);
FE_
}
#endif
/* -----------------------------------------------------------------------------
* $Id: RtsStartup.c,v 1.70 2003/01/30 10:19:07 simonmar Exp $
* $Id: RtsStartup.c,v 1.71 2003/02/21 05:34:15 sof Exp $
*
* (c) The GHC Team, 1998-2002
*
......@@ -50,6 +50,10 @@
# include "LLC.h"
#endif
#if defined(mingw32_TARGET_OS)
#include "win32/AsyncIO.h"
#endif
#include <stdlib.h>
// Flag Structure
......@@ -153,6 +157,10 @@ hs_init(int *argc, char **argv[])
initDefaultHandlers();
#endif
#if defined(mingw32_TARGET_OS)
startupAsyncIO();
#endif
#ifdef RTS_GTK_FRONTPANEL
if (RtsFlags.GcFlags.frontpanel) {
initFrontPanel();
......@@ -343,6 +351,10 @@ hs_exit(void)
#if defined(TICKY_TICKY)
if (RtsFlags.TickyFlags.showTickyStats) PrintTickyInfo();
#endif
#if defined(mingw32_TARGET_OS)
shutdownAsyncIO();
#endif
}
// Compatibility interfaces
......
/* -----------------------------------------------------------------------------
* $Id: Select.c,v 1.23 2003/01/25 15:54:50 wolfgang Exp $
* $Id: Select.c,v 1.24 2003/02/21 05:34:16 sof Exp $
*
* (c) The GHC Team 1995-2002
*
......@@ -28,6 +28,7 @@
# ifdef mingw32_TARGET_OS
# include <windows.h>
# include "win32/AsyncIO.h"
# endif
#include <errno.h>
......@@ -235,7 +236,9 @@ awaitEvent(rtsBool wait)
#endif
RELEASE_LOCK(&sched_mutex);
while (1) {
Sleep(0); /* don't busy wait */
if (!awaitRequests(wait)) {
Sleep(0); /* don't busy wait */
}
#endif /* mingw32_TARGET_OS */
ACQUIRE_LOCK(&sched_mutex);
#ifdef RTS_SUPPORTS_THREADS
......
/* AsyncIO.c
*
* Integrating Win32 asynchronous I/O with the GHC RTS.
*
* (c) sof, 2002-2003.
*/
#include "Rts.h"
#include <windows.h>
#include <stdio.h>
#include "Schedule.h"
#include "win32/AsyncIO.h"
#include "win32/IOManager.h"
/*
* Overview:
*
* Haskell code issue asynchronous I/O requests via the
* asyncRead# and asyncWrite# primops. These cause addIORequest()
* to be invoked, which forwards the request to the underlying
* asynchronous I/O subsystem. Each request is tagged with a unique
* ID.
*
* addIORequest() returns this ID, so that when the blocked CH
* thread is added onto blocked_queue, its TSO is annotated with
* it. Upon completion of an I/O request, the async I/O handling
* code makes a back-call to signal its completion; the local
* onIOComplete() routine. It adds the IO request ID (along with
* its result data) to a queue of completed requests before returning.
*
* The queue of completed IO request is read by the thread operating
* the RTS scheduler. It de-queues the CH threads corresponding
* to the request IDs, making them runnable again.
*
*/
typedef struct CompletedReq {
unsigned int reqID;
int len;
int errCode;
} CompletedReq;
#define MAX_REQUESTS 200
static CRITICAL_SECTION queue_lock;
static HANDLE completed_req_event;
static CompletedReq completedTable[MAX_REQUESTS];
static int completed_hw;
static int issued_reqs;
static void
onIOComplete(unsigned int reqID,
void* param STG_UNUSED,
int fd STG_UNUSED,
int len,
char* buf STG_UNUSED,
int errCode)
{
/* Deposit result of request in queue/table */
EnterCriticalSection(&queue_lock);
if (completed_hw == MAX_REQUESTS) {
/* Not likely */
fprintf(stderr, "Request table overflow (%d); dropping.\n", reqID);
fflush(stderr);
} else {
#if 0
fprintf(stderr, "onCompl: %d %d %d %d %d\n", reqID, len, errCode, issued_reqs, completed_hw); fflush(stderr);
#endif
completedTable[completed_hw].reqID = reqID;
completedTable[completed_hw].len = len;
completedTable[completed_hw].errCode = errCode;
completed_hw++;
issued_reqs--;
if (completed_hw == 1) {
/* The event is used to wake up the scheduler thread should it
* be blocked waiting for requests to complete. It reset once
* that thread has cleared out the request queue/table.
*/
SetEvent(completed_req_event);
}
}
LeaveCriticalSection(&queue_lock);
}
unsigned int
addIORequest(int fd,
int forWriting,
int isSock,
int len,
char* buf)
{
EnterCriticalSection(&queue_lock);
issued_reqs++;
LeaveCriticalSection(&queue_lock);
#if 0
fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr);
#endif
return AddIORequest(fd,forWriting,isSock,len,buf,0,onIOComplete);
}
int
startupAsyncIO()
{
if (!StartIOManager()) {
return 0;
}
InitializeCriticalSection(&queue_lock);
completed_req_event = CreateEvent (NULL, TRUE, FALSE, NULL);
completed_hw = 0;
return 1;
}
void
shutdownAsyncIO()
{
CloseHandle(completed_req_event);
ShutdownIOManager();
}
int
awaitRequests(rtsBool wait)
{
start:
#if 0
fprintf(stderr, "awaitRequests: %d %d %d\n", issued_reqs, completed_hw, wait); fflush(stderr);
#endif
EnterCriticalSection(&queue_lock);
/* Nothing immediately available & we won't wait */
if ((!wait && completed_hw == 0) ||
(issued_reqs == 0 && completed_hw == 0)) {
LeaveCriticalSection(&queue_lock);
return 0;
}
if (completed_hw == 0) {
/* empty table, drop lock and wait */
LeaveCriticalSection(&queue_lock);
if (wait) {
WaitForSingleObject( completed_req_event, INFINITE );
} else {
return 0; /* cannot happen */
}
goto start;
} else {
int i;
StgTSO *tso, *prev;
for (i=0; i < completed_hw; i++) {
unsigned int rID = completedTable[i].reqID;
prev = NULL;
for(tso = blocked_queue_hd ; tso != END_TSO_QUEUE; tso = tso->link) {
switch(tso->why_blocked) {
case BlockedOnRead:
case BlockedOnWrite:
if (tso->block_info.async_result->reqID == rID) {
/* Found the thread blocked waiting on request; stodgily fill
* in its result block.
*/
tso->block_info.async_result->len = completedTable[i].len;
tso->block_info.async_result->errCode = completedTable[i].errCode;
/* Drop the matched TSO from blocked_queue */
if (prev) {
prev->link = tso->link;
} else {
blocked_queue_hd = tso->link;
}
if (blocked_queue_tl == tso) {
blocked_queue_tl = prev;
}
/* Terminates the run queue + this inner for-loop. */
tso->link = END_TSO_QUEUE;
tso->why_blocked = NotBlocked;
PUSH_ON_RUN_QUEUE(tso);
break;
}
break;
default:
break;
}
prev = tso;
}
}
completed_hw = 0;
ResetEvent(completed_req_event);
LeaveCriticalSection(&queue_lock);
return 1;
}
}
/* AsyncIO.h
*
* Integrating Win32 asynchronous I/O with the GHC RTS.
*
* (c) sof, 2002-2003.
*/
#ifndef __ASYNCHIO_H__
#define __ASYNCHIO_H__
extern unsigned int
addIORequest(int fd,
int forWriting,
int isSock,
int len,
char* buf);
extern int startupAsyncIO(void);
extern void shutdownAsyncIO(void);
extern int awaitRequests(rtsBool wait);
#endif /* __ASYNCHIO_H__ */
/* IOManager.c
*
* Non-blocking / asynchronous I/O for Win32.
*
* (c) sof, 2002-2003.
*/
#include "IOManager.h"
#include "WorkQueue.h"
#include <stdio.h>
#include <stdlib.h>
#include <io.h>
#include <winsock.h>
#include <process.h>
/*
* Internal state maintained by the IO manager.
*/
typedef struct IOManagerState {
CritSection manLock;
WorkQueue* workQueue;
int numWorkers;
int workersIdle;
HANDLE hExitEvent;
unsigned int requestID;
} IOManagerState;
/* ToDo: wrap up this state via a IOManager handle instead? */
static IOManagerState* ioMan;
/*
* The routine executed by each worker thread.
*/
static
unsigned
WINAPI
IOWorkerProc(PVOID param)
{
HANDLE hWaits[2];
DWORD rc;
IOManagerState* iom = (IOManagerState*)param;
WorkQueue* pq = iom->workQueue;
WorkItem* work;
int len;
DWORD errCode;
hWaits[0] = (HANDLE)iom->hExitEvent;
hWaits[1] = GetWorkQueueHandle(pq);
while (1) {
/* The error code is communicated back on completion of request; reset. */
errCode = 0;
EnterCriticalSection(&iom->manLock);
iom->workersIdle++;
LeaveCriticalSection(&iom->manLock);
rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
EnterCriticalSection(&iom->manLock);
iom->workersIdle--;
LeaveCriticalSection(&iom->manLock);
if ( WAIT_OBJECT_0 == rc ) {
/* shutdown */
#if 0
fprintf(stderr, "shutting down...\n"); fflush(stderr);
#endif
return 0;
} else if ( (WAIT_OBJECT_0 + 1) == rc ) {
/* work item available, fetch it. */
#if 0
fprintf(stderr, "work available...\n"); fflush(stderr);
#endif
if (FetchWork(pq,(void**)&work)) {
if ( work->workKind & WORKER_READ ) {
if ( work->workKind & WORKER_FOR_SOCKET ) {
len = recv(work->fd, work->buf, work->len, 0);
if (len == SOCKET_ERROR) {
errCode = WSAGetLastError();
}
} else {
len = read(work->fd, work->buf, work->len);
if (len == -1) { errCode = errno; }
}
} else if ( work->workKind & WORKER_WRITE ) {
if ( work->workKind & WORKER_FOR_SOCKET ) {
len = send(work->fd, work->buf, work->len, 0);
if (len == SOCKET_ERROR) {
errCode = WSAGetLastError();
}
} else {
len = write(work->fd,work->buf, work->len);
if (len == -1) { errCode = errno; }
}
} else if ( work->workKind & WORKER_DELAY ) {
/* very approximate implementation of threadDelay */
Sleep(work->len);
len = work->len;
errCode = 0;
} else {
fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
fflush(stderr);
continue;
}
work->onCompletion(work->requestID,
work->param,
work->fd,
len,
work->buf,
errCode);
/* Free the WorkItem */
free(work);
} else {
fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
return 1;
}
} else {
fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
return 1;
}
}
return 0;
}
static
BOOL
NewIOWorkerThread(IOManagerState* iom)
{
return ( 0 != _beginthreadex(NULL,
0,
IOWorkerProc,
(LPVOID)iom,
0,
NULL) );
//CreateThread( NULL, 0, IOWorkerProc, (LPVOID)iom, 0, NULL));
}
BOOL
StartIOManager(void)
{
HANDLE hExit;
WorkQueue* wq;
wq = NewWorkQueue();
if ( !wq ) return FALSE;
ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
if (!ioMan) {
FreeWorkQueue(wq);
return FALSE;
}
/* A manual-reset event */
hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
if ( !hExit ) {
FreeWorkQueue(wq);
free(ioMan);
return FALSE;
}
ioMan->hExitEvent = hExit;
InitializeCriticalSection(&ioMan->manLock);
ioMan->workQueue = wq;
ioMan->numWorkers = 0;
ioMan->workersIdle = 0;
ioMan->requestID = 1;
return TRUE;
}
/*
* Function: AddIORequest()
*
* Conduit to underlying WorkQueue's SubmitWork(); adds IO
* request to work queue, returning without blocking.
*/
int
AddIORequest ( int fd,
BOOL forWriting,
BOOL isSocket,
int len,
char* buffer,
void* data,
CompletionProc onCompletion)
{
WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
if (!ioMan || !wItem) return 0;
/* Fill in the blanks */
wItem->fd = fd;
wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
( forWriting ? WORKER_WRITE : WORKER_READ );
wItem->len = len;
wItem->buf = buffer;
wItem->param = data;
wItem->onCompletion = onCompletion;
wItem->requestID = ioMan->requestID++;
EnterCriticalSection(&ioMan->manLock);
/* If there are no worker threads available, create one.
*
* If this turns out to be too aggressive a policy, refine.
*/
#if 0
fprintf(stderr, "AddIORequest: %d\n", ioMan->workersIdle); fflush(stderr);
#endif
if ( ioMan->workersIdle == 0 ) {
ioMan->numWorkers++;
NewIOWorkerThread(ioMan);
}
LeaveCriticalSection(&ioMan->manLock);
if (SubmitWork(ioMan->workQueue,wItem)) {
return wItem->requestID;
} else {
return 0;
}
}
/*
* Function: AddDelayRequest()