From b57bc4ddd7bd65cea1961cc4ae128c7aa1a145d4 Mon Sep 17 00:00:00 2001 From: Dmitry Simonenko Date: Wed, 31 May 2017 14:38:06 +0300 Subject: [PATCH] odissey: add periodic task, stats and expire --- src/CMakeLists.txt | 42 +++++++++++ src/od_instance.c | 4 +- src/od_periodic.c | 174 ++++++++++++++++++++++++++++++++++++++++++++ src/od_periodic.h | 12 +++ src/od_route_pool.h | 2 +- src/od_router.c | 12 ++- src/od_scheme.c | 2 +- 7 files changed, 243 insertions(+), 5 deletions(-) create mode 100644 src/CMakeLists.txt create mode 100644 src/od_periodic.c create mode 100644 src/od_periodic.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt new file mode 100644 index 00000000..99daeebb --- /dev/null +++ b/src/CMakeLists.txt @@ -0,0 +1,42 @@ + +set(od_binary odissey) +set(od_src + od_pid.c + od_syslog.c + od_log.c + od_daemon.c + od_lex.c + od_scheme.c + od_config.c + od_instance.c + od_server_pool.c + od_client_pool.c + od_route_pool.c + od_io.c + od_pooler.c + od_router.c + od_relay.c + od_frontend.c + od_backend.c + od_auth.c + od_cancel.c + od_periodic.c + #od_router_transaction.c + #od_tls.c + odissey.c +) + +configure_file("od_version.h.cmake" "od_version.h") + +include_directories("${PROJECT_SOURCE_DIR}/src") +include_directories("${PROJECT_BINARY_DIR}/src") + +add_executable(${od_binary} ${od_src}) +add_dependencies(${od_binary} build_libs) + +if(THREADS_HAVE_PTHREAD_ARG) + set_property(TARGET ${od_binary} PROPERTY COMPILE_OPTIONS "-pthread") + set_property(TARGET ${od_binary} PROPERTY INTERFACE_COMPILE_OPTIONS "-pthread") +endif() + +target_link_libraries(${od_binary} ${od_libraries} ${CMAKE_THREAD_LIBS_INIT}) diff --git a/src/od_instance.c b/src/od_instance.c index 8a8a2dfd..56a88a84 100644 --- a/src/od_instance.c +++ b/src/od_instance.c @@ -67,7 +67,7 @@ void od_instance_free(od_instance_t *instance) static inline void od_usage(od_instance_t *instance, char *path) { - od_log(&instance->log, NULL, "odissey (version: %s %s)", + od_log(&instance->log, NULL, "odissey (build: %s %s)", OD_VERSION_GIT, OD_VERSION_BUILD); od_log(&instance->log, NULL, "usage: %s ", path); @@ -122,7 +122,7 @@ int od_instance_main(od_instance_t *instance, int argc, char **argv) instance->scheme.syslog_ident, instance->scheme.syslog_facility); } - od_log(&instance->log, NULL, "odissey (version: %s %s)", + od_log(&instance->log, NULL, "odissey (build: %s %s)", OD_VERSION_GIT, OD_VERSION_BUILD); od_log(&instance->log, NULL, ""); diff --git a/src/od_periodic.c b/src/od_periodic.c new file mode 100644 index 00000000..9fc54cd4 --- /dev/null +++ b/src/od_periodic.c @@ -0,0 +1,174 @@ + +/* + * odissey. + * + * PostgreSQL connection pooler and request router. +*/ + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "od_macro.h" +#include "od_version.h" +#include "od_list.h" +#include "od_pid.h" +#include "od_syslog.h" +#include "od_log.h" +#include "od_daemon.h" +#include "od_scheme.h" +#include "od_lex.h" +#include "od_config.h" +#include "od_msg.h" +#include "od_system.h" +#include "od_instance.h" + +#include "od_server.h" +#include "od_server_pool.h" +#include "od_client.h" +#include "od_client_pool.h" +#include "od_route_id.h" +#include "od_route.h" +#include "od_route_pool.h" +#include "od_io.h" + +#include "od_pooler.h" +#include "od_router.h" +#include "od_relay.h" +#include "od_frontend.h" +#include "od_backend.h" +#include "od_periodic.h" + +static inline void +od_periodic_stats (od_router_t *router) +{ + od_instance_t *instance = router->system->instance; + if (router->route_pool.count == 0) + return; + od_log(&instance->log, NULL, "statistics"); + od_list_t *i; + od_list_foreach(&router->route_pool.list, i) { + od_route_t *route; + route = od_container_of(i, od_route_t, link); + od_log(&instance->log, NULL, + " [%.*s, %.*s] clients %d, " + "pool_active %d, " + "pool_idle %d ", + route->id.database_len, + route->id.database, + route->id.user_len, + route->id.user, + route->client_pool.count_active, + route->server_pool.count_active, + route->server_pool.count_idle); + } +} + +static inline int +od_expire_mark(od_server_t *server, void *arg) +{ + od_router_t *router = arg; + od_instance_t *instance = router->system->instance; + + od_route_t *route = server->route; + /* + if (! machine_connected(server->io)) { + od_serverpool_set(&route->server_pool, server, + OD_SCLOSE); + return 0; + } + */ + if (! route->scheme->ttl) + return 0; + od_debug(&instance->log, server->io, "S: idle time: %d", + server->idle_time); + if (server->idle_time < route->scheme->ttl) { + server->idle_time++; + return 0; + } + od_serverpool_set(&route->server_pool, server, + OD_SEXPIRE); + return 0; +} + + +void od_periodic(void *arg) +{ + od_router_t *router = arg; + od_instance_t *instance = router->system->instance; + + int tick = 0; + for (;;) + { + /* idle servers expire. + * + * 1. Add plus one idle second on each traversal. + * If a server idle time is equal to ttl, then move + * it to the EXPIRE queue. + * + * It is important that this function must not yield. + * + * 2. Foreach servers in EXPIRE queue, send Terminate + * and close the connection. + */ + + /* mark servers for gc */ + od_routepool_foreach(&router->route_pool, OD_SIDLE, + od_expire_mark, + router); + +#if 0 + /* sweep disconnected servers */ + for (;;) { + od_server_t *server; + server = od_routepool_next(&router->route_pool, OD_SCLOSE); + if (server == NULL) + break; + od_log(&instance->log, server->io, "S: disconnected", + server->idle_time); + server->idle_time = 0; + /* + od_beclose(server); + */ + } +#endif + + /* sweep expired connections */ + for (;;) { + od_server_t *server; + server = od_routepool_next(&router->route_pool, OD_SEXPIRE); + if (server == NULL) + break; + od_debug(&instance->log, server->io, + "S: closing idle connection (%d secs)", + server->idle_time); + server->idle_time = 0; + + od_route_t *route = server->route; + server->route = NULL; + od_serverpool_set(&route->server_pool, server, OD_SUNDEF); + + machine_io_attach(server->io); + od_backend_terminate(server); + od_backend_close(server); + } + + /* stats */ + if (instance->scheme.stats_period > 0) { + tick++; + if (tick >= instance->scheme.stats_period) { + od_periodic_stats(router); + tick = 0; + } + } + + /* 1 second soft interval */ + machine_sleep(1000); + } +} diff --git a/src/od_periodic.h b/src/od_periodic.h new file mode 100644 index 00000000..836ecfb6 --- /dev/null +++ b/src/od_periodic.h @@ -0,0 +1,12 @@ +#ifndef OD_PERIODIC_H +#define OD_PERIODIC_H + +/* + * odissey. + * + * PostgreSQL connection pooler and request router. +*/ + +void od_periodic(void*); + +#endif /* OD_PERIODIC_H */ diff --git a/src/od_route_pool.h b/src/od_route_pool.h index e8a95236..8e15496e 100644 --- a/src/od_route_pool.h +++ b/src/od_route_pool.h @@ -12,7 +12,7 @@ typedef struct od_routepool od_routepool_t; struct od_routepool { od_list_t list; - int count; + int count; }; void od_routepool_init(od_routepool_t*); diff --git a/src/od_router.c b/src/od_router.c index b8924fe8..4a0f5a1d 100644 --- a/src/od_router.c +++ b/src/od_router.c @@ -44,6 +44,7 @@ #include "od_frontend.h" #include "od_backend.h" #include "od_cancel.h" +#include "od_periodic.h" typedef struct { @@ -139,6 +140,7 @@ on_connect: od_clientpool_set(&route->client_pool, client, OD_CACTIVE); msg_attach->status = OD_ROK; client->server = server; + server->idle_time = 0; machine_queue_put(msg_attach->response, msg); } @@ -150,6 +152,15 @@ od_router(void *arg) od_log(&instance->log, NULL, "router: started"); + /* start periodic task coroutine */ + int64_t coroutine_id; + coroutine_id = machine_coroutine_create(od_periodic, router); + if (coroutine_id == -1) { + od_error(&instance->log, NULL, + "failed to create periodic coroutine"); + return; + } + for (;;) { machine_msg_t msg; @@ -202,7 +213,6 @@ od_router(void *arg) od_msgrouter_t *msg_attach; msg_attach = machine_msg_get_data(msg); - int64_t coroutine_id; coroutine_id = machine_coroutine_create(od_router_attacher, msg); if (coroutine_id == -1) { msg_attach->status = OD_RERROR; diff --git a/src/od_scheme.c b/src/od_scheme.c index aec4c89c..059ed9b5 100644 --- a/src/od_scheme.c +++ b/src/od_scheme.c @@ -35,7 +35,7 @@ void od_scheme_init(od_scheme_t *scheme) scheme->backlog = 128; scheme->nodelay = 1; scheme->keepalive = 7200; - scheme->readahead = 8096; + scheme->readahead = 8192; scheme->workers = 1; scheme->client_max = 100; scheme->tls_verify = OD_TDISABLE;