diff --git a/src/mm_io.c b/src/mm_io.c index 0ca1af1c..ddae3f1f 100644 --- a/src/mm_io.c +++ b/src/mm_io.c @@ -23,10 +23,7 @@ machine_create_io(machine_t obj) io->machine = machine; /* read */ - /* - mm_buf_init(&io->read_ahead); - io->read_ahead_size = 16384; - */ + mm_buf_init(&io->readahead_buf); #if 0 /* getaddrinfo */ @@ -46,7 +43,7 @@ MACHINE_API void machine_free_io(machine_io_t obj) { mm_io_t *io = obj; - /*mm_buf_free(&io->read_ahead);*/ + mm_buf_free(&io->readahead_buf); free(io); } @@ -108,22 +105,6 @@ machine_set_keepalive(machine_io_t obj, int enable, int delay) return 0; } -MACHINE_API int -machine_set_readahead(machine_io_t obj, int size) -{ - mm_io_t *io = obj; - /* - if (mm_buf_size(&io->read_ahead) > 0) { - mm_io_set_errno(io, EINPROGRESS); - return -1; - } - io->read_ahead_size = size; - */ - (void)io; - (void)size; - return 0; -} - int mm_io_socket_set(mm_io_t *io, int fd) { io->fd = fd; diff --git a/src/mm_io.h b/src/mm_io.h index cb4240b1..0c671c42 100644 --- a/src/mm_io.h +++ b/src/mm_io.h @@ -54,6 +54,10 @@ struct mm_io_t { int read_eof; int read_status; mm_fiber_t *read_fiber; + mm_buf_t readahead_buf; + int readahead_pos; + int readahead_pos_read; + int readahead_size; /* mm_timer_t read_timer; diff --git a/src/mm_read.c b/src/mm_read.c index 6907da29..e78acdc6 100644 --- a/src/mm_read.c +++ b/src/mm_read.c @@ -63,31 +63,16 @@ wakeup: return 0; } -int -mm_read(mm_io_t *io, char *buf, int size, uint64_t time_ms) +static int +mm_read_default(mm_io_t *io, uint64_t time_ms) { mm_t *machine = machine = io->machine; mm_fiber_t *current = mm_scheduler_current(&io->machine->scheduler); - mm_io_set_errno(io, 0); - if (mm_fiber_is_cancelled(current)) { - mm_io_set_errno(io, ECANCELED); - return -1; - } - if (io->read_fiber) { - mm_io_set_errno(io, EINPROGRESS); - return -1; - } - if (! io->connected) { - mm_io_set_errno(io, ENOTCONN); - return -1; - } - io->read_status = 0; - io->read_timedout = 0; - io->read_eof = 0; - io->read_fiber = NULL; - io->read_size = size; - io->read_pos = 0; - io->read_buf = buf; + + io->read_status = 0; + io->read_eof = 0; + io->read_fiber = NULL; + io->read_pos = 0; io->handle.on_read = mm_read_cb; io->handle.on_read_arg = io; @@ -126,7 +111,7 @@ mm_read(mm_io_t *io, char *buf, int size, uint64_t time_ms) rc = io->read_status; if (rc == 0) { - if (io->read_pos != size) { + if (io->read_pos != io->read_size) { mm_io_set_errno(io, ECONNRESET); return -1; } @@ -136,6 +121,181 @@ mm_read(mm_io_t *io, char *buf, int size, uint64_t time_ms) return -1; } +static int +mm_readahead_read(mm_io_t *io, uint64_t time_ms); + +int mm_readahead_stop(mm_io_t *io); + +static int +mm_readahead_cb(mm_fd_t *handle) +{ + mm_io_t *io = handle->on_read_arg; + mm_t *machine = machine = io->machine; + + int left = io->readahead_size - io->readahead_pos; + int rc; + while (left > 0) + { + rc = mm_socket_read(io->fd, io->readahead_buf.start + io->readahead_pos, left); + if (rc == -1) { + if (errno == EAGAIN || + errno == EWOULDBLOCK) + break; + if (errno == EINTR) + continue; + io->read_status = errno; + if (io->read_fiber) + mm_scheduler_wakeup(io->read_fiber); + return 0; + } + io->readahead_pos += rc; + left = io->readahead_size - io->readahead_pos; + assert(left >= 0); + if (rc == 0) { + /* eof */ + mm_readahead_stop(io); + io->read_eof = 1; + io->read_status = 0; + break; + } + } + io->read_status = 0; + if (io->read_fiber) { + int ra_left = io->readahead_pos - io->readahead_pos_read; + if (io->read_eof || ra_left >= io->read_size) + mm_scheduler_wakeup(io->read_fiber); + } + return 0; +} + +int +mm_readahead_start(mm_io_t *io) +{ + mm_t *machine = machine = io->machine; + int rc; + rc = mm_loop_read(&machine->loop, &io->handle, mm_readahead_cb, io, 1); + if (rc == -1) { + mm_io_set_errno(io, errno); + return -1; + } + return 0; +} + +int +mm_readahead_stop(mm_io_t *io) +{ + mm_t *machine = machine = io->machine; + int rc; + rc = mm_loop_read(&machine->loop, &io->handle, NULL, NULL, 0); + if (rc == -1) { + mm_io_set_errno(io, errno); + return -1; + } + return 0; +} + +static int +mm_readahead_read(mm_io_t *io, uint64_t time_ms) +{ + mm_t *machine = machine = io->machine; + mm_fiber_t *current = mm_scheduler_current(&io->machine->scheduler); + + if (io->read_size > io->readahead_size) { + mm_io_set_errno(io, EINVAL); + return -1; + } + + /* try to use readahead first */ + assert(io->readahead_pos >= io->readahead_pos_read); + int ra_left = io->readahead_pos - io->readahead_pos_read; + if (ra_left >= io->read_size) { + memcpy(io->read_buf, io->readahead_buf.start + io->readahead_pos_read, + io->read_size); + io->readahead_pos_read += io->read_size; + return 0; + } + + if (io->read_status != 0) { + mm_io_set_errno(io, io->read_status); + return -1; + } + + if (io->read_eof) { + mm_io_set_errno(io, ECONNRESET); + return -1; + } + + /* copy first readahead chunk */ + int copy_pos = 0; + if (ra_left > 0) { + memcpy(io->read_buf, + io->readahead_buf.start + io->readahead_pos_read, + ra_left); + io->readahead_pos_read += ra_left; + io->read_size -= ra_left; + copy_pos = ra_left; + } + + /* reset readahead position */ + assert(io->readahead_pos_read == io->readahead_pos); + io->readahead_pos = 0; + io->readahead_pos_read = 0; + + /* wait for timedout, cancel or execution status */ + mm_timer_start(&machine->loop.clock, + &io->read_timer, + mm_read_timer_cb, io, time_ms); + mm_call_begin(¤t->call, mm_read_cancel_cb, io); + io->read_fiber = current; + mm_scheduler_yield(&machine->scheduler); + io->read_fiber = NULL; + mm_call_end(¤t->call); + mm_timer_stop(&io->read_timer); + + int rc; + rc = io->read_status; + if (rc == 0) { + ra_left = io->readahead_pos - io->readahead_pos_read; + if (ra_left < io->read_size) { + mm_io_set_errno(io, ECONNRESET); + return -1; + } + memcpy(io->read_buf + copy_pos, + io->readahead_buf.start + io->readahead_pos_read, + io->read_size); + io->readahead_pos_read += io->read_size; + return 0; + } + mm_io_set_errno(io, rc); + return -1; +} + +int +mm_read(mm_io_t *io, char *buf, int size, uint64_t time_ms) +{ + mm_t *machine = machine = io->machine; + mm_fiber_t *current = mm_scheduler_current(&io->machine->scheduler); + mm_io_set_errno(io, 0); + if (mm_fiber_is_cancelled(current)) { + mm_io_set_errno(io, ECANCELED); + return -1; + } + if (io->read_fiber) { + mm_io_set_errno(io, EINPROGRESS); + return -1; + } + if (! io->connected) { + mm_io_set_errno(io, ENOTCONN); + return -1; + } + io->read_size = size; + io->read_buf = buf; + io->read_timedout = 0; + if (io->readahead_size > 0) + return mm_readahead_read(io, time_ms); + return mm_read_default(io, time_ms); +} + MACHINE_API int machine_read(machine_io_t obj, char *buf, int size, uint64_t time_ms) { @@ -151,3 +311,42 @@ machine_read_timedout(machine_io_t obj) mm_io_t *io = obj; return io->read_timedout; } + +MACHINE_API int +machine_set_readahead(machine_io_t obj, int size) +{ + mm_io_t *io = obj; + mm_fiber_t *current = mm_scheduler_current(&io->machine->scheduler); + mm_io_set_errno(io, 0); + if (mm_fiber_is_cancelled(current)) { + mm_io_set_errno(io, ECANCELED); + return -1; + } + if (io->read_fiber) { + mm_io_set_errno(io, EINPROGRESS); + return -1; + } + if (! io->connected) { + mm_io_set_errno(io, ENOTCONN); + return -1; + } + if (size <= 0) { + mm_io_set_errno(io, EINVAL); + return -1; + } + if (io->readahead_size > 0) { + mm_io_set_errno(io, EINPROGRESS); + return -1; + } + int rc; + rc = mm_buf_ensure(&io->readahead_buf, size); + if (rc == -1) { + mm_io_set_errno(io, ENOMEM); + return -1; + } + rc = mm_readahead_start(io); + if (rc == -1) + return -1; + io->readahead_size = size; + return 0; +} diff --git a/src/mm_read.h b/src/mm_read.h index bde4f008..9217f0b2 100644 --- a/src/mm_read.h +++ b/src/mm_read.h @@ -9,4 +9,7 @@ int mm_read(mm_io_t*, char*, int, uint64_t); +int mm_readahead_start(mm_io_t*); +int mm_readahead_stop(mm_io_t*); + #endif diff --git a/tests/makefile b/tests/makefile index 97f5b50d..06ea2757 100644 --- a/tests/makefile +++ b/tests/makefile @@ -6,7 +6,6 @@ LFLAGS = $(LFLAGS_LIB) TESTS = test_new \ test_create \ test_sleep \ - test_sleep_2 \ test_wait \ test_condition \ test_condition_2 \ diff --git a/tests/test_client_server_ra.c b/tests/test_client_server_ra.c index 369bb4fc..5fa7321f 100644 --- a/tests/test_client_server_ra.c +++ b/tests/test_client_server_ra.c @@ -77,17 +77,27 @@ client(void *arg) if (rc < 0) { printf("client: connect failed\n"); machine_close(client); + machine_free_io(client); return; } printf("client: connected\n"); + rc = machine_set_readahead(client, 1024); + if (rc < 0) { + printf("client: %s\n", machine_error(client)); + machine_close(client); + machine_free_io(client); + return; + } + /* read and fill readahead buffer */ char buf[16]; rc = machine_read(client, buf, 11, 0); if (rc < 0) { printf("client: read failed\n"); machine_close(client); + machine_free_io(client); return; }