mirror of https://github.com/yandex/odyssey.git
machinarium: implement queue readers pool
This commit is contained in:
parent
c8a6f28664
commit
b64fdeb554
2
src/mm.c
2
src/mm.c
|
@ -15,6 +15,7 @@ machinarium_init(void)
|
|||
{
|
||||
mm_machinemgr_init(&machinarium.machine_mgr);
|
||||
mm_msgpool_init(&machinarium.msg_pool);
|
||||
mm_queuerdpool_init(&machinarium.queuerd_pool);
|
||||
mm_tls_init();
|
||||
return 0;
|
||||
}
|
||||
|
@ -23,6 +24,7 @@ MACHINE_API void
|
|||
machinarium_free(void)
|
||||
{
|
||||
mm_machinemgr_free(&machinarium.machine_mgr);
|
||||
mm_queuerdpool_free(&machinarium.queuerd_pool);
|
||||
mm_msgpool_free(&machinarium.msg_pool);
|
||||
mm_tls_free();
|
||||
}
|
||||
|
|
5
src/mm.h
5
src/mm.h
|
@ -10,8 +10,9 @@
|
|||
typedef struct mm_t mm_t;
|
||||
|
||||
struct mm_t {
|
||||
mm_machinemgr_t machine_mgr;
|
||||
mm_msgpool_t msg_pool;
|
||||
mm_machinemgr_t machine_mgr;
|
||||
mm_msgpool_t msg_pool;
|
||||
mm_queuerdpool_t queuerd_pool;
|
||||
};
|
||||
|
||||
extern mm_t machinarium;
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
|
||||
/*
|
||||
* machinarium.
|
||||
*
|
||||
* cooperative multitasking engine.
|
||||
*/
|
||||
|
||||
#include <machinarium.h>
|
||||
#include <machinarium_private.h>
|
||||
|
||||
void mm_queuerdpool_init(mm_queuerdpool_t *pool)
|
||||
{
|
||||
pthread_spin_init(&pool->lock, PTHREAD_PROCESS_PRIVATE);
|
||||
mm_list_init(&pool->list);
|
||||
pool->count = 0;
|
||||
}
|
||||
|
||||
void mm_queuerdpool_free(mm_queuerdpool_t *pool)
|
||||
{
|
||||
mm_list_t *i, *n;
|
||||
mm_list_foreach_safe(&pool->list, i, n) {
|
||||
mm_queuerd_t *reader;
|
||||
reader = mm_container_of(i, mm_queuerd_t, link);
|
||||
mm_queuerd_close(reader);
|
||||
free(reader);
|
||||
}
|
||||
pthread_spin_destroy(&pool->lock);
|
||||
}
|
||||
|
||||
mm_queuerd_t*
|
||||
mm_queuerdpool_pop(mm_queuerdpool_t *pool)
|
||||
{
|
||||
mm_queuerd_t *reader = NULL;
|
||||
pthread_spin_lock(&pool->lock);
|
||||
if (pool->count > 0) {
|
||||
mm_list_t *first = mm_list_pop(&pool->list);
|
||||
pool->count--;
|
||||
pthread_spin_unlock(&pool->lock);
|
||||
reader = mm_container_of(first, mm_queuerd_t, link);
|
||||
return reader;
|
||||
}
|
||||
pthread_spin_unlock(&pool->lock);
|
||||
|
||||
reader = malloc(sizeof(mm_queuerd_t));
|
||||
if (reader == NULL)
|
||||
return NULL;
|
||||
int rc;
|
||||
rc = mm_queuerd_open(reader);
|
||||
if (rc == -1) {
|
||||
free(reader);
|
||||
return NULL;
|
||||
}
|
||||
return reader;
|
||||
}
|
||||
|
||||
void mm_queuerdpool_push(mm_queuerdpool_t *pool, mm_queuerd_t *reader)
|
||||
{
|
||||
pthread_spin_lock(&pool->lock);
|
||||
mm_list_append(&pool->list, &reader->link);
|
||||
pool->count++;
|
||||
pthread_spin_unlock(&pool->lock);
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
#ifndef MM_QUEUE_RD_POOL_H_
|
||||
#define MM_QUEUE_RD_POOL_H_
|
||||
|
||||
/*
|
||||
* machinarium.
|
||||
*
|
||||
* cooperative multitasking engine.
|
||||
*/
|
||||
|
||||
typedef struct mm_queuerdpool_t mm_queuerdpool_t;
|
||||
|
||||
struct mm_queuerdpool_t {
|
||||
pthread_spinlock_t lock;
|
||||
mm_list_t list;
|
||||
int count;
|
||||
};
|
||||
|
||||
void mm_queuerdpool_init(mm_queuerdpool_t*);
|
||||
void mm_queuerdpool_free(mm_queuerdpool_t*);
|
||||
|
||||
mm_queuerd_t*
|
||||
mm_queuerdpool_pop(mm_queuerdpool_t*);
|
||||
|
||||
void mm_queuerdpool_push(mm_queuerdpool_t*, mm_queuerd_t*);
|
||||
|
||||
#endif
|
Loading…
Reference in New Issue