From 703948713767fdc8e1e9115dec550f8aeea638ab Mon Sep 17 00:00:00 2001 From: Dmitry Simonenko Date: Fri, 26 May 2017 15:17:45 +0300 Subject: [PATCH] odissey: group system services; add initial router --- src/od_auth.c | 7 ++--- src/od_frontend.c | 11 ++++---- src/od_frontend.h | 2 +- src/od_instance.c | 21 ++++++++++++--- src/od_pooler.c | 21 ++++++--------- src/od_pooler.h | 13 +++++---- src/od_relay.c | 18 +++++++------ src/od_relay.h | 4 +-- src/od_router.c | 68 +++++++++++++++++++++++++++++++++++++++++++++++ src/od_router.h | 21 +++++++++++++++ src/od_system.h | 21 +++++++++++++++ 11 files changed, 164 insertions(+), 43 deletions(-) create mode 100644 src/od_router.c create mode 100644 src/od_router.h create mode 100644 src/od_system.h diff --git a/src/od_auth.c b/src/od_auth.c index c167d950..e79cb8e3 100644 --- a/src/od_auth.c +++ b/src/od_auth.c @@ -26,6 +26,7 @@ #include "od_lex.h" #include "od_config.h" #include "od_msg.h" +#include "od_system.h" #include "od_instance.h" #include "od_server.h" @@ -45,7 +46,7 @@ static inline int od_auth_frontend_cleartext(od_client_t *client) { od_relay_t *relay = client->relay; - od_instance_t *instance = relay->pooler->instance; + od_instance_t *instance = relay->system->instance; /* AuthenticationCleartextPassword */ so_stream_t *stream = &client->stream; @@ -111,7 +112,7 @@ static inline int od_auth_frontend_md5(od_client_t *client) { od_relay_t *relay = client->relay; - od_instance_t *instance = relay->pooler->instance; + od_instance_t *instance = relay->system->instance; /* generate salt */ uint32_t salt = so_password_salt(&client->key); @@ -190,7 +191,7 @@ od_auth_frontend_md5(od_client_t *client) int od_auth_frontend(od_client_t *client) { od_relay_t *relay = client->relay; - od_instance_t *instance = relay->pooler->instance; + od_instance_t *instance = relay->system->instance; /* match user scheme */ od_schemeuser_t *user_scheme = diff --git a/src/od_frontend.c b/src/od_frontend.c index 2afa1598..18f9a8a0 100644 --- a/src/od_frontend.c +++ b/src/od_frontend.c @@ -26,6 +26,7 @@ #include "od_lex.h" #include "od_config.h" #include "od_msg.h" +#include "od_system.h" #include "od_instance.h" #include "od_server.h" @@ -66,7 +67,7 @@ static int od_frontend_startup_read(od_client_t *client) { od_relay_t *relay = client->relay; - od_instance_t *instance = relay->pooler->instance; + od_instance_t *instance = relay->system->instance; so_stream_t *stream = &client->stream; so_stream_reset(stream); @@ -150,7 +151,7 @@ static inline int od_frontend_setup(od_client_t *client) { od_relay_t *relay = client->relay; - od_instance_t *instance = relay->pooler->instance; + od_instance_t *instance = relay->system->instance; so_stream_t *stream = &client->stream; so_stream_reset(stream); @@ -175,7 +176,7 @@ static inline int od_frontend_ready(od_client_t *client) { od_relay_t *relay = client->relay; - od_instance_t *instance = relay->pooler->instance; + od_instance_t *instance = relay->system->instance; so_stream_t *stream = &client->stream; so_stream_reset(stream); @@ -192,11 +193,11 @@ od_frontend_ready(od_client_t *client) return 0; } -void od_frontend_main(void *arg) +void od_frontend(void *arg) { od_client_t *client = arg; od_relay_t *relay = client->relay; - od_instance_t *instance = relay->pooler->instance; + od_instance_t *instance = relay->system->instance; od_log(&instance->log, client->io, "C: new connection"); diff --git a/src/od_frontend.h b/src/od_frontend.h index 69910103..0303b734 100644 --- a/src/od_frontend.h +++ b/src/od_frontend.h @@ -7,6 +7,6 @@ * PostgreSQL connection pooler and request router. */ -void od_frontend_main(void*); +void od_frontend(void*); #endif /* OD_FRONTEND_H */ diff --git a/src/od_instance.c b/src/od_instance.c index afb56018..358e1934 100644 --- a/src/od_instance.c +++ b/src/od_instance.c @@ -26,6 +26,7 @@ #include "od_lex.h" #include "od_config.h" #include "od_instance.h" +#include "od_system.h" #include "od_pooler.h" #include "od_relay.h" @@ -125,15 +126,27 @@ int od_instance_main(od_instance_t *instance, int argc, char **argv) if (instance->scheme.pid_file) od_pid_create(&instance->pid, instance->scheme.pid_file); - /* run connection pooler */ od_pooler_t pooler; - od_pooler_init(&pooler, instance); + od_relay_t relay; + + od_system_t system = { + .pooler = &pooler, + .relay = &relay, + .instance = instance + }; + system.task_queue = machine_queue_create(); + if (system.task_queue == NULL) { + od_error(&instance->log, NULL, "failed to create task queue"); + return 1; + } + + /* run connection pooler */ + od_pooler_init(&pooler, &system); rc = od_pooler_start(&pooler); if (rc == -1) return 1; - od_relay_t relay; - od_relay_init(&relay, &pooler); + od_relay_init(&relay, &system); rc = od_relay_start(&relay); if (rc == -1) return 1; diff --git a/src/od_pooler.c b/src/od_pooler.c index 4139fbbd..22705bdf 100644 --- a/src/od_pooler.c +++ b/src/od_pooler.c @@ -26,21 +26,21 @@ #include "od_lex.h" #include "od_config.h" #include "od_msg.h" +#include "od_system.h" #include "od_instance.h" -#include "od_pooler.h" - #include "od_server.h" #include "od_server_pool.h" #include "od_client.h" #include "od_client_pool.h" #include "od_route_id.h" #include "od_route.h" +#include "od_pooler.h" static inline void od_pooler(void *arg) { od_pooler_t *pooler = arg; - od_instance_t *instance = pooler->instance; + od_instance_t *instance = pooler->system->instance; /* init pooler tls */ int rc; @@ -158,30 +158,25 @@ od_pooler(void *arg) msg = machine_msg_create(OD_MCLIENT_NEW, sizeof(od_client_t*)); char *msg_data = machine_msg_get_data(msg); memcpy(msg_data, &client, sizeof(od_client_t*)); - machine_queue_put(pooler->task_queue, msg); + machine_queue_put(pooler->system->task_queue, msg); } } -int od_pooler_init(od_pooler_t *pooler, od_instance_t *instance) +int od_pooler_init(od_pooler_t *pooler, od_system_t *system) { pooler->machine = -1; pooler->server = NULL; pooler->client_seq = 0; - pooler->instance = instance; - pooler->task_queue = NULL; - pooler->task_queue = machine_queue_create(); - if (pooler->task_queue == NULL) { - od_error(&instance->log, NULL, "failed to create task queue"); - return -1; - } + pooler->system = system; return 0; } int od_pooler_start(od_pooler_t *pooler) { + od_instance_t *instance = pooler->system->instance; pooler->machine = machine_create("pooler", od_pooler, pooler); if (pooler->machine == -1) { - od_error(&pooler->instance->log, NULL, "failed to start server"); + od_error(&instance->log, NULL, "failed to start server"); return 1; } return 0; diff --git a/src/od_pooler.h b/src/od_pooler.h index 81558882..0b1262dc 100644 --- a/src/od_pooler.h +++ b/src/od_pooler.h @@ -11,15 +11,14 @@ typedef struct od_pooler od_pooler_t; struct od_pooler { - int64_t machine; - machine_io_t server; - machine_tls_t tls; - uint64_t client_seq; - od_instance_t *instance; - machine_queue_t task_queue; + int64_t machine; + machine_io_t server; + machine_tls_t tls; + uint64_t client_seq; + od_system_t *system; }; -int od_pooler_init(od_pooler_t*, od_instance_t*); +int od_pooler_init(od_pooler_t*, od_system_t*); int od_pooler_start(od_pooler_t*); #endif /* OD_INSTANCE_H */ diff --git a/src/od_relay.c b/src/od_relay.c index fa2c099c..916a7614 100644 --- a/src/od_relay.c +++ b/src/od_relay.c @@ -26,6 +26,7 @@ #include "od_lex.h" #include "od_config.h" #include "od_msg.h" +#include "od_system.h" #include "od_instance.h" #include "od_server.h" @@ -43,13 +44,13 @@ static inline void od_relay(void *arg) { od_relay_t *relay = arg; - od_instance_t *instance = relay->pooler->instance; + od_instance_t *instance = relay->system->instance; od_log(&instance->log, NULL, "relay: started"); for (;;) { machine_msg_t msg; - msg = machine_queue_get(relay->pooler->task_queue, UINT32_MAX); + msg = machine_queue_get(relay->system->task_queue, UINT32_MAX); if (msg == NULL) break; @@ -62,10 +63,9 @@ od_relay(void *arg) client = *(od_client_t**)machine_msg_get_data(msg); client->relay = relay; int64_t coroutine_id; - coroutine_id = machine_coroutine_create(od_frontend_main, client); + coroutine_id = machine_coroutine_create(od_frontend, client); if (coroutine_id == -1) { - od_error(&relay->pooler->instance->log, client->io, - "failed to create coroutine"); + od_error(&instance->log, client->io, "failed to create coroutine"); machine_close(client->io); od_client_free(client); break; @@ -74,23 +74,25 @@ od_relay(void *arg) break; } } + machine_msg_free(msg); } od_log(&instance->log, NULL, "relay: stopped"); } -void od_relay_init(od_relay_t *relay, od_pooler_t *pooler) +void od_relay_init(od_relay_t *relay, od_system_t *system) { relay->machine = -1; - relay->pooler = pooler; + relay->system = system; } int od_relay_start(od_relay_t *relay) { + od_instance_t *instance = relay->system->instance; relay->machine = machine_create("relay", od_relay, relay); if (relay->machine == -1) { - od_error(&relay->pooler->instance->log, NULL, "failed to start relay"); + od_error(&instance->log, NULL, "failed to start relay"); return 1; } return 0; diff --git a/src/od_relay.h b/src/od_relay.h index 9a2c877d..07b1adb9 100644 --- a/src/od_relay.h +++ b/src/od_relay.h @@ -12,10 +12,10 @@ typedef struct od_relay od_relay_t; struct od_relay { int64_t machine; - od_pooler_t *pooler; + od_system_t *system; }; -void od_relay_init(od_relay_t*, od_pooler_t*); +void od_relay_init(od_relay_t*, od_system_t*); int od_relay_start(od_relay_t*); #endif /* OD_RELAY_H */ diff --git a/src/od_router.c b/src/od_router.c new file mode 100644 index 00000000..339809ba --- /dev/null +++ b/src/od_router.c @@ -0,0 +1,68 @@ + +/* + * odissey. + * + * PostgreSQL connection pooler and request router. +*/ + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "od_macro.h" +#include "od_version.h" +#include "od_list.h" +#include "od_pid.h" +#include "od_syslog.h" +#include "od_log.h" +#include "od_daemon.h" +#include "od_scheme.h" +#include "od_lex.h" +#include "od_config.h" +#include "od_msg.h" +#include "od_system.h" +#include "od_instance.h" + +#include "od_server.h" +#include "od_server_pool.h" +#include "od_client.h" +#include "od_client_pool.h" +#include "od_route_id.h" +#include "od_route.h" + +#include "od_pooler.h" +#include "od_relay.h" +#include "od_frontend.h" +#include "od_router.h" + +static inline void +od_router(void *arg) +{ + od_router_t *router = arg; + od_instance_t *instance = router->system->instance; + + od_log(&instance->log, NULL, "router: started"); +} + +void od_router_init(od_router_t *router, od_system_t *system) +{ + router->machine = -1; + router->system = system; +} + +int od_router_start(od_router_t *router) +{ + od_instance_t *instance = router->system->instance; + router->machine = machine_create("router", od_router, router); + if (router->machine == -1) { + od_error(&instance->log, NULL, "failed to start router"); + return 1; + } + return 0; +} diff --git a/src/od_router.h b/src/od_router.h new file mode 100644 index 00000000..93f44ec4 --- /dev/null +++ b/src/od_router.h @@ -0,0 +1,21 @@ +#ifndef OD_ROUTER_H +#define OD_ROUTER_H + +/* + * odissey. + * + * PostgreSQL connection pooler and request router. +*/ + +typedef struct od_router od_router_t; + +struct od_router +{ + int64_t machine; + od_system_t *system; +}; + +void od_router_init(od_router_t*, od_system_t*); +int od_router_start(od_router_t*); + +#endif /* OD_ROUTER_H */ diff --git a/src/od_system.h b/src/od_system.h new file mode 100644 index 00000000..675c844e --- /dev/null +++ b/src/od_system.h @@ -0,0 +1,21 @@ +#ifndef OD_SYSTEM_H +#define OD_SYSTEM_H + +/* + * odissey. + * + * PostgreSQL connection pooler and request router. +*/ + +typedef struct od_system od_system_t; + +struct od_system +{ + void *instance; + void *pooler; + void *relay; + void *router; + machine_queue_t task_queue; +}; + +#endif /* OD_SYSTEM_H */