Commit a3a8ce6e authored by Ben Gamari's avatar Ben Gamari 🐢

rts: Add simple resource pool

parent 36b21390
/* ---------------------------------------------------------------------------
*
* (c) The GHC Team, 2014-2015
*
* A pool of lazily allocated things
*
* --------------------------------------------------------------------------*/
#include "PosixSource.h"
#include "Rts.h"
#include "RtsUtils.h"
#include "Pool.h"
/* used to mark an entry as needing to be freed when released */
#define FLAG_SHOULD_FREE (1 << 0)
typedef struct PoolEntry_ {
struct PoolEntry_ *next;
void *thing;
StgWord flags;
} PoolEntry;
struct Pool_ {
/* the maximum number of allocated resources in the pool */
nat max_size;
/* the number of allocated resources to keep in the pool when idle */
nat desired_size;
/* how many things are currently allocated? (sum of lengths of available and
* taken lists) */
nat current_size;
#ifdef THREADED_RTS
/* signaled when a thing is released */
Condition cond;
#endif
alloc_thing_fn alloc_fn;
free_thing_fn free_fn;
PoolEntry *available;
PoolEntry *taken;
#ifdef THREADED_RTS
/* protects entire data structure */
Mutex mutex;
#endif
};
Pool *poolInit(nat max_size, nat desired_size,
alloc_thing_fn alloc_fn, free_thing_fn free_fn) {
Pool *pool = stgMallocBytes(sizeof(Pool), "pool_init");
pool->max_size = max_size == 0 ? (nat) -1 : max_size;
pool->desired_size = desired_size;
pool->current_size = 0;
pool->alloc_fn = alloc_fn;
pool->free_fn = free_fn;
pool->available = NULL;
pool->taken = NULL;
#ifdef THREADED_RTS
initMutex(&pool->mutex);
initCondition(&pool->cond);
#endif
return pool;
}
int poolFree(Pool *pool) {
if (pool->taken != NULL)
return 1;
poolSetMaxSize(pool, 0);
#ifdef THREADED_RTS
closeCondition(&pool->cond);
closeMutex(&pool->mutex);
#endif
free(pool);
return 0;
}
/* free available entries such that current_size <= size */
static void free_available(Pool *pool, nat size) {
while (pool->current_size > size && pool->available != NULL) {
PoolEntry *ent = pool->available;
pool->free_fn(ent->thing);
pool->available = ent->next;
free(ent);
pool->current_size--;
}
}
void poolSetDesiredSize(Pool *pool, nat size) {
ACQUIRE_LOCK(&pool->mutex);
pool->desired_size = size;
free_available(pool, size);
RELEASE_LOCK(&pool->mutex);
}
void poolSetMaxSize(Pool *pool, nat size) {
ACQUIRE_LOCK(&pool->mutex);
if (size == 0)
size = (nat) -1;
pool->max_size = size;
if (pool->desired_size > pool->max_size) {
pool->desired_size = size;
free_available(pool, size);
}
RELEASE_LOCK(&pool->mutex);
}
nat poolGetMaxSize(Pool *pool) {
return pool->max_size;
}
nat poolGetDesiredSize(Pool *pool) {
return pool->desired_size;
}
void *poolTake(Pool *pool) {
PoolEntry *ent = NULL;
ACQUIRE_LOCK(&pool->mutex);
while (ent == NULL) {
if (pool->available != NULL) {
ent = pool->available;
pool->available = ent->next;
} else if (pool->current_size < pool->max_size) {
ent = stgMallocBytes(sizeof(PoolEntry), "pool_take");
ent->flags = 0;
ent->thing = pool->alloc_fn();
pool->current_size++;
} else {
#ifdef THREADED_RTS
waitCondition(&pool->cond, &pool->mutex);
#else
barf("Tried to take from an empty pool");
#endif
}
}
ent->next = pool->taken;
pool->taken = ent;
RELEASE_LOCK(&pool->mutex);
return ent->thing;
}
void poolRelease(Pool *pool, void *thing) {
ACQUIRE_LOCK(&pool->mutex);
PoolEntry **last = &pool->taken;
PoolEntry *ent = pool->taken;
while (ent != NULL) {
if (ent->thing == thing) {
*last = ent->next;
if (pool->current_size > pool->desired_size
|| ent->flags & FLAG_SHOULD_FREE) {
pool->free_fn(ent->thing);
free(ent);
} else {
ent->next = pool->available;
pool->available = ent;
#ifdef THREADED_RTS
signalCondition(&pool->cond);
#endif
}
RELEASE_LOCK(&pool->mutex);
return;
}
last = &ent->next;
ent = ent->next;
}
barf("pool_release: trying to release resource which doesn't belong to pool.");
}
void poolFlush(Pool *pool) {
ACQUIRE_LOCK(&pool->mutex);
free_available(pool, 0);
PoolEntry *ent = pool->taken;
while (ent != NULL) {
ent->flags |= FLAG_SHOULD_FREE;
ent = ent->next;
}
RELEASE_LOCK(&pool->mutex);
}
#include "Rts.h"
/*
* Resource pools
*
* This module provides an implementation of a simple thread-safe resource pool.
* A pool is a shared set of resources, the size of which is bounded by a
* maximum size (0 indicates unbounded). Consumers can request a resource from
* the pool with pool_take and, when finished can return it to the pool with
* pool_release. Resources will be lazily allocated with alloc_fn as necessary.
* If the pool is already at its maximum size when a request is made, pool_take
* will block until a resource is freed.
*
* The pool will free resources such that there are at most desired_size
* resources in the pool when all resources have been released.
*
* invariant: desired_size <= max_size
*
*/
typedef void *(*alloc_thing_fn)(void);
typedef void (*free_thing_fn)(void *);
typedef struct Pool_ Pool;
/* Create a pool of things. */
Pool *poolInit(nat max_size, nat desired_size,
alloc_thing_fn alloc_fn, free_thing_fn free_fn);
/* Free a pool. Returns 0 on success or 1 on failure due to things
* belonging to the pool currently being claimed. */
int poolFree(Pool *pool);
/* Set the maximum size of a pool (0 indicates unbounded). desired_size will be
* lowered if necessary. */
void poolSetMaxSize(Pool *pool, nat size);
/* Get the maximum size of a pool */
nat poolGetMaxSize(Pool *pool);
/* Set the desired size of a pool */
void poolSetDesiredSize(Pool *pool, nat size);
/* Get the desired size of a pool */
nat poolGetDesiredSize(Pool *pool);
/* Grab an available thing from a pool */
void *poolTake(Pool *pool);
/* Release a thing back to the pool from which it was taken */
void poolRelease(Pool *pool, void *thing);
/* Invalidate all currently allocated resources. Things which are currently
* taken will be freed upon release instead of being returned to the pool. */
void poolFlush(Pool *pool);
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment