odyssey: rework statistics update logic

This commit is contained in:
Dmitry Simonenko 2018-08-02 19:02:02 +03:00
parent f7a4438ec1
commit 903f496802
7 changed files with 206 additions and 166 deletions

View File

@ -43,6 +43,7 @@
#include "sources/instance.h"
#include "sources/router_cancel.h"
#include "sources/router.h"
#include "sources/cron.h"
#include "sources/system.h"
#include "sources/worker.h"
#include "sources/frontend.h"
@ -186,6 +187,8 @@ static inline int
od_console_show_stats(od_client_t *client)
{
od_router_t *router = client->global->router;
od_cron_t *cron = client->global->cron;
shapito_stream_t *stream = client->stream;
shapito_stream_reset(stream);
@ -208,9 +211,10 @@ od_console_show_stats(od_client_t *client)
"avg_wait_time");
if (rc == -1)
return -1;
rc = od_routepool_stats(&router->route_pool,
od_console_show_stats_callback,
client);
rc = od_routepool_stat_database(&router->route_pool,
od_console_show_stats_callback,
cron->stat_time_us,
client);
if (rc == -1)
return -1;
rc = shapito_be_write_complete(stream, "SHOW", 5);

View File

@ -48,23 +48,49 @@
#include "sources/backend.h"
#include "sources/cron.h"
static inline int
od_cron_stats_server(od_server_t *server, void *arg)
static int
od_cron_stat_cb(od_route_t *route, od_stat_t *current, od_stat_t *avg,
void *arg)
{
od_stat_t *stats = arg;
stats->count_query += od_atomic_u64_of(&server->stats.count_query);
stats->count_tx += od_atomic_u64_of(&server->stats.count_tx);
stats->query_time += od_atomic_u64_of(&server->stats.query_time);
stats->tx_time += od_atomic_u64_of(&server->stats.tx_time);
stats->recv_client += od_atomic_u64_of(&server->stats.recv_client);
stats->recv_server += od_atomic_u64_of(&server->stats.recv_server);
od_router_t *router = arg;
od_instance_t *instance = router->global->instance;
/* update route stats */
route->stats_cron = *current;
if (! instance->config.log_stats)
return 0;
od_log(&instance->logger, "stats", NULL, NULL,
"[%.*s.%.*s] %d clients, "
"%d active servers, "
"%d idle servers, "
"%" PRIu64 " transactions/sec (%" PRIu64 " usec) "
"%" PRIu64 " queries/sec (%" PRIu64 " usec) "
"%" PRIu64 " in bytes/sec, "
"%" PRIu64 " out bytes/sec",
route->id.database_len - 1,
route->id.database,
route->id.user_len - 1,
route->id.user,
od_clientpool_total(&route->client_pool),
route->server_pool.count_active,
route->server_pool.count_idle,
avg->count_tx,
avg->tx_time,
avg->count_query,
avg->query_time,
avg->recv_client,
avg->recv_server);
return 0;
}
static inline void
od_cron_stats(od_router_t *router)
od_cron_stat(od_router_t *router)
{
od_instance_t *instance = router->global->instance;
od_cron_t *cron = router->global->cron;
if (router->route_pool.count == 0)
return;
@ -95,103 +121,13 @@ od_cron_stats(od_router_t *router)
count_coroutine_cache);
}
od_list_t *i;
od_list_foreach(&router->route_pool.list, i)
{
od_route_t *route;
route = od_container_of(i, od_route_t, link);
/* update stats per route */
od_routepool_stat(&router->route_pool, od_cron_stat_cb,
cron->stat_time_us,
router);
/* gather statistics per route server pool */
od_stat_t stats;
od_stat_init(&stats);
od_serverpool_foreach(&route->server_pool, OD_SACTIVE,
od_cron_stats_server,
&stats);
od_serverpool_foreach(&route->server_pool, OD_SIDLE,
od_cron_stats_server,
&stats);
/* calculate average between previous sample and the
current one */
int interval = instance->config.stats_interval;
uint64_t query = 0;
uint64_t tx = 0;
uint64_t query_time = 0;
uint64_t tx_time = 0;
uint64_t recv_client = 0;
uint64_t recv_server = 0;
/* ensure server stats not changed due to a
server connection close */
int64_t query_diff_sanity;
query_diff_sanity = (stats.count_query - route->cron_stats.count_query);
if (query_diff_sanity >= 0)
{
/* query */
uint64_t query_prev = route->cron_stats.count_query / interval;
uint64_t query_current = stats.count_query / interval;
int64_t query_diff = query_current - query_prev;
query = query_diff / interval;
if (query_diff > 0)
query_time = (stats.query_time - route->cron_stats.query_time) / query_diff;
/* transaction */
uint64_t tx_prev = route->cron_stats.count_tx / interval;
uint64_t tx_current = stats.count_tx / interval;
int64_t tx_diff = tx_current - tx_prev;
tx = tx_diff / interval;
if (tx_diff > 0)
tx_time = (stats.tx_time - route->cron_stats.tx_time) / tx_diff;
/* recv client */
uint64_t recv_client_prev = route->cron_stats.recv_client / interval;
uint64_t recv_client_current = stats.recv_client / interval;
recv_client = (recv_client_current - recv_client_prev) / interval;
/* recv server */
uint64_t recv_server_prev = route->cron_stats.recv_server / interval;
uint64_t recv_server_current = recv_server_current = stats.recv_server / interval;
recv_server = (recv_server_current - recv_server_prev) / interval;
}
/* update current stats */
route->cron_stats = stats;
/* update avg stats */
route->cron_stats_avg.count_query = query;
route->cron_stats_avg.count_tx = tx;
route->cron_stats_avg.query_time = query_time;
route->cron_stats_avg.tx_time = tx_time;
route->cron_stats_avg.recv_client = recv_client;
route->cron_stats_avg.recv_server = recv_server;
if (instance->config.log_stats) {
od_log(&instance->logger, "stats", NULL, NULL,
"[%.*s.%.*s] clients %d, "
"pool_active %d, "
"pool_idle %d "
"rps %" PRIu64 " "
"tps %" PRIu64 " "
"query_time_us %" PRIu64 " "
"tx_time_us %" PRIu64 " "
"recv_client_bytes %" PRIu64 " "
"recv_server_bytes %" PRIu64,
route->id.database_len - 1,
route->id.database,
route->id.user_len - 1,
route->id.user,
od_clientpool_total(&route->client_pool),
route->server_pool.count_active,
route->server_pool.count_idle,
query,
tx,
query_time,
tx_time,
recv_client,
recv_server);
}
}
/* update current stat time mark */
cron->stat_time_us = machine_time();
}
static inline int
@ -283,15 +219,17 @@ od_cron(void *arg)
od_router_t *router = cron->global->router;
od_instance_t *instance = cron->global->instance;
cron->stat_time_us = machine_time();
int stats_tick = 0;
for (;;)
{
/* mark and sweep expired idle server connections */
od_cron_expire(cron);
/* update stats */
/* update statistics */
if (++stats_tick >= instance->config.stats_interval) {
od_cron_stats(router);
od_cron_stat(router);
stats_tick = 0;
}
@ -303,6 +241,7 @@ od_cron(void *arg)
void od_cron_init(od_cron_t *cron, od_global_t *global)
{
cron->global = global;
cron->stat_time_us = 0;
}
int od_cron_start(od_cron_t *cron)

View File

@ -11,6 +11,7 @@ typedef struct od_cron od_cron_t;
struct od_cron
{
uint64_t stat_time_us;
od_global_t *global;
};

View File

@ -13,8 +13,7 @@ struct od_route
{
od_configroute_t *config;
od_routeid_t id;
od_stat_t cron_stats;
od_stat_t cron_stats_avg;
od_stat_t stats_cron;
int stats_mark;
od_serverpool_t server_pool;
od_clientpool_t client_pool;
@ -29,8 +28,7 @@ od_route_init(od_route_t *route)
od_serverpool_init(&route->server_pool);
od_clientpool_init(&route->client_pool);
route->stats_mark = 0;
od_stat_init(&route->cron_stats);
od_stat_init(&route->cron_stats_avg);
od_stat_init(&route->stats_cron);
od_list_init(&route->link);
}

View File

@ -166,15 +166,23 @@ od_routepool_client_foreach(od_routepool_t *pool, od_clientstate_t state,
}
static inline int
od_routepool_stats_mark(od_routepool_t *pool,
char *database,
int database_len,
od_stat_t *total,
od_stat_t *avg)
od_routepool_stat_server(od_server_t *server, void *arg)
{
od_stat_t *stats = arg;
od_stat_sum(stats, &server->stats);
return 0;
}
static inline void
od_routepool_stat_database_mark(od_routepool_t *pool,
char *database,
int database_len,
od_stat_t *current,
od_stat_t *prev)
{
int match = 0;
od_list_t *i;
od_list_foreach(&pool->list, i) {
od_list_foreach(&pool->list, i)
{
od_route_t *route;
route = od_container_of(i, od_route_t, link);
if (route->stats_mark)
@ -184,72 +192,109 @@ od_routepool_stats_mark(od_routepool_t *pool,
if (memcmp(route->id.database, database, database_len) != 0)
continue;
total->count_query += route->cron_stats.count_query;
total->count_tx += route->cron_stats.count_tx;
total->query_time += route->cron_stats.query_time;
total->tx_time += route->cron_stats.tx_time;
total->recv_client += route->cron_stats.recv_client;
total->recv_server += route->cron_stats.recv_server;
/* sum actual stats per server */
od_serverpool_foreach(&route->server_pool, OD_SACTIVE,
od_routepool_stat_server,
current);
avg->count_query += route->cron_stats_avg.count_query;
avg->count_tx += route->cron_stats_avg.count_tx;
avg->query_time += route->cron_stats_avg.query_time;
avg->tx_time += route->cron_stats_avg.tx_time;
avg->recv_client += route->cron_stats_avg.recv_client;
avg->recv_server += route->cron_stats_avg.recv_server;
od_serverpool_foreach(&route->server_pool, OD_SIDLE,
od_routepool_stat_server,
current);
/* sum previous cron stats per route */
od_stat_sum(prev, &route->stats_cron);
route->stats_mark++;
match++;
}
return match;
}
static inline void
od_routepool_stats_unmark(od_routepool_t *pool)
od_routepool_stat_unmark(od_routepool_t *pool)
{
od_route_t *route;
od_list_t *i, *n;
od_list_foreach_safe(&pool->list, i, n) {
od_list_t *i;
od_list_foreach(&pool->list, i) {
route = od_container_of(i, od_route_t, link);
route->stats_mark = 0;
}
}
int
od_routepool_stats(od_routepool_t *pool,
od_routepool_stats_function_t callback, void *arg)
od_routepool_stat_database(od_routepool_t *pool,
od_routepool_stat_database_cb_t callback,
uint64_t prev_time_us,
void *arg)
{
od_route_t *route;
od_list_t *i, *n;
od_list_foreach_safe(&pool->list, i, n) {
od_list_t *i;
od_list_foreach(&pool->list, i)
{
route = od_container_of(i, od_route_t, link);
if (route->stats_mark)
continue;
od_stat_t total;
od_stat_t avg;
od_stat_init(&total);
od_stat_init(&avg);
int match;
match = od_routepool_stats_mark(pool,
/* gather current and previous cron stats */
od_stat_t current;
od_stat_t prev;
od_stat_init(&current);
od_stat_init(&prev);
od_routepool_stat_database_mark(pool,
route->id.database,
route->id.database_len,
&total, &avg);
assert(match > 0);
&current, &prev);
/* calculate average */
od_stat_t avg;
od_stat_init(&avg);
od_stat_average(&avg, &current, &prev, prev_time_us);
avg.count_query /= match;
avg.count_tx /= match;
avg.query_time /= match;
avg.tx_time /= match;
avg.recv_client /= match;
avg.recv_server /= match;
int rc;
rc = callback(route->id.database, route->id.database_len - 1,
&total, &avg, arg);
&current, &avg, arg);
if (rc == -1) {
od_routepool_stats_unmark(pool);
od_routepool_stat_unmark(pool);
return -1;
}
}
od_routepool_stats_unmark(pool);
od_routepool_stat_unmark(pool);
return 0;
}
int
od_routepool_stat(od_routepool_t *pool,
od_routepool_stat_cb_t callback,
uint64_t prev_time_us,
void *arg)
{
od_list_t *i;
od_list_foreach(&pool->list, i)
{
od_route_t *route;
route = od_container_of(i, od_route_t, link);
/* gather current statistics per route server pool */
od_stat_t current;
od_stat_init(&current);
od_serverpool_foreach(&route->server_pool, OD_SACTIVE,
od_routepool_stat_server,
&current);
od_serverpool_foreach(&route->server_pool, OD_SIDLE,
od_routepool_stat_server,
&current);
/* calculate average */
od_stat_t avg;
od_stat_init(&avg);
od_stat_average(&avg, &current, &route->stats_cron, prev_time_us);
int rc;
rc = callback(route, &current, &avg, arg);
if (rc == -1)
return -1;
}
return 0;
}

View File

@ -9,7 +9,12 @@
typedef struct od_routepool od_routepool_t;
typedef int (*od_routepool_stats_function_t)
typedef int (*od_routepool_stat_cb_t)
(od_route_t *route,
od_stat_t *current,
od_stat_t *avg, void *arg);
typedef int (*od_routepool_stat_database_cb_t)
(char *database,
int database_len,
od_stat_t *total,
@ -43,8 +48,14 @@ od_client_t*
od_routepool_client_foreach(od_routepool_t*, od_clientstate_t,
od_clientpool_cb_t, void*);
int od_routepool_stats(od_routepool_t *pool,
od_routepool_stats_function_t,
void*);
int od_routepool_stat_database(od_routepool_t *pool,
od_routepool_stat_database_cb_t,
uint64_t,
void*);
int od_routepool_stat(od_routepool_t *pool,
od_routepool_stat_cb_t,
uint64_t,
void*);
#endif /* OD_ROUTE_POOL_H */

View File

@ -21,7 +21,7 @@ struct od_stat
uint64_t tx_time_start;
od_atomic_u64_t recv_server;
od_atomic_u64_t recv_client;
uint64_t count_error;
od_atomic_u64_t count_error;
};
static inline void
@ -100,7 +100,49 @@ od_stat_recv_client(od_stat_t *stat, uint64_t bytes)
static inline void
od_stat_error(od_stat_t *stat)
{
stat->count_error++;
od_atomic_u64_inc(&stat->count_error);
}
static inline void
od_stat_sum(od_stat_t *sum, od_stat_t *stat)
{
sum->count_query += od_atomic_u64_of(&stat->count_query);
sum->count_tx += od_atomic_u64_of(&stat->count_tx);
sum->query_time += od_atomic_u64_of(&stat->query_time);
sum->tx_time += od_atomic_u64_of(&stat->tx_time);
sum->recv_client += od_atomic_u64_of(&stat->recv_client);
sum->recv_server += od_atomic_u64_of(&stat->recv_server);
}
static inline void
od_stat_average(od_stat_t *avg, od_stat_t *current, od_stat_t *prev,
uint64_t prev_time_us)
{
const uint64_t interval_usec = 1000000;
uint64_t interval_us;
interval_us = machine_time() - prev_time_us;
if (interval_us <= 0)
return;
uint64_t count_query;
uint64_t count_tx;
count_query = current->count_query - prev->count_query;
count_tx = current->count_tx - prev->count_tx;
avg->count_query = (count_query * interval_usec) / interval_us;
avg->count_tx = (count_tx * interval_usec) / interval_us;
if (count_query > 0)
avg->query_time = (current->query_time - prev->query_time) / count_query;
if (count_tx > 0)
avg->tx_time = (current->tx_time - prev->tx_time) / count_tx;
avg->recv_client = ((current->recv_client - prev->recv_client) * interval_usec) /
interval_us;
avg->recv_server = ((current->recv_server - prev->recv_server) * interval_usec) /
interval_us;
}
#endif /* OD_STAT_H */