machinarium: rework readahead support

This commit is contained in:
Dmitry Simonenko 2017-04-18 15:02:28 +03:00
parent 259fc79315
commit 7807f860ea
6 changed files with 241 additions and 45 deletions

View File

@ -23,10 +23,7 @@ machine_create_io(machine_t obj)
io->machine = machine;
/* read */
/*
mm_buf_init(&io->read_ahead);
io->read_ahead_size = 16384;
*/
mm_buf_init(&io->readahead_buf);
#if 0
/* getaddrinfo */
@ -46,7 +43,7 @@ MACHINE_API void
machine_free_io(machine_io_t obj)
{
mm_io_t *io = obj;
/*mm_buf_free(&io->read_ahead);*/
mm_buf_free(&io->readahead_buf);
free(io);
}
@ -108,22 +105,6 @@ machine_set_keepalive(machine_io_t obj, int enable, int delay)
return 0;
}
MACHINE_API int
machine_set_readahead(machine_io_t obj, int size)
{
mm_io_t *io = obj;
/*
if (mm_buf_size(&io->read_ahead) > 0) {
mm_io_set_errno(io, EINPROGRESS);
return -1;
}
io->read_ahead_size = size;
*/
(void)io;
(void)size;
return 0;
}
int mm_io_socket_set(mm_io_t *io, int fd)
{
io->fd = fd;

View File

@ -54,6 +54,10 @@ struct mm_io_t {
int read_eof;
int read_status;
mm_fiber_t *read_fiber;
mm_buf_t readahead_buf;
int readahead_pos;
int readahead_pos_read;
int readahead_size;
/*
mm_timer_t read_timer;

View File

@ -63,31 +63,16 @@ wakeup:
return 0;
}
int
mm_read(mm_io_t *io, char *buf, int size, uint64_t time_ms)
static int
mm_read_default(mm_io_t *io, uint64_t time_ms)
{
mm_t *machine = machine = io->machine;
mm_fiber_t *current = mm_scheduler_current(&io->machine->scheduler);
mm_io_set_errno(io, 0);
if (mm_fiber_is_cancelled(current)) {
mm_io_set_errno(io, ECANCELED);
return -1;
}
if (io->read_fiber) {
mm_io_set_errno(io, EINPROGRESS);
return -1;
}
if (! io->connected) {
mm_io_set_errno(io, ENOTCONN);
return -1;
}
io->read_status = 0;
io->read_timedout = 0;
io->read_eof = 0;
io->read_fiber = NULL;
io->read_size = size;
io->read_pos = 0;
io->read_buf = buf;
io->read_status = 0;
io->read_eof = 0;
io->read_fiber = NULL;
io->read_pos = 0;
io->handle.on_read = mm_read_cb;
io->handle.on_read_arg = io;
@ -126,7 +111,7 @@ mm_read(mm_io_t *io, char *buf, int size, uint64_t time_ms)
rc = io->read_status;
if (rc == 0) {
if (io->read_pos != size) {
if (io->read_pos != io->read_size) {
mm_io_set_errno(io, ECONNRESET);
return -1;
}
@ -136,6 +121,181 @@ mm_read(mm_io_t *io, char *buf, int size, uint64_t time_ms)
return -1;
}
static int
mm_readahead_read(mm_io_t *io, uint64_t time_ms);
int mm_readahead_stop(mm_io_t *io);
static int
mm_readahead_cb(mm_fd_t *handle)
{
mm_io_t *io = handle->on_read_arg;
mm_t *machine = machine = io->machine;
int left = io->readahead_size - io->readahead_pos;
int rc;
while (left > 0)
{
rc = mm_socket_read(io->fd, io->readahead_buf.start + io->readahead_pos, left);
if (rc == -1) {
if (errno == EAGAIN ||
errno == EWOULDBLOCK)
break;
if (errno == EINTR)
continue;
io->read_status = errno;
if (io->read_fiber)
mm_scheduler_wakeup(io->read_fiber);
return 0;
}
io->readahead_pos += rc;
left = io->readahead_size - io->readahead_pos;
assert(left >= 0);
if (rc == 0) {
/* eof */
mm_readahead_stop(io);
io->read_eof = 1;
io->read_status = 0;
break;
}
}
io->read_status = 0;
if (io->read_fiber) {
int ra_left = io->readahead_pos - io->readahead_pos_read;
if (io->read_eof || ra_left >= io->read_size)
mm_scheduler_wakeup(io->read_fiber);
}
return 0;
}
int
mm_readahead_start(mm_io_t *io)
{
mm_t *machine = machine = io->machine;
int rc;
rc = mm_loop_read(&machine->loop, &io->handle, mm_readahead_cb, io, 1);
if (rc == -1) {
mm_io_set_errno(io, errno);
return -1;
}
return 0;
}
int
mm_readahead_stop(mm_io_t *io)
{
mm_t *machine = machine = io->machine;
int rc;
rc = mm_loop_read(&machine->loop, &io->handle, NULL, NULL, 0);
if (rc == -1) {
mm_io_set_errno(io, errno);
return -1;
}
return 0;
}
static int
mm_readahead_read(mm_io_t *io, uint64_t time_ms)
{
mm_t *machine = machine = io->machine;
mm_fiber_t *current = mm_scheduler_current(&io->machine->scheduler);
if (io->read_size > io->readahead_size) {
mm_io_set_errno(io, EINVAL);
return -1;
}
/* try to use readahead first */
assert(io->readahead_pos >= io->readahead_pos_read);
int ra_left = io->readahead_pos - io->readahead_pos_read;
if (ra_left >= io->read_size) {
memcpy(io->read_buf, io->readahead_buf.start + io->readahead_pos_read,
io->read_size);
io->readahead_pos_read += io->read_size;
return 0;
}
if (io->read_status != 0) {
mm_io_set_errno(io, io->read_status);
return -1;
}
if (io->read_eof) {
mm_io_set_errno(io, ECONNRESET);
return -1;
}
/* copy first readahead chunk */
int copy_pos = 0;
if (ra_left > 0) {
memcpy(io->read_buf,
io->readahead_buf.start + io->readahead_pos_read,
ra_left);
io->readahead_pos_read += ra_left;
io->read_size -= ra_left;
copy_pos = ra_left;
}
/* reset readahead position */
assert(io->readahead_pos_read == io->readahead_pos);
io->readahead_pos = 0;
io->readahead_pos_read = 0;
/* wait for timedout, cancel or execution status */
mm_timer_start(&machine->loop.clock,
&io->read_timer,
mm_read_timer_cb, io, time_ms);
mm_call_begin(&current->call, mm_read_cancel_cb, io);
io->read_fiber = current;
mm_scheduler_yield(&machine->scheduler);
io->read_fiber = NULL;
mm_call_end(&current->call);
mm_timer_stop(&io->read_timer);
int rc;
rc = io->read_status;
if (rc == 0) {
ra_left = io->readahead_pos - io->readahead_pos_read;
if (ra_left < io->read_size) {
mm_io_set_errno(io, ECONNRESET);
return -1;
}
memcpy(io->read_buf + copy_pos,
io->readahead_buf.start + io->readahead_pos_read,
io->read_size);
io->readahead_pos_read += io->read_size;
return 0;
}
mm_io_set_errno(io, rc);
return -1;
}
int
mm_read(mm_io_t *io, char *buf, int size, uint64_t time_ms)
{
mm_t *machine = machine = io->machine;
mm_fiber_t *current = mm_scheduler_current(&io->machine->scheduler);
mm_io_set_errno(io, 0);
if (mm_fiber_is_cancelled(current)) {
mm_io_set_errno(io, ECANCELED);
return -1;
}
if (io->read_fiber) {
mm_io_set_errno(io, EINPROGRESS);
return -1;
}
if (! io->connected) {
mm_io_set_errno(io, ENOTCONN);
return -1;
}
io->read_size = size;
io->read_buf = buf;
io->read_timedout = 0;
if (io->readahead_size > 0)
return mm_readahead_read(io, time_ms);
return mm_read_default(io, time_ms);
}
MACHINE_API int
machine_read(machine_io_t obj, char *buf, int size, uint64_t time_ms)
{
@ -151,3 +311,42 @@ machine_read_timedout(machine_io_t obj)
mm_io_t *io = obj;
return io->read_timedout;
}
MACHINE_API int
machine_set_readahead(machine_io_t obj, int size)
{
mm_io_t *io = obj;
mm_fiber_t *current = mm_scheduler_current(&io->machine->scheduler);
mm_io_set_errno(io, 0);
if (mm_fiber_is_cancelled(current)) {
mm_io_set_errno(io, ECANCELED);
return -1;
}
if (io->read_fiber) {
mm_io_set_errno(io, EINPROGRESS);
return -1;
}
if (! io->connected) {
mm_io_set_errno(io, ENOTCONN);
return -1;
}
if (size <= 0) {
mm_io_set_errno(io, EINVAL);
return -1;
}
if (io->readahead_size > 0) {
mm_io_set_errno(io, EINPROGRESS);
return -1;
}
int rc;
rc = mm_buf_ensure(&io->readahead_buf, size);
if (rc == -1) {
mm_io_set_errno(io, ENOMEM);
return -1;
}
rc = mm_readahead_start(io);
if (rc == -1)
return -1;
io->readahead_size = size;
return 0;
}

View File

@ -9,4 +9,7 @@
int mm_read(mm_io_t*, char*, int, uint64_t);
int mm_readahead_start(mm_io_t*);
int mm_readahead_stop(mm_io_t*);
#endif

View File

@ -6,7 +6,6 @@ LFLAGS = $(LFLAGS_LIB)
TESTS = test_new \
test_create \
test_sleep \
test_sleep_2 \
test_wait \
test_condition \
test_condition_2 \

View File

@ -77,17 +77,27 @@ client(void *arg)
if (rc < 0) {
printf("client: connect failed\n");
machine_close(client);
machine_free_io(client);
return;
}
printf("client: connected\n");
rc = machine_set_readahead(client, 1024);
if (rc < 0) {
printf("client: %s\n", machine_error(client));
machine_close(client);
machine_free_io(client);
return;
}
/* read and fill readahead buffer */
char buf[16];
rc = machine_read(client, buf, 11, 0);
if (rc < 0) {
printf("client: read failed\n");
machine_close(client);
machine_free_io(client);
return;
}