machinarium: make rdpool per-machine specific

This commit is contained in:
Dmitry Simonenko 2017-05-22 17:20:44 +03:00
parent 879443ab60
commit 0d24e802e3
9 changed files with 28 additions and 25 deletions

View File

@ -57,14 +57,15 @@
#include "mm_thread.h"
#include "mm_machine.h"
#include "mm_machine_mgr.h"
#include "mm_msg.h"
#include "mm_msg_pool.h"
#include "mm_channel.h"
#include "mm_queue_rd.h"
#include "mm_queue_rd_pool.h"
#include "mm_queue.h"
#include "mm_machine.h"
#include "mm_machine_mgr.h"
#include "mm.h"
#include "mm_tls.h"

View File

@ -15,7 +15,6 @@ machinarium_init(void)
{
mm_machinemgr_init(&machinarium.machine_mgr);
mm_msgpool_init(&machinarium.msg_pool);
mm_queuerdpool_init(&machinarium.queuerd_pool);
mm_tls_init();
return 0;
}
@ -24,7 +23,6 @@ MACHINE_API void
machinarium_free(void)
{
mm_machinemgr_free(&machinarium.machine_mgr);
mm_queuerdpool_free(&machinarium.queuerd_pool);
mm_msgpool_free(&machinarium.msg_pool);
mm_tls_free();
}

View File

@ -10,9 +10,8 @@
typedef struct mm_t mm_t;
struct mm_t {
mm_machinemgr_t machine_mgr;
mm_msgpool_t msg_pool;
mm_queuerdpool_t queuerd_pool;
mm_machinemgr_t machine_mgr;
mm_msgpool_t msg_pool;
};
extern mm_t machinarium;

View File

@ -15,6 +15,15 @@ mm_idle_cb(mm_idle_t *handle)
{
(void)handle;
mm_scheduler_run(&mm_self->scheduler);
if (mm_scheduler_online(&mm_self->scheduler))
return;
/* machine shutdown */
mm_queuerdpool_free(&mm_self->queuerd_pool);
/* todo: check active timers and other allocated
* resources */
}
static inline void
@ -73,9 +82,11 @@ machine_create(char *name, machine_function_t function, void *arg)
}
mm_list_init(&machine->link);
mm_scheduler_init(&machine->scheduler, 2048 /* 16K */);
mm_queuerdpool_init(&machine->queuerd_pool);
int rc;
rc = mm_loop_init(&machine->loop);
if (rc < 0) {
mm_queuerdpool_free(&machine->queuerd_pool);
mm_scheduler_free(&machine->scheduler);
free(machine);
return -1;
@ -86,6 +97,7 @@ machine_create(char *name, machine_function_t function, void *arg)
if (rc == -1) {
mm_machinemgr_delete(&machinarium.machine_mgr, machine);
mm_loop_shutdown(&machine->loop);
mm_queuerdpool_free(&machine->queuerd_pool);
mm_scheduler_free(&machine->scheduler);
free(machine);
return -1;

View File

@ -17,6 +17,7 @@ struct mm_machine_t {
void *main_arg;
mm_thread_t thread;
mm_scheduler_t scheduler;
mm_queuerdpool_t queuerd_pool;
mm_loop_t loop;
mm_list_t link;
};

View File

@ -133,11 +133,11 @@ machine_queue_get(machine_queue_t obj, int time_ms)
{
mm_queue_t *queue = obj;
mm_queuerd_t *reader;
reader = mm_queuerdpool_pop(&machinarium.queuerd_pool);
reader = mm_queuerdpool_pop(&mm_self->queuerd_pool);
if (reader == NULL)
return NULL;
mm_msg_t *msg;
msg = mm_queue_get(queue, reader, time_ms);
mm_queuerdpool_push(&machinarium.queuerd_pool, reader);
mm_queuerdpool_push(&mm_self->queuerd_pool, reader);
return msg;
}

View File

@ -29,7 +29,7 @@ int mm_queuerd_open(mm_queuerd_t *reader)
if (rc == -1)
return -1;
rc = mm_loop_read(&mm_self->loop, &reader->fd, mm_queuerd_cb,
reader, MM_R);
reader, 1);
if (rc == -1) {
mm_loop_delete(&mm_self->loop, &reader->fd);
return -1;
@ -39,10 +39,11 @@ int mm_queuerd_open(mm_queuerd_t *reader)
void mm_queuerd_close(mm_queuerd_t *reader)
{
if (reader->fd.fd != -1) {
mm_loop_delete(&mm_self->loop, &reader->fd);
reader->fd.fd = -1;
}
if (reader->fd.fd == -1)
return;
mm_loop_delete(&mm_self->loop, &reader->fd);
close(reader->fd.fd);
reader->fd.fd = -1;
}
void mm_queuerd_notify(mm_queuerd_t *reader)

View File

@ -10,7 +10,6 @@
void mm_queuerdpool_init(mm_queuerdpool_t *pool)
{
pthread_spin_init(&pool->lock, PTHREAD_PROCESS_PRIVATE);
mm_list_init(&pool->list);
pool->count = 0;
}
@ -24,23 +23,18 @@ void mm_queuerdpool_free(mm_queuerdpool_t *pool)
mm_queuerd_close(reader);
free(reader);
}
pthread_spin_destroy(&pool->lock);
}
mm_queuerd_t*
mm_queuerdpool_pop(mm_queuerdpool_t *pool)
{
mm_queuerd_t *reader = NULL;
pthread_spin_lock(&pool->lock);
if (pool->count > 0) {
mm_list_t *first = mm_list_pop(&pool->list);
pool->count--;
pthread_spin_unlock(&pool->lock);
reader = mm_container_of(first, mm_queuerd_t, link);
return reader;
}
pthread_spin_unlock(&pool->lock);
reader = malloc(sizeof(mm_queuerd_t));
if (reader == NULL)
return NULL;
@ -55,8 +49,6 @@ mm_queuerdpool_pop(mm_queuerdpool_t *pool)
void mm_queuerdpool_push(mm_queuerdpool_t *pool, mm_queuerd_t *reader)
{
pthread_spin_lock(&pool->lock);
mm_list_append(&pool->list, &reader->link);
pool->count++;
pthread_spin_unlock(&pool->lock);
}

View File

@ -10,9 +10,8 @@
typedef struct mm_queuerdpool_t mm_queuerdpool_t;
struct mm_queuerdpool_t {
pthread_spinlock_t lock;
mm_list_t list;
int count;
mm_list_t list;
int count;
};
void mm_queuerdpool_init(mm_queuerdpool_t*);