machinarium: switch machine_read() to new event loop impl

This commit is contained in:
Dmitry Simonenko 2017-04-12 16:22:41 +03:00
parent 18b834b6f8
commit 46ad1cc0df
7 changed files with 81 additions and 106 deletions

View File

@ -22,6 +22,9 @@ machine_create_io(machine_t obj)
/*mm_tlsio_init(&io->tls, io);*/
io->machine = machine;
/* read */
mm_buf_init(&io->read_ahead);
io->read_ahead_size = 16384;
#if 0
/* getaddrinfo */
@ -33,39 +36,6 @@ machine_create_io(machine_t obj)
io->gai_status = 0;
io->gai_timedout = 0;
io->gai_result = NULL;
/* connect */
memset(&io->connect, 0, sizeof(io->connect));
uv_timer_init(&machine->loop, &io->connect_timer);
io->connect.data = io;
io->connect_timer.data = io;
io->connect_timedout = 0;
io->connect_status = 0;
io->connected = 0;
io->connect_fiber = NULL;
/* accept */
io->accept_status = 0;
io->accept_fiber = NULL;
io->accepted = 0;
/* read */
mm_buf_init(&io->read_ahead);
uv_timer_init(&machine->loop, &io->read_timer);
io->read_ahead_size = 16384;
io->read_ahead_pos = 0;
io->read_ahead_pos_data = 0;
io->read_timer.data = io;
io->read_size = 0;
io->read_status = 0;
io->read_timedout = 0;
io->read_eof = 0;
io->read_fiber = NULL;
/* write */
memset(&io->write, 0, sizeof(io->write));
uv_timer_init(&machine->loop, &io->write_timer);
io->write.data = io;
io->write_timer.data = io;
io->write_timedout = 0;
io->write_fiber = NULL;
io->write_status = 0;
#endif
return io;
}

View File

@ -45,9 +45,9 @@ struct mm_io_t {
mm_fiber_t *accept_fiber;
int accepted;
#endif
/* read */
#if 0
uv_timer_t read_timer;
mm_timer_t read_timer;
int read_ahead_size;
mm_buf_t read_ahead;
int read_ahead_pos;
@ -57,7 +57,7 @@ struct mm_io_t {
int read_eof;
int read_status;
mm_fiber_t *read_fiber;
#endif
/* write */
mm_timer_t write_timer;
int write_timedout;

View File

@ -9,11 +9,11 @@
#include <machinarium.h>
static void
mm_read_timeout_cb(uv_timer_t *handle)
mm_read_timer_cb(mm_timer_t *handle)
{
mm_io_t *io = handle->data;
mm_io_t *io = handle->arg;
io->read_timedout = 1;
io->read_status = -ETIMEDOUT;
io->read_status = ETIMEDOUT;
mm_scheduler_wakeup(io->read_fiber);
}
@ -22,63 +22,69 @@ mm_read_cancel_cb(void *obj, void *arg)
{
(void)obj;
mm_io_t *io = arg;
mm_io_timer_stop(&io->read_timer);
io->read_timedout = 0;
io->read_status = -ECANCELED;
io->read_status = ECANCELED;
mm_scheduler_wakeup(io->read_fiber);
}
static void
mm_read_alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)
static int
mm_read_cb(mm_fd_t *handle)
{
(void)suggested_size;
mm_io_t *io = handle->data;
buf->base = io->read_ahead.pos;
buf->len = mm_buf_unused(&io->read_ahead);
}
mm_io_t *io = handle->on_read_arg;
mm_t *machine = machine = io->machine;
for (;;)
{
int size;
size = mm_socket_read(handle->fd,
io->read_ahead.pos,
mm_buf_unused(&io->read_ahead));
static void
mm_read_cb(uv_stream_t *handle, ssize_t size, const uv_buf_t *buf)
{
mm_io_t *io = handle->data;
(void)buf;
/* read error */
if (size == -1) {
if (errno == EAGAIN ||
errno == EWOULDBLOCK)
return 0;
if (errno == EINTR)
continue;
io->read_status = errno;
break;
}
if (size >= 0) {
/* connection closed */
if (size == 0) {
io->connected = 0;
io->read_status = 0;
mm_loop_read(&machine->loop, &io->handle, NULL, NULL, 0);
break;
}
/* handle incoming data */
mm_buf_advance(&io->read_ahead, size);
assert(mm_buf_used(&io->read_ahead) <= io->read_ahead_size);
/* readahead buffer now has atleast as minimum
* size requested by read operation */
if (mm_buf_used(&io->read_ahead) >= io->read_size) {
io->read_ahead_pos = io->read_size;
break;
}
/* stop reader when we reach readahead limit */
if (mm_buf_used(&io->read_ahead) == io->read_ahead_size) {
/* stop reader when we reach readahead limit */
uv_read_stop(handle);
mm_loop_read(&machine->loop, &io->handle, NULL, NULL, 0);
break;
}
} else {
/* connection closed */
if (size == UV_EOF) {
io->connected = 0;
}
io->read_status = size;
}
if (io->read_fiber) {
if (io->read_status != 0) {
mm_io_timer_stop(&io->read_timer);
if (! io->connected)
uv_read_stop(handle);
mm_scheduler_wakeup(io->read_fiber);
return;
}
/* readahead buffer now has atleast as minum
* size required by read operation */
if (mm_buf_used(&io->read_ahead) >= io->read_size) {
mm_io_timer_stop(&io->read_timer);
io->read_ahead_pos = io->read_size;
mm_scheduler_wakeup(io->read_fiber);
}
}
if (io->read_fiber)
mm_scheduler_wakeup(io->read_fiber);
return 0;
}
int
mm_read(mm_io_t *io, char *buf, int size, uint64_t time_ms)
{
mm_t *machine = machine = io->machine;
mm_fiber_t *current = mm_scheduler_current(&io->machine->scheduler);
mm_io_set_errno(io, 0);
if (mm_fiber_is_cancelled(current)) {
@ -134,30 +140,27 @@ mm_read(mm_io_t *io, char *buf, int size, uint64_t time_ms)
mm_buf_reset(&io->read_ahead);
}
io->read_size = size;
io->read_fiber = current;
io->read_ahead_pos_data = 0;
io->read_ahead_pos = 0;
/* subscribe fiber for new data */
mm_io_timer_start(&io->read_timer, mm_read_timeout_cb,
time_ms);
if (! uv_is_active((uv_handle_t*)&io->handle))
{
rc = uv_read_start((uv_stream_t*)&io->handle,
mm_read_alloc_cb,
mm_read_cb);
if (rc < 0) {
mm_io_timer_stop(&io->read_timer);
io->read_fiber = NULL;
mm_io_set_errno_uv(io, rc);
return -1;
}
/* maybe subscribe for read event */
rc = mm_loop_read(&machine->loop, &io->handle, mm_read_cb, io, 1);
if (rc == -1) {
mm_io_set_errno(io, errno);
return -1;
}
mm_call_begin(&current->call, mm_read_cancel_cb, io);
mm_scheduler_yield(&io->machine->scheduler);
mm_call_end(&current->call);
/* wait for timedout, cancel or execution status */
mm_timer_start(&machine->loop.clock,
&io->read_timer,
mm_read_timer_cb, io, time_ms);
mm_call_begin(&current->call, mm_read_cancel_cb, io);
io->read_fiber = current;
mm_scheduler_yield(&machine->scheduler);
io->read_fiber = NULL;
mm_call_end(&current->call);
mm_timer_stop(&io->read_timer);
if (mm_buf_used(&io->read_ahead) >= io->read_size) {
if (buf) {
memcpy(buf, io->read_ahead.start + io->read_ahead_pos_data, size);
@ -165,8 +168,8 @@ mm_read(mm_io_t *io, char *buf, int size, uint64_t time_ms)
return 0;
}
rc = io->read_status;
assert(rc < 0);
mm_io_set_errno_uv(io, rc);
assert(rc != 0);
mm_io_set_errno(io, rc);
return -1;
}
@ -174,8 +177,10 @@ MACHINE_API int
machine_read(machine_io_t obj, char *buf, int size, uint64_t time_ms)
{
mm_io_t *io = obj;
/*
if (mm_tls_is_active(&io->tls))
return mm_tlsio_read(&io->tls, buf, size);
*/
return mm_read(io, buf, size, time_ms);
}

View File

@ -9,11 +9,4 @@
int mm_read(mm_io_t*, char*, int, uint64_t);
static inline void
mm_io_read_stop(mm_io_t *io)
{
if (uv_is_active((uv_handle_t*)&io->handle))
uv_read_stop((uv_stream_t*)&io->handle);
}
#endif

View File

@ -141,3 +141,10 @@ int mm_socket_write(int fd, void *buf, int size)
rc = write(fd, buf, size);
return rc;
}
int mm_socket_read(int fd, void *buf, int size)
{
int rc;
rc = read(fd, buf, size);
return rc;
}

View File

@ -17,5 +17,6 @@ int mm_socket_error(int);
int mm_socket_connect(int, struct sockaddr*);
int mm_socket_bind(int, struct sockaddr*);
int mm_socket_write(int, void*, int);
int mm_socket_read(int, void*, int);
#endif

View File

@ -85,7 +85,6 @@ mm_write(mm_io_t *io, char *buf, int size, uint64_t time_ms)
int rc;
rc = mm_loop_write(&machine->loop, &io->handle, mm_write_cb, io, 1);
if (rc == -1) {
io->write_fiber = NULL;
mm_io_set_errno(io, errno);
return -1;
}