From f5657689e55e0a81f64de7a75c93d426d0fbfbfa Mon Sep 17 00:00:00 2001 From: Dmitry Simonenko Date: Mon, 29 May 2017 17:53:21 +0300 Subject: [PATCH] odissey: assemble basic session pooling --- src/od_backend.c | 204 ++++++++++++++++++++++++++++++++++++++++++ src/od_backend.h | 4 + src/od_frontend.c | 220 +++++++++++++++++++++++++++++++++++++++++++++- src/od_router.c | 62 ++++++++++--- 4 files changed, 473 insertions(+), 17 deletions(-) diff --git a/src/od_backend.c b/src/od_backend.c index 6f330c03..ddef2da7 100644 --- a/src/od_backend.c +++ b/src/od_backend.c @@ -111,6 +111,35 @@ int od_backend_ready(od_server_t *server, uint8_t *data, int size) return 0; } +static inline int +od_backend_ready_wait(od_server_t *server, char *procedure, int time_ms) +{ + od_instance_t *instance = server->system->instance; + + so_stream_t *stream = &server->stream; + /* wait for response */ + while (1) { + so_stream_reset(stream); + int rc; + rc = od_read(server->io, stream, time_ms); + if (rc == -1) { + od_error(&instance->log, server->io, "S (%s): read error: %s", + procedure, machine_error(server->io)); + return -1; + } + uint8_t type = stream->s[rc]; + od_debug(&instance->log, server->io, "S (%s): %c", + procedure, type); + /* ReadyForQuery */ + if (type == 'Z') { + od_backend_ready(server, stream->s + rc, + so_stream_used(stream) - rc); + break; + } + } + return 0; +} + static inline int od_backend_setup(od_server_t *server) { @@ -196,11 +225,14 @@ od_backend_connect(od_server_t *server) server_scheme->port); return -1; } + +#if 0 rc = machine_set_readahead(server->io, instance->scheme.readahead); if (rc == -1) { od_error(&instance->log, NULL, "failed to set readahead"); return -1; } +#endif /* do tls handshake */ #if 0 @@ -273,3 +305,175 @@ od_backend_new(od_router_t *router, od_route_t *route) } return server; } + +static inline int +od_backend_query(od_server_t *server, char *procedure, char *query, int len) +{ + od_instance_t *instance = server->system->instance; + int rc; + so_stream_t *stream = &server->stream; + so_stream_reset(stream); + rc = so_fewrite_query(stream, query, len); + if (rc == -1) + return -1; + rc = od_write(server->io, stream); + if (rc == -1) { + od_error(&instance->log, server->io, "S (%s): write error: %s", + procedure, machine_error(server->io)); + return -1; + } + server->count_request++; + rc = od_backend_ready_wait(server, procedure, UINT32_MAX); + if (rc == -1) + return -1; + return 0; +} + +int od_backend_configure(od_server_t *server, so_bestartup_t *startup) +{ + od_instance_t *instance = server->system->instance; + + char query_configure[1024]; + int size = 0; + so_parameter_t *param; + so_parameter_t *end; + param = (so_parameter_t*)startup->params.buf.s; + end = (so_parameter_t*)startup->params.buf.p; + for (; param < end; param = so_parameter_next(param)) { + if (param == startup->user || + param == startup->database) + continue; + size += snprintf(query_configure + size, + sizeof(query_configure) - size, + "SET %s=%s;", + so_parameter_name(param), + so_parameter_value(param)); + } + if (size == 0) + return 0; + od_debug(&instance->log, server->io, + "S (configure): %s", query_configure); + int rc; + rc = od_backend_query(server, "configure", query_configure, + size + 1); + return rc; +} + +int od_backend_reset(od_server_t *server) +{ + od_instance_t *instance = server->system->instance; + od_route_t *route = server->route; + + /* server left in copy mode */ + if (server->is_copy) { + od_debug(&instance->log, server->io, + "S (reset): in copy, closing"); + goto drop; + } + + /* support route rollback off */ + if (! route->scheme->rollback) { + if (server->is_transaction) { + od_debug(&instance->log, server->io, + "S (reset): in active transaction, closing"); + goto drop; + } + } + + /* support route cancel off */ + if (! route->scheme->cancel) { + if (! od_server_is_sync(server)) { + od_debug(&instance->log, server->io, + "S (reset): not synchronized, closing"); + goto drop; + } + } + + /* Server is not synchronized. + * + * Number of queries sent to server is not equal + * to the number of received replies. Do the following + * logic until server becomes synchronized: + * + * 1. Wait each ReadyForQuery until we receive all + * replies with 1 sec timeout. + * + * 2. Send Cancel in other connection. + * + * It is possible that client could previously + * pipeline server with requests. Each request + * may stall database on its own way and may require + * additional Cancel request. + * + * 3. Continue with (1) + */ + int wait_timeout = 1000; + int wait_try = 0; + int wait_try_cancel = 0; + int wait_cancel_limit = 1; + int rc = 0; + for (;;) { + while (! od_server_is_sync(server)) { + od_debug(&instance->log, server->io, + "S (reset): not synchronized, wait for %d msec (#%d)", + wait_timeout, + wait_try); + wait_try++; + rc = od_backend_ready_wait(server, "reset", wait_timeout); + if (rc == -1) + break; + } + if (rc == -1) { + if (! machine_read_timedout(server->io)) + goto error; + if (wait_try_cancel == wait_cancel_limit) { + od_debug(&instance->log, server->io, + "S (reset): server cancel limit reached, closing"); + goto error; + } + od_debug(&instance->log, server->io, + "S (reset): not responded, cancel (#%d)", + wait_try_cancel); + wait_try_cancel++; + /* TODO: */ + /* + rc = od_cancel_of(pooler, route->scheme->server, &server->key); + if (rc < 0) + goto error; + */ + continue; + } + assert(od_server_is_sync(server)); + break; + } + od_debug(&instance->log, server->io, "S (reset): synchronized"); + + /* send rollback in case server has an active + * transaction running */ + if (route->scheme->rollback) { + if (server->is_transaction) { + char query_rlb[] = "ROLLBACK"; + rc = od_backend_query(server, "reset rollback", query_rlb, + sizeof(query_rlb)); + if (rc == -1) + goto error; + assert(! server->is_transaction); + } + } + + /* send reset query */ + if (route->scheme->discard) { + char query_reset[] = "DISCARD ALL"; + rc = od_backend_query(server, "reset", query_reset, + sizeof(query_reset)); + if (rc == -1) + goto error; + } + + /* ready to use */ + return 1; +drop: + return 0; +error: + return -1; +} diff --git a/src/od_backend.h b/src/od_backend.h index 79c7d083..d8a0cf5a 100644 --- a/src/od_backend.h +++ b/src/od_backend.h @@ -10,4 +10,8 @@ od_server_t* od_backend_new(od_router_t*, od_route_t*); +int od_backend_reset(od_server_t*); +int od_backend_ready(od_server_t*, uint8_t*, int); +int od_backend_configure(od_server_t*, so_bestartup_t*); + #endif /* OD_BACKEND_H */ diff --git a/src/od_frontend.c b/src/od_frontend.c index 1c098806..80f5b2e9 100644 --- a/src/od_frontend.c +++ b/src/od_frontend.c @@ -42,6 +42,7 @@ #include "od_router.h" #include "od_relay.h" #include "od_frontend.h" +#include "od_backend.h" #include "od_auth.h" void od_frontend_close(od_client_t *client) @@ -192,6 +193,160 @@ od_frontend_ready(od_client_t *client) return 0; } +enum { + OD_RS_UNDEF, + OD_RS_OK, + OD_RS_EROUTE, + OD_RS_EPOOL, + OD_RS_ELIMIT, + OD_RS_ESERVER_CONFIGURE, + OD_RS_ESERVER_READ, + OD_RS_ESERVER_WRITE, + OD_RS_ECLIENT_READ, + OD_RS_ECLIENT_WRITE +}; + +static inline od_routerstatus_t +od_frontend_copy_in(od_client_t *client) +{ + od_instance_t *instance = client->system->instance; + od_server_t *server = client->server; + + assert(! server->is_copy); + server->is_copy = 1; + + int rc, type; + so_stream_t *stream = &client->stream; + for (;;) { + so_stream_reset(stream); + rc = od_read(client->io, stream, UINT32_MAX); + if (rc == -1) + return OD_RS_ECLIENT_READ; + type = *stream->s; + od_debug(&instance->log, client->io, "C (copy): %c", *stream->s); + + rc = od_write(server->io, stream); + if (rc == -1) + return OD_RS_ESERVER_WRITE; + + /* copy complete or fail */ + if (type == 'c' || type == 'f') + break; + } + + server->is_copy = 0; + return OD_RS_OK; +} + +static int +od_frontend_session(od_client_t *client) +{ + od_instance_t *instance = client->system->instance; + + /* get server connection for the route */ + od_routerstatus_t status; + status = od_router_attach(client->system->router, client); + if (status != OD_ROK) + return OD_RS_EPOOL; + + od_server_t *server; + server = client->server; + + /* assign client session key */ + server->key_client = client->key; + + /* configure server using client startup parameters */ + int rc; + rc = od_backend_configure(client->server, &client->startup); + if (rc == -1) + return OD_RS_ESERVER_CONFIGURE; + + so_stream_t *stream = &client->stream; + for (;;) + { + /* client to server */ + so_stream_reset(stream); + rc = od_read(client->io, stream, UINT32_MAX); + if (rc == -1) + return OD_RS_ECLIENT_READ; + int type; + type = stream->s[rc]; + od_debug(&instance->log, client->io, "C: %c", type); + + /* client graceful shutdown */ + if (type == 'X') + break; + + rc = od_write(server->io, stream); + if (rc == -1) + return OD_RS_ESERVER_WRITE; + + server->count_request++; + + so_stream_reset(stream); + for (;;) { + /* pipeline server reply */ + for (;;) { + rc = od_read(server->io, stream, 1000); + if (rc >= 0) + break; + /* client watchdog. + * + * ensure that client has not closed + * the connection */ + if (! machine_read_timedout(server->io)) + return OD_RS_ESERVER_READ; + if (machine_connected(client->io)) + continue; + od_debug(&instance->log, server->io, + "S (watchdog): client disconnected"); + return OD_RS_ECLIENT_READ; + } + type = stream->s[rc]; + od_debug(&instance->log, server->io, "S: %c", type); + + /* ReadyForQuery */ + if (type == 'Z') { + rc = od_backend_ready(server, stream->s + rc, + so_stream_used(stream) - rc); + if (rc == -1) + return OD_RS_ECLIENT_READ; + + /* flush reply buffer to client */ + rc = od_write(client->io, stream); + if (rc == -1) + return OD_RS_ECLIENT_WRITE; + + break; + } + + /* CopyInResponse */ + if (type == 'G') { + /* transmit reply to client */ + rc = od_write(client->io, stream); + if (rc == -1) + return OD_RS_ECLIENT_WRITE; + rc = od_frontend_copy_in(client); + if (rc != OD_RS_OK) + return rc; + continue; + } + /* CopyOutResponse */ + if (type == 'H') { + assert(! server->is_copy); + server->is_copy = 1; + continue; + } + /* copy out complete */ + if (type == 'c') { + server->is_copy = 0; + continue; + } + } + } + return OD_RS_OK; +} + void od_frontend(void *arg) { od_client_t *client = arg; @@ -217,16 +372,18 @@ void od_frontend(void *arg) return; } -#if 0 /* client cancel request */ if (client->startup.is_cancel) { - od_debug(&pooler->od->log, client->io, "C: cancel request"); + od_debug(&instance->log, client->io, "C: cancel request"); + + od_frontend_close(client); +#if 0 so_key_t key = client->startup.key; od_feclose(client); od_cancel(pooler, &key); +#endif return; } -#endif /* Generate backend key for the client. * @@ -288,6 +445,61 @@ void od_frontend(void *arg) break; } - /* main */ + rc = od_frontend_session(client); + + od_server_t *server; + server = client->server; + switch (rc) { + case OD_RS_EROUTE: + case OD_RS_EPOOL: + case OD_RS_ELIMIT: + assert(server == NULL); + + break; + case OD_RS_OK: + case OD_RS_ECLIENT_READ: + case OD_RS_ECLIENT_WRITE: + /* close client connection and reuse server + * link in case of client errors and + * graceful shutdown */ + if (rc == OD_RS_OK) + od_log(&instance->log, client->io, + "C: disconnected"); + else + od_log(&instance->log, client->io, + "C: disconnected (read/write error): %s", + machine_error(client->io)); + + rc = od_backend_reset(server); + if (rc != 1) { + /* TODO: close backend connection */ + break; + } + + /* TODO: DETACH server */ + break; + case OD_RS_ESERVER_CONFIGURE: + od_log(&instance->log, server->io, + "S: disconnected (server configure error): %s", + machine_error(server->io)); + + /* TODO: close backend connection */ + break; + case OD_RS_ESERVER_READ: + case OD_RS_ESERVER_WRITE: + /* close client connection and close server + * connection in case of server errors */ + od_log(&instance->log, server->io, + "S: disconnected (read/write error): %s", + machine_error(server->io)); + + /* TODO: close backend connection */ + break; + case OD_RS_UNDEF: + assert(0); + break; + } + + /* close frontend connection */ od_frontend_close(client); } diff --git a/src/od_router.c b/src/od_router.c index 1869c12d..8cd38778 100644 --- a/src/od_router.c +++ b/src/od_router.c @@ -40,8 +40,9 @@ #include "od_pooler.h" #include "od_relay.h" -#include "od_frontend.h" #include "od_router.h" +#include "od_frontend.h" +#include "od_backend.h" typedef struct { @@ -54,7 +55,7 @@ typedef struct { od_route_t *route; od_server_t *server; -} od_msgrouter_push_t; +} od_msgrouter_detach_t; static od_route_t* od_router_fwd(od_router_t *router, so_bestartup_t *startup) @@ -108,12 +109,41 @@ od_router_fwd(od_router_t *router, so_bestartup_t *startup) static inline void od_router_attacher(void *arg) { + machine_msg_t msg = arg; + od_msgrouter_t *msg_attach; - msg_attach = arg; + msg_attach = machine_msg_get_data(msg); + + od_client_t *client; + client = msg_attach->client; od_route_t *route; - route = msg_attach->client->route; + route = client->route; assert(route != NULL); + + od_server_t *server; + server = od_serverpool_next(&route->server_pool, OD_SIDLE); + if (server) + goto on_connect; + + /* TODO: wait */ + + /* create new backend connection */ + server = od_backend_new(client->system->router, route); + if (server == NULL) { + msg_attach->status = OD_RERROR; + machine_queue_put(msg_attach->response, msg); + return; + } + + /* detach server io from router context */ + machine_io_detach(server->io); + +on_connect: + od_serverpool_set(&route->server_pool, server, OD_SACTIVE); + msg_attach->status = OD_ROK; + client->server = server; + machine_queue_put(msg_attach->response, msg); } static inline void @@ -162,8 +192,8 @@ od_router(void *arg) } /*od_clientpool_set(&route->client_pool, msg_route->client, OD_CPENDING);*/ - msg_route->client->route = route; msg_route->status = OD_ROK; + msg_route->client->route = route; machine_queue_put(msg_route->response, msg); continue; } @@ -187,7 +217,7 @@ od_router(void *arg) case OD_MROUTER_DETACH: { /* push client server back to route server pool */ - od_msgrouter_push_t *msg_detach; + od_msgrouter_detach_t *msg_detach; msg_detach = machine_msg_get_data(msg); od_serverpool_set(&msg_detach->route->server_pool, @@ -271,7 +301,7 @@ od_route(od_router_t *router, od_client_t *client) } od_routerstatus_t -od_route_attach(od_router_t *router, od_client_t *client) +od_router_attach(od_router_t *router, od_client_t *client) { /* create response queue */ machine_queue_t response; @@ -306,21 +336,27 @@ od_route_attach(od_router_t *router, od_client_t *client) machine_queue_free(response); machine_msg_free(msg); - /* TODO: machine_attach(client->server->io) */ + if (client->server) { + /* attach server io to clients machine context */ + machine_io_attach(client->server->io); + } return status; } void -od_route_detach(od_router_t *router, od_client_t *client) +od_router_detach(od_router_t *router, od_client_t *client) { - /* TODO: machine_detach(client->server->io) */ + assert(client->server != NULL); - /* send server push request to router */ + /* detach server io from clients machine context */ + machine_io_detach(client->server->io); + + /* send server detach request to router */ machine_msg_t msg; - msg = machine_msg_create(OD_MROUTER_DETACH, sizeof(od_msgrouter_push_t)); + msg = machine_msg_create(OD_MROUTER_DETACH, sizeof(od_msgrouter_detach_t)); if (msg == NULL) return; - od_msgrouter_push_t *msg_detach; + od_msgrouter_detach_t *msg_detach; msg_detach = machine_msg_get_data(msg); msg_detach->route = client->route; msg_detach->server = client->server;