diff --git a/src/od_auth.c b/src/od_auth.c new file mode 100644 index 00000000..c167d950 --- /dev/null +++ b/src/od_auth.c @@ -0,0 +1,252 @@ + +/* + * odissey. + * + * PostgreSQL connection pooler and request router. +*/ + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "od_macro.h" +#include "od_version.h" +#include "od_list.h" +#include "od_pid.h" +#include "od_syslog.h" +#include "od_log.h" +#include "od_daemon.h" +#include "od_scheme.h" +#include "od_lex.h" +#include "od_config.h" +#include "od_msg.h" +#include "od_instance.h" + +#include "od_server.h" +#include "od_server_pool.h" +#include "od_client.h" +#include "od_client_pool.h" +#include "od_route_id.h" +#include "od_route.h" +#include "od_io.h" + +#include "od_pooler.h" +#include "od_relay.h" +#include "od_frontend.h" +#include "od_auth.h" + +static inline int +od_auth_frontend_cleartext(od_client_t *client) +{ + od_relay_t *relay = client->relay; + od_instance_t *instance = relay->pooler->instance; + + /* AuthenticationCleartextPassword */ + so_stream_t *stream = &client->stream; + so_stream_reset(stream); + int rc; + rc = so_bewrite_authentication_clear_text(stream); + if (rc == -1) + return -1; + rc = od_write(client->io, stream); + if (rc == -1) { + od_error(&instance->log, client->io, "C (auth): write error: %s", + machine_error(client->io)); + return -1; + } + + /* wait for password response */ + while (1) { + so_stream_reset(stream); + rc = od_read(client->io, stream, INT_MAX); + if (rc == -1) { + od_error(&instance->log, client->io, "C (auth): read error: %s", + machine_error(client->io)); + return -1; + } + uint8_t type = *stream->s; + od_debug(&instance->log, client->io, "C (auth): %c", *stream->s); + /* PasswordMessage */ + if (type == 'p') + break; + } + + /* read password message */ + so_password_t client_token; + so_password_init(&client_token); + rc = so_beread_password(&client_token, stream->s, + so_stream_used(stream)); + if (rc == -1) { + od_error(&instance->log, client->io, + "C (auth): password read error"); + so_password_free(&client_token); + return -1; + } + + /* set user password */ + so_password_t client_password = { + .password_len = client->scheme->password_len + 1, + .password = client->scheme->password, + }; + + /* authenticate */ + int check = so_password_compare(&client_password, &client_token); + so_password_free(&client_token); + if (! check) { + od_log(&instance->log, client->io, + "C (auth): user '%s' incorrect password", + client->startup.user); + return -1; + } + return 0; +} + +static inline int +od_auth_frontend_md5(od_client_t *client) +{ + od_relay_t *relay = client->relay; + od_instance_t *instance = relay->pooler->instance; + + /* generate salt */ + uint32_t salt = so_password_salt(&client->key); + + /* AuthenticationMD5Password */ + so_stream_t *stream = &client->stream; + so_stream_reset(stream); + int rc; + rc = so_bewrite_authentication_md5(stream, (uint8_t*)&salt); + if (rc == -1) + return -1; + rc = od_write(client->io, stream); + if (rc == -1) { + od_error(&instance->log, client->io, "C (auth): write error: %s", + machine_error(client->io)); + return -1; + } + + /* wait for password response */ + while (1) { + int rc; + so_stream_reset(stream); + rc = od_read(client->io, stream, INT_MAX); + if (rc == -1) { + od_error(&instance->log, client->io, "C (auth): read error: %s", + machine_error(client->io)); + return -1; + } + uint8_t type = *stream->s; + od_debug(&instance->log, client->io, "C (auth): %c", *stream->s); + /* PasswordMessage */ + if (type == 'p') + break; + } + + /* read password message */ + so_password_t client_token; + so_password_init(&client_token); + rc = so_beread_password(&client_token, stream->s, so_stream_used(stream)); + if (rc == -1) { + od_error(&instance->log, client->io, + "C (auth): password read error"); + so_password_free(&client_token); + return -1; + } + + /* set user password */ + so_password_t client_password; + so_password_init(&client_password); + rc = so_password_md5(&client_password, + so_parameter_value(client->startup.user), + client->startup.user->value_len - 1, + client->scheme->password, + client->scheme->password_len, + (uint8_t*)&salt); + if (rc == -1) { + od_error(&instance->log, NULL, "memory allocation error"); + so_password_free(&client_password); + so_password_free(&client_token); + return -1; + } + + /* authenticate */ + int check = so_password_compare(&client_password, &client_token); + so_password_free(&client_password); + so_password_free(&client_token); + if (! check) { + od_log(&instance->log, client->io, + "C (auth): user '%s' incorrect password", + client->startup.user); + return -1; + } + return 0; +} + +int od_auth_frontend(od_client_t *client) +{ + od_relay_t *relay = client->relay; + od_instance_t *instance = relay->pooler->instance; + + /* match user scheme */ + od_schemeuser_t *user_scheme = + od_schemeuser_match(&instance->scheme, + so_parameter_value(client->startup.user)); + if (user_scheme == NULL) { + /* try to use default user */ + user_scheme = instance->scheme.users_default; + if (user_scheme == NULL) { + od_error(&instance->log, client->io, + "C (auth): user '%s' not found", + so_parameter_value(client->startup.user)); + return -1; + } + } + client->scheme = user_scheme; + + /* is user access denied */ + if (user_scheme->is_deny) { + od_log(&instance->log, client->io, + "C (auth): user '%s' access denied", + so_parameter_value(client->startup.user)); + return -1; + } + + /* authentication mode */ + int rc; + switch (user_scheme->auth_mode) { + case OD_ACLEAR_TEXT: + rc = od_auth_frontend_cleartext(client); + if (rc == -1) + return -1; + break; + case OD_AMD5: + rc = od_auth_frontend_md5(client); + if (rc == -1) + return -1; + break; + case OD_ANONE: + break; + default: + assert(0); + break; + } + + /* pass */ + so_stream_t *stream = &client->stream; + so_stream_reset(stream); + rc = so_bewrite_authentication_ok(stream); + if (rc == -1) + return -1; + rc = od_write(client->io, stream); + if (rc == -1) { + od_error(&instance->log, client->io, "C (auth): write error: %s", + machine_error(client->io)); + return -1; + } + return 0; +} diff --git a/src/od_auth.h b/src/od_auth.h new file mode 100644 index 00000000..88d05955 --- /dev/null +++ b/src/od_auth.h @@ -0,0 +1,12 @@ +#ifndef OD_AUTH_H +#define OD_AUTH_H + +/* + * odissey. + * + * PostgreSQL connection pooler and request router. +*/ + +int od_auth_frontend(od_client_t*); + +#endif /* OD_AUTH_H */ diff --git a/src/od_client.h b/src/od_client.h index 6f38e963..71eb9350 100644 --- a/src/od_client.h +++ b/src/od_client.h @@ -21,6 +21,7 @@ struct od_client { od_clientstate_t state; uint64_t id; + uint64_t coroutine_id; machine_io_t io; od_schemeuser_t *scheme; so_bestartup_t startup; @@ -28,6 +29,7 @@ struct od_client so_stream_t stream; od_server_t *server; void *route; + void *relay; od_list_t link_pool; od_list_t link; }; @@ -37,10 +39,12 @@ od_client_init(od_client_t *client) { client->state = OD_CUNDEF; client->id = 0; + client->coroutine_id = 0; client->io = NULL; client->scheme = NULL; client->server = NULL; client->route = NULL; + client->relay = NULL; so_bestartup_init(&client->startup); so_keyinit(&client->key); so_stream_init(&client->stream); diff --git a/src/od_frontend.c b/src/od_frontend.c new file mode 100644 index 00000000..2afa1598 --- /dev/null +++ b/src/od_frontend.c @@ -0,0 +1,327 @@ + +/* + * odissey. + * + * PostgreSQL connection pooler and request router. +*/ + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "od_macro.h" +#include "od_version.h" +#include "od_list.h" +#include "od_pid.h" +#include "od_syslog.h" +#include "od_log.h" +#include "od_daemon.h" +#include "od_scheme.h" +#include "od_lex.h" +#include "od_config.h" +#include "od_msg.h" +#include "od_instance.h" + +#include "od_server.h" +#include "od_server_pool.h" +#include "od_client.h" +#include "od_client_pool.h" +#include "od_route_id.h" +#include "od_route.h" +#include "od_io.h" + +#include "od_pooler.h" +#include "od_relay.h" +#include "od_frontend.h" +#include "od_auth.h" + +void od_frontend_close(od_client_t *client) +{ +#if 0 + od_pooler_t *pooler = client->pooler; + if (client->route) { + od_route_t *route = client->route; + od_clientpool_set(&route->client_pool, client, OD_CUNDEF); + client->route = NULL; + } +#endif + if (client->io) { + machine_close(client->io); + machine_io_free(client->io); + client->io = NULL; + } +#if 0 + od_clientlist_unlink(&pooler->client_list, client); +#endif + od_client_free(client); +} + +static int +od_frontend_startup_read(od_client_t *client) +{ + od_relay_t *relay = client->relay; + od_instance_t *instance = relay->pooler->instance; + + so_stream_t *stream = &client->stream; + so_stream_reset(stream); + for (;;) { + uint32_t pos_size = so_stream_used(stream); + uint8_t *pos_data = stream->s; + uint32_t len; + int to_read; + to_read = so_read_startup(&len, &pos_data, &pos_size); + if (to_read == 0) + break; + if (to_read == -1) { + od_error(&instance->log, client->io, + "C (startup): bad startup packet"); + return -1; + } + int rc = so_stream_ensure(stream, to_read); + if (rc == -1) + return -1; + rc = machine_read(client->io, (char*)stream->p, to_read, INT_MAX); + if (rc < 0) { + od_error(&instance->log, client->io, + "C (startup): read error: %s", + machine_error(client->io)); + return -1; + } + so_stream_advance(stream, to_read); + } + return 0; +} + +static int +od_frontend_startup(od_client_t *client) +{ + int rc; + rc = od_frontend_startup_read(client); + if (rc == -1) + return -1; + so_stream_t *stream = &client->stream; + rc = so_beread_startup(&client->startup, + stream->s, + so_stream_used(stream)); + if (rc == -1) + return -1; + +#if 0 + /* client ssl request */ + rc = od_tlsfe_accept(pooler->env, client->io, pooler->tls, + &client->stream, + &pooler->od->log, "C", + &pooler->od->scheme, + &client->startup); + if (rc == -1) + return -1; +#endif + if (! client->startup.is_ssl_request) + return 0; + + /* read startup-cancel message followed after ssl + * negotiation */ + assert(client->startup.is_ssl_request); + rc = od_frontend_startup_read(client); + if (rc == -1) + return -1; + rc = so_beread_startup(&client->startup, + stream->s, + so_stream_used(stream)); + if (rc == -1) + return -1; + return 0; +} + +static inline void +od_frontend_key(od_client_t *client) +{ + client->key.key_pid = client->id; + client->key.key = 1 + rand(); +} + +static inline int +od_frontend_setup(od_client_t *client) +{ + od_relay_t *relay = client->relay; + od_instance_t *instance = relay->pooler->instance; + + so_stream_t *stream = &client->stream; + so_stream_reset(stream); + int rc; + rc = so_bewrite_backend_key_data(stream, client->key.key_pid, + client->key.key); + if (rc == -1) + return -1; + rc = so_bewrite_parameter_status(stream, "", 1, "", 1); + if (rc == -1) + return -1; + rc = od_write(client->io, stream); + if (rc == -1) { + od_error(&instance->log, client->io, "C (setup): write error: %s", + machine_error(client->io)); + return -1; + } + return 0; +} + +static inline int +od_frontend_ready(od_client_t *client) +{ + od_relay_t *relay = client->relay; + od_instance_t *instance = relay->pooler->instance; + + so_stream_t *stream = &client->stream; + so_stream_reset(stream); + int rc; + rc = so_bewrite_ready(stream, 'I'); + if (rc == -1) + return -1; + rc = od_write(client->io, stream); + if (rc == -1) { + od_error(&instance->log, client->io, "C: write error: %s", + machine_error(client->io)); + return -1; + } + return 0; +} + +void od_frontend_main(void *arg) +{ + od_client_t *client = arg; + od_relay_t *relay = client->relay; + od_instance_t *instance = relay->pooler->instance; + + od_log(&instance->log, client->io, "C: new connection"); + + /* attach client io to relay machine event loop */ + int rc; + rc = machine_io_attach(client->io); + if (rc == -1) { + od_error(&instance->log, client->io, "failed to transfer client io"); + machine_close(client->io); + od_client_free(client); + return; + } + + /* client startup */ + rc = od_frontend_startup(client); + if (rc == -1) { + od_frontend_close(client); + return; + } + +#if 0 + /* client cancel request */ + if (client->startup.is_cancel) { + od_debug(&pooler->od->log, client->io, "C: cancel request"); + so_key_t key = client->startup.key; + od_feclose(client); + od_cancel(pooler, &key); + return; + } +#endif + + /* Generate backend key for the client. + * + * This key will be used to identify a server by + * user cancel requests. The key must be regenerated + * for each new client-server assignment, to avoid + * possibility of cancelling requests by a previous + * server owners. + */ + od_frontend_key(client); + + /* client authentication */ + rc = od_auth_frontend(client); + if (rc == -1) { + od_frontend_close(client); + return; + } + + /* set client backend options and the key */ + rc = od_frontend_setup(client); + if (rc == -1) { + od_frontend_close(client); + return; + } + + /* notify client that we are ready */ + rc = od_frontend_ready(client); + if (rc == -1) { + od_frontend_close(client); + return; + } + + /* route */ + +#if 0 + /* execute pooler method */ + od_routerstatus_t status = OD_RS_UNDEF; + switch (pooler->od->scheme.pooling_mode) { + case OD_PSESSION: + status = od_router_session(client); + break; + case OD_PTRANSACTION: + status = od_router_transaction(client); + break; + case OD_PUNDEF: + break; + } + + od_server_t *server = client->server; + switch (status) { + case OD_RS_EROUTE: + case OD_RS_EPOOL: + case OD_RS_ELIMIT: + assert(! client->server); + od_feclose(client); + break; + case OD_RS_OK: + case OD_RS_ECLIENT_READ: + case OD_RS_ECLIENT_WRITE: + if (status == OD_RS_OK) + od_log(&pooler->od->log, client->io, + "C: disconnected"); + else + od_log(&pooler->od->log, client->io, + "C: disconnected (read/write error): %s", + machine_error(client->io)); + /* close client connection and reuse server + * link in case of client errors and + * graceful shutdown */ + od_feclose(client); + if (server) + od_berelease(server); + break; + case OD_RS_ESERVER_CONFIGURE: + od_log(&pooler->od->log, server->io, + "S: disconnected (server configure error): %s", + machine_error(server->io)); + od_feclose(client); + if (server) + od_beclose(server); + break; + case OD_RS_ESERVER_READ: + case OD_RS_ESERVER_WRITE: + od_log(&pooler->od->log, server->io, + "S: disconnected (read/write error): %s", + machine_error(server->io)); + /* close client connection and close server + * connection in case of server errors */ + od_feclose(client); + if (server) + od_beclose(server); + break; + case OD_RS_UNDEF: + assert(0); + break; + } +#endif +} diff --git a/src/od_frontend.h b/src/od_frontend.h new file mode 100644 index 00000000..69910103 --- /dev/null +++ b/src/od_frontend.h @@ -0,0 +1,12 @@ +#ifndef OD_FRONTEND_H +#define OD_FRONTEND_H + +/* + * odissey. + * + * PostgreSQL connection pooler and request router. +*/ + +void od_frontend_main(void*); + +#endif /* OD_FRONTEND_H */ diff --git a/src/od_instance.c b/src/od_instance.c index 29fbae24..afb56018 100644 --- a/src/od_instance.c +++ b/src/od_instance.c @@ -27,6 +27,7 @@ #include "od_config.h" #include "od_instance.h" #include "od_pooler.h" +#include "od_relay.h" void od_instance_init(od_instance_t *instance) { @@ -123,11 +124,20 @@ int od_instance_main(od_instance_t *instance, int argc, char **argv) /* create pid file */ if (instance->scheme.pid_file) od_pid_create(&instance->pid, instance->scheme.pid_file); + /* run connection pooler */ od_pooler_t pooler; od_pooler_init(&pooler, instance); rc = od_pooler_start(&pooler); if (rc == -1) return 1; + + od_relay_t relay; + od_relay_init(&relay, &pooler); + rc = od_relay_start(&relay); + if (rc == -1) + return 1; + + machine_wait(pooler.machine); return 0; } diff --git a/src/od_pooler.c b/src/od_pooler.c index 55b37f0c..4139fbbd 100644 --- a/src/od_pooler.c +++ b/src/od_pooler.c @@ -42,13 +42,6 @@ od_pooler(void *arg) od_pooler_t *pooler = arg; od_instance_t *instance = pooler->instance; - /* create task queue */ - pooler->task_queue = machine_queue_create(); - if (pooler->task_queue == NULL) { - od_error(&instance->log, NULL, "failed to create task queue"); - return; - } - /* init pooler tls */ int rc; #if 0 @@ -129,6 +122,7 @@ od_pooler(void *arg) if (instance->scheme.keepalive > 0) machine_set_keepalive(client_io, 1, instance->scheme.keepalive); + /* rc = machine_set_readahead(client_io, instance->scheme.readahead); if (rc == -1) { od_error(&instance->log, NULL, "failed to set client readahead"); @@ -136,11 +130,22 @@ od_pooler(void *arg) machine_io_free(client_io); continue; } + */ + + rc = machine_io_detach(client_io); + if (rc == -1) { + od_error(&instance->log, client_io, + "failed to transfer client io"); + machine_close(client_io); + machine_io_free(client_io); + continue; + } /* allocate new client */ od_client_t *client = od_client_allocate(); if (client == NULL) { - od_error(&instance->log, NULL, "failed to allocate client object"); + od_error(&instance->log, client_io, + "failed to allocate client object"); machine_close(client_io); machine_io_free(client_io); continue; @@ -157,13 +162,19 @@ od_pooler(void *arg) } } -void od_pooler_init(od_pooler_t *pooler, od_instance_t *instance) +int od_pooler_init(od_pooler_t *pooler, od_instance_t *instance) { pooler->machine = -1; pooler->server = NULL; pooler->client_seq = 0; pooler->instance = instance; pooler->task_queue = NULL; + pooler->task_queue = machine_queue_create(); + if (pooler->task_queue == NULL) { + od_error(&instance->log, NULL, "failed to create task queue"); + return -1; + } + return 0; } int od_pooler_start(od_pooler_t *pooler) @@ -173,6 +184,5 @@ int od_pooler_start(od_pooler_t *pooler) od_error(&pooler->instance->log, NULL, "failed to start server"); return 1; } - machine_wait(pooler->machine); return 0; } diff --git a/src/od_pooler.h b/src/od_pooler.h index e735e7c3..81558882 100644 --- a/src/od_pooler.h +++ b/src/od_pooler.h @@ -19,7 +19,7 @@ struct od_pooler machine_queue_t task_queue; }; -void od_pooler_init(od_pooler_t*, od_instance_t*); -int od_pooler_start(od_pooler_t*); +int od_pooler_init(od_pooler_t*, od_instance_t*); +int od_pooler_start(od_pooler_t*); #endif /* OD_INSTANCE_H */ diff --git a/src/od_relay.c b/src/od_relay.c new file mode 100644 index 00000000..fa2c099c --- /dev/null +++ b/src/od_relay.c @@ -0,0 +1,97 @@ + +/* + * odissey. + * + * PostgreSQL connection pooler and request router. +*/ + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "od_macro.h" +#include "od_version.h" +#include "od_list.h" +#include "od_pid.h" +#include "od_syslog.h" +#include "od_log.h" +#include "od_daemon.h" +#include "od_scheme.h" +#include "od_lex.h" +#include "od_config.h" +#include "od_msg.h" +#include "od_instance.h" + +#include "od_server.h" +#include "od_server_pool.h" +#include "od_client.h" +#include "od_client_pool.h" +#include "od_route_id.h" +#include "od_route.h" + +#include "od_pooler.h" +#include "od_relay.h" +#include "od_frontend.h" + +static inline void +od_relay(void *arg) +{ + od_relay_t *relay = arg; + od_instance_t *instance = relay->pooler->instance; + + od_log(&instance->log, NULL, "relay: started"); + + for (;;) { + machine_msg_t msg; + msg = machine_queue_get(relay->pooler->task_queue, UINT32_MAX); + if (msg == NULL) + break; + + od_msg_t msg_type; + msg_type = machine_msg_get_type(msg); + switch (msg_type) { + case OD_MCLIENT_NEW: + { + od_client_t *client; + client = *(od_client_t**)machine_msg_get_data(msg); + client->relay = relay; + int64_t coroutine_id; + coroutine_id = machine_coroutine_create(od_frontend_main, client); + if (coroutine_id == -1) { + od_error(&relay->pooler->instance->log, client->io, + "failed to create coroutine"); + machine_close(client->io); + od_client_free(client); + break; + } + client->coroutine_id = coroutine_id; + break; + } + } + machine_msg_free(msg); + } + + od_log(&instance->log, NULL, "relay: stopped"); +} + +void od_relay_init(od_relay_t *relay, od_pooler_t *pooler) +{ + relay->machine = -1; + relay->pooler = pooler; +} + +int od_relay_start(od_relay_t *relay) +{ + relay->machine = machine_create("relay", od_relay, relay); + if (relay->machine == -1) { + od_error(&relay->pooler->instance->log, NULL, "failed to start relay"); + return 1; + } + return 0; +} diff --git a/src/od_relay.h b/src/od_relay.h new file mode 100644 index 00000000..9a2c877d --- /dev/null +++ b/src/od_relay.h @@ -0,0 +1,21 @@ +#ifndef OD_RELAY_H +#define OD_RELAY_H + +/* + * odissey. + * + * PostgreSQL connection pooler and request router. +*/ + +typedef struct od_relay od_relay_t; + +struct od_relay +{ + int64_t machine; + od_pooler_t *pooler; +}; + +void od_relay_init(od_relay_t*, od_pooler_t*); +int od_relay_start(od_relay_t*); + +#endif /* OD_RELAY_H */