odissey: rename periodic service to cron

This commit is contained in:
Dmitry Simonenko 2018-03-02 16:12:32 +03:00
parent ad5c46954e
commit 99fca366ef
11 changed files with 82 additions and 81 deletions

View File

@ -36,9 +36,9 @@ Repository: [github/shapito](https://github.yandex-team.ru/pmwkaa/shapito).
.--------. .---------. .---------. .---------.
| router | | servers | | worker0 | ... | workerN |
'--------' '---------' '---------' '---------'
.---------. .----------. thread thread
| console | | periodic |
'---------' '----------'
.---------. .------. thread thread
| console | | cron |
'---------' '------'
```
#### Instance
@ -52,7 +52,7 @@ Run pooler and worker\_pool threads.
#### Pooler
Start router, periodic and console subsystems.
Start router, cron and console subsystems.
Create listen server one for each resolved address. Each listen server runs inside own coroutine.
Server coroutine mostly waits on `machine_accept()`.
@ -76,12 +76,12 @@ router and waits for reply. Could be a potential hot spot (not an issue at the m
[sources/router.h](sources/router.h), [sources/router.c](sources/router.c)
#### Periodic
#### Cron
Do periodic service tasks, like idle server connection expiration and
database scheme obsoletion.
[sources/periodic.h](sources/periodic.h), [sources/periodic.c](sources/periodic.c)
[sources/cron.h](sources/cron.h), [sources/cron.c](sources/cron.c)
#### Worker and worker pool

View File

@ -24,7 +24,7 @@ set(od_src
auth.c
auth_query.c
cancel.c
periodic.c
cron.c
tls.c
main.c
)

View File

@ -46,10 +46,10 @@
#include "sources/worker.h"
#include "sources/frontend.h"
#include "sources/backend.h"
#include "sources/periodic.h"
#include "sources/cron.h"
static inline int
od_periodic_stats_server(od_server_t *server, void *arg)
od_cron_stats_server(od_server_t *server, void *arg)
{
od_serverstat_t *stats = arg;
stats->query_time += od_atomic_u64_of(&server->stats.query_time);
@ -60,7 +60,7 @@ od_periodic_stats_server(od_server_t *server, void *arg)
}
static inline void
od_periodic_stats(od_router_t *router)
od_cron_stats(od_router_t *router)
{
od_instance_t *instance = router->system->instance;
@ -104,10 +104,10 @@ od_periodic_stats(od_router_t *router)
memset(&stats, 0, sizeof(stats));
od_serverpool_foreach(&route->server_pool, OD_SACTIVE,
od_periodic_stats_server,
od_cron_stats_server,
&stats);
od_serverpool_foreach(&route->server_pool, OD_SIDLE,
od_periodic_stats_server,
od_cron_stats_server,
&stats);
/* calculate average between previous sample and the
@ -120,13 +120,13 @@ od_periodic_stats(od_router_t *router)
/* ensure server stats not changed due to a
* server connection close */
int64_t reqs_diff_sanity;
reqs_diff_sanity = (stats.count_request - route->periodic_stats.count_request);
reqs_diff_sanity = (stats.count_request - route->cron_stats.count_request);
if (reqs_diff_sanity >= 0)
{
/* request count */
uint64_t reqs_prev = 0;
reqs_prev = route->periodic_stats.count_request /
reqs_prev = route->cron_stats.count_request /
instance->scheme.stats_interval;
uint64_t reqs_current = 0;
@ -140,7 +140,7 @@ od_periodic_stats(od_router_t *router)
/* recv client */
uint64_t recv_client_prev = 0;
recv_client_prev = route->periodic_stats.recv_client /
recv_client_prev = route->cron_stats.recv_client /
instance->scheme.stats_interval;
uint64_t recv_client_current = 0;
@ -152,7 +152,7 @@ od_periodic_stats(od_router_t *router)
/* recv server */
uint64_t recv_server_prev = 0;
recv_server_prev = route->periodic_stats.recv_server /
recv_server_prev = route->cron_stats.recv_server /
instance->scheme.stats_interval;
uint64_t recv_server_current = 0;
@ -164,17 +164,17 @@ od_periodic_stats(od_router_t *router)
/* query time */
if (reqs_diff > 0)
query_time = (stats.query_time - route->periodic_stats.query_time) /
query_time = (stats.query_time - route->cron_stats.query_time) /
reqs_diff;
}
/* update stats */
route->periodic_stats = stats;
route->cron_stats = stats;
route->periodic_stats_avg.count_request = reqs;
route->periodic_stats_avg.recv_client = recv_client;
route->periodic_stats_avg.recv_server = recv_server;
route->periodic_stats_avg.query_time = query_time;
route->cron_stats_avg.count_request = reqs;
route->cron_stats_avg.recv_client = recv_client;
route->cron_stats_avg.recv_server = recv_server;
route->cron_stats_avg.query_time = query_time;
if (instance->scheme.log_stats) {
od_log(&instance->logger, "stats", NULL, NULL,
@ -203,7 +203,7 @@ od_periodic_stats(od_router_t *router)
}
static inline int
od_periodic_expire_mark(od_server_t *server, void *arg)
od_cron_expire_mark(od_server_t *server, void *arg)
{
od_router_t *router = arg;
od_instance_t *instance = router->system->instance;
@ -236,10 +236,10 @@ od_periodic_expire_mark(od_server_t *server, void *arg)
}
static inline void
od_periodic_expire(od_periodic_t *periodic)
od_cron_expire(od_cron_t *cron)
{
od_router_t *router = periodic->system->router;
od_instance_t *instance = periodic->system->instance;
od_router_t *router = cron->system->router;
od_instance_t *instance = cron->system->instance;
/* Idle servers expire.
*
@ -264,7 +264,7 @@ od_periodic_expire(od_periodic_t *periodic)
/* mark */
od_routepool_server_foreach(&router->route_pool, OD_SIDLE,
od_periodic_expire_mark,
od_cron_expire_mark,
router);
/* sweep */
@ -295,21 +295,21 @@ od_periodic_expire(od_periodic_t *periodic)
}
static void
od_periodic(void *arg)
od_cron(void *arg)
{
od_periodic_t *periodic = arg;
od_router_t *router = periodic->system->router;
od_instance_t *instance = periodic->system->instance;
od_cron_t *cron = arg;
od_router_t *router = cron->system->router;
od_instance_t *instance = cron->system->instance;
int stats_tick = 0;
for (;;)
{
/* mark and sweep expired idle server connections */
od_periodic_expire(periodic);
od_cron_expire(cron);
/* update stats */
if (++stats_tick >= instance->scheme.stats_interval) {
od_periodic_stats(router);
od_cron_stats(router);
stats_tick = 0;
}
@ -318,19 +318,19 @@ od_periodic(void *arg)
}
}
void od_periodic_init(od_periodic_t *periodic, od_system_t *system)
void od_cron_init(od_cron_t *cron, od_system_t *system)
{
periodic->system = system;
cron->system = system;
}
int od_periodic_start(od_periodic_t *periodic)
int od_cron_start(od_cron_t *cron)
{
od_instance_t *instance = periodic->system->instance;
od_instance_t *instance = cron->system->instance;
int64_t coroutine_id;
coroutine_id = machine_coroutine_create(od_periodic, periodic);
coroutine_id = machine_coroutine_create(od_cron, cron);
if (coroutine_id == -1) {
od_error(&instance->logger, "periodic", NULL, NULL,
"failed to start periodic coroutine");
od_error(&instance->logger, "cron", NULL, NULL,
"failed to start cron coroutine");
return -1;
}
return 0;

20
sources/cron.h Normal file
View File

@ -0,0 +1,20 @@
#ifndef OD_CRON_H
#define OD_CRON_H
/*
* Odissey.
*
* Advanced PostgreSQL connection pooler.
*/
typedef struct od_cron od_cron_t;
struct od_cron
{
od_system_t *system;
};
void od_cron_init(od_cron_t*, od_system_t*);
int od_cron_start(od_cron_t*);
#endif /* OD_CRON_H */

View File

@ -44,7 +44,7 @@
#include "sources/router.h"
#include "sources/console.h"
#include "sources/pooler.h"
#include "sources/periodic.h"
#include "sources/cron.h"
#include "sources/worker.h"
#include "sources/worker_pool.h"
#include "sources/frontend.h"
@ -198,7 +198,7 @@ int od_instance_main(od_instance_t *instance, int argc, char **argv)
od_router_t router;
od_console_t console;
od_periodic_t periodic;
od_cron_t cron;
od_workerpool_t worker_pool;
od_system_t *system;
@ -207,12 +207,12 @@ int od_instance_main(od_instance_t *instance, int argc, char **argv)
system->pooler = &pooler;
system->router = &router;
system->console = &console;
system->periodic = &periodic;
system->cron = &cron;
system->worker_pool = &worker_pool;
od_router_init(&router, system);
od_console_init(&console, system);
od_periodic_init(&periodic, system);
od_cron_init(&cron, system);
od_workerpool_init(&worker_pool);
/* start pooler machine thread */

View File

@ -1,19 +0,0 @@
#ifndef OD_PERIODIC_H
#define OD_PERIODIC_H
/*
* Odissey.
*
* Advanced PostgreSQL connection pooler.
*/
typedef struct od_periodic_t od_periodic_t;
struct od_periodic_t {
od_system_t *system;
};
void od_periodic_init(od_periodic_t*, od_system_t*);
int od_periodic_start(od_periodic_t*);
#endif /* OD_PERIODIC_H */

View File

@ -48,7 +48,7 @@
#include "sources/worker.h"
#include "sources/worker_pool.h"
#include "sources/pooler.h"
#include "sources/periodic.h"
#include "sources/cron.h"
#include "sources/tls.h"
static inline void
@ -353,9 +353,9 @@ od_pooler(void *arg)
if (rc == -1)
return;
/* start periodic coroutine */
od_periodic_t *periodic = pooler->system.periodic;
rc = od_periodic_start(periodic);
/* start cron coroutine */
od_cron_t *cron = pooler->system.cron;
rc = od_cron_start(cron);
if (rc == -1)
return;

View File

@ -13,8 +13,8 @@ struct od_route
{
od_schemeroute_t *scheme;
od_routeid_t id;
od_serverstat_t periodic_stats;
od_serverstat_t periodic_stats_avg;
od_serverstat_t cron_stats;
od_serverstat_t cron_stats_avg;
int stats_mark;
od_serverpool_t server_pool;
od_clientpool_t client_pool;
@ -29,8 +29,8 @@ od_route_init(od_route_t *route)
od_serverpool_init(&route->server_pool);
od_clientpool_init(&route->client_pool);
route->stats_mark = 0;
memset(&route->periodic_stats, 0, sizeof(route->periodic_stats));
memset(&route->periodic_stats_avg, 0, sizeof(route->periodic_stats_avg));
memset(&route->cron_stats, 0, sizeof(route->cron_stats));
memset(&route->cron_stats_avg, 0, sizeof(route->cron_stats_avg));
od_list_init(&route->link);
}

View File

@ -188,15 +188,15 @@ od_routepool_stats_mark(od_routepool_t *pool,
if (memcmp(route->id.database, database, database_len) != 0)
continue;
total->count_request += route->periodic_stats.count_request;
total->query_time += route->periodic_stats.query_time;
total->recv_client += route->periodic_stats.recv_client;
total->recv_server += route->periodic_stats.recv_server;
total->count_request += route->cron_stats.count_request;
total->query_time += route->cron_stats.query_time;
total->recv_client += route->cron_stats.recv_client;
total->recv_server += route->cron_stats.recv_server;
avg->count_request += route->periodic_stats_avg.count_request;
avg->query_time += route->periodic_stats_avg.query_time;
avg->recv_client += route->periodic_stats_avg.recv_client;
avg->recv_server += route->periodic_stats_avg.recv_server;
avg->count_request += route->cron_stats_avg.count_request;
avg->query_time += route->cron_stats_avg.query_time;
avg->recv_client += route->cron_stats_avg.recv_client;
avg->recv_server += route->cron_stats_avg.recv_server;
route->stats_mark++;
match++;

View File

@ -47,7 +47,7 @@
#include "sources/frontend.h"
#include "sources/backend.h"
#include "sources/cancel.h"
#include "sources/periodic.h"
#include "sources/cron.h"
typedef struct
{

View File

@ -15,7 +15,7 @@ struct od_system
void *pooler;
void *router;
void *console;
void *periodic;
void *cron;
void *worker_pool;
};