From 0d24e802e303053e4c4f7f858157a462c44cee15 Mon Sep 17 00:00:00 2001 From: Dmitry Simonenko Date: Mon, 22 May 2017 17:20:44 +0300 Subject: [PATCH] machinarium: make rdpool per-machine specific --- src/machinarium_private.h | 5 +++-- src/mm.c | 2 -- src/mm.h | 5 ++--- src/mm_machine.c | 12 ++++++++++++ src/mm_machine.h | 1 + src/mm_queue.c | 4 ++-- src/mm_queue_rd.c | 11 ++++++----- src/mm_queue_rd_pool.c | 8 -------- src/mm_queue_rd_pool.h | 5 ++--- 9 files changed, 28 insertions(+), 25 deletions(-) diff --git a/src/machinarium_private.h b/src/machinarium_private.h index 85de9c32..df48ef33 100644 --- a/src/machinarium_private.h +++ b/src/machinarium_private.h @@ -57,14 +57,15 @@ #include "mm_thread.h" -#include "mm_machine.h" -#include "mm_machine_mgr.h" #include "mm_msg.h" #include "mm_msg_pool.h" #include "mm_channel.h" #include "mm_queue_rd.h" #include "mm_queue_rd_pool.h" #include "mm_queue.h" + +#include "mm_machine.h" +#include "mm_machine_mgr.h" #include "mm.h" #include "mm_tls.h" diff --git a/src/mm.c b/src/mm.c index c508d187..5ed6f00f 100644 --- a/src/mm.c +++ b/src/mm.c @@ -15,7 +15,6 @@ machinarium_init(void) { mm_machinemgr_init(&machinarium.machine_mgr); mm_msgpool_init(&machinarium.msg_pool); - mm_queuerdpool_init(&machinarium.queuerd_pool); mm_tls_init(); return 0; } @@ -24,7 +23,6 @@ MACHINE_API void machinarium_free(void) { mm_machinemgr_free(&machinarium.machine_mgr); - mm_queuerdpool_free(&machinarium.queuerd_pool); mm_msgpool_free(&machinarium.msg_pool); mm_tls_free(); } diff --git a/src/mm.h b/src/mm.h index 2426dfc7..b9b3441a 100644 --- a/src/mm.h +++ b/src/mm.h @@ -10,9 +10,8 @@ typedef struct mm_t mm_t; struct mm_t { - mm_machinemgr_t machine_mgr; - mm_msgpool_t msg_pool; - mm_queuerdpool_t queuerd_pool; + mm_machinemgr_t machine_mgr; + mm_msgpool_t msg_pool; }; extern mm_t machinarium; diff --git a/src/mm_machine.c b/src/mm_machine.c index 61e95571..57c08ceb 100644 --- a/src/mm_machine.c +++ b/src/mm_machine.c @@ -15,6 +15,15 @@ mm_idle_cb(mm_idle_t *handle) { (void)handle; mm_scheduler_run(&mm_self->scheduler); + + if (mm_scheduler_online(&mm_self->scheduler)) + return; + + /* machine shutdown */ + mm_queuerdpool_free(&mm_self->queuerd_pool); + + /* todo: check active timers and other allocated + * resources */ } static inline void @@ -73,9 +82,11 @@ machine_create(char *name, machine_function_t function, void *arg) } mm_list_init(&machine->link); mm_scheduler_init(&machine->scheduler, 2048 /* 16K */); + mm_queuerdpool_init(&machine->queuerd_pool); int rc; rc = mm_loop_init(&machine->loop); if (rc < 0) { + mm_queuerdpool_free(&machine->queuerd_pool); mm_scheduler_free(&machine->scheduler); free(machine); return -1; @@ -86,6 +97,7 @@ machine_create(char *name, machine_function_t function, void *arg) if (rc == -1) { mm_machinemgr_delete(&machinarium.machine_mgr, machine); mm_loop_shutdown(&machine->loop); + mm_queuerdpool_free(&machine->queuerd_pool); mm_scheduler_free(&machine->scheduler); free(machine); return -1; diff --git a/src/mm_machine.h b/src/mm_machine.h index d7d6e32e..533536e8 100644 --- a/src/mm_machine.h +++ b/src/mm_machine.h @@ -17,6 +17,7 @@ struct mm_machine_t { void *main_arg; mm_thread_t thread; mm_scheduler_t scheduler; + mm_queuerdpool_t queuerd_pool; mm_loop_t loop; mm_list_t link; }; diff --git a/src/mm_queue.c b/src/mm_queue.c index fae18697..cd2b584e 100644 --- a/src/mm_queue.c +++ b/src/mm_queue.c @@ -133,11 +133,11 @@ machine_queue_get(machine_queue_t obj, int time_ms) { mm_queue_t *queue = obj; mm_queuerd_t *reader; - reader = mm_queuerdpool_pop(&machinarium.queuerd_pool); + reader = mm_queuerdpool_pop(&mm_self->queuerd_pool); if (reader == NULL) return NULL; mm_msg_t *msg; msg = mm_queue_get(queue, reader, time_ms); - mm_queuerdpool_push(&machinarium.queuerd_pool, reader); + mm_queuerdpool_push(&mm_self->queuerd_pool, reader); return msg; } diff --git a/src/mm_queue_rd.c b/src/mm_queue_rd.c index 14cdb126..206473d7 100644 --- a/src/mm_queue_rd.c +++ b/src/mm_queue_rd.c @@ -29,7 +29,7 @@ int mm_queuerd_open(mm_queuerd_t *reader) if (rc == -1) return -1; rc = mm_loop_read(&mm_self->loop, &reader->fd, mm_queuerd_cb, - reader, MM_R); + reader, 1); if (rc == -1) { mm_loop_delete(&mm_self->loop, &reader->fd); return -1; @@ -39,10 +39,11 @@ int mm_queuerd_open(mm_queuerd_t *reader) void mm_queuerd_close(mm_queuerd_t *reader) { - if (reader->fd.fd != -1) { - mm_loop_delete(&mm_self->loop, &reader->fd); - reader->fd.fd = -1; - } + if (reader->fd.fd == -1) + return; + mm_loop_delete(&mm_self->loop, &reader->fd); + close(reader->fd.fd); + reader->fd.fd = -1; } void mm_queuerd_notify(mm_queuerd_t *reader) diff --git a/src/mm_queue_rd_pool.c b/src/mm_queue_rd_pool.c index 095a63c1..ffb7c9bc 100644 --- a/src/mm_queue_rd_pool.c +++ b/src/mm_queue_rd_pool.c @@ -10,7 +10,6 @@ void mm_queuerdpool_init(mm_queuerdpool_t *pool) { - pthread_spin_init(&pool->lock, PTHREAD_PROCESS_PRIVATE); mm_list_init(&pool->list); pool->count = 0; } @@ -24,23 +23,18 @@ void mm_queuerdpool_free(mm_queuerdpool_t *pool) mm_queuerd_close(reader); free(reader); } - pthread_spin_destroy(&pool->lock); } mm_queuerd_t* mm_queuerdpool_pop(mm_queuerdpool_t *pool) { mm_queuerd_t *reader = NULL; - pthread_spin_lock(&pool->lock); if (pool->count > 0) { mm_list_t *first = mm_list_pop(&pool->list); pool->count--; - pthread_spin_unlock(&pool->lock); reader = mm_container_of(first, mm_queuerd_t, link); return reader; } - pthread_spin_unlock(&pool->lock); - reader = malloc(sizeof(mm_queuerd_t)); if (reader == NULL) return NULL; @@ -55,8 +49,6 @@ mm_queuerdpool_pop(mm_queuerdpool_t *pool) void mm_queuerdpool_push(mm_queuerdpool_t *pool, mm_queuerd_t *reader) { - pthread_spin_lock(&pool->lock); mm_list_append(&pool->list, &reader->link); pool->count++; - pthread_spin_unlock(&pool->lock); } diff --git a/src/mm_queue_rd_pool.h b/src/mm_queue_rd_pool.h index 41eaeaf3..ab1ebdf5 100644 --- a/src/mm_queue_rd_pool.h +++ b/src/mm_queue_rd_pool.h @@ -10,9 +10,8 @@ typedef struct mm_queuerdpool_t mm_queuerdpool_t; struct mm_queuerdpool_t { - pthread_spinlock_t lock; - mm_list_t list; - int count; + mm_list_t list; + int count; }; void mm_queuerdpool_init(mm_queuerdpool_t*);