/* * machinarium. * * Cooperative multitasking engine. */ #include #include static void mm_io_read_timeout_cb(uv_timer_t *handle) { mmio *io = handle->data; uv_read_stop((uv_stream_t*)&io->handle); io->read_timeout = 1; io->read_status = -ETIMEDOUT; mm_wakeup(io->f, io->read_fiber); } static void mm_io_read_cancel_cb(mmfiber *fiber, void *arg) { mmio *io = arg; mm_io_timer_stop(io, &io->read_timer); uv_read_stop((uv_stream_t*)&io->handle); io->read_timeout = 0; io->read_status = -ECANCELED; mm_wakeup(io->f, io->read_fiber); } static void mm_io_read_alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { mmio *io = handle->data; int rc; rc = mm_bufensure(&io->read_ahead, io->read_ahead_size); if (rc == -1) return; buf->base = io->read_ahead.p; buf->len = io->read_ahead_size; } static void mm_io_read_cb(uv_stream_t *handle, ssize_t size, const uv_buf_t *buf) { mmio *io = handle->data; assert(! io->read_timeout); if (size >= 0) { mm_bufadvance(&io->read_ahead, size); if (mm_bufused(&io->read_ahead) < io->read_size) { /* expect more data */ return; } /* readahead buffer now has atleast as minum * size required by read operation */ assert(mm_bufused(&io->read_ahead) >= io->read_size); } else { /* connection closed */ if (size == UV_EOF) { io->connected = 0; } io->read_status = size; } mm_io_timer_stop(io, &io->read_timer); uv_read_stop(handle); mm_wakeup(io->f, io->read_fiber); } MM_API int mm_read(mm_io_t iop, int size, uint64_t time_ms) { mmio *io = iop; mmfiber *current = mm_current(io->f); if (mm_fiber_is_cancel(current)) return -ECANCELED; if (io->read_fiber) return -1; if (size > io->read_ahead_size) return -EINVAL; io->read_status = 0; io->read_timeout = 0; io->read_size = 0; io->read_fiber = NULL; /* readhead */ int ra_left = mm_bufused(&io->read_ahead) - io->read_ahead_pos; if (ra_left >= size) { io->read_ahead_pos_data = io->read_ahead_pos; io->read_ahead_pos += size; return 0; } /* ra_left < size */ if (ra_left > 0) { memmove(io->read_ahead.s, io->read_ahead.s + io->read_ahead_pos, ra_left); mm_bufreset(&io->read_ahead); mm_bufadvance(&io->read_ahead, ra_left); } else { mm_bufreset(&io->read_ahead); } io->read_size = size - ra_left; io->read_fiber = current; io->read_ahead_pos_data = 0; io->read_ahead_pos = ra_left + io->read_size; mm_io_timer_start(io, &io->read_timer, mm_io_read_timeout_cb, time_ms); int rc; rc = uv_read_start((uv_stream_t*)&io->handle, mm_io_read_alloc_cb, mm_io_read_cb); if (rc < 0) { mm_io_timer_stop(io, &io->read_timer); io->read_fiber = NULL; return rc; } mm_fiber_op_begin(io->read_fiber, mm_io_read_cancel_cb, io); mm_scheduler_yield(&io->f->scheduler); mm_fiber_op_end(io->read_fiber); rc = io->read_status; io->read_fiber = NULL; if (rc < 0) return rc; return 0; } MM_API int mm_read_is_timeout(mm_io_t iop) { mmio *io = iop; return io->read_timeout; } MM_API char* mm_read_buf(mm_io_t iop) { mmio *io = iop; return io->read_ahead.s + io->read_ahead_pos_data; }