From 5716feb831849cdca2950939eb2d5feaebee27e0 Mon Sep 17 00:00:00 2001 From: Dmitry Simonenko Date: Sat, 25 May 2019 17:26:29 +0300 Subject: [PATCH] odyssey: unsubscribe from read events during pool attach wait --- sources/io.h | 28 ++++++++++++++++++++++++++-- sources/relay.h | 30 +++++++++++++++--------------- sources/router.c | 9 +++++++++ 3 files changed, 50 insertions(+), 17 deletions(-) diff --git a/sources/io.h b/sources/io.h index 1ed91256..a24598e4 100644 --- a/sources/io.h +++ b/sources/io.h @@ -84,6 +84,30 @@ od_io_detach(od_io_t *io) return machine_io_detach(io->io); } +static inline int +od_io_read_start(od_io_t *io) +{ + return machine_read_start(io->io, io->on_read); +} + +static inline int +od_io_read_stop(od_io_t *io) +{ + return machine_read_stop(io->io); +} + +static inline int +od_io_write_start(od_io_t *io) +{ + return machine_write_start(io->io, io->on_write); +} + +static inline int +od_io_write_stop(od_io_t *io) +{ + return machine_write_stop(io->io); +} + static inline int od_io_read(od_io_t *io, char *dest, int size, uint32_t time_ms) { @@ -127,7 +151,7 @@ od_io_read(od_io_t *io, char *dest, int size, uint32_t time_ms) int errno_ = machine_errno(); if (errno_ == EAGAIN || errno_ == EWOULDBLOCK || errno_ == EINTR) { if (! read_started) { - rc = machine_read_start(io->io, io->on_read); + rc = od_io_read_start(io); if (rc == -1) return -1; read_started = 1; @@ -144,7 +168,7 @@ od_io_read(od_io_t *io, char *dest, int size, uint32_t time_ms) } if (read_started) { - rc = machine_read_stop(io->io); + rc = od_io_read_stop(io); if (rc == -1) return -1; } diff --git a/sources/relay.h b/sources/relay.h index 0dbf6a01..f8642cd7 100644 --- a/sources/relay.h +++ b/sources/relay.h @@ -85,7 +85,7 @@ od_relay_start(od_relay_t *relay, machine_cond_propagate(relay->src->on_write, base); int rc; - rc = machine_read_start(relay->src->io, relay->src->on_read); + rc = od_io_read_start(relay->src); if (rc == -1) return relay->error_read; @@ -104,7 +104,7 @@ od_relay_detach(od_relay_t *relay) { if (! relay->dst) return; - machine_write_stop(relay->dst->io); + od_io_write_stop(relay->dst); relay->dst = NULL; } @@ -112,7 +112,7 @@ static inline int od_relay_stop(od_relay_t *relay) { od_relay_detach(relay); - machine_read_stop(relay->src->io); + od_io_read_stop(relay->src); return 0; } @@ -235,14 +235,14 @@ od_relay_process(od_relay_t *relay, int *progress, char *data, int size) relay->packet_full = NULL; relay->packet_full_pos = 0; return od_relay_on_packet_msg(relay, msg); - } else { - if (relay->packet_skip) - return OD_OK; - rc = machine_iov_add_pointer(relay->iov, data, to_parse); - if (rc == -1) - return OD_EOOM; } + if (relay->packet_skip) + return OD_OK; + rc = machine_iov_add_pointer(relay->iov, data, to_parse); + if (rc == -1) + return OD_EOOM; + return OD_OK; } @@ -359,17 +359,17 @@ od_relay_step(od_relay_t *relay) if (! machine_iov_pending(relay->iov)) { - rc = machine_write_stop(relay->dst->io); + rc = od_io_write_stop(relay->dst); if (rc == -1) return relay->error_write; od_readahead_reuse(&relay->src->readahead); - rc = machine_read_start(relay->src->io, relay->src->on_read); + rc = od_io_read_start(relay->src); if (rc == -1) return relay->error_read; } else { - rc = machine_write_start(relay->dst->io, relay->dst->on_write); + rc = od_io_write_start(relay->dst); if (rc == -1) return relay->error_write; } @@ -395,7 +395,7 @@ od_relay_flush(od_relay_t *relay) if (! machine_iov_pending(relay->iov)) return OD_OK; - rc = machine_write_start(relay->dst->io, relay->dst->on_write); + rc = od_io_write_start(relay->dst); if (rc == -1) return relay->error_write; @@ -408,12 +408,12 @@ od_relay_flush(od_relay_t *relay) rc = od_relay_write(relay); if (rc != OD_OK) { - machine_write_stop(relay->dst->io); + od_io_write_stop(relay->dst); return rc; } } - rc = machine_write_stop(relay->dst->io); + rc = od_io_write_stop(relay->dst); if (rc == -1) return relay->error_write; diff --git a/sources/router.c b/sources/router.c index 040226b9..297f6a72 100644 --- a/sources/router.c +++ b/sources/router.c @@ -345,6 +345,12 @@ od_router_attach(od_router_t *router, od_config_t *config, od_client_t *client) if (od_server_pool_total(&route->server_pool) < route->rule->pool_size) break; + /* + * unsubscribe from pending client read events during the time we wait + * for an available server + */ + od_io_read_stop(&client->io); + od_route_unlock(route); /* pool_size limit implementation. @@ -394,6 +400,9 @@ attach: if (server->io.io && od_config_is_multi_workers(config)) od_io_attach(&server->io); + /* maybe restore read events subscription */ + od_io_read_start(&client->io); + return OD_ROUTER_OK; }