From c8a6f2866446ffec5bd502eff13593084522a219 Mon Sep 17 00:00:00 2001 From: Dmitry Simonenko Date: Mon, 22 May 2017 16:31:37 +0300 Subject: [PATCH] machinarium: implement queue reader context --- src/mm_queue_rd.c | 58 +++++++++++++++++++++++++++++++++++++++++++++++ src/mm_queue_rd.h | 24 ++++++++++++++++++++ 2 files changed, 82 insertions(+) create mode 100644 src/mm_queue_rd.c create mode 100644 src/mm_queue_rd.h diff --git a/src/mm_queue_rd.c b/src/mm_queue_rd.c new file mode 100644 index 00000000..14cdb126 --- /dev/null +++ b/src/mm_queue_rd.c @@ -0,0 +1,58 @@ + +/* + * machinarium. + * + * cooperative multitasking engine. +*/ + +#include +#include + +static void +mm_queuerd_cb(mm_fd_t *handle) +{ + mm_queuerd_t *reader = handle->on_read_arg; + uint64_t id; + mm_socket_read(reader->fd.fd, &id, sizeof(id)); + mm_scheduler_wakeup(&mm_self->scheduler, reader->call.fiber); +} + +int mm_queuerd_open(mm_queuerd_t *reader) +{ + mm_list_init(&reader->link); + memset(&reader->call, 0, sizeof(reader->call)); + reader->fd.fd = eventfd(0, EFD_NONBLOCK); + if (reader->fd.fd == -1) + return -1; + int rc; + rc = mm_loop_add(&mm_self->loop, &reader->fd, 0); + if (rc == -1) + return -1; + rc = mm_loop_read(&mm_self->loop, &reader->fd, mm_queuerd_cb, + reader, MM_R); + if (rc == -1) { + mm_loop_delete(&mm_self->loop, &reader->fd); + return -1; + } + return 0; +} + +void mm_queuerd_close(mm_queuerd_t *reader) +{ + if (reader->fd.fd != -1) { + mm_loop_delete(&mm_self->loop, &reader->fd); + reader->fd.fd = -1; + } +} + +void mm_queuerd_notify(mm_queuerd_t *reader) +{ + uint64_t id = 1; + mm_socket_write(reader->fd.fd, &id, sizeof(id)); +} + +int mm_queuerd_wait(mm_queuerd_t *reader, int time_ms) +{ + mm_call(&reader->call, time_ms); + return reader->call.status; +} diff --git a/src/mm_queue_rd.h b/src/mm_queue_rd.h new file mode 100644 index 00000000..d637ba73 --- /dev/null +++ b/src/mm_queue_rd.h @@ -0,0 +1,24 @@ +#ifndef MM_QUEUE_RD_H_ +#define MM_QUEUE_RD_H_ + +/* + * machinarium. + * + * cooperative multitasking engine. +*/ + +typedef struct mm_queuerd_t mm_queuerd_t; + +struct mm_queuerd_t { + mm_call_t call; + int signaled; + mm_fd_t fd; + mm_list_t link; +}; + +int mm_queuerd_open(mm_queuerd_t*); +void mm_queuerd_close(mm_queuerd_t*); +void mm_queuerd_notify(mm_queuerd_t*); +int mm_queuerd_wait(mm_queuerd_t*, int); + +#endif