Commit ce42f19f authored by sof's avatar sof
Browse files

[project @ 2003-09-15 20:39:38 by sof]

factor out code that handles depositing of work items on the
  thread pool's request queue.
- when it looks as if a new worker thread needs to be created, give
  up our quantum first in the hope that this might at the last minute
  turn up more idle worker threads.
- add comment re: trimming pool size.

Merged to STABLE eventually; I may continue tinkering with this code
some more over the next day or two.
parent 02670da5
...@@ -61,7 +61,13 @@ IOWorkerProc(PVOID param) ...@@ -61,7 +61,13 @@ IOWorkerProc(PVOID param)
*/ */
iom->workersIdle++; iom->workersIdle++;
LeaveCriticalSection(&iom->manLock); LeaveCriticalSection(&iom->manLock);
/*
* A possible future refinement is to make long-term idle threads
* wake up and decide to shut down should the number of idle threads
* be above some threshold.
*
*/
rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE ); rc = WaitForMultipleObjects( 2, hWaits, FALSE, INFINITE );
EnterCriticalSection(&iom->manLock); EnterCriticalSection(&iom->manLock);
...@@ -206,41 +212,22 @@ StartIOManager(void) ...@@ -206,41 +212,22 @@ StartIOManager(void)
} }
/* /*
* Function: AddIORequest() * Function: depositWorkItem()
*
* Local function which deposits a WorkItem onto a work queue,
* deciding in the process whether or not the thread pool needs
* to be augmented with another thread to handle the new request.
* *
* Conduit to underlying WorkQueue's SubmitWork(); adds IO
* request to work queue, deciding whether or not to augment
* the thread pool in the process.
*/ */
static
int int
AddIORequest ( int fd, depositWorkItem( unsigned int reqID,
BOOL forWriting, WorkItem* wItem )
BOOL isSocket,
int len,
char* buffer,
CompletionProc onCompletion)
{ {
WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
unsigned int reqID = ioMan->requestID++;
if (!ioMan || !wItem) return 0;
/* Fill in the blanks */
wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
( forWriting ? WORKER_WRITE : WORKER_READ );
wItem->workData.ioData.fd = fd;
wItem->workData.ioData.len = len;
wItem->workData.ioData.buf = buffer;
wItem->onCompletion = onCompletion;
wItem->requestID = reqID;
EnterCriticalSection(&ioMan->manLock); EnterCriticalSection(&ioMan->manLock);
/* If there are no worker threads available, create one.
*
* If this turns out to be too aggressive a policy, refine.
*/
#if 0 #if 0
fprintf(stderr, "AddIORequest: %d\n", ioMan->workersIdle); fprintf(stderr, "depositWorkItem: %d/%d\n", ioMan->workersIdle, ioMan->numWorkers);
fflush(stderr); fflush(stderr);
#endif #endif
/* A new worker thread is created when there are fewer idle threads /* A new worker thread is created when there are fewer idle threads
...@@ -255,8 +242,7 @@ AddIORequest ( int fd, ...@@ -255,8 +242,7 @@ AddIORequest ( int fd,
* the request queue without expanding the thread pool to handle this * the request queue without expanding the thread pool to handle this
* sudden spike in queued requests. * sudden spike in queued requests.
* [How? Assume workersIdle is 1, and addIORequest() is called. No new * [How? Assume workersIdle is 1, and addIORequest() is called. No new
* thread is created and the, returning without blocking. * thread is created and the request is simply queued. If addIORequest()
request is simply queued. If addIORequest()
* is called again _before the OS schedules a worker thread to pull the * is called again _before the OS schedules a worker thread to pull the
* request off the queue_, workersIdle is still 1 and another request is * request off the queue_, workersIdle is still 1 and another request is
* simply added to the queue. Once the worker thread is run, only one * simply added to the queue. Once the worker thread is run, only one
...@@ -270,14 +256,24 @@ AddIORequest ( int fd, ...@@ -270,14 +256,24 @@ AddIORequest ( int fd,
* *
*/ */
ioMan->queueSize++; ioMan->queueSize++;
if ( ioMan->workersIdle < ioMan->queueSize ) { if ( (ioMan->workersIdle < ioMan->queueSize) ) {
ioMan->numWorkers++; /* see if giving up our quantum ferrets out some idle threads.
*/
LeaveCriticalSection(&ioMan->manLock); LeaveCriticalSection(&ioMan->manLock);
NewIOWorkerThread(ioMan); Sleep(0);
EnterCriticalSection(&ioMan->manLock);
if ( (ioMan->workersIdle < ioMan->queueSize) ) {
/* No, go ahead and create another. */
ioMan->numWorkers++;
LeaveCriticalSection(&ioMan->manLock);
NewIOWorkerThread(ioMan);
} else {
LeaveCriticalSection(&ioMan->manLock);
}
} else { } else {
LeaveCriticalSection(&ioMan->manLock); LeaveCriticalSection(&ioMan->manLock);
} }
if (SubmitWork(ioMan->workQueue,wItem)) { if (SubmitWork(ioMan->workQueue,wItem)) {
/* Note: the work item has potentially been consumed by a worker thread /* Note: the work item has potentially been consumed by a worker thread
* (and freed) at this point, so we cannot use wItem's requestID. * (and freed) at this point, so we cannot use wItem's requestID.
...@@ -286,6 +282,38 @@ AddIORequest ( int fd, ...@@ -286,6 +282,38 @@ AddIORequest ( int fd,
} else { } else {
return 0; return 0;
} }
}
/*
* Function: AddIORequest()
*
* Conduit to underlying WorkQueue's SubmitWork(); adds IO
* request to work queue, deciding whether or not to augment
* the thread pool in the process.
*/
int
AddIORequest ( int fd,
BOOL forWriting,
BOOL isSocket,
int len,
char* buffer,
CompletionProc onCompletion)
{
WorkItem* wItem = (WorkItem*)malloc(sizeof(WorkItem));
unsigned int reqID = ioMan->requestID++;
if (!ioMan || !wItem) return 0;
/* Fill in the blanks */
wItem->workKind = ( isSocket ? WORKER_FOR_SOCKET : 0 ) |
( forWriting ? WORKER_WRITE : WORKER_READ );
wItem->workData.ioData.fd = fd;
wItem->workData.ioData.len = len;
wItem->workData.ioData.buf = buffer;
wItem->onCompletion = onCompletion;
wItem->requestID = reqID;
return depositWorkItem(reqID, wItem);
} }
/* /*
...@@ -308,29 +336,7 @@ AddDelayRequest ( unsigned int msecs, ...@@ -308,29 +336,7 @@ AddDelayRequest ( unsigned int msecs,
wItem->onCompletion = onCompletion; wItem->onCompletion = onCompletion;
wItem->requestID = reqID; wItem->requestID = reqID;
EnterCriticalSection(&ioMan->manLock); return depositWorkItem(reqID, wItem);
#if 0
fprintf(stderr, "AddDelayRequest: %d\n", ioMan->workersIdle);
fflush(stderr);
#endif
/* See AddIORequest() for comments regarding policy
* for augmenting the worker thread pool.
*/
ioMan->queueSize++;
if ( ioMan->workersIdle < ioMan->queueSize ) {
ioMan->numWorkers++;
LeaveCriticalSection(&ioMan->manLock);
NewIOWorkerThread(ioMan);
} else {
LeaveCriticalSection(&ioMan->manLock);
}
if (SubmitWork(ioMan->workQueue,wItem)) {
/* See AddIORequest() comment */
return reqID;
} else {
return 0;
}
} }
/* /*
...@@ -354,29 +360,7 @@ AddProcRequest ( void* proc, ...@@ -354,29 +360,7 @@ AddProcRequest ( void* proc,
wItem->onCompletion = onCompletion; wItem->onCompletion = onCompletion;
wItem->requestID = reqID; wItem->requestID = reqID;
EnterCriticalSection(&ioMan->manLock); return depositWorkItem(reqID, wItem);
#if 0
fprintf(stderr, "AddProcRequest: %d\n", ioMan->workersIdle);
fflush(stderr);
#endif
/* See AddIORequest() for comments regarding policy
* for augmenting the worker thread pool.
*/
ioMan->queueSize++;
if ( ioMan->workersIdle < ioMan->queueSize ) {
ioMan->numWorkers++;
LeaveCriticalSection(&ioMan->manLock);
NewIOWorkerThread(ioMan);
} else {
LeaveCriticalSection(&ioMan->manLock);
}
if (SubmitWork(ioMan->workQueue,wItem)) {
/* See AddIORequest() comment */
return reqID;
} else {
return 0;
}
} }
void ShutdownIOManager() void ShutdownIOManager()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment