diff --git a/src/od_backend.c b/src/od_backend.c new file mode 100644 index 00000000..402c0c29 --- /dev/null +++ b/src/od_backend.c @@ -0,0 +1,281 @@ + +/* + * odissey. + * + * PostgreSQL connection pooler and request router. +*/ + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "od_macro.h" +#include "od_version.h" +#include "od_list.h" +#include "od_pid.h" +#include "od_syslog.h" +#include "od_log.h" +#include "od_daemon.h" +#include "od_scheme.h" +#include "od_lex.h" +#include "od_config.h" +#include "od_msg.h" +#include "od_system.h" +#include "od_instance.h" + +#include "od_server.h" +#include "od_server_pool.h" +#include "od_client.h" +#include "od_client_pool.h" +#include "od_route_id.h" +#include "od_route.h" +#include "od_route_pool.h" +#include "od_io.h" + +#include "od_pooler.h" +#include "od_router.h" +#include "od_relay.h" +#include "od_frontend.h" +#include "od_backend.h" +#include "od_auth.h" + +void od_backend_close(od_server_t *server) +{ + od_route_t *route = server->route; + od_serverpool_set(&route->server_pool, server, OD_SUNDEF); + if (server->io) { + machine_close(server->io); + machine_io_free(server->io); + server->io = NULL; + } + if (server->tls) { + machine_tls_free(server->tls); + server->tls = NULL; + } + server->is_transaction = 0; + server->idle_time = 0; + so_keyinit(&server->key); + so_keyinit(&server->key_client); + od_server_free(server); +} + +static inline int +od_backend_startup(od_server_t *server) +{ + od_instance_t *instance = server->system->instance; + od_route_t *route = server->route; + + so_stream_t *stream = &server->stream; + so_stream_reset(stream); + so_fearg_t argv[] = { + { "user", 5 }, { route->id.user, route->id.user_len }, + { "database", 9 }, { route->id.database, route->id.database_len } + }; + int rc; + rc = so_fewrite_startup_message(stream, 4, argv); + if (rc == -1) + return -1; + rc = od_write(server->io, stream); + if (rc == -1) { + od_error(&instance->log, server->io, "S (startup): write error: %s", + machine_error(server->io)); + return -1; + } + server->count_request++; + return 0; +} + +int od_backend_ready(od_server_t *server, uint8_t *data, int size) +{ + int status; + int rc; + rc = so_feread_ready(&status, data, size); + if (rc == -1) + return -1; + if (status == 'I') { + /* no active transaction */ + server->is_transaction = 0; + } else + if (status == 'T' || status == 'E') { + /* in active transaction or in interrupted + * transaction block */ + server->is_transaction = 1; + } + server->count_reply++; + return 0; +} + +static inline int +od_backend_setup(od_server_t *server) +{ + od_instance_t *instance = server->system->instance; + so_stream_t *stream = &server->stream; + while (1) { + so_stream_reset(stream); + int rc; + rc = od_read(server->io, &server->stream, UINT32_MAX); + if (rc == -1) { + od_error(&instance->log, server->io, "S (setup): read error: %s", + machine_error(server->io)); + return -1; + } + uint8_t type = *server->stream.s; + od_debug(&instance->log, server->io, "S (setup): %c", type); + switch (type) { + /* ReadyForQuery */ + case 'Z': + od_backend_ready(server, stream->s, so_stream_used(stream)); + return 0; + /* Authentication */ + case 'R': + rc = od_auth_backend(server); + if (rc == -1) + return -1; + break; + /* BackendKeyData */ + case 'K': + rc = so_feread_key(&server->key, + stream->s, so_stream_used(stream)); + if (rc == -1) { + od_error(&instance->log, server->io, + "S (setup): failed to parse BackendKeyData message"); + return -1; + } + break; + /* ParameterStatus */ + case 'S': + break; + /* NoticeResponce */ + case 'N': + break; + /* ErrorResponce */ + case 'E': + return -1; + default: + od_debug(&instance->log, server->io, + "S (setup): unknown packet: %c", type); + return -1; + } + } + return 0; +} + +static inline int +od_backend_connect(od_server_t *server) +{ + od_instance_t *instance = server->system->instance; + od_route_t *route = server->route; + od_schemeserver_t *server_scheme = route->scheme->server; + + /* resolve server address */ + char port[16]; + snprintf(port, sizeof(port), "%d", server_scheme->port); + struct addrinfo *ai = NULL; + int rc; + rc = machine_getaddrinfo(server_scheme->host, port, NULL, &ai, 0); + if (rc < 0) { + od_error(&instance->log, NULL, "failed to resolve %s:%d", + server_scheme->host, + server_scheme->port); + return -1; + } + assert(ai != NULL); + + /* connect to server */ + rc = machine_connect(server->io, ai->ai_addr, UINT32_MAX); + freeaddrinfo(ai); + if (rc < 0) { + od_error(&instance->log, NULL, "failed to connect to %s:%d", + server_scheme->host, + server_scheme->port); + return -1; + } + rc = machine_set_readahead(server->io, instance->scheme.readahead); + if (rc == -1) { + od_error(&instance->log, NULL, "failed to set readahead"); + return -1; + } + + /* do tls handshake */ +#if 0 + if (server_scheme->tls_verify != OD_TDISABLE) { + rc = od_tlsbe_connect(pooler->env, server->io, server->tls, + &server->stream, + &pooler->od->log, "S", + server_scheme); + if (rc == -1) + return -1; + } +#endif + + od_log(&instance->log, server->io, "S: new connection"); + + /* startup */ + rc = od_backend_startup(server); + if (rc == -1) + return -1; + + /* server configuration */ + rc = od_backend_setup(server); + if (rc == -1) + return -1; + + return 0; +} + +od_server_t* +od_backend_new(od_router_t *router, od_route_t *route) +{ + od_instance_t *instance = router->system->instance; + + /* create new server connection */ + od_server_t *server; + server = od_server_allocate(); + if (server == NULL) + return NULL; + server->route = route; + server->system = router->system; + + /* set network options */ + server->io = machine_io_create(); + if (server->io == NULL) { + od_server_free(server); + return NULL; + } + machine_set_nodelay(server->io, instance->scheme.nodelay); + if (instance->scheme.keepalive > 0) + machine_set_keepalive(server->io, 1, instance->scheme.keepalive); + + /* set tls options */ +#if 0 + od_schemeserver_t *server_scheme; + server_scheme = route->scheme->server; + if (server_scheme->tls_verify != OD_TDISABLE) { + server->tls = od_tlsbe(pooler->env, server_scheme); + if (server->tls == NULL) { + od_serverfree(server); + return NULL; + } + } +#endif + + /* place server to connection pool */ + od_serverpool_set(&route->server_pool, server, OD_SCONNECT); + + int rc; + rc = od_backend_connect(server); + if (rc == -1) { + od_backend_close(server); + return NULL; + } + + /* server is ready to use */ + od_serverpool_set(&route->server_pool, server, OD_SIDLE); + return server; +} diff --git a/src/od_backend.h b/src/od_backend.h new file mode 100644 index 00000000..79c7d083 --- /dev/null +++ b/src/od_backend.h @@ -0,0 +1,13 @@ +#ifndef OD_BACKEND_H +#define OD_BACKEND_H + +/* + * odissey. + * + * PostgreSQL connection pooler and request router. +*/ + +od_server_t* +od_backend_new(od_router_t*, od_route_t*); + +#endif /* OD_BACKEND_H */ diff --git a/src/od_client.h b/src/od_client.h index 71eb9350..533837a5 100644 --- a/src/od_client.h +++ b/src/od_client.h @@ -29,7 +29,7 @@ struct od_client so_stream_t stream; od_server_t *server; void *route; - void *relay; + od_system_t *system; od_list_t link_pool; od_list_t link; }; @@ -44,7 +44,7 @@ od_client_init(od_client_t *client) client->scheme = NULL; client->server = NULL; client->route = NULL; - client->relay = NULL; + client->system = NULL; so_bestartup_init(&client->startup); so_keyinit(&client->key); so_stream_init(&client->stream); diff --git a/src/od_client_pool.c b/src/od_client_pool.c index dbcb6617..4163b8c8 100644 --- a/src/od_client_pool.c +++ b/src/od_client_pool.c @@ -23,6 +23,7 @@ #include "od_scheme.h" #include "od_lex.h" #include "od_config.h" +#include "od_system.h" #include "od_server.h" #include "od_server_pool.h" #include "od_client.h" diff --git a/src/od_frontend.c b/src/od_frontend.c index 65d72242..1c098806 100644 --- a/src/od_frontend.c +++ b/src/od_frontend.c @@ -68,8 +68,7 @@ void od_frontend_close(od_client_t *client) static int od_frontend_startup_read(od_client_t *client) { - od_relay_t *relay = client->relay; - od_instance_t *instance = relay->system->instance; + od_instance_t *instance = client->system->instance; so_stream_t *stream = &client->stream; so_stream_reset(stream); @@ -89,7 +88,7 @@ od_frontend_startup_read(od_client_t *client) int rc = so_stream_ensure(stream, to_read); if (rc == -1) return -1; - rc = machine_read(client->io, (char*)stream->p, to_read, INT_MAX); + rc = machine_read(client->io, (char*)stream->p, to_read, UINT32_MAX); if (rc < 0) { od_error(&instance->log, client->io, "C (startup): read error: %s", @@ -152,8 +151,7 @@ od_frontend_key(od_client_t *client) static inline int od_frontend_setup(od_client_t *client) { - od_relay_t *relay = client->relay; - od_instance_t *instance = relay->system->instance; + od_instance_t *instance = client->system->instance; so_stream_t *stream = &client->stream; so_stream_reset(stream); @@ -177,8 +175,7 @@ od_frontend_setup(od_client_t *client) static inline int od_frontend_ready(od_client_t *client) { - od_relay_t *relay = client->relay; - od_instance_t *instance = relay->system->instance; + od_instance_t *instance = client->system->instance; so_stream_t *stream = &client->stream; so_stream_reset(stream); @@ -198,8 +195,8 @@ od_frontend_ready(od_client_t *client) void od_frontend(void *arg) { od_client_t *client = arg; - od_relay_t *relay = client->relay; - od_instance_t *instance = relay->system->instance; + od_relay_t *relay = client->system->relay; + od_instance_t *instance = client->system->instance; od_log(&instance->log, client->io, "C: new connection"); diff --git a/src/od_relay.c b/src/od_relay.c index e6d23c9c..56d055af 100644 --- a/src/od_relay.c +++ b/src/od_relay.c @@ -62,7 +62,7 @@ od_relay(void *arg) { od_client_t *client; client = *(od_client_t**)machine_msg_get_data(msg); - client->relay = relay; + client->system = relay->system; int64_t coroutine_id; coroutine_id = machine_coroutine_create(od_frontend, client); if (coroutine_id == -1) { diff --git a/src/od_route_pool.c b/src/od_route_pool.c index a43f40a9..dab70a43 100644 --- a/src/od_route_pool.c +++ b/src/od_route_pool.c @@ -23,6 +23,7 @@ #include "od_scheme.h" #include "od_lex.h" #include "od_config.h" +#include "od_system.h" #include "od_server.h" #include "od_server_pool.h" #include "od_client.h" diff --git a/src/od_server.h b/src/od_server.h index c008cbd2..8bc5301a 100644 --- a/src/od_server.h +++ b/src/od_server.h @@ -34,7 +34,7 @@ struct od_server so_key_t key; so_key_t key_client; void *route; - void *pooler; + od_system_t *system; od_list_t link; }; @@ -48,9 +48,9 @@ od_server_init(od_server_t *server) { server->state = OD_SUNDEF; server->route = NULL; + server->system = NULL; server->io = NULL; server->tls = NULL; - server->pooler = NULL; server->idle_time = 0; server->is_transaction = 0; server->is_copy = 0; diff --git a/src/od_server_pool.c b/src/od_server_pool.c index 8bce5083..97487669 100644 --- a/src/od_server_pool.c +++ b/src/od_server_pool.c @@ -23,6 +23,7 @@ #include "od_scheme.h" #include "od_lex.h" #include "od_config.h" +#include "od_system.h" #include "od_server.h" #include "od_server_pool.h"