diff --git a/sources/CMakeLists.txt b/sources/CMakeLists.txt index 0aa62029..67e24b56 100644 --- a/sources/CMakeLists.txt +++ b/sources/CMakeLists.txt @@ -21,6 +21,7 @@ set(od_src relay_pool.c frontend.c backend.c + reset.c auth.c cancel.c periodic.c diff --git a/sources/backend.c b/sources/backend.c index e02214ed..1a231f7d 100644 --- a/sources/backend.c +++ b/sources/backend.c @@ -141,45 +141,6 @@ int od_backend_ready(od_server_t *server, char *context, return 0; } -static inline int -od_backend_ready_wait(od_server_t *server, char *context, int time_ms) -{ - od_instance_t *instance = server->system->instance; - - shapito_stream_t *stream = &server->stream; - /* wait for response */ - while (1) { - shapito_stream_reset(stream); - int rc; - rc = od_read(server->io, stream, time_ms); - if (rc == -1) { - if (! machine_timedout()) { - od_error_server(&instance->logger, &server->id, context, - "read error: %s", - machine_error(server->io)); - } - return -1; - } - int offset = rc; - char type = stream->start[offset]; - od_debug_server(&instance->logger, &server->id, context, - "%c", type); - /* ErrorResponse */ - if (type == 'E') { - od_backend_error(server, context, stream->start, - shapito_stream_used(stream)); - } - /* ReadyForQuery */ - if (type == 'Z') { - od_backend_ready(server, context, - stream->start + offset, - shapito_stream_used(stream) - offset); - break; - } - } - return 0; -} - static inline int od_backend_startup(od_server_t *server) { @@ -384,8 +345,45 @@ int od_backend_connect_cancel(od_server_t *server, return 0; } -static inline int -od_backend_query(od_server_t *server, char *context, char *query, int len) +int od_backend_ready_wait(od_server_t *server, char *context, int time_ms) +{ + od_instance_t *instance = server->system->instance; + + shapito_stream_t *stream = &server->stream; + /* wait for response */ + while (1) { + shapito_stream_reset(stream); + int rc; + rc = od_read(server->io, stream, time_ms); + if (rc == -1) { + if (! machine_timedout()) { + od_error_server(&instance->logger, &server->id, context, + "read error: %s", + machine_error(server->io)); + } + return -1; + } + int offset = rc; + char type = stream->start[offset]; + od_debug_server(&instance->logger, &server->id, context, + "%c", type); + /* ErrorResponse */ + if (type == 'E') { + od_backend_error(server, context, stream->start, + shapito_stream_used(stream)); + } + /* ReadyForQuery */ + if (type == 'Z') { + od_backend_ready(server, context, + stream->start + offset, + shapito_stream_used(stream) - offset); + break; + } + } + return 0; +} + +int od_backend_query(od_server_t *server, char *context, char *query, int len) { od_instance_t *instance = server->system->instance; int rc; @@ -411,153 +409,3 @@ od_backend_query(od_server_t *server, char *context, char *query, int len) return -1; return 0; } - -int od_backend_configure(od_server_t *server, shapito_be_startup_t *startup) -{ - od_instance_t *instance = server->system->instance; - - char query_configure[1024]; - int size = 0; - shapito_parameter_t *param; - shapito_parameter_t *end; - param = (shapito_parameter_t*)startup->params.buf.start; - end = (shapito_parameter_t*)startup->params.buf.pos; - for (; param < end; param = shapito_parameter_next(param)) { - if (param == startup->user || - param == startup->database) - continue; - size += snprintf(query_configure + size, - sizeof(query_configure) - size, - "SET %s=%s;", - shapito_parameter_name(param), - shapito_parameter_value(param)); - } - if (size == 0) - return 0; - od_debug_server(&instance->logger, &server->id, "configure", - "%s", query_configure); - int rc; - rc = od_backend_query(server, "configure", query_configure, - size + 1); - return rc; -} - -int od_backend_reset(od_server_t *server) -{ - od_instance_t *instance = server->system->instance; - od_route_t *route = server->route; - - /* server left in copy mode */ - if (server->is_copy) { - od_debug_server(&instance->logger, &server->id, "reset", - "in copy, closing"); - goto drop; - } - - /* support route rollback off */ - if (! route->scheme->pool_rollback) { - if (server->is_transaction) { - od_debug_server(&instance->logger, &server->id, "reset", - "in active transaction, closing"); - goto drop; - } - } - - /* support route cancel off */ - if (! route->scheme->pool_cancel) { - if (! od_server_sync_is(server)) { - od_debug_server(&instance->logger, &server->id, "reset", - "not synchronized, closing"); - goto drop; - } - } - - /* Server is not synchronized. - * - * Number of queries sent to server is not equal - * to the number of received replies. Do the following - * logic until server becomes synchronized: - * - * 1. Wait each ReadyForQuery until we receive all - * replies with 1 sec timeout. - * - * 2. Send Cancel in other connection. - * - * It is possible that client could previously - * pipeline server with requests. Each request - * may stall database on its own way and may require - * additional Cancel request. - * - * 3. Continue with (1) - */ - int wait_timeout = 1000; - int wait_try = 0; - int wait_try_cancel = 0; - int wait_cancel_limit = 1; - int rc = 0; - for (;;) { - while (! od_server_sync_is(server)) { - od_debug_server(&instance->logger, &server->id, "reset", - "not synchronized, wait for %d msec (#%d)", - wait_timeout, - wait_try); - wait_try++; - rc = od_backend_ready_wait(server, "reset", wait_timeout); - if (rc == -1) - break; - } - if (rc == -1) { - if (! machine_timedout()) - goto error; - if (wait_try_cancel == wait_cancel_limit) { - od_debug_server(&instance->logger, &server->id, "reset", - "server cancel limit reached, closing"); - goto error; - } - od_debug_server(&instance->logger, &server->id, "reset", - "not responded, cancel (#%d)", - wait_try_cancel); - wait_try_cancel++; - rc = od_cancel(server->system, - route->scheme->storage, &server->key, - &server->id); - if (rc == -1) - goto error; - continue; - } - assert(od_server_sync_is(server)); - break; - } - od_debug_server(&instance->logger, &server->id, "reset", - "synchronized"); - - /* send rollback in case server has an active - * transaction running */ - if (route->scheme->pool_rollback) { - if (server->is_transaction) { - char query_rlb[] = "ROLLBACK"; - rc = od_backend_query(server, "reset rollback", query_rlb, - sizeof(query_rlb)); - if (rc == -1) - goto error; - assert(! server->is_transaction); - } - } - - /* ready to use (yet maybe discard is required) */ - return 1; -drop: - return 0; -error: - return -1; -} - -int od_backend_discard(od_server_t *server) -{ - od_instance_t *instance = server->system->instance; - char query_discard[] = "DISCARD ALL"; - od_debug_server(&instance->logger, &server->id, "discard", - "%s", query_discard); - return od_backend_query(server, "reset", query_discard, - sizeof(query_discard)); -} diff --git a/sources/backend.h b/sources/backend.h index cec74877..1589bc42 100644 --- a/sources/backend.h +++ b/sources/backend.h @@ -11,10 +11,9 @@ int od_backend_connect(od_server_t*); int od_backend_connect_cancel(od_server_t*, od_schemestorage_t*, shapito_key_t*); void od_backend_close(od_server_t*); int od_backend_terminate(od_server_t*); -int od_backend_reset(od_server_t*); void od_backend_error(od_server_t*, char*, char*, int); int od_backend_ready(od_server_t*, char*, char*, int); -int od_backend_configure(od_server_t*, shapito_be_startup_t*); -int od_backend_discard(od_server_t*); +int od_backend_ready_wait(od_server_t*, char*, int); +int od_backend_query(od_server_t*, char*, char*, int); #endif /* OD_BACKEND_H */ diff --git a/sources/frontend.c b/sources/frontend.c index dcdf582d..d5daf0c0 100644 --- a/sources/frontend.c +++ b/sources/frontend.c @@ -45,6 +45,7 @@ #include "sources/relay.h" #include "sources/frontend.h" #include "sources/backend.h" +#include "sources/reset.h" #include "sources/tls.h" #include "sources/auth.h" #include "sources/console.h" @@ -354,14 +355,14 @@ od_frontend_remote(od_client_t *client) } else { /* discard last server configuration */ if (route->scheme->pool_discard) { - rc = od_backend_discard(client->server); + rc = od_reset_discard(client->server); if (rc == -1) return OD_RS_ESERVER_CONFIGURE; } } /* set client parameters */ - rc = od_backend_configure(client->server, &client->startup); + rc = od_reset_configure(client->server, &client->startup); if (rc == -1) return OD_RS_ESERVER_CONFIGURE; } @@ -447,7 +448,7 @@ od_frontend_remote(od_client_t *client) if (route->scheme->pool == OD_POOLING_TRANSACTION) { if (! server->is_transaction) { /* cleanup server */ - rc = od_backend_reset(server); + rc = od_reset(server); if (rc == -1) return OD_RS_ESERVER_WRITE; /* push server connection back to route pool */ @@ -697,7 +698,7 @@ void od_frontend(void *arg) od_unroute(client); break; } - rc = od_backend_reset(server); + rc = od_reset(server); if (rc != 1) { /* close backend connection */ od_router_close_and_unroute(client); @@ -718,7 +719,7 @@ void od_frontend(void *arg) od_unroute(client); break; } - rc = od_backend_reset(server); + rc = od_reset(server); if (rc != 1) { /* close backend connection */ od_router_close_and_unroute(client); diff --git a/sources/reset.c b/sources/reset.c new file mode 100644 index 00000000..dad46ada --- /dev/null +++ b/sources/reset.c @@ -0,0 +1,201 @@ + +/* + * Odissey. + * + * Advanced PostgreSQL connection pooler. +*/ + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "sources/macro.h" +#include "sources/version.h" +#include "sources/atomic.h" +#include "sources/list.h" +#include "sources/pid.h" +#include "sources/id.h" +#include "sources/log_file.h" +#include "sources/log_system.h" +#include "sources/logger.h" +#include "sources/daemon.h" +#include "sources/scheme.h" +#include "sources/scheme_mgr.h" +#include "sources/config.h" +#include "sources/msg.h" +#include "sources/system.h" +#include "sources/instance.h" +#include "sources/server.h" +#include "sources/server_pool.h" +#include "sources/client.h" +#include "sources/client_pool.h" +#include "sources/route_id.h" +#include "sources/route.h" +#include "sources/route_pool.h" +#include "sources/io.h" +#include "sources/router.h" +#include "sources/pooler.h" +#include "sources/relay.h" +#include "sources/frontend.h" +#include "sources/backend.h" +#include "sources/reset.h" +#include "sources/auth.h" +#include "sources/tls.h" +#include "sources/cancel.h" + +int od_reset(od_server_t *server) +{ + od_instance_t *instance = server->system->instance; + od_route_t *route = server->route; + + /* server left in copy mode */ + if (server->is_copy) { + od_debug_server(&instance->logger, &server->id, "reset", + "in copy, closing"); + goto drop; + } + + /* support route rollback off */ + if (! route->scheme->pool_rollback) { + if (server->is_transaction) { + od_debug_server(&instance->logger, &server->id, "reset", + "in active transaction, closing"); + goto drop; + } + } + + /* support route cancel off */ + if (! route->scheme->pool_cancel) { + if (! od_server_sync_is(server)) { + od_debug_server(&instance->logger, &server->id, "reset", + "not synchronized, closing"); + goto drop; + } + } + + /* Server is not synchronized. + * + * Number of queries sent to server is not equal + * to the number of received replies. Do the following + * logic until server becomes synchronized: + * + * 1. Wait each ReadyForQuery until we receive all + * replies with 1 sec timeout. + * + * 2. Send Cancel in other connection. + * + * It is possible that client could previously + * pipeline server with requests. Each request + * may stall database on its own way and may require + * additional Cancel request. + * + * 3. Continue with (1) + */ + int wait_timeout = 1000; + int wait_try = 0; + int wait_try_cancel = 0; + int wait_cancel_limit = 1; + int rc = 0; + for (;;) { + while (! od_server_sync_is(server)) { + od_debug_server(&instance->logger, &server->id, "reset", + "not synchronized, wait for %d msec (#%d)", + wait_timeout, + wait_try); + wait_try++; + rc = od_backend_ready_wait(server, "reset", wait_timeout); + if (rc == -1) + break; + } + if (rc == -1) { + if (! machine_timedout()) + goto error; + if (wait_try_cancel == wait_cancel_limit) { + od_debug_server(&instance->logger, &server->id, "reset", + "server cancel limit reached, closing"); + goto error; + } + od_debug_server(&instance->logger, &server->id, "reset", + "not responded, cancel (#%d)", + wait_try_cancel); + wait_try_cancel++; + rc = od_cancel(server->system, + route->scheme->storage, &server->key, + &server->id); + if (rc == -1) + goto error; + continue; + } + assert(od_server_sync_is(server)); + break; + } + od_debug_server(&instance->logger, &server->id, "reset", + "synchronized"); + + /* send rollback in case server has an active + * transaction running */ + if (route->scheme->pool_rollback) { + if (server->is_transaction) { + char query_rlb[] = "ROLLBACK"; + rc = od_backend_query(server, "reset rollback", query_rlb, + sizeof(query_rlb)); + if (rc == -1) + goto error; + assert(! server->is_transaction); + } + } + + /* ready to use (yet maybe discard is required) */ + return 1; +drop: + return 0; +error: + return -1; +} + +int od_reset_configure(od_server_t *server, shapito_be_startup_t *startup) +{ + od_instance_t *instance = server->system->instance; + + char query_configure[1024]; + int size = 0; + shapito_parameter_t *param; + shapito_parameter_t *end; + param = (shapito_parameter_t*)startup->params.buf.start; + end = (shapito_parameter_t*)startup->params.buf.pos; + for (; param < end; param = shapito_parameter_next(param)) { + if (param == startup->user || + param == startup->database) + continue; + size += snprintf(query_configure + size, + sizeof(query_configure) - size, + "SET %s=%s;", + shapito_parameter_name(param), + shapito_parameter_value(param)); + } + if (size == 0) + return 0; + od_debug_server(&instance->logger, &server->id, "configure", + "%s", query_configure); + int rc; + rc = od_backend_query(server, "configure", query_configure, + size + 1); + return rc; +} + +int od_reset_discard(od_server_t *server) +{ + od_instance_t *instance = server->system->instance; + char query_discard[] = "DISCARD ALL"; + od_debug_server(&instance->logger, &server->id, "discard", + "%s", query_discard); + return od_backend_query(server, "reset", query_discard, + sizeof(query_discard)); +} diff --git a/sources/reset.h b/sources/reset.h new file mode 100644 index 00000000..12a643bc --- /dev/null +++ b/sources/reset.h @@ -0,0 +1,14 @@ +#ifndef OD_RESET_H +#define OD_RESET_H + +/* + * Odissey. + * + * Advanced PostgreSQL connection pooler. +*/ + +int od_reset(od_server_t*); +int od_reset_configure(od_server_t*, shapito_be_startup_t*); +int od_reset_discard(od_server_t*); + +#endif /* OD_RESET_H */