diff --git a/src/mm.c b/src/mm.c index 5ed6f00f..c508d187 100644 --- a/src/mm.c +++ b/src/mm.c @@ -15,6 +15,7 @@ machinarium_init(void) { mm_machinemgr_init(&machinarium.machine_mgr); mm_msgpool_init(&machinarium.msg_pool); + mm_queuerdpool_init(&machinarium.queuerd_pool); mm_tls_init(); return 0; } @@ -23,6 +24,7 @@ MACHINE_API void machinarium_free(void) { mm_machinemgr_free(&machinarium.machine_mgr); + mm_queuerdpool_free(&machinarium.queuerd_pool); mm_msgpool_free(&machinarium.msg_pool); mm_tls_free(); } diff --git a/src/mm.h b/src/mm.h index b9b3441a..2426dfc7 100644 --- a/src/mm.h +++ b/src/mm.h @@ -10,8 +10,9 @@ typedef struct mm_t mm_t; struct mm_t { - mm_machinemgr_t machine_mgr; - mm_msgpool_t msg_pool; + mm_machinemgr_t machine_mgr; + mm_msgpool_t msg_pool; + mm_queuerdpool_t queuerd_pool; }; extern mm_t machinarium; diff --git a/src/mm_queue_rd_pool.c b/src/mm_queue_rd_pool.c new file mode 100644 index 00000000..095a63c1 --- /dev/null +++ b/src/mm_queue_rd_pool.c @@ -0,0 +1,62 @@ + +/* + * machinarium. + * + * cooperative multitasking engine. +*/ + +#include +#include + +void mm_queuerdpool_init(mm_queuerdpool_t *pool) +{ + pthread_spin_init(&pool->lock, PTHREAD_PROCESS_PRIVATE); + mm_list_init(&pool->list); + pool->count = 0; +} + +void mm_queuerdpool_free(mm_queuerdpool_t *pool) +{ + mm_list_t *i, *n; + mm_list_foreach_safe(&pool->list, i, n) { + mm_queuerd_t *reader; + reader = mm_container_of(i, mm_queuerd_t, link); + mm_queuerd_close(reader); + free(reader); + } + pthread_spin_destroy(&pool->lock); +} + +mm_queuerd_t* +mm_queuerdpool_pop(mm_queuerdpool_t *pool) +{ + mm_queuerd_t *reader = NULL; + pthread_spin_lock(&pool->lock); + if (pool->count > 0) { + mm_list_t *first = mm_list_pop(&pool->list); + pool->count--; + pthread_spin_unlock(&pool->lock); + reader = mm_container_of(first, mm_queuerd_t, link); + return reader; + } + pthread_spin_unlock(&pool->lock); + + reader = malloc(sizeof(mm_queuerd_t)); + if (reader == NULL) + return NULL; + int rc; + rc = mm_queuerd_open(reader); + if (rc == -1) { + free(reader); + return NULL; + } + return reader; +} + +void mm_queuerdpool_push(mm_queuerdpool_t *pool, mm_queuerd_t *reader) +{ + pthread_spin_lock(&pool->lock); + mm_list_append(&pool->list, &reader->link); + pool->count++; + pthread_spin_unlock(&pool->lock); +} diff --git a/src/mm_queue_rd_pool.h b/src/mm_queue_rd_pool.h new file mode 100644 index 00000000..41eaeaf3 --- /dev/null +++ b/src/mm_queue_rd_pool.h @@ -0,0 +1,26 @@ +#ifndef MM_QUEUE_RD_POOL_H_ +#define MM_QUEUE_RD_POOL_H_ + +/* + * machinarium. + * + * cooperative multitasking engine. +*/ + +typedef struct mm_queuerdpool_t mm_queuerdpool_t; + +struct mm_queuerdpool_t { + pthread_spinlock_t lock; + mm_list_t list; + int count; +}; + +void mm_queuerdpool_init(mm_queuerdpool_t*); +void mm_queuerdpool_free(mm_queuerdpool_t*); + +mm_queuerd_t* +mm_queuerdpool_pop(mm_queuerdpool_t*); + +void mm_queuerdpool_push(mm_queuerdpool_t*, mm_queuerd_t*); + +#endif