Commit fe41b2dc authored by sof's avatar sof
Browse files

[project @ 2003-09-12 16:26:05 by sof]

- Sleep()'s resolution is millisecs, not microsecs.
- adopt a more agressive policy for augmenting the thread pool
  to handle incoming requests (see code comments for details.)

  The previous policy ran the risk of starvation in rare (and hard
  to reproduce) cases, as spotted after having chased a bug
  for two days.

Merge to STABLE
parent 9fc59c6e
......@@ -16,12 +16,13 @@
* Internal state maintained by the IO manager.
*/
typedef struct IOManagerState {
CritSection manLock;
WorkQueue* workQueue;
int numWorkers;
int workersIdle;
HANDLE hExitEvent;
unsigned int requestID;
CritSection manLock;
WorkQueue* workQueue;
int queueSize;
int numWorkers;
int workersIdle;
HANDLE hExitEvent;
unsigned int requestID;
} IOManagerState;
/* ToDo: wrap up this state via a IOManager handle instead? */
......@@ -35,172 +36,181 @@ unsigned
WINAPI
IOWorkerProc(PVOID param)
{
HANDLE hWaits[2];
DWORD rc;
IOManagerState* iom = (IOManagerState*)param;
WorkQueue* pq = iom->workQueue;
WorkItem* work;
int len = 0, fd = 0;
DWORD errCode;
void* complData;
HANDLE hWaits[2];
DWORD rc;
IOManagerState* iom = (IOManagerState*)param;
WorkQueue* pq = iom->workQueue;
WorkItem* work;
int len = 0, fd = 0;
DWORD errCode;
void* complData;
hWaits[0] = (HANDLE)iom->hExitEvent;
hWaits[1] = GetWorkQueueHandle(pq);
hWaits[0] = (HANDLE)iom->hExitEvent;
hWaits[1] = GetWorkQueueHandle(pq);
while (1) {
/* The error code is communicated back on completion of request; reset. */
errCode = 0;
while (1) {
/* The error code is communicated back on completion of request; reset. */
errCode = 0;
EnterCriticalSection(&iom->manLock);
/* Signal that the worker is idle.
*
* 'workersIdle' is used when determining whether or not to
* increase the worker thread pool when adding a new request.
* (see addIORequest().)
*/
iom->workersIdle++;
LeaveCriticalSection(&iom->manLock);
EnterCriticalSection(&iom->manLock);
iom->workersIdle++;
LeaveCriticalSection(&iom->manLock);
rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
EnterCriticalSection(&iom->manLock);
iom->workersIdle--;
LeaveCriticalSection(&iom->manLock);
EnterCriticalSection(&iom->manLock);
/* Signal that the thread is 'non-idle' and about to consume
* a work item.
*/
iom->workersIdle--;
iom->queueSize--;
LeaveCriticalSection(&iom->manLock);
if ( WAIT_OBJECT_0 == rc ) {
/* shutdown */
#if 0
fprintf(stderr, "shutting down...\n"); fflush(stderr);
#endif
return 0;
} else if ( (WAIT_OBJECT_0 + 1) == rc ) {
/* work item available, fetch it. */
#if 0
fprintf(stderr, "work available...\n"); fflush(stderr);
#endif
if (FetchWork(pq,(void**)&work)) {
if ( work->workKind & WORKER_READ ) {
if ( work->workKind & WORKER_FOR_SOCKET ) {
len = recv(work->workData.ioData.fd,
work->workData.ioData.buf,
work->workData.ioData.len,
0);
if (len == SOCKET_ERROR) {
errCode = WSAGetLastError();
}
} else {
len = read(work->workData.ioData.fd,
work->workData.ioData.buf,
work->workData.ioData.len);
if (len == -1) { errCode = errno; }
}
complData = work->workData.ioData.buf;
fd = work->workData.ioData.fd;
} else if ( work->workKind & WORKER_WRITE ) {
if ( work->workKind & WORKER_FOR_SOCKET ) {
len = send(work->workData.ioData.fd,
work->workData.ioData.buf,
work->workData.ioData.len,
0);
if (len == SOCKET_ERROR) {
errCode = WSAGetLastError();
}
} else {
len = write(work->workData.ioData.fd,
work->workData.ioData.buf,
work->workData.ioData.len);
if (len == -1) { errCode = errno; }
}
complData = work->workData.ioData.buf;
fd = work->workData.ioData.fd;
} else if ( work->workKind & WORKER_DELAY ) {
/* very approximate implementation of threadDelay */
Sleep(work->workData.delayData.msecs);
len = work->workData.delayData.msecs;
complData = NULL;
fd = 0;
errCode = 0;
} else if ( work->workKind & WORKER_DO_PROC ) {
/* perform operation/proc on behalf of Haskell thread. */
if (work->workData.procData.proc) {
/* The procedure is assumed to encode result + success/failure
* via its param.
*/
errCode=work->workData.procData.proc(work->workData.procData.param);
if ( WAIT_OBJECT_0 == rc ) {
/* shutdown */
return 0;
} else if ( (WAIT_OBJECT_0 + 1) == rc ) {
/* work item available, fetch it. */
if (FetchWork(pq,(void**)&work)) {
if ( work->workKind & WORKER_READ ) {
if ( work->workKind & WORKER_FOR_SOCKET ) {
len = recv(work->workData.ioData.fd,
work->workData.ioData.buf,
work->workData.ioData.len,
0);
if (len == SOCKET_ERROR) {
errCode = WSAGetLastError();
}
} else {
len = read(work->workData.ioData.fd,
work->workData.ioData.buf,
work->workData.ioData.len);
if (len == -1) { errCode = errno; }
}
complData = work->workData.ioData.buf;
fd = work->workData.ioData.fd;
} else if ( work->workKind & WORKER_WRITE ) {
if ( work->workKind & WORKER_FOR_SOCKET ) {
len = send(work->workData.ioData.fd,
work->workData.ioData.buf,
work->workData.ioData.len,
0);
if (len == SOCKET_ERROR) {
errCode = WSAGetLastError();
}
} else {
len = write(work->workData.ioData.fd,
work->workData.ioData.buf,
work->workData.ioData.len);
if (len == -1) { errCode = errno; }
}
complData = work->workData.ioData.buf;
fd = work->workData.ioData.fd;
} else if ( work->workKind & WORKER_DELAY ) {
/* Approximate implementation of threadDelay;
*
* Note: Sleep() is in milliseconds, not micros.
*/
Sleep(work->workData.delayData.msecs / 1000);
len = work->workData.delayData.msecs;
complData = NULL;
fd = 0;
errCode = 0;
} else if ( work->workKind & WORKER_DO_PROC ) {
/* perform operation/proc on behalf of Haskell thread. */
if (work->workData.procData.proc) {
/* The procedure is assumed to encode result + success/failure
* via its param.
*/
errCode=work->workData.procData.proc(work->workData.procData.param);
} else {
errCode=1;
}
complData = work->workData.procData.param;
} else {
fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
fflush(stderr);
continue;
}
work->onCompletion(work->requestID,
fd,
len,
complData,
errCode);
/* Free the WorkItem */
free(work);
} else {
errCode=1;
fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
return 1;
}
complData = work->workData.procData.param;
} else {
fprintf(stderr, "unknown work request type (%d) , ignoring.\n", work->workKind);
fflush(stderr);
continue;
fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
return 1;
}
work->onCompletion(work->requestID,
fd,
len,
complData,
errCode);
/* Free the WorkItem */
free(work);
} else {
fprintf(stderr, "unable to fetch work; fatal.\n"); fflush(stderr);
return 1;
}
} else {
fprintf(stderr, "waiting failed; fatal.\n"); fflush(stderr);
return 1;
}
}
return 0;
return 0;
}
static
BOOL
NewIOWorkerThread(IOManagerState* iom)
{
unsigned threadId;
return ( 0 != _beginthreadex(NULL,
0,
IOWorkerProc,
(LPVOID)iom,
0,
&threadId) );
unsigned threadId;
return ( 0 != _beginthreadex(NULL,
0,
IOWorkerProc,
(LPVOID)iom,
0,
&threadId) );
}
BOOL
StartIOManager(void)
{
HANDLE hExit;
WorkQueue* wq;
HANDLE hExit;
WorkQueue* wq;
wq = NewWorkQueue();
if ( !wq ) return FALSE;
wq = NewWorkQueue();
if ( !wq ) return FALSE;
ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
ioMan = (IOManagerState*)malloc(sizeof(IOManagerState));
if (!ioMan) {
FreeWorkQueue(wq);
return FALSE;
}
if (!ioMan) {
FreeWorkQueue(wq);
return FALSE;
}
/* A manual-reset event */
hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
if ( !hExit ) {
FreeWorkQueue(wq);
free(ioMan);
return FALSE;
}
/* A manual-reset event */
hExit = CreateEvent ( NULL, TRUE, FALSE, NULL );
if ( !hExit ) {
FreeWorkQueue(wq);
free(ioMan);
return FALSE;
}
ioMan->hExitEvent = hExit;
InitializeCriticalSection(&ioMan->manLock);
ioMan->workQueue = wq;
ioMan->numWorkers = 0;
ioMan->workersIdle = 0;
ioMan->requestID = 1;
ioMan->hExitEvent = hExit;
InitializeCriticalSection(&ioMan->manLock);
ioMan->workQueue = wq;
ioMan->numWorkers = 0;
ioMan->workersIdle = 0;
ioMan->queueSize = 0;
ioMan->requestID = 1;
return TRUE;
return TRUE;
}
/*
* Function: AddIORequest()
*
* Conduit to underlying WorkQueue's SubmitWork(); adds IO
* request to work queue, returning without blocking.
* request to work queue, deciding whether or not to augment
* the thread pool in the process.
*/
int
AddIORequest ( int fd,
......@@ -210,75 +220,110 @@ AddIORequest ( int fd,
char* buffer,
CompletionProc onCompletion)
{
WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
unsigned int reqID = ioMan->requestID++;
if (!ioMan || !wItem) return 0;
WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
unsigned int reqID = ioMan->requestID++;
if (!ioMan || !wItem) return 0;
/* Fill in the blanks */
wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
( forWriting ? WORKER_WRITE : WORKER_READ );
wItem->workData.ioData.fd = fd;
wItem->workData.ioData.len = len;
wItem->workData.ioData.buf = buffer;
/* Fill in the blanks */
wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
( forWriting ? WORKER_WRITE : WORKER_READ );
wItem->workData.ioData.fd = fd;
wItem->workData.ioData.len = len;
wItem->workData.ioData.buf = buffer;
wItem->onCompletion = onCompletion;
wItem->requestID = reqID;
wItem->onCompletion = onCompletion;
wItem->requestID = reqID;
EnterCriticalSection(&ioMan->manLock);
/* If there are no worker threads available, create one.
*
* If this turns out to be too aggressive a policy, refine.
*/
EnterCriticalSection(&ioMan->manLock);
/* If there are no worker threads available, create one.
*
* If this turns out to be too aggressive a policy, refine.
*/
#if 0
fprintf(stderr, "AddIORequest: %d\n", ioMan->workersIdle); fflush(stderr);
fprintf(stderr, "AddIORequest: %d\n", ioMan->workersIdle);
fflush(stderr);
#endif
if ( ioMan->workersIdle == 0 ) {
ioMan->numWorkers++;
LeaveCriticalSection(&ioMan->manLock);
NewIOWorkerThread(ioMan);
} else {
LeaveCriticalSection(&ioMan->manLock);
}
if (SubmitWork(ioMan->workQueue,wItem)) {
/* Note: the work item has potentially been consumed by a worker thread
* (and freed) at this point, so we cannot use wItem's requestID.
*/
return reqID;
} else {
return 0;
}
}
/* A new worker thread is created when there are fewer idle threads
* than non-consumed queue requests. This ensures that requests will
* be dealt with in a timely manner.
*
* [Long explanation of why the previous thread pool policy lead to
* trouble]
*
* Previously, the thread pool was augmented iff no idle worker threads
* were available. That strategy runs the risk of repeatedly adding to
* the request queue without expanding the thread pool to handle this
* sudden spike in queued requests.
* [How? Assume workersIdle is 1, and addIORequest() is called. No new
* thread is created and the, returning without blocking.
request is simply queued. If addIORequest()
* is called again _before the OS schedules a worker thread to pull the
* request off the queue_, workersIdle is still 1 and another request is
* simply added to the queue. Once the worker thread is run, only one
* request is de-queued, leaving the 2nd request in the queue]
*
* Assuming none of the queued requests take an inordinate amount of to
* complete, the request queue would eventually be drained. But if that's
* not the case, the later requests will end up languishing in the queue
* indefinitely. The non-timely handling of requests may cause CH applications
* to misbehave / hang; bad.
*
*/
ioMan->queueSize++;
if ( ioMan->workersIdle < ioMan->queueSize ) {
ioMan->numWorkers++;
LeaveCriticalSection(&ioMan->manLock);
NewIOWorkerThread(ioMan);
} else {
LeaveCriticalSection(&ioMan->manLock);
}
if (SubmitWork(ioMan->workQueue,wItem)) {
/* Note: the work item has potentially been consumed by a worker thread
* (and freed) at this point, so we cannot use wItem's requestID.
*/
return reqID;
} else {
return 0;
}
}
/*
* Function: AddDelayRequest()
*
* Like AddIORequest(), but this time adding a delay request to
* the request queue.
*/
BOOL
AddDelayRequest ( unsigned int msecs,
CompletionProc onCompletion)
{
WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
unsigned int reqID = ioMan->requestID++;
if (!ioMan || !wItem) return FALSE;
WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
unsigned int reqID = ioMan->requestID++;
if (!ioMan || !wItem) return FALSE;
/* Fill in the blanks */
wItem->workKind = WORKER_DELAY;
wItem->workData.delayData.msecs = msecs;
wItem->onCompletion = onCompletion;
wItem->requestID = reqID;
/* Fill in the blanks */
wItem->workKind = WORKER_DELAY;
wItem->workData.delayData.msecs = msecs;
wItem->onCompletion = onCompletion;
wItem->requestID = reqID;
EnterCriticalSection(&ioMan->manLock);
EnterCriticalSection(&ioMan->manLock);
#if 0
fprintf(stderr, "AddDelayRequest: %d\n", ioMan->workersIdle); fflush(stderr);
fprintf(stderr, "AddDelayRequest: %d\n", ioMan->workersIdle);
fflush(stderr);
#endif
if ( ioMan->workersIdle == 0 ) {
ioMan->numWorkers++;
LeaveCriticalSection(&ioMan->manLock);
NewIOWorkerThread(ioMan);
} else {
LeaveCriticalSection(&ioMan->manLock);
}
/* See AddIORequest() for comments regarding policy
* for augmenting the worker thread pool.
*/
ioMan->queueSize++;
if ( ioMan->workersIdle < ioMan->queueSize ) {
ioMan->numWorkers++;
LeaveCriticalSection(&ioMan->manLock);
NewIOWorkerThread(ioMan);
} else {
LeaveCriticalSection(&ioMan->manLock);
}
if (SubmitWork(ioMan->workQueue,wItem)) {
/* See AddIORequest() comment */
......@@ -289,43 +334,49 @@ AddDelayRequest ( unsigned int msecs,
}
/*
* Function: AddDelayRequest()
* Function: AddProcRequest()
*
* Add an asynchronous procedure request.
*/
BOOL
AddProcRequest ( void* proc,
void* param,
CompletionProc onCompletion)
{
WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
unsigned int reqID = ioMan->requestID++;
if (!ioMan || !wItem) return FALSE;
WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
unsigned int reqID = ioMan->requestID++;
if (!ioMan || !wItem) return FALSE;
/* Fill in the blanks */
wItem->workKind = WORKER_DO_PROC;
wItem->workData.procData.proc = proc;
wItem->workData.procData.param = param;
wItem->onCompletion = onCompletion;
wItem->requestID = reqID;
/* Fill in the blanks */
wItem->workKind = WORKER_DO_PROC;
wItem->workData.procData.proc = proc;
wItem->workData.procData.param = param;
wItem->onCompletion = onCompletion;
wItem->requestID = reqID;
EnterCriticalSection(&ioMan->manLock);
EnterCriticalSection(&ioMan->manLock);
#if 0
fprintf(stderr, "AddProcRequest: %d\n", ioMan->workersIdle); fflush(stderr);
fprintf(stderr, "AddProcRequest: %d\n", ioMan->workersIdle);
fflush(stderr);
#endif
if ( ioMan->workersIdle == 0 ) {
ioMan->numWorkers++;
LeaveCriticalSection(&ioMan->manLock);
NewIOWorkerThread(ioMan);
} else {
LeaveCriticalSection(&ioMan->manLock);
}
/* See AddIORequest() for comments regarding policy
* for augmenting the worker thread pool.
*/
ioMan->queueSize++;
if ( ioMan->workersIdle < ioMan->queueSize ) {
ioMan->numWorkers++;
LeaveCriticalSection(&ioMan->manLock);
NewIOWorkerThread(ioMan);
} else {
LeaveCriticalSection(&ioMan->manLock);
}
if (SubmitWork(ioMan->workQueue,wItem)) {
/* See AddIORequest() comment */
return reqID;
} else {
return 0;
}
if (SubmitWork(ioMan->workQueue,wItem)) {
/* See AddIORequest() comment */
return reqID;
} else {
return 0;
}
}
void ShutdownIOManager()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment