diff --git a/src/mm_io.c b/src/mm_io.c index a382f9b5..eb2d71e0 100644 --- a/src/mm_io.c +++ b/src/mm_io.c @@ -22,6 +22,9 @@ machine_create_io(machine_t obj) /*mm_tlsio_init(&io->tls, io);*/ io->machine = machine; + /* read */ + mm_buf_init(&io->read_ahead); + io->read_ahead_size = 16384; #if 0 /* getaddrinfo */ @@ -33,39 +36,6 @@ machine_create_io(machine_t obj) io->gai_status = 0; io->gai_timedout = 0; io->gai_result = NULL; - /* connect */ - memset(&io->connect, 0, sizeof(io->connect)); - uv_timer_init(&machine->loop, &io->connect_timer); - io->connect.data = io; - io->connect_timer.data = io; - io->connect_timedout = 0; - io->connect_status = 0; - io->connected = 0; - io->connect_fiber = NULL; - /* accept */ - io->accept_status = 0; - io->accept_fiber = NULL; - io->accepted = 0; - /* read */ - mm_buf_init(&io->read_ahead); - uv_timer_init(&machine->loop, &io->read_timer); - io->read_ahead_size = 16384; - io->read_ahead_pos = 0; - io->read_ahead_pos_data = 0; - io->read_timer.data = io; - io->read_size = 0; - io->read_status = 0; - io->read_timedout = 0; - io->read_eof = 0; - io->read_fiber = NULL; - /* write */ - memset(&io->write, 0, sizeof(io->write)); - uv_timer_init(&machine->loop, &io->write_timer); - io->write.data = io; - io->write_timer.data = io; - io->write_timedout = 0; - io->write_fiber = NULL; - io->write_status = 0; #endif return io; } diff --git a/src/mm_io.h b/src/mm_io.h index e452a3d8..6772aee5 100644 --- a/src/mm_io.h +++ b/src/mm_io.h @@ -45,9 +45,9 @@ struct mm_io_t { mm_fiber_t *accept_fiber; int accepted; #endif + /* read */ -#if 0 - uv_timer_t read_timer; + mm_timer_t read_timer; int read_ahead_size; mm_buf_t read_ahead; int read_ahead_pos; @@ -57,7 +57,7 @@ struct mm_io_t { int read_eof; int read_status; mm_fiber_t *read_fiber; -#endif + /* write */ mm_timer_t write_timer; int write_timedout; diff --git a/src/mm_read.c b/src/mm_read.c index 7b83edab..72af8c9c 100644 --- a/src/mm_read.c +++ b/src/mm_read.c @@ -9,11 +9,11 @@ #include static void -mm_read_timeout_cb(uv_timer_t *handle) +mm_read_timer_cb(mm_timer_t *handle) { - mm_io_t *io = handle->data; + mm_io_t *io = handle->arg; io->read_timedout = 1; - io->read_status = -ETIMEDOUT; + io->read_status = ETIMEDOUT; mm_scheduler_wakeup(io->read_fiber); } @@ -22,63 +22,69 @@ mm_read_cancel_cb(void *obj, void *arg) { (void)obj; mm_io_t *io = arg; - mm_io_timer_stop(&io->read_timer); io->read_timedout = 0; - io->read_status = -ECANCELED; + io->read_status = ECANCELED; mm_scheduler_wakeup(io->read_fiber); } -static void -mm_read_alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) +static int +mm_read_cb(mm_fd_t *handle) { - (void)suggested_size; - mm_io_t *io = handle->data; - buf->base = io->read_ahead.pos; - buf->len = mm_buf_unused(&io->read_ahead); -} + mm_io_t *io = handle->on_read_arg; + mm_t *machine = machine = io->machine; + for (;;) + { + int size; + size = mm_socket_read(handle->fd, + io->read_ahead.pos, + mm_buf_unused(&io->read_ahead)); -static void -mm_read_cb(uv_stream_t *handle, ssize_t size, const uv_buf_t *buf) -{ - mm_io_t *io = handle->data; - (void)buf; + /* read error */ + if (size == -1) { + if (errno == EAGAIN || + errno == EWOULDBLOCK) + return 0; + if (errno == EINTR) + continue; + io->read_status = errno; + break; + } - if (size >= 0) { + /* connection closed */ + if (size == 0) { + io->connected = 0; + io->read_status = 0; + mm_loop_read(&machine->loop, &io->handle, NULL, NULL, 0); + break; + } + + /* handle incoming data */ mm_buf_advance(&io->read_ahead, size); assert(mm_buf_used(&io->read_ahead) <= io->read_ahead_size); + + /* readahead buffer now has atleast as minimum + * size requested by read operation */ + if (mm_buf_used(&io->read_ahead) >= io->read_size) { + io->read_ahead_pos = io->read_size; + break; + } + + /* stop reader when we reach readahead limit */ if (mm_buf_used(&io->read_ahead) == io->read_ahead_size) { - /* stop reader when we reach readahead limit */ - uv_read_stop(handle); + mm_loop_read(&machine->loop, &io->handle, NULL, NULL, 0); + break; } - } else { - /* connection closed */ - if (size == UV_EOF) { - io->connected = 0; - } - io->read_status = size; } - if (io->read_fiber) { - if (io->read_status != 0) { - mm_io_timer_stop(&io->read_timer); - if (! io->connected) - uv_read_stop(handle); - mm_scheduler_wakeup(io->read_fiber); - return; - } - /* readahead buffer now has atleast as minum - * size required by read operation */ - if (mm_buf_used(&io->read_ahead) >= io->read_size) { - mm_io_timer_stop(&io->read_timer); - io->read_ahead_pos = io->read_size; - mm_scheduler_wakeup(io->read_fiber); - } - } + if (io->read_fiber) + mm_scheduler_wakeup(io->read_fiber); + return 0; } int mm_read(mm_io_t *io, char *buf, int size, uint64_t time_ms) { + mm_t *machine = machine = io->machine; mm_fiber_t *current = mm_scheduler_current(&io->machine->scheduler); mm_io_set_errno(io, 0); if (mm_fiber_is_cancelled(current)) { @@ -134,30 +140,27 @@ mm_read(mm_io_t *io, char *buf, int size, uint64_t time_ms) mm_buf_reset(&io->read_ahead); } io->read_size = size; - io->read_fiber = current; io->read_ahead_pos_data = 0; io->read_ahead_pos = 0; - /* subscribe fiber for new data */ - mm_io_timer_start(&io->read_timer, mm_read_timeout_cb, - time_ms); - if (! uv_is_active((uv_handle_t*)&io->handle)) - { - rc = uv_read_start((uv_stream_t*)&io->handle, - mm_read_alloc_cb, - mm_read_cb); - if (rc < 0) { - mm_io_timer_stop(&io->read_timer); - io->read_fiber = NULL; - mm_io_set_errno_uv(io, rc); - return -1; - } + /* maybe subscribe for read event */ + rc = mm_loop_read(&machine->loop, &io->handle, mm_read_cb, io, 1); + if (rc == -1) { + mm_io_set_errno(io, errno); + return -1; } - mm_call_begin(¤t->call, mm_read_cancel_cb, io); - mm_scheduler_yield(&io->machine->scheduler); - mm_call_end(¤t->call); + /* wait for timedout, cancel or execution status */ + mm_timer_start(&machine->loop.clock, + &io->read_timer, + mm_read_timer_cb, io, time_ms); + mm_call_begin(¤t->call, mm_read_cancel_cb, io); + io->read_fiber = current; + mm_scheduler_yield(&machine->scheduler); io->read_fiber = NULL; + mm_call_end(¤t->call); + mm_timer_stop(&io->read_timer); + if (mm_buf_used(&io->read_ahead) >= io->read_size) { if (buf) { memcpy(buf, io->read_ahead.start + io->read_ahead_pos_data, size); @@ -165,8 +168,8 @@ mm_read(mm_io_t *io, char *buf, int size, uint64_t time_ms) return 0; } rc = io->read_status; - assert(rc < 0); - mm_io_set_errno_uv(io, rc); + assert(rc != 0); + mm_io_set_errno(io, rc); return -1; } @@ -174,8 +177,10 @@ MACHINE_API int machine_read(machine_io_t obj, char *buf, int size, uint64_t time_ms) { mm_io_t *io = obj; + /* if (mm_tls_is_active(&io->tls)) return mm_tlsio_read(&io->tls, buf, size); + */ return mm_read(io, buf, size, time_ms); } diff --git a/src/mm_read.h b/src/mm_read.h index 4f6cccc0..bde4f008 100644 --- a/src/mm_read.h +++ b/src/mm_read.h @@ -9,11 +9,4 @@ int mm_read(mm_io_t*, char*, int, uint64_t); -static inline void -mm_io_read_stop(mm_io_t *io) -{ - if (uv_is_active((uv_handle_t*)&io->handle)) - uv_read_stop((uv_stream_t*)&io->handle); -} - #endif diff --git a/src/mm_socket.c b/src/mm_socket.c index 61b63d25..42c28e6c 100644 --- a/src/mm_socket.c +++ b/src/mm_socket.c @@ -141,3 +141,10 @@ int mm_socket_write(int fd, void *buf, int size) rc = write(fd, buf, size); return rc; } + +int mm_socket_read(int fd, void *buf, int size) +{ + int rc; + rc = read(fd, buf, size); + return rc; +} diff --git a/src/mm_socket.h b/src/mm_socket.h index 07501257..3e362e7b 100644 --- a/src/mm_socket.h +++ b/src/mm_socket.h @@ -17,5 +17,6 @@ int mm_socket_error(int); int mm_socket_connect(int, struct sockaddr*); int mm_socket_bind(int, struct sockaddr*); int mm_socket_write(int, void*, int); +int mm_socket_read(int, void*, int); #endif diff --git a/src/mm_write.c b/src/mm_write.c index 9c63e84a..700720cc 100644 --- a/src/mm_write.c +++ b/src/mm_write.c @@ -85,7 +85,6 @@ mm_write(mm_io_t *io, char *buf, int size, uint64_t time_ms) int rc; rc = mm_loop_write(&machine->loop, &io->handle, mm_write_cb, io, 1); if (rc == -1) { - io->write_fiber = NULL; mm_io_set_errno(io, errno); return -1; }