mirror of https://github.com/yandex/odyssey.git
odissey: add periodic task, stats and expire
This commit is contained in:
parent
7cc03473ea
commit
b57bc4ddd7
|
@ -0,0 +1,42 @@
|
|||
|
||||
set(od_binary odissey)
|
||||
set(od_src
|
||||
od_pid.c
|
||||
od_syslog.c
|
||||
od_log.c
|
||||
od_daemon.c
|
||||
od_lex.c
|
||||
od_scheme.c
|
||||
od_config.c
|
||||
od_instance.c
|
||||
od_server_pool.c
|
||||
od_client_pool.c
|
||||
od_route_pool.c
|
||||
od_io.c
|
||||
od_pooler.c
|
||||
od_router.c
|
||||
od_relay.c
|
||||
od_frontend.c
|
||||
od_backend.c
|
||||
od_auth.c
|
||||
od_cancel.c
|
||||
od_periodic.c
|
||||
#od_router_transaction.c
|
||||
#od_tls.c
|
||||
odissey.c
|
||||
)
|
||||
|
||||
configure_file("od_version.h.cmake" "od_version.h")
|
||||
|
||||
include_directories("${PROJECT_SOURCE_DIR}/src")
|
||||
include_directories("${PROJECT_BINARY_DIR}/src")
|
||||
|
||||
add_executable(${od_binary} ${od_src})
|
||||
add_dependencies(${od_binary} build_libs)
|
||||
|
||||
if(THREADS_HAVE_PTHREAD_ARG)
|
||||
set_property(TARGET ${od_binary} PROPERTY COMPILE_OPTIONS "-pthread")
|
||||
set_property(TARGET ${od_binary} PROPERTY INTERFACE_COMPILE_OPTIONS "-pthread")
|
||||
endif()
|
||||
|
||||
target_link_libraries(${od_binary} ${od_libraries} ${CMAKE_THREAD_LIBS_INIT})
|
|
@ -67,7 +67,7 @@ void od_instance_free(od_instance_t *instance)
|
|||
static inline void
|
||||
od_usage(od_instance_t *instance, char *path)
|
||||
{
|
||||
od_log(&instance->log, NULL, "odissey (version: %s %s)",
|
||||
od_log(&instance->log, NULL, "odissey (build: %s %s)",
|
||||
OD_VERSION_GIT,
|
||||
OD_VERSION_BUILD);
|
||||
od_log(&instance->log, NULL, "usage: %s <config_file>", path);
|
||||
|
@ -122,7 +122,7 @@ int od_instance_main(od_instance_t *instance, int argc, char **argv)
|
|||
instance->scheme.syslog_ident,
|
||||
instance->scheme.syslog_facility);
|
||||
}
|
||||
od_log(&instance->log, NULL, "odissey (version: %s %s)",
|
||||
od_log(&instance->log, NULL, "odissey (build: %s %s)",
|
||||
OD_VERSION_GIT,
|
||||
OD_VERSION_BUILD);
|
||||
od_log(&instance->log, NULL, "");
|
||||
|
|
|
@ -0,0 +1,174 @@
|
|||
|
||||
/*
|
||||
* odissey.
|
||||
*
|
||||
* PostgreSQL connection pooler and request router.
|
||||
*/
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <stdarg.h>
|
||||
#include <stdint.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <signal.h>
|
||||
|
||||
#include <machinarium.h>
|
||||
#include <soprano.h>
|
||||
|
||||
#include "od_macro.h"
|
||||
#include "od_version.h"
|
||||
#include "od_list.h"
|
||||
#include "od_pid.h"
|
||||
#include "od_syslog.h"
|
||||
#include "od_log.h"
|
||||
#include "od_daemon.h"
|
||||
#include "od_scheme.h"
|
||||
#include "od_lex.h"
|
||||
#include "od_config.h"
|
||||
#include "od_msg.h"
|
||||
#include "od_system.h"
|
||||
#include "od_instance.h"
|
||||
|
||||
#include "od_server.h"
|
||||
#include "od_server_pool.h"
|
||||
#include "od_client.h"
|
||||
#include "od_client_pool.h"
|
||||
#include "od_route_id.h"
|
||||
#include "od_route.h"
|
||||
#include "od_route_pool.h"
|
||||
#include "od_io.h"
|
||||
|
||||
#include "od_pooler.h"
|
||||
#include "od_router.h"
|
||||
#include "od_relay.h"
|
||||
#include "od_frontend.h"
|
||||
#include "od_backend.h"
|
||||
#include "od_periodic.h"
|
||||
|
||||
static inline void
|
||||
od_periodic_stats (od_router_t *router)
|
||||
{
|
||||
od_instance_t *instance = router->system->instance;
|
||||
if (router->route_pool.count == 0)
|
||||
return;
|
||||
od_log(&instance->log, NULL, "statistics");
|
||||
od_list_t *i;
|
||||
od_list_foreach(&router->route_pool.list, i) {
|
||||
od_route_t *route;
|
||||
route = od_container_of(i, od_route_t, link);
|
||||
od_log(&instance->log, NULL,
|
||||
" [%.*s, %.*s] clients %d, "
|
||||
"pool_active %d, "
|
||||
"pool_idle %d ",
|
||||
route->id.database_len,
|
||||
route->id.database,
|
||||
route->id.user_len,
|
||||
route->id.user,
|
||||
route->client_pool.count_active,
|
||||
route->server_pool.count_active,
|
||||
route->server_pool.count_idle);
|
||||
}
|
||||
}
|
||||
|
||||
static inline int
|
||||
od_expire_mark(od_server_t *server, void *arg)
|
||||
{
|
||||
od_router_t *router = arg;
|
||||
od_instance_t *instance = router->system->instance;
|
||||
|
||||
od_route_t *route = server->route;
|
||||
/*
|
||||
if (! machine_connected(server->io)) {
|
||||
od_serverpool_set(&route->server_pool, server,
|
||||
OD_SCLOSE);
|
||||
return 0;
|
||||
}
|
||||
*/
|
||||
if (! route->scheme->ttl)
|
||||
return 0;
|
||||
od_debug(&instance->log, server->io, "S: idle time: %d",
|
||||
server->idle_time);
|
||||
if (server->idle_time < route->scheme->ttl) {
|
||||
server->idle_time++;
|
||||
return 0;
|
||||
}
|
||||
od_serverpool_set(&route->server_pool, server,
|
||||
OD_SEXPIRE);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
void od_periodic(void *arg)
|
||||
{
|
||||
od_router_t *router = arg;
|
||||
od_instance_t *instance = router->system->instance;
|
||||
|
||||
int tick = 0;
|
||||
for (;;)
|
||||
{
|
||||
/* idle servers expire.
|
||||
*
|
||||
* 1. Add plus one idle second on each traversal.
|
||||
* If a server idle time is equal to ttl, then move
|
||||
* it to the EXPIRE queue.
|
||||
*
|
||||
* It is important that this function must not yield.
|
||||
*
|
||||
* 2. Foreach servers in EXPIRE queue, send Terminate
|
||||
* and close the connection.
|
||||
*/
|
||||
|
||||
/* mark servers for gc */
|
||||
od_routepool_foreach(&router->route_pool, OD_SIDLE,
|
||||
od_expire_mark,
|
||||
router);
|
||||
|
||||
#if 0
|
||||
/* sweep disconnected servers */
|
||||
for (;;) {
|
||||
od_server_t *server;
|
||||
server = od_routepool_next(&router->route_pool, OD_SCLOSE);
|
||||
if (server == NULL)
|
||||
break;
|
||||
od_log(&instance->log, server->io, "S: disconnected",
|
||||
server->idle_time);
|
||||
server->idle_time = 0;
|
||||
/*
|
||||
od_beclose(server);
|
||||
*/
|
||||
}
|
||||
#endif
|
||||
|
||||
/* sweep expired connections */
|
||||
for (;;) {
|
||||
od_server_t *server;
|
||||
server = od_routepool_next(&router->route_pool, OD_SEXPIRE);
|
||||
if (server == NULL)
|
||||
break;
|
||||
od_debug(&instance->log, server->io,
|
||||
"S: closing idle connection (%d secs)",
|
||||
server->idle_time);
|
||||
server->idle_time = 0;
|
||||
|
||||
od_route_t *route = server->route;
|
||||
server->route = NULL;
|
||||
od_serverpool_set(&route->server_pool, server, OD_SUNDEF);
|
||||
|
||||
machine_io_attach(server->io);
|
||||
od_backend_terminate(server);
|
||||
od_backend_close(server);
|
||||
}
|
||||
|
||||
/* stats */
|
||||
if (instance->scheme.stats_period > 0) {
|
||||
tick++;
|
||||
if (tick >= instance->scheme.stats_period) {
|
||||
od_periodic_stats(router);
|
||||
tick = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/* 1 second soft interval */
|
||||
machine_sleep(1000);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
#ifndef OD_PERIODIC_H
|
||||
#define OD_PERIODIC_H
|
||||
|
||||
/*
|
||||
* odissey.
|
||||
*
|
||||
* PostgreSQL connection pooler and request router.
|
||||
*/
|
||||
|
||||
void od_periodic(void*);
|
||||
|
||||
#endif /* OD_PERIODIC_H */
|
|
@ -12,7 +12,7 @@ typedef struct od_routepool od_routepool_t;
|
|||
struct od_routepool
|
||||
{
|
||||
od_list_t list;
|
||||
int count;
|
||||
int count;
|
||||
};
|
||||
|
||||
void od_routepool_init(od_routepool_t*);
|
||||
|
|
|
@ -44,6 +44,7 @@
|
|||
#include "od_frontend.h"
|
||||
#include "od_backend.h"
|
||||
#include "od_cancel.h"
|
||||
#include "od_periodic.h"
|
||||
|
||||
typedef struct
|
||||
{
|
||||
|
@ -139,6 +140,7 @@ on_connect:
|
|||
od_clientpool_set(&route->client_pool, client, OD_CACTIVE);
|
||||
msg_attach->status = OD_ROK;
|
||||
client->server = server;
|
||||
server->idle_time = 0;
|
||||
machine_queue_put(msg_attach->response, msg);
|
||||
}
|
||||
|
||||
|
@ -150,6 +152,15 @@ od_router(void *arg)
|
|||
|
||||
od_log(&instance->log, NULL, "router: started");
|
||||
|
||||
/* start periodic task coroutine */
|
||||
int64_t coroutine_id;
|
||||
coroutine_id = machine_coroutine_create(od_periodic, router);
|
||||
if (coroutine_id == -1) {
|
||||
od_error(&instance->log, NULL,
|
||||
"failed to create periodic coroutine");
|
||||
return;
|
||||
}
|
||||
|
||||
for (;;)
|
||||
{
|
||||
machine_msg_t msg;
|
||||
|
@ -202,7 +213,6 @@ od_router(void *arg)
|
|||
od_msgrouter_t *msg_attach;
|
||||
msg_attach = machine_msg_get_data(msg);
|
||||
|
||||
int64_t coroutine_id;
|
||||
coroutine_id = machine_coroutine_create(od_router_attacher, msg);
|
||||
if (coroutine_id == -1) {
|
||||
msg_attach->status = OD_RERROR;
|
||||
|
|
|
@ -35,7 +35,7 @@ void od_scheme_init(od_scheme_t *scheme)
|
|||
scheme->backlog = 128;
|
||||
scheme->nodelay = 1;
|
||||
scheme->keepalive = 7200;
|
||||
scheme->readahead = 8096;
|
||||
scheme->readahead = 8192;
|
||||
scheme->workers = 1;
|
||||
scheme->client_max = 100;
|
||||
scheme->tls_verify = OD_TDISABLE;
|
||||
|
|
Loading…
Reference in New Issue