Commit 18340925 authored by sof's avatar sof
Browse files

[project @ 2003-07-03 15:14:56 by sof]

New primop (mingw only),

  asyncDoProc# :: Addr# -> Addr# -> State# RealWorld-> (# State# RealWorld, Int#, Int# #)

which lets a Haskell thread hand off a pointer to external code (1st arg) for
asynchronous execution by the RTS worker thread pool. Second arg is data passed
in to the asynchronous routine. The routine is _not_ permitted to re-enter
the RTS as part of its execution.
parent 5affd811
-----------------------------------------------------------------------
-- $Id: primops.txt.pp,v 1.27 2003/06/19 10:42:26 simonmar Exp $
-- $Id: primops.txt.pp,v 1.28 2003/07/03 15:14:56 sof Exp $
--
-- Primitive Operations
--
......@@ -1440,6 +1440,15 @@ primop AsyncWriteOp "asyncWrite#" GenPrimOp
needs_wrapper = True
has_side_effects = True
out_of_line = True
primop AsyncDoProcOp "asyncDoProc#" GenPrimOp
Addr# -> Addr# -> State# RealWorld-> (# State# RealWorld, Int#, Int# #)
{Asynchronously perform procedure (first arg), passing it 2nd arg.}
with
needs_wrapper = True
has_side_effects = True
out_of_line = True
#endif
------------------------------------------------------------------------
......
/* -----------------------------------------------------------------------------
* $Id: PrimOps.h,v 1.102 2003/06/19 10:42:24 simonmar Exp $
* $Id: PrimOps.h,v 1.103 2003/07/03 15:14:57 sof Exp $
*
* (c) The GHC Team, 1998-2000
*
......@@ -244,6 +244,7 @@ EXTFUN_RTS(delayzh_fast);
#ifdef mingw32_TARGET_OS
EXTFUN_RTS(asyncReadzh_fast);
EXTFUN_RTS(asyncWritezh_fast);
EXTFUN_RTS(asyncDoProczh_fast);
#endif
......
/* -----------------------------------------------------------------------------
* $Id: TSO.h,v 1.30 2003/02/21 05:34:15 sof Exp $
* $Id: TSO.h,v 1.31 2003/07/03 15:14:58 sof Exp $
*
* (c) The GHC Team, 1998-1999
*
......@@ -139,6 +139,9 @@ typedef enum {
BlockedOnRead,
BlockedOnWrite,
BlockedOnDelay
#if defined(mingw32_TARGET_OS)
, BlockedOnDoProc
#endif
#if defined(PAR)
, BlockedOnGA // blocked on a remote closure represented by a Global Address
, BlockedOnGA_NoSend // same as above but without sending a Fetch message
......@@ -150,7 +153,7 @@ typedef enum {
#endif
} StgTSOBlockReason;
#ifdef mingw32_TARGET_OS
#if defined(mingw32_TARGET_OS)
/* results from an async I/O request + it's ID. */
typedef struct {
unsigned int reqID;
......@@ -163,7 +166,7 @@ typedef union {
StgClosure *closure;
struct StgTSO_ *tso;
int fd;
#ifdef mingw32_TARGET_OS
#if defined(mingw32_TARGET_OS)
StgAsyncIOResult* async_result;
#endif
unsigned int target;
......
/* -----------------------------------------------------------------------------
* $Id: PrimOps.hc,v 1.107 2003/04/15 14:37:12 simonmar Exp $
* $Id: PrimOps.hc,v 1.108 2003/07/03 15:14:58 sof Exp $
*
* (c) The GHC Team, 1998-2002
*
......@@ -1676,7 +1676,7 @@ FN_(asyncReadzh_fast)
CurrentTSO->why_blocked = BlockedOnRead;
ACQUIRE_LOCK(&sched_mutex);
/* could probably allocate this on the heap instead */
ares = (StgAsyncIOResult*)RET_STGCALL2(P_,stgMallocBytes,sizeof(StgAsyncIOResult), "asyncWritezh_fast");
ares = (StgAsyncIOResult*)RET_STGCALL2(P_,stgMallocBytes,sizeof(StgAsyncIOResult), "asyncReadzh_fast");
reqID = RET_STGCALL5(W_,addIORequest,R1.i,FALSE,R2.i,R3.i,(char*)R4.p);
ares->reqID = reqID;
ares->len = 0;
......@@ -1709,4 +1709,26 @@ FN_(asyncWritezh_fast)
JMP_(stg_block_async);
FE_
}
FN_(asyncDoProczh_fast)
{
StgAsyncIOResult* ares;
unsigned int reqID;
FB_
/* args: R1.i = proc, R2.i = param */
ASSERT(CurrentTSO->why_blocked == NotBlocked);
CurrentTSO->why_blocked = BlockedOnDoProc;
ACQUIRE_LOCK(&sched_mutex);
/* could probably allocate this on the heap instead */
ares = (StgAsyncIOResult*)RET_STGCALL2(P_,stgMallocBytes,sizeof(StgAsyncIOResult), "asyncDoProczh_fast");
reqID = RET_STGCALL2(W_,addDoProcRequest,R1.p,R2.p);
ares->reqID = reqID;
ares->len = 0;
ares->errCode = 0;
CurrentTSO->block_info.async_result = ares;
APPEND_TO_BLOCKED_QUEUE(CurrentTSO);
RELEASE_LOCK(&sched_mutex);
JMP_(stg_block_async);
FE_
}
#endif
/* -----------------------------------------------------------------------------
* $Id: Sanity.c,v 1.33 2003/04/22 16:25:12 simonmar Exp $
* $Id: Sanity.c,v 1.34 2003/07/03 15:14:58 sof Exp $
*
* (c) The GHC Team, 1998-2001
*
......@@ -610,6 +610,9 @@ checkTSO(StgTSO *tso)
case BlockedOnRead:
case BlockedOnWrite:
case BlockedOnDelay:
#if defined(mingw32_TARGET_OS)
case BlockedOnDoProc:
#endif
/* isOnBQ(blocked_queue) */
break;
case BlockedOnException:
......
/* ---------------------------------------------------------------------------
* $Id: Schedule.c,v 1.170 2003/06/19 10:35:37 simonmar Exp $
* $Id: Schedule.c,v 1.171 2003/07/03 15:14:58 sof Exp $
*
* (c) The GHC Team, 1998-2000
*
......@@ -3103,6 +3103,9 @@ unblockThread(StgTSO *tso)
case BlockedOnRead:
case BlockedOnWrite:
#if defined(mingw32_TARGET_OS)
case BlockedOnDoProc:
#endif
{
/* take TSO off blocked_queue */
StgBlockingQueueElement *prev = NULL;
......@@ -3230,6 +3233,9 @@ unblockThread(StgTSO *tso)
case BlockedOnRead:
case BlockedOnWrite:
#if defined(mingw32_TARGET_OS)
case BlockedOnDoProc:
#endif
{
StgTSO *prev = NULL;
for (t = blocked_queue_hd; t != END_TSO_QUEUE;
......@@ -3623,6 +3629,11 @@ printThreadBlockage(StgTSO *tso)
case BlockedOnWrite:
fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
break;
#if defined(mingw32_TARGET_OS)
case BlockedOnDoProc:
fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
break;
#endif
case BlockedOnDelay:
fprintf(stderr,"is blocked until %d", tso->block_info.target);
break;
......
......@@ -51,10 +51,9 @@ static int issued_reqs;
static void
onIOComplete(unsigned int reqID,
void* param STG_UNUSED,
int fd STG_UNUSED,
int len,
char* buf STG_UNUSED,
void* buf STG_UNUSED,
int errCode)
{
/* Deposit result of request in queue/table */
......@@ -96,21 +95,34 @@ addIORequest(int fd,
#if 0
fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len); fflush(stderr);
#endif
return AddIORequest(fd,forWriting,isSock,len,buf,0,onIOComplete);
return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete);
}
unsigned int
addDelayRequest(int msecs)
addDelayRequest(int msecs)
{
EnterCriticalSection(&queue_lock);
issued_reqs++;
LeaveCriticalSection(&queue_lock);
#if 0
fprintf(stderr, "addDelayReq: %d %d %d\n", msecs); fflush(stderr);
fprintf(stderr, "addDelayReq: %d\n", msecs); fflush(stderr);
#endif
return AddDelayRequest(msecs,0,onIOComplete);
return AddDelayRequest(msecs,onIOComplete);
}
unsigned int
addDoProcRequest(void* proc, void* param)
{
EnterCriticalSection(&queue_lock);
issued_reqs++;
LeaveCriticalSection(&queue_lock);
#if 0
fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr);
#endif
return AddProcRequest(proc,param,onIOComplete);
}
int
startupAsyncIO()
{
......@@ -186,6 +198,7 @@ start:
case BlockedOnDelay:
case BlockedOnRead:
case BlockedOnWrite:
case BlockedOnDoProc:
if (tso->block_info.async_result->reqID == rID) {
/* Found the thread blocked waiting on request; stodgily fill
* in its result block.
......
......@@ -13,6 +13,7 @@ addIORequest(int fd,
int len,
char* buf);
extern unsigned int addDelayRequest(int msecs);
extern unsigned int addDoProcRequest(void* proc, void* param);
extern int startupAsyncIO(void);
extern void shutdownAsyncIO(void);
......
......@@ -40,8 +40,9 @@ IOWorkerProc(PVOID param)
IOManagerState* iom = (IOManagerState*)param;
WorkQueue* pq = iom->workQueue;
WorkItem* work;
int len;
int len = 0, fd = 0;
DWORD errCode;
void* complData;
hWaits[0] = (HANDLE)iom->hExitEvent;
hWaits[1] = GetWorkQueueHandle(pq);
......@@ -74,39 +75,66 @@ IOWorkerProc(PVOID param)
if (FetchWork(pq,(void**)&work)) {
if ( work->workKind & WORKER_READ ) {
if ( work->workKind & WORKER_FOR_SOCKET ) {
len = recv(work->fd, work->buf, work->len, 0);
len = recv(work->workData.ioData.fd,
work->workData.ioData.buf,
work->workData.ioData.len,
0);
if (len == SOCKET_ERROR) {
errCode = WSAGetLastError();
}
} else {
len = read(work->fd, work->buf, work->len);
len = read(work->workData.ioData.fd,
work->workData.ioData.buf,
work->workData.ioData.len);
if (len == -1) { errCode = errno; }
}
complData = work->workData.ioData.buf;
fd = work->workData.ioData.fd;
} else if ( work->workKind & WORKER_WRITE ) {
if ( work->workKind & WORKER_FOR_SOCKET ) {
len = send(work->fd, work->buf, work->len, 0);
len = send(work->workData.ioData.fd,
work->workData.ioData.buf,
work->workData.ioData.len,
0);
if (len == SOCKET_ERROR) {
errCode = WSAGetLastError();
}
} else {
len = write(work->fd,work->buf, work->len);
len = write(work->workData.ioData.fd,
work->workData.ioData.buf,
work->workData.ioData.len);
if (len == -1) { errCode = errno; }
}
complData = work->workData.ioData.buf;
fd = work->workData.ioData.fd;
} else if ( work->workKind & WORKER_DELAY ) {
/* very approximate implementation of threadDelay */
Sleep(work->len);
len = work->len;
Sleep(work->workData.delayData.msecs);
len = work->workData.delayData.msecs;
complData = NULL;
fd = 0;
errCode = 0;
} else if ( work->workKind & WORKER_DO_PROC ) {
/* perform operation/proc on behalf of Haskell thread. */
if (work->workData.procData.proc) {
/* The procedure is assumed to encode result + success/failure
* via its param.
*/
work->workData.procData.proc(work->workData.procData.param);
errCode=0;
} else {
errCode=1;
}
complData = work->workData.procData.param;
} else {
fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
fflush(stderr);
continue;
}
work->onCompletion(work->requestID,
work->param,
work->fd,
fd,
len,
work->buf,
complData,
errCode);
/* Free the WorkItem */
free(work);
......@@ -181,21 +209,20 @@ AddIORequest ( int fd,
BOOL isSocket,
int len,
char* buffer,
void* data,
CompletionProc onCompletion)
{
WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
if (!ioMan || !wItem) return 0;
/* Fill in the blanks */
wItem->fd = fd;
wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
( forWriting ? WORKER_WRITE : WORKER_READ );
wItem->len = len;
wItem->buf = buffer;
wItem->param = data;
wItem->onCompletion = onCompletion;
wItem->requestID = ioMan->requestID++;
wItem->workData.ioData.fd = fd;
wItem->workData.ioData.len = len;
wItem->workData.ioData.buf = buffer;
wItem->onCompletion = onCompletion;
wItem->requestID = ioMan->requestID++;
EnterCriticalSection(&ioMan->manLock);
/* If there are no worker threads available, create one.
......@@ -224,18 +251,47 @@ AddIORequest ( int fd,
*/
BOOL
AddDelayRequest ( unsigned int msecs,
void* data,
CompletionProc onCompletion)
{
WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
if (!ioMan || !wItem) return FALSE;
/* Fill in the blanks */
wItem->fd = 0;
wItem->workKind = WORKER_DELAY;
wItem->len = msecs;
wItem->buf = 0;
wItem->param = data;
wItem->workData.delayData.msecs = msecs;
wItem->onCompletion = onCompletion;
wItem->requestID = ioMan->requestID++;
EnterCriticalSection(&ioMan->manLock);
if ( ioMan->workersIdle == 0 ) {
ioMan->numWorkers++;
NewIOWorkerThread(ioMan);
}
LeaveCriticalSection(&ioMan->manLock);
if (SubmitWork(ioMan->workQueue,wItem)) {
return wItem->requestID;
} else {
return 0;
}
}
/*
* Function: AddDelayRequest()
*
*/
BOOL
AddProcRequest ( void* proc,
void* param,
CompletionProc onCompletion)
{
WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
if (!ioMan || !wItem) return FALSE;
/* Fill in the blanks */
wItem->workKind = WORKER_DO_PROC;
wItem->workData.procData.proc = proc;
wItem->workData.procData.param = param;
wItem->onCompletion = onCompletion;
wItem->requestID = ioMan->requestID++;
......
......@@ -32,18 +32,31 @@ extern void* GetFiberData ( void );
*
*/
typedef void (*CompletionProc)(unsigned int requestID,
void* param,
int fd,
int len,
char* buf,
void* buf,
int errCode);
typedef void (*DoProcProc)(void *param);
typedef union workData {
struct {
int fd;
int len;
char *buf;
} ioData;
struct {
int msecs;
} delayData;
struct {
DoProcProc proc;
void* param;
} procData;
} WorkData;
typedef struct WorkItem {
unsigned int workKind;
int fd;
int len;
char* buf;
void* param;
WorkData workData;
unsigned int requestID;
CompletionProc onCompletion;
} WorkItem;
......@@ -54,10 +67,11 @@ extern CompletionProc onComplete;
* that instead of passing a tag describing the work to be performed,
* a function pointer is passed instead. Maybe later.
*/
#define WORKER_READ 1
#define WORKER_WRITE 2
#define WORKER_DELAY 4
#define WORKER_FOR_SOCKET 8
#define WORKER_READ 1
#define WORKER_WRITE 2
#define WORKER_DELAY 4
#define WORKER_FOR_SOCKET 8
#define WORKER_DO_PROC 16
/*
* Starting up and shutting down.
......@@ -71,7 +85,6 @@ extern void ShutdownIOManager ( void );
* will invoke upon completion.
*/
extern int AddDelayRequest ( unsigned int msecs,
void* data,
CompletionProc onCompletion);
extern int AddIORequest ( int fd,
......@@ -79,7 +92,10 @@ extern int AddIORequest ( int fd,
BOOL isSocket,
int len,
char* buffer,
void* data,
CompletionProc onCompletion);
extern int AddProcRequest ( void* proc,
void* data,
CompletionProc onCompletion);
#endif /* __IOMANAGER_H__ */
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment