From 0504a34b52237ff205a1cf9bcf6c9780d1a79b61 Mon Sep 17 00:00:00 2001 From: Dmitry Simonenko Date: Fri, 7 Dec 2018 15:12:06 +0300 Subject: [PATCH] odyssey: rework pool wait logic and refactor client notification --- sources/atomic.h | 12 +++++++ sources/client.h | 27 +++++++++++--- sources/frontend.c | 13 +++++-- sources/route.h | 84 ++++++++++++++++++++++++++++++-------------- sources/route_pool.h | 8 ++--- sources/router.c | 58 +++++++++++++++++++++--------- sources/rules.c | 5 ++- sources/rules.h | 1 - sources/stat.h | 2 +- 9 files changed, 153 insertions(+), 57 deletions(-) diff --git a/sources/atomic.h b/sources/atomic.h index 2044c9a0..1f15b09d 100644 --- a/sources/atomic.h +++ b/sources/atomic.h @@ -40,6 +40,18 @@ od_atomic_u32_sub(od_atomic_u32_t *atomic, uint32_t value) __sync_sub_and_fetch(atomic, value); } +static inline void +od_atomic_u32_or(od_atomic_u32_t *atomic, uint32_t value) +{ + __sync_or_and_fetch(atomic, value); +} + +static inline void +od_atomic_u32_xor(od_atomic_u32_t *atomic, uint32_t value) +{ + __sync_xor_and_fetch(atomic, value); +} + static inline uint32_t od_atomic_u64_of(od_atomic_u64_t *atomic) { diff --git a/sources/client.h b/sources/client.h index 41fbbb9d..0533cde6 100644 --- a/sources/client.h +++ b/sources/client.h @@ -20,13 +20,13 @@ typedef enum typedef enum { - OD_CLIENT_OP_NONE, - OD_CLIENT_OP_KILL + OD_CLIENT_OP_NONE = 0, + OD_CLIENT_OP_KILL = 1 } od_clientop_t; struct od_client_ctl { - volatile od_clientop_t op; + od_atomic_u32_t op; }; struct od_client @@ -112,11 +112,28 @@ od_client_notify_read(od_client_t *client) machine_msg_free(msg); } +static inline uint32_t +od_client_ctl_of(od_client_t *client) +{ + return od_atomic_u32_of(&client->ctl.op); +} + +static inline void +od_client_ctl_set(od_client_t *client, uint32_t op) +{ + od_atomic_u32_or(&client->ctl.op, op); +} + +static inline void +od_client_ctl_unset(od_client_t *client, uint32_t op) +{ + od_atomic_u32_xor(&client->ctl.op, op); +} + static inline void od_client_kill(od_client_t *client) { - /* TODO */ - client->ctl.op = OD_CLIENT_OP_KILL; + od_client_ctl_set(client, OD_CLIENT_OP_KILL); od_client_notify(client); } diff --git a/sources/frontend.c b/sources/frontend.c index a57c5168..ab14c757 100644 --- a/sources/frontend.c +++ b/sources/frontend.c @@ -164,7 +164,12 @@ od_frontend_attach(od_client_t *client, char *context) { status = od_router_attach(router, &instance->config, &instance->id_mgr, client); if (status != OD_ROUTER_OK) + { + if (status == OD_ROUTER_ERROR_TIMEDOUT) + od_error(&instance->logger, "router", client, NULL, + "server pool wait timedout, closing"); return OD_FE_EATTACH; + } server = client->server; if (server->io && !machine_connected(server->io)) { @@ -608,10 +613,14 @@ static od_frontend_rc_t od_frontend_ctl(od_client_t *client) { od_client_notify_read(client); - if (client->ctl.op == OD_CLIENT_OP_KILL) { + + uint32_t op = od_client_ctl_of(client); + if (op & OD_CLIENT_OP_KILL) + { + od_client_ctl_unset(client, OD_CLIENT_OP_KILL); machine_msg_t *msg; msg = od_frontend_errorf(client, KIWI_OPERATOR_INTERVENTION, - "client connection dropped"); + "client connection dropped"); if (msg == NULL) return OD_FE_KILL; machine_write(client->io, msg); diff --git a/sources/route.h b/sources/route.h index f02b39b5..ca8e704c 100644 --- a/sources/route.h +++ b/sources/route.h @@ -11,16 +11,17 @@ typedef struct od_route od_route_t; struct od_route { - od_rule_t *rule; - od_route_id_t id; - od_stat_t stats; - od_stat_t stats_prev; - int stats_mark; - od_server_pool_t server_pool; - od_client_pool_t client_pool; - kiwi_params_lock_t params; - pthread_mutex_t lock; - od_list_t link; + od_rule_t *rule; + od_route_id_t id; + od_stat_t stats; + od_stat_t stats_prev; + int stats_mark; + od_server_pool_t server_pool; + od_client_pool_t client_pool; + kiwi_params_lock_t params; + machine_channel_t *wait_bus; + pthread_mutex_t lock; + od_list_t link; }; static inline void @@ -35,29 +36,49 @@ od_route_init(od_route_t *route) od_stat_init(&route->stats_prev); kiwi_params_lock_init(&route->params); od_list_init(&route->link); + route->wait_bus = NULL; pthread_mutex_init(&route->lock, NULL); } -static inline od_route_t* -od_route_allocate(void) -{ - od_route_t *route = malloc(sizeof(*route)); - if (route == NULL) - return NULL; - od_route_init(route); - return route; -} - static inline void od_route_free(od_route_t *route) { od_route_id_free(&route->id); od_server_pool_free(&route->server_pool); kiwi_params_lock_free(&route->params); + if (route->wait_bus) + machine_channel_free(route->wait_bus); pthread_mutex_destroy(&route->lock); free(route); } +static inline od_route_t* +od_route_allocate(int is_shared) +{ + od_route_t *route = malloc(sizeof(*route)); + if (route == NULL) + return NULL; + od_route_init(route); + route->wait_bus = machine_channel_create(is_shared); + if (route->wait_bus == NULL) { + od_route_free(route); + return NULL; + } + return route; +} + +static inline void +od_route_lock(od_route_t *route) +{ + pthread_mutex_lock(&route->lock); +} + +static inline void +od_route_unlock(od_route_t *route) +{ + pthread_mutex_unlock(&route->lock); +} + static inline int od_route_is_dynamic(od_route_t *route) { @@ -122,16 +143,27 @@ od_route_kill_client_pool(od_route_t *route) od_route_kill_cb, NULL); } -static inline void -od_route_lock(od_route_t *route) +static inline int +od_route_wait(od_route_t *route, uint32_t time_ms) { - pthread_mutex_lock(&route->lock); + machine_msg_t *msg; + msg = machine_channel_read(route->wait_bus, time_ms); + if (msg) { + machine_msg_free(msg); + return 0; + } + return -1; } -static inline void -od_route_unlock(od_route_t *route) +static inline int +od_route_signal(od_route_t *route) { - pthread_mutex_unlock(&route->lock); + machine_msg_t *msg; + msg = machine_msg_create(0); + if (msg == NULL) + return -1; + machine_channel_write(route->wait_bus, msg); + return 0; } #endif /* ODYSSEY_ROUTE_H */ diff --git a/sources/route_pool.h b/sources/route_pool.h index 756fbda6..957dc3ea 100644 --- a/sources/route_pool.h +++ b/sources/route_pool.h @@ -47,10 +47,10 @@ od_route_pool_free(od_route_pool_t *pool) } static inline od_route_t* -od_route_pool_new(od_route_pool_t *pool, od_route_id_t *id, +od_route_pool_new(od_route_pool_t *pool, int is_shared, od_route_id_t *id, od_rule_t *rule) { - od_route_t *route = od_route_allocate(); + od_route_t *route = od_route_allocate(is_shared); if (route == NULL) return NULL; int rc; @@ -169,7 +169,7 @@ static inline int od_route_pool_stat_database(od_route_pool_t *pool, od_route_pool_stat_database_cb_t callback, uint64_t prev_time_us, - void *arg) + void **argv) { od_route_t *route; od_list_t *i; @@ -196,7 +196,7 @@ od_route_pool_stat_database(od_route_pool_t *pool, int rc; rc = callback(route->id.database, route->id.database_len - 1, - ¤t, &avg, arg); + ¤t, &avg, argv); if (rc == -1) { od_route_pool_stat_unmark(pool); return -1; diff --git a/sources/router.c b/sources/router.c index 8e513434..bf2bde5a 100644 --- a/sources/router.c +++ b/sources/router.c @@ -79,10 +79,16 @@ od_router_reconfigure(od_router_t *router, od_rules_t *rules) static inline int od_router_expire_server_cb(od_server_t *server, void **argv) { + od_route_t *route = server->route; od_list_t *expire_list = argv[0]; int *count = argv[1]; + + /* remove server for server pool */ + od_server_pool_set(&route->server_pool, server, OD_SERVER_UNDEF); + od_list_append(expire_list, &server->link); (*count)++; + return 0; } @@ -245,7 +251,9 @@ od_router_route(od_router_t *router, od_config_t *config, od_client_t *client) od_route_t *route; route = od_route_pool_match(&router->route_pool, &id, rule); if (route == NULL) { - route = od_route_pool_new(&router->route_pool, &id, rule); + int is_shared; + is_shared = od_config_is_multi_workers(config); + route = od_route_pool_new(&router->route_pool, is_shared, &id, rule); if (route == NULL) { od_router_unlock(router); return OD_ROUTER_ERROR; @@ -306,6 +314,9 @@ od_router_attach(od_router_t *router, od_config_t *config, od_id_mgr_t *id_mgr, od_route_lock(route); + /* enqueue client (pending -> queue) */ + od_client_pool_set(&route->client_pool, client, OD_CLIENT_QUEUE); + /* get client server from route server pool */ od_server_t *server; for (;;) @@ -318,11 +329,29 @@ od_router_attach(od_router_t *router, od_config_t *config, od_id_mgr_t *id_mgr, if (route->rule->pool_size == 0) break; - /* TODO: wait for free server */ -#if 0 - /* enqueue client */ - od_client_pool_set(&route->client_pool, client, OD_CLIENT_QUEUE); -#endif + /* maybe start new connection */ + if (od_server_pool_total(&route->server_pool) < route->rule->pool_size) + break; + + od_route_unlock(route); + + /* pool_size limit implementation. + * + * If the limit reached, wait wakeup condition for + * pool_timeout milliseconds. + * + * The condition triggered when a server connection + * put into idle state by DETACH events. + */ + uint32_t timeout = route->rule->pool_timeout; + if (timeout == 0) + timeout = UINT32_MAX; + int rc; + rc = od_route_wait(route, timeout); + if (rc == -1) + return OD_ROUTER_ERROR_TIMEDOUT; + + od_route_lock(route); } od_route_unlock(route); @@ -337,20 +366,19 @@ od_router_attach(od_router_t *router, od_config_t *config, od_id_mgr_t *id_mgr, od_packet_set_chunk(&server->packet_reader, config->packet_read_size); od_route_lock(route); - - /* TODO */ - /* recheck for free server again? */ + /* xxx: maybe retry check for free server again */ attach: od_server_pool_set(&route->server_pool, server, OD_SERVER_ACTIVE); od_client_pool_set(&route->client_pool, client, OD_CLIENT_ACTIVE); - od_route_unlock(route); client->server = server; server->client = client; server->idle_time = 0; server->key_client = client->key; + od_route_unlock(route); + /* attach server io to clients machine context */ if (server->io && od_config_is_multi_workers(config)) machine_io_attach(server->io); @@ -378,13 +406,9 @@ od_router_detach(od_router_t *router, od_config_t *config, od_client_t *client) od_server_pool_set(&route->server_pool, server, OD_SERVER_IDLE); od_client_pool_set(&route->client_pool, client, OD_CLIENT_PENDING); - /* TODO: wakeup waiters */ - /* - if (route->client_pool.count_queue > 0) { - od_client_t *waiter; - waiter = od_client_pool_next(&route->client_pool, OD_CLIENT_QUEUE); - } - */ + /* notify waiters */ + if (route->client_pool.count_queue > 0) + od_route_signal(route); od_route_unlock(route); } diff --git a/sources/rules.c b/sources/rules.c index f2962c0b..c62eaf85 100644 --- a/sources/rules.c +++ b/sources/rules.c @@ -25,6 +25,9 @@ od_rules_init(od_rules_t *rules) od_list_init(&rules->rules); } +static inline void +od_rules_rule_free(od_rule_t*); + void od_rules_free(od_rules_t *rules) { @@ -205,7 +208,7 @@ od_rules_add(od_rules_t *rules) return rule; } -void +static inline void od_rules_rule_free(od_rule_t *rule) { if (rule->db_name) diff --git a/sources/rules.h b/sources/rules.h index 5fec3967..c684a187 100644 --- a/sources/rules.h +++ b/sources/rules.h @@ -129,7 +129,6 @@ void od_rules_print(od_rules_t*, od_logger_t*); /* rule */ od_rule_t* od_rules_add(od_rules_t*); -void od_rules_rule_free(od_rule_t*); void od_rules_ref(od_rule_t*); void od_rules_unref(od_rule_t*); int od_rules_compare(od_rule_t*, od_rule_t*); diff --git a/sources/stat.h b/sources/stat.h index 40f1bd97..f69b3c3c 100644 --- a/sources/stat.h +++ b/sources/stat.h @@ -114,7 +114,7 @@ od_stat_sum(od_stat_t *sum, od_stat_t *stat) static inline void od_stat_update_of(od_atomic_u64_t *prev, od_atomic_u64_t *current) { - /* todo: this should be made more optiomal */ + /* todo: this could be made more optimal */ /* prev <= current */ uint64_t diff; diff = od_atomic_u64_of(current) - od_atomic_u64_of(prev);