From 3d9edfd18fb018d84a8b0b95aa3e898aa995a962 Mon Sep 17 00:00:00 2001 From: Dmitry Simonenko Date: Tue, 8 Nov 2016 15:48:18 +0300 Subject: [PATCH] fluent: add io read support --- lib/fluent.h | 4 ++ lib/ft_ioread.c | 103 ++++++++++++++++++++++++++++++++++++++++++++++++ lib/makefile | 4 +- 3 files changed, 110 insertions(+), 1 deletion(-) diff --git a/lib/fluent.h b/lib/fluent.h index 5ae57408..e2479d55 100644 --- a/lib/fluent.h +++ b/lib/fluent.h @@ -41,5 +41,9 @@ FLUENT_API int ft_is_connected(ftio_t); FLUENT_API int ft_connect(ftio_t, char *addr, int port, uint64_t time_ms); FLUENT_API int ft_connect_is_timeout(ftio_t); FLUENT_API int ft_bind(ftio_t, char *addr, int port); +FLUENT_API int ft_accept(ftio_t, ftio_t *client); +FLUENT_API int ft_read(ftio_t, int size, uint64_t time_ms); +FLUENT_API int ft_read_is_timeout(ftio_t); +FLUENT_API char *ft_read_buf(ftio_t); #endif diff --git a/lib/ft_ioread.c b/lib/ft_ioread.c index e69de29b..848e2133 100644 --- a/lib/ft_ioread.c +++ b/lib/ft_ioread.c @@ -0,0 +1,103 @@ + +/* + * fluent. + * + * Cooperative multitasking engine. +*/ + +#include +#include + +static void +ft_io_read_timeout_cb(uv_timer_t *handle) +{ + ftio *io = handle->data; + uv_read_stop((uv_stream_t*)&io->handle); + io->read_timeout = 1; + io->read_status = -ETIMEDOUT; + ft_wakeup(io->f, io->read_fiber); +} + +static void +ft_io_read_alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) +{ + ftio *io = handle->data; + int to_read = + io->read_size - ft_bufused(&io->read_buf); + if (to_read > suggested_size) + to_read = suggested_size; + int rc; + rc = ft_bufensure(&io->read_buf, to_read); + if (rc == -1) + return; + buf->base = io->read_buf.p; + buf->len = to_read; +} + +static void +ft_io_read_cb(uv_stream_t *handle, ssize_t size, const uv_buf_t *buf) +{ + ftio *io = handle->data; + assert(! io->read_timeout); + if (size >= 0) { + ft_bufadvance(&io->read_buf, size); + if (ft_bufused(&io->read_buf) < io->read_size) { + /* expect more data */ + return; + } + /* read complete */ + assert(ft_bufused(&io->read_buf) == io->read_size); + } else { + /* connection closed */ + if (size == UV_EOF) { + io->connected = 0; + } + io->read_status = size; + } + + ft_io_timer_stop(&io->read_timer); + uv_read_stop(handle); + ft_wakeup(io->f, io->read_fiber); +} + +FLUENT_API int +ft_read(ftio_t iop, int size, uint64_t time_ms) +{ + ftio *io = iop; + if (!io->connected || io->read_fiber) + return -1; + io->read_status = 0; + io->read_timeout = 0; + io->read_size = size; + io->read_fiber = ft_current(io->f); + ft_bufreset(&io->read_buf); + + ft_io_timer_start(&io->connect_timer, ft_io_read_timeout_cb, + time_ms); + int rc; + rc = uv_read_start((uv_stream_t*)&io->handle, + ft_io_read_alloc_cb, + ft_io_read_cb); + if (rc < 0) { + io->read_fiber = NULL; + return rc; + } + ft_scheduler_yield(&io->f->scheduler); + rc = io->read_status; + io->read_fiber = NULL; + return rc; +} + +FLUENT_API int +ft_read_is_timeout(ftio_t iop) +{ + ftio *io = iop; + return io->read_timeout; +} + +FLUENT_API char* +ft_read_buf(ftio_t iop) +{ + ftio *io = iop; + return io->read_buf.s; +} diff --git a/lib/makefile b/lib/makefile index aca049a5..39ce8bf9 100644 --- a/lib/makefile +++ b/lib/makefile @@ -10,7 +10,9 @@ OBJECTS = ft_context.o \ ft.o \ ft_io.o \ ft_ioconnect.o \ - ft_iobind.o + ft_iobind.o \ + ft_ioaccept.o \ + ft_ioread.o LIB = libfluent.a $(LIB): clean $(OBJECTS)