From 903f4968029995fb3dbf97e79045dbb04a7e0839 Mon Sep 17 00:00:00 2001 From: Dmitry Simonenko Date: Thu, 2 Aug 2018 19:02:02 +0300 Subject: [PATCH] odyssey: rework statistics update logic --- sources/console.c | 10 ++- sources/cron.c | 155 +++++++++++++------------------------------ sources/cron.h | 1 + sources/route.h | 6 +- sources/route_pool.c | 135 ++++++++++++++++++++++++------------- sources/route_pool.h | 19 ++++-- sources/stat.h | 46 ++++++++++++- 7 files changed, 206 insertions(+), 166 deletions(-) diff --git a/sources/console.c b/sources/console.c index 6089421b..0903273c 100644 --- a/sources/console.c +++ b/sources/console.c @@ -43,6 +43,7 @@ #include "sources/instance.h" #include "sources/router_cancel.h" #include "sources/router.h" +#include "sources/cron.h" #include "sources/system.h" #include "sources/worker.h" #include "sources/frontend.h" @@ -186,6 +187,8 @@ static inline int od_console_show_stats(od_client_t *client) { od_router_t *router = client->global->router; + od_cron_t *cron = client->global->cron; + shapito_stream_t *stream = client->stream; shapito_stream_reset(stream); @@ -208,9 +211,10 @@ od_console_show_stats(od_client_t *client) "avg_wait_time"); if (rc == -1) return -1; - rc = od_routepool_stats(&router->route_pool, - od_console_show_stats_callback, - client); + rc = od_routepool_stat_database(&router->route_pool, + od_console_show_stats_callback, + cron->stat_time_us, + client); if (rc == -1) return -1; rc = shapito_be_write_complete(stream, "SHOW", 5); diff --git a/sources/cron.c b/sources/cron.c index 5d5d2450..7f3c8fe1 100644 --- a/sources/cron.c +++ b/sources/cron.c @@ -48,23 +48,49 @@ #include "sources/backend.h" #include "sources/cron.h" -static inline int -od_cron_stats_server(od_server_t *server, void *arg) +static int +od_cron_stat_cb(od_route_t *route, od_stat_t *current, od_stat_t *avg, + void *arg) { - od_stat_t *stats = arg; - stats->count_query += od_atomic_u64_of(&server->stats.count_query); - stats->count_tx += od_atomic_u64_of(&server->stats.count_tx); - stats->query_time += od_atomic_u64_of(&server->stats.query_time); - stats->tx_time += od_atomic_u64_of(&server->stats.tx_time); - stats->recv_client += od_atomic_u64_of(&server->stats.recv_client); - stats->recv_server += od_atomic_u64_of(&server->stats.recv_server); + od_router_t *router = arg; + od_instance_t *instance = router->global->instance; + + /* update route stats */ + route->stats_cron = *current; + + if (! instance->config.log_stats) + return 0; + + od_log(&instance->logger, "stats", NULL, NULL, + "[%.*s.%.*s] %d clients, " + "%d active servers, " + "%d idle servers, " + "%" PRIu64 " transactions/sec (%" PRIu64 " usec) " + "%" PRIu64 " queries/sec (%" PRIu64 " usec) " + "%" PRIu64 " in bytes/sec, " + "%" PRIu64 " out bytes/sec", + route->id.database_len - 1, + route->id.database, + route->id.user_len - 1, + route->id.user, + od_clientpool_total(&route->client_pool), + route->server_pool.count_active, + route->server_pool.count_idle, + avg->count_tx, + avg->tx_time, + avg->count_query, + avg->query_time, + avg->recv_client, + avg->recv_server); + return 0; } static inline void -od_cron_stats(od_router_t *router) +od_cron_stat(od_router_t *router) { od_instance_t *instance = router->global->instance; + od_cron_t *cron = router->global->cron; if (router->route_pool.count == 0) return; @@ -95,103 +121,13 @@ od_cron_stats(od_router_t *router) count_coroutine_cache); } - od_list_t *i; - od_list_foreach(&router->route_pool.list, i) - { - od_route_t *route; - route = od_container_of(i, od_route_t, link); + /* update stats per route */ + od_routepool_stat(&router->route_pool, od_cron_stat_cb, + cron->stat_time_us, + router); - /* gather statistics per route server pool */ - od_stat_t stats; - od_stat_init(&stats); - od_serverpool_foreach(&route->server_pool, OD_SACTIVE, - od_cron_stats_server, - &stats); - od_serverpool_foreach(&route->server_pool, OD_SIDLE, - od_cron_stats_server, - &stats); - - /* calculate average between previous sample and the - current one */ - int interval = instance->config.stats_interval; - uint64_t query = 0; - uint64_t tx = 0; - uint64_t query_time = 0; - uint64_t tx_time = 0; - uint64_t recv_client = 0; - uint64_t recv_server = 0; - - /* ensure server stats not changed due to a - server connection close */ - int64_t query_diff_sanity; - query_diff_sanity = (stats.count_query - route->cron_stats.count_query); - - if (query_diff_sanity >= 0) - { - /* query */ - uint64_t query_prev = route->cron_stats.count_query / interval; - uint64_t query_current = stats.count_query / interval; - int64_t query_diff = query_current - query_prev; - query = query_diff / interval; - if (query_diff > 0) - query_time = (stats.query_time - route->cron_stats.query_time) / query_diff; - - /* transaction */ - uint64_t tx_prev = route->cron_stats.count_tx / interval; - uint64_t tx_current = stats.count_tx / interval; - int64_t tx_diff = tx_current - tx_prev; - tx = tx_diff / interval; - if (tx_diff > 0) - tx_time = (stats.tx_time - route->cron_stats.tx_time) / tx_diff; - - /* recv client */ - uint64_t recv_client_prev = route->cron_stats.recv_client / interval; - uint64_t recv_client_current = stats.recv_client / interval; - recv_client = (recv_client_current - recv_client_prev) / interval; - - /* recv server */ - uint64_t recv_server_prev = route->cron_stats.recv_server / interval; - uint64_t recv_server_current = recv_server_current = stats.recv_server / interval; - recv_server = (recv_server_current - recv_server_prev) / interval; - } - - /* update current stats */ - route->cron_stats = stats; - - /* update avg stats */ - route->cron_stats_avg.count_query = query; - route->cron_stats_avg.count_tx = tx; - route->cron_stats_avg.query_time = query_time; - route->cron_stats_avg.tx_time = tx_time; - route->cron_stats_avg.recv_client = recv_client; - route->cron_stats_avg.recv_server = recv_server; - - if (instance->config.log_stats) { - od_log(&instance->logger, "stats", NULL, NULL, - "[%.*s.%.*s] clients %d, " - "pool_active %d, " - "pool_idle %d " - "rps %" PRIu64 " " - "tps %" PRIu64 " " - "query_time_us %" PRIu64 " " - "tx_time_us %" PRIu64 " " - "recv_client_bytes %" PRIu64 " " - "recv_server_bytes %" PRIu64, - route->id.database_len - 1, - route->id.database, - route->id.user_len - 1, - route->id.user, - od_clientpool_total(&route->client_pool), - route->server_pool.count_active, - route->server_pool.count_idle, - query, - tx, - query_time, - tx_time, - recv_client, - recv_server); - } - } + /* update current stat time mark */ + cron->stat_time_us = machine_time(); } static inline int @@ -283,15 +219,17 @@ od_cron(void *arg) od_router_t *router = cron->global->router; od_instance_t *instance = cron->global->instance; + cron->stat_time_us = machine_time(); + int stats_tick = 0; for (;;) { /* mark and sweep expired idle server connections */ od_cron_expire(cron); - /* update stats */ + /* update statistics */ if (++stats_tick >= instance->config.stats_interval) { - od_cron_stats(router); + od_cron_stat(router); stats_tick = 0; } @@ -303,6 +241,7 @@ od_cron(void *arg) void od_cron_init(od_cron_t *cron, od_global_t *global) { cron->global = global; + cron->stat_time_us = 0; } int od_cron_start(od_cron_t *cron) diff --git a/sources/cron.h b/sources/cron.h index 468ecf26..ab6b3bcc 100644 --- a/sources/cron.h +++ b/sources/cron.h @@ -11,6 +11,7 @@ typedef struct od_cron od_cron_t; struct od_cron { + uint64_t stat_time_us; od_global_t *global; }; diff --git a/sources/route.h b/sources/route.h index 8564bc5a..c68cac2f 100644 --- a/sources/route.h +++ b/sources/route.h @@ -13,8 +13,7 @@ struct od_route { od_configroute_t *config; od_routeid_t id; - od_stat_t cron_stats; - od_stat_t cron_stats_avg; + od_stat_t stats_cron; int stats_mark; od_serverpool_t server_pool; od_clientpool_t client_pool; @@ -29,8 +28,7 @@ od_route_init(od_route_t *route) od_serverpool_init(&route->server_pool); od_clientpool_init(&route->client_pool); route->stats_mark = 0; - od_stat_init(&route->cron_stats); - od_stat_init(&route->cron_stats_avg); + od_stat_init(&route->stats_cron); od_list_init(&route->link); } diff --git a/sources/route_pool.c b/sources/route_pool.c index 9723b9ec..f3416f8a 100644 --- a/sources/route_pool.c +++ b/sources/route_pool.c @@ -166,15 +166,23 @@ od_routepool_client_foreach(od_routepool_t *pool, od_clientstate_t state, } static inline int -od_routepool_stats_mark(od_routepool_t *pool, - char *database, - int database_len, - od_stat_t *total, - od_stat_t *avg) +od_routepool_stat_server(od_server_t *server, void *arg) +{ + od_stat_t *stats = arg; + od_stat_sum(stats, &server->stats); + return 0; +} + +static inline void +od_routepool_stat_database_mark(od_routepool_t *pool, + char *database, + int database_len, + od_stat_t *current, + od_stat_t *prev) { - int match = 0; od_list_t *i; - od_list_foreach(&pool->list, i) { + od_list_foreach(&pool->list, i) + { od_route_t *route; route = od_container_of(i, od_route_t, link); if (route->stats_mark) @@ -184,72 +192,109 @@ od_routepool_stats_mark(od_routepool_t *pool, if (memcmp(route->id.database, database, database_len) != 0) continue; - total->count_query += route->cron_stats.count_query; - total->count_tx += route->cron_stats.count_tx; - total->query_time += route->cron_stats.query_time; - total->tx_time += route->cron_stats.tx_time; - total->recv_client += route->cron_stats.recv_client; - total->recv_server += route->cron_stats.recv_server; + /* sum actual stats per server */ + od_serverpool_foreach(&route->server_pool, OD_SACTIVE, + od_routepool_stat_server, + current); - avg->count_query += route->cron_stats_avg.count_query; - avg->count_tx += route->cron_stats_avg.count_tx; - avg->query_time += route->cron_stats_avg.query_time; - avg->tx_time += route->cron_stats_avg.tx_time; - avg->recv_client += route->cron_stats_avg.recv_client; - avg->recv_server += route->cron_stats_avg.recv_server; + od_serverpool_foreach(&route->server_pool, OD_SIDLE, + od_routepool_stat_server, + current); + + /* sum previous cron stats per route */ + od_stat_sum(prev, &route->stats_cron); route->stats_mark++; - match++; } - return match; } static inline void -od_routepool_stats_unmark(od_routepool_t *pool) +od_routepool_stat_unmark(od_routepool_t *pool) { od_route_t *route; - od_list_t *i, *n; - od_list_foreach_safe(&pool->list, i, n) { + od_list_t *i; + od_list_foreach(&pool->list, i) { route = od_container_of(i, od_route_t, link); route->stats_mark = 0; } } int -od_routepool_stats(od_routepool_t *pool, - od_routepool_stats_function_t callback, void *arg) +od_routepool_stat_database(od_routepool_t *pool, + od_routepool_stat_database_cb_t callback, + uint64_t prev_time_us, + void *arg) { od_route_t *route; - od_list_t *i, *n; - od_list_foreach_safe(&pool->list, i, n) { + od_list_t *i; + od_list_foreach(&pool->list, i) + { route = od_container_of(i, od_route_t, link); if (route->stats_mark) continue; - od_stat_t total; - od_stat_t avg; - od_stat_init(&total); - od_stat_init(&avg); - int match; - match = od_routepool_stats_mark(pool, + + /* gather current and previous cron stats */ + od_stat_t current; + od_stat_t prev; + od_stat_init(¤t); + od_stat_init(&prev); + od_routepool_stat_database_mark(pool, route->id.database, route->id.database_len, - &total, &avg); - assert(match > 0); + ¤t, &prev); + + /* calculate average */ + od_stat_t avg; + od_stat_init(&avg); + od_stat_average(&avg, ¤t, &prev, prev_time_us); - avg.count_query /= match; - avg.count_tx /= match; - avg.query_time /= match; - avg.tx_time /= match; - avg.recv_client /= match; - avg.recv_server /= match; int rc; rc = callback(route->id.database, route->id.database_len - 1, - &total, &avg, arg); + ¤t, &avg, arg); if (rc == -1) { - od_routepool_stats_unmark(pool); + od_routepool_stat_unmark(pool); return -1; } } - od_routepool_stats_unmark(pool); + + od_routepool_stat_unmark(pool); + return 0; +} + +int +od_routepool_stat(od_routepool_t *pool, + od_routepool_stat_cb_t callback, + uint64_t prev_time_us, + void *arg) +{ + od_list_t *i; + od_list_foreach(&pool->list, i) + { + od_route_t *route; + route = od_container_of(i, od_route_t, link); + + /* gather current statistics per route server pool */ + od_stat_t current; + od_stat_init(¤t); + + od_serverpool_foreach(&route->server_pool, OD_SACTIVE, + od_routepool_stat_server, + ¤t); + + od_serverpool_foreach(&route->server_pool, OD_SIDLE, + od_routepool_stat_server, + ¤t); + + /* calculate average */ + od_stat_t avg; + od_stat_init(&avg); + od_stat_average(&avg, ¤t, &route->stats_cron, prev_time_us); + + int rc; + rc = callback(route, ¤t, &avg, arg); + if (rc == -1) + return -1; + } + return 0; } diff --git a/sources/route_pool.h b/sources/route_pool.h index bb10f6c4..36829eab 100644 --- a/sources/route_pool.h +++ b/sources/route_pool.h @@ -9,7 +9,12 @@ typedef struct od_routepool od_routepool_t; -typedef int (*od_routepool_stats_function_t) +typedef int (*od_routepool_stat_cb_t) + (od_route_t *route, + od_stat_t *current, + od_stat_t *avg, void *arg); + +typedef int (*od_routepool_stat_database_cb_t) (char *database, int database_len, od_stat_t *total, @@ -43,8 +48,14 @@ od_client_t* od_routepool_client_foreach(od_routepool_t*, od_clientstate_t, od_clientpool_cb_t, void*); -int od_routepool_stats(od_routepool_t *pool, - od_routepool_stats_function_t, - void*); +int od_routepool_stat_database(od_routepool_t *pool, + od_routepool_stat_database_cb_t, + uint64_t, + void*); + +int od_routepool_stat(od_routepool_t *pool, + od_routepool_stat_cb_t, + uint64_t, + void*); #endif /* OD_ROUTE_POOL_H */ diff --git a/sources/stat.h b/sources/stat.h index ccaf1ce1..85506a21 100644 --- a/sources/stat.h +++ b/sources/stat.h @@ -21,7 +21,7 @@ struct od_stat uint64_t tx_time_start; od_atomic_u64_t recv_server; od_atomic_u64_t recv_client; - uint64_t count_error; + od_atomic_u64_t count_error; }; static inline void @@ -100,7 +100,49 @@ od_stat_recv_client(od_stat_t *stat, uint64_t bytes) static inline void od_stat_error(od_stat_t *stat) { - stat->count_error++; + od_atomic_u64_inc(&stat->count_error); +} + +static inline void +od_stat_sum(od_stat_t *sum, od_stat_t *stat) +{ + sum->count_query += od_atomic_u64_of(&stat->count_query); + sum->count_tx += od_atomic_u64_of(&stat->count_tx); + sum->query_time += od_atomic_u64_of(&stat->query_time); + sum->tx_time += od_atomic_u64_of(&stat->tx_time); + sum->recv_client += od_atomic_u64_of(&stat->recv_client); + sum->recv_server += od_atomic_u64_of(&stat->recv_server); +} + +static inline void +od_stat_average(od_stat_t *avg, od_stat_t *current, od_stat_t *prev, + uint64_t prev_time_us) +{ + const uint64_t interval_usec = 1000000; + uint64_t interval_us; + interval_us = machine_time() - prev_time_us; + if (interval_us <= 0) + return; + + uint64_t count_query; + uint64_t count_tx; + count_query = current->count_query - prev->count_query; + count_tx = current->count_tx - prev->count_tx; + + avg->count_query = (count_query * interval_usec) / interval_us; + avg->count_tx = (count_tx * interval_usec) / interval_us; + + if (count_query > 0) + avg->query_time = (current->query_time - prev->query_time) / count_query; + + if (count_tx > 0) + avg->tx_time = (current->tx_time - prev->tx_time) / count_tx; + + avg->recv_client = ((current->recv_client - prev->recv_client) * interval_usec) / + interval_us; + + avg->recv_server = ((current->recv_server - prev->recv_server) * interval_usec) / + interval_us; } #endif /* OD_STAT_H */