machinarium: implement queue reader context

This commit is contained in:
Dmitry Simonenko 2017-05-22 16:31:37 +03:00
parent 9b675b36ab
commit c8a6f28664
2 changed files with 82 additions and 0 deletions

58
src/mm_queue_rd.c Normal file
View File

@ -0,0 +1,58 @@
/*
* machinarium.
*
* cooperative multitasking engine.
*/
#include <machinarium.h>
#include <machinarium_private.h>
static void
mm_queuerd_cb(mm_fd_t *handle)
{
mm_queuerd_t *reader = handle->on_read_arg;
uint64_t id;
mm_socket_read(reader->fd.fd, &id, sizeof(id));
mm_scheduler_wakeup(&mm_self->scheduler, reader->call.fiber);
}
int mm_queuerd_open(mm_queuerd_t *reader)
{
mm_list_init(&reader->link);
memset(&reader->call, 0, sizeof(reader->call));
reader->fd.fd = eventfd(0, EFD_NONBLOCK);
if (reader->fd.fd == -1)
return -1;
int rc;
rc = mm_loop_add(&mm_self->loop, &reader->fd, 0);
if (rc == -1)
return -1;
rc = mm_loop_read(&mm_self->loop, &reader->fd, mm_queuerd_cb,
reader, MM_R);
if (rc == -1) {
mm_loop_delete(&mm_self->loop, &reader->fd);
return -1;
}
return 0;
}
void mm_queuerd_close(mm_queuerd_t *reader)
{
if (reader->fd.fd != -1) {
mm_loop_delete(&mm_self->loop, &reader->fd);
reader->fd.fd = -1;
}
}
void mm_queuerd_notify(mm_queuerd_t *reader)
{
uint64_t id = 1;
mm_socket_write(reader->fd.fd, &id, sizeof(id));
}
int mm_queuerd_wait(mm_queuerd_t *reader, int time_ms)
{
mm_call(&reader->call, time_ms);
return reader->call.status;
}

24
src/mm_queue_rd.h Normal file
View File

@ -0,0 +1,24 @@
#ifndef MM_QUEUE_RD_H_
#define MM_QUEUE_RD_H_
/*
* machinarium.
*
* cooperative multitasking engine.
*/
typedef struct mm_queuerd_t mm_queuerd_t;
struct mm_queuerd_t {
mm_call_t call;
int signaled;
mm_fd_t fd;
mm_list_t link;
};
int mm_queuerd_open(mm_queuerd_t*);
void mm_queuerd_close(mm_queuerd_t*);
void mm_queuerd_notify(mm_queuerd_t*);
int mm_queuerd_wait(mm_queuerd_t*, int);
#endif