mirror of https://github.com/yandex/odyssey.git
odyssey: unsubscribe from read events during pool attach wait
This commit is contained in:
parent
80484c3337
commit
5716feb831
28
sources/io.h
28
sources/io.h
|
@ -84,6 +84,30 @@ od_io_detach(od_io_t *io)
|
|||
return machine_io_detach(io->io);
|
||||
}
|
||||
|
||||
static inline int
|
||||
od_io_read_start(od_io_t *io)
|
||||
{
|
||||
return machine_read_start(io->io, io->on_read);
|
||||
}
|
||||
|
||||
static inline int
|
||||
od_io_read_stop(od_io_t *io)
|
||||
{
|
||||
return machine_read_stop(io->io);
|
||||
}
|
||||
|
||||
static inline int
|
||||
od_io_write_start(od_io_t *io)
|
||||
{
|
||||
return machine_write_start(io->io, io->on_write);
|
||||
}
|
||||
|
||||
static inline int
|
||||
od_io_write_stop(od_io_t *io)
|
||||
{
|
||||
return machine_write_stop(io->io);
|
||||
}
|
||||
|
||||
static inline int
|
||||
od_io_read(od_io_t *io, char *dest, int size, uint32_t time_ms)
|
||||
{
|
||||
|
@ -127,7 +151,7 @@ od_io_read(od_io_t *io, char *dest, int size, uint32_t time_ms)
|
|||
int errno_ = machine_errno();
|
||||
if (errno_ == EAGAIN || errno_ == EWOULDBLOCK || errno_ == EINTR) {
|
||||
if (! read_started) {
|
||||
rc = machine_read_start(io->io, io->on_read);
|
||||
rc = od_io_read_start(io);
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
read_started = 1;
|
||||
|
@ -144,7 +168,7 @@ od_io_read(od_io_t *io, char *dest, int size, uint32_t time_ms)
|
|||
}
|
||||
|
||||
if (read_started) {
|
||||
rc = machine_read_stop(io->io);
|
||||
rc = od_io_read_stop(io);
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -85,7 +85,7 @@ od_relay_start(od_relay_t *relay,
|
|||
machine_cond_propagate(relay->src->on_write, base);
|
||||
|
||||
int rc;
|
||||
rc = machine_read_start(relay->src->io, relay->src->on_read);
|
||||
rc = od_io_read_start(relay->src);
|
||||
if (rc == -1)
|
||||
return relay->error_read;
|
||||
|
||||
|
@ -104,7 +104,7 @@ od_relay_detach(od_relay_t *relay)
|
|||
{
|
||||
if (! relay->dst)
|
||||
return;
|
||||
machine_write_stop(relay->dst->io);
|
||||
od_io_write_stop(relay->dst);
|
||||
relay->dst = NULL;
|
||||
}
|
||||
|
||||
|
@ -112,7 +112,7 @@ static inline int
|
|||
od_relay_stop(od_relay_t *relay)
|
||||
{
|
||||
od_relay_detach(relay);
|
||||
machine_read_stop(relay->src->io);
|
||||
od_io_read_stop(relay->src);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -235,14 +235,14 @@ od_relay_process(od_relay_t *relay, int *progress, char *data, int size)
|
|||
relay->packet_full = NULL;
|
||||
relay->packet_full_pos = 0;
|
||||
return od_relay_on_packet_msg(relay, msg);
|
||||
} else {
|
||||
if (relay->packet_skip)
|
||||
return OD_OK;
|
||||
rc = machine_iov_add_pointer(relay->iov, data, to_parse);
|
||||
if (rc == -1)
|
||||
return OD_EOOM;
|
||||
}
|
||||
|
||||
if (relay->packet_skip)
|
||||
return OD_OK;
|
||||
rc = machine_iov_add_pointer(relay->iov, data, to_parse);
|
||||
if (rc == -1)
|
||||
return OD_EOOM;
|
||||
|
||||
return OD_OK;
|
||||
}
|
||||
|
||||
|
@ -359,17 +359,17 @@ od_relay_step(od_relay_t *relay)
|
|||
|
||||
if (! machine_iov_pending(relay->iov))
|
||||
{
|
||||
rc = machine_write_stop(relay->dst->io);
|
||||
rc = od_io_write_stop(relay->dst);
|
||||
if (rc == -1)
|
||||
return relay->error_write;
|
||||
|
||||
od_readahead_reuse(&relay->src->readahead);
|
||||
|
||||
rc = machine_read_start(relay->src->io, relay->src->on_read);
|
||||
rc = od_io_read_start(relay->src);
|
||||
if (rc == -1)
|
||||
return relay->error_read;
|
||||
} else {
|
||||
rc = machine_write_start(relay->dst->io, relay->dst->on_write);
|
||||
rc = od_io_write_start(relay->dst);
|
||||
if (rc == -1)
|
||||
return relay->error_write;
|
||||
}
|
||||
|
@ -395,7 +395,7 @@ od_relay_flush(od_relay_t *relay)
|
|||
if (! machine_iov_pending(relay->iov))
|
||||
return OD_OK;
|
||||
|
||||
rc = machine_write_start(relay->dst->io, relay->dst->on_write);
|
||||
rc = od_io_write_start(relay->dst);
|
||||
if (rc == -1)
|
||||
return relay->error_write;
|
||||
|
||||
|
@ -408,12 +408,12 @@ od_relay_flush(od_relay_t *relay)
|
|||
|
||||
rc = od_relay_write(relay);
|
||||
if (rc != OD_OK) {
|
||||
machine_write_stop(relay->dst->io);
|
||||
od_io_write_stop(relay->dst);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
|
||||
rc = machine_write_stop(relay->dst->io);
|
||||
rc = od_io_write_stop(relay->dst);
|
||||
if (rc == -1)
|
||||
return relay->error_write;
|
||||
|
||||
|
|
|
@ -345,6 +345,12 @@ od_router_attach(od_router_t *router, od_config_t *config, od_client_t *client)
|
|||
if (od_server_pool_total(&route->server_pool) < route->rule->pool_size)
|
||||
break;
|
||||
|
||||
/*
|
||||
* unsubscribe from pending client read events during the time we wait
|
||||
* for an available server
|
||||
*/
|
||||
od_io_read_stop(&client->io);
|
||||
|
||||
od_route_unlock(route);
|
||||
|
||||
/* pool_size limit implementation.
|
||||
|
@ -394,6 +400,9 @@ attach:
|
|||
if (server->io.io && od_config_is_multi_workers(config))
|
||||
od_io_attach(&server->io);
|
||||
|
||||
/* maybe restore read events subscription */
|
||||
od_io_read_start(&client->io);
|
||||
|
||||
return OD_ROUTER_OK;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue