From b964dbdc4f8ab7fac5a25ca667e8fbfa2c14e537 Mon Sep 17 00:00:00 2001 From: Dmitry Simonenko Date: Tue, 5 Sep 2017 17:07:50 +0300 Subject: [PATCH] odissey: forward ParameterStatus messages during client configure --- sources/backend.c | 16 +++++++++++++--- sources/backend.h | 4 ++-- sources/frontend.c | 34 ++++++++++++++++++++++++---------- sources/reset.c | 13 +++++++------ sources/reset.h | 4 ++-- sources/server.h | 3 +++ 6 files changed, 51 insertions(+), 23 deletions(-) diff --git a/sources/backend.c b/sources/backend.c index 1a231f7d..4769068c 100644 --- a/sources/backend.c +++ b/sources/backend.c @@ -345,7 +345,8 @@ int od_backend_connect_cancel(od_server_t *server, return 0; } -int od_backend_ready_wait(od_server_t *server, char *context, int time_ms) +int od_backend_ready_wait(od_server_t *server, shapito_stream_t *params, + char *context, int time_ms) { od_instance_t *instance = server->system->instance; @@ -372,6 +373,14 @@ int od_backend_ready_wait(od_server_t *server, char *context, int time_ms) od_backend_error(server, context, stream->start, shapito_stream_used(stream)); } + /* ParameterStatus */ + if (type == 'S' && params) { + /* copy status messages */ + rc = shapito_stream_ensure(params, shapito_stream_used(stream)); + if (rc == -1) + return -1; + shapito_stream_write(params, stream->start, shapito_stream_used(stream)); + } /* ReadyForQuery */ if (type == 'Z') { od_backend_ready(server, context, @@ -383,7 +392,8 @@ int od_backend_ready_wait(od_server_t *server, char *context, int time_ms) return 0; } -int od_backend_query(od_server_t *server, char *context, char *query, int len) +int od_backend_query(od_server_t *server, shapito_stream_t *params, + char *context, char *query, int len) { od_instance_t *instance = server->system->instance; int rc; @@ -404,7 +414,7 @@ int od_backend_query(od_server_t *server, char *context, char *query, int len) od_server_sync_request(server); od_server_stat_request(server); - rc = od_backend_ready_wait(server, context, UINT32_MAX); + rc = od_backend_ready_wait(server, params, context, UINT32_MAX); if (rc == -1) return -1; return 0; diff --git a/sources/backend.h b/sources/backend.h index 1589bc42..f4e70a01 100644 --- a/sources/backend.h +++ b/sources/backend.h @@ -13,7 +13,7 @@ void od_backend_close(od_server_t*); int od_backend_terminate(od_server_t*); void od_backend_error(od_server_t*, char*, char*, int); int od_backend_ready(od_server_t*, char*, char*, int); -int od_backend_ready_wait(od_server_t*, char*, int); -int od_backend_query(od_server_t*, char*, char*, int); +int od_backend_ready_wait(od_server_t*, shapito_stream_t*, char*, int); +int od_backend_query(od_server_t*, shapito_stream_t*, char*, char*, int); #endif /* OD_BACKEND_H */ diff --git a/sources/frontend.c b/sources/frontend.c index c6cf696e..8146bb1c 100644 --- a/sources/frontend.c +++ b/sources/frontend.c @@ -339,14 +339,9 @@ od_frontend_remote(od_client_t *client) server->id.id); /* configure server using client startup parameters, - * if it has not been configured before. */ - if (od_idmgr_cmp(&server->last_client_id, &client->id)) { - assert(server->io != NULL); - od_debug_client(&instance->logger, &client->id, NULL, - "previously owned, no need to reconfigure %s%.*s", - server->id.id_prefix, sizeof(server->id.id), - server->id.id); - } else { + * if it has not been configured before */ + if (! od_idmgr_cmp(&server->last_client_id, &client->id)) + { /* connect to server, if necessary */ if (server->io == NULL) { rc = od_backend_connect(server); @@ -354,17 +349,36 @@ od_frontend_remote(od_client_t *client) return OD_RS_ESERVER_CONNECT; } + shapito_stream_reset(&server->stream_params); + /* discard last server configuration */ if (route->scheme->pool_discard) { - rc = od_reset_discard(client->server); + rc = od_reset_discard(client->server, &server->stream_params); if (rc == -1) return OD_RS_ESERVER_CONFIGURE; } /* set client parameters */ - rc = od_reset_configure(client->server, &client->startup); + rc = od_reset_configure(client->server, &server->stream_params, + &client->startup); if (rc == -1) return OD_RS_ESERVER_CONFIGURE; + + /* forward ParameterStatus messages */ + if (shapito_stream_used(&server->stream_params)) { + od_debug_client(&instance->logger, &client->id, "configure", + "sending parameter statuses to client: %d bytes", + shapito_stream_used(&server->stream_params)); + rc = od_write(client->io, &server->stream_params); + if (rc == -1) + return OD_RS_ECLIENT_WRITE; + } + } else { + assert(server->io != NULL); + od_debug_client(&instance->logger, &client->id, NULL, + "previously owned, no need to reconfigure %s%.*s", + server->id.id_prefix, sizeof(server->id.id), + server->id.id); } } diff --git a/sources/reset.c b/sources/reset.c index dad46ada..5fa7fb08 100644 --- a/sources/reset.c +++ b/sources/reset.c @@ -110,7 +110,7 @@ int od_reset(od_server_t *server) wait_timeout, wait_try); wait_try++; - rc = od_backend_ready_wait(server, "reset", wait_timeout); + rc = od_backend_ready_wait(server, NULL, "reset", wait_timeout); if (rc == -1) break; } @@ -144,7 +144,7 @@ int od_reset(od_server_t *server) if (route->scheme->pool_rollback) { if (server->is_transaction) { char query_rlb[] = "ROLLBACK"; - rc = od_backend_query(server, "reset rollback", query_rlb, + rc = od_backend_query(server, NULL, "reset rollback", query_rlb, sizeof(query_rlb)); if (rc == -1) goto error; @@ -160,7 +160,8 @@ error: return -1; } -int od_reset_configure(od_server_t *server, shapito_be_startup_t *startup) +int od_reset_configure(od_server_t *server, shapito_stream_t *params, + shapito_be_startup_t *startup) { od_instance_t *instance = server->system->instance; @@ -185,17 +186,17 @@ int od_reset_configure(od_server_t *server, shapito_be_startup_t *startup) od_debug_server(&instance->logger, &server->id, "configure", "%s", query_configure); int rc; - rc = od_backend_query(server, "configure", query_configure, + rc = od_backend_query(server, params, "configure", query_configure, size + 1); return rc; } -int od_reset_discard(od_server_t *server) +int od_reset_discard(od_server_t *server, shapito_stream_t *params) { od_instance_t *instance = server->system->instance; char query_discard[] = "DISCARD ALL"; od_debug_server(&instance->logger, &server->id, "discard", "%s", query_discard); - return od_backend_query(server, "reset", query_discard, + return od_backend_query(server, params, "reset", query_discard, sizeof(query_discard)); } diff --git a/sources/reset.h b/sources/reset.h index 12a643bc..e71d3cc8 100644 --- a/sources/reset.h +++ b/sources/reset.h @@ -8,7 +8,7 @@ */ int od_reset(od_server_t*); -int od_reset_configure(od_server_t*, shapito_be_startup_t*); -int od_reset_discard(od_server_t*); +int od_reset_configure(od_server_t*, shapito_stream_t*, shapito_be_startup_t*); +int od_reset_discard(od_server_t*, shapito_stream_t*); #endif /* OD_RESET_H */ diff --git a/sources/server.h b/sources/server.h index e89abc41..9a8c9f1f 100644 --- a/sources/server.h +++ b/sources/server.h @@ -32,6 +32,7 @@ struct od_server od_serverstate_t state; od_id_t id; shapito_stream_t stream; + shapito_stream_t stream_params; machine_io_t *io; machine_tls_t *tls; int is_allocated; @@ -67,6 +68,7 @@ od_server_init(od_server_t *server) shapito_key_init(&server->key); shapito_key_init(&server->key_client); shapito_stream_init(&server->stream); + shapito_stream_init(&server->stream_params); od_list_init(&server->link); memset(&server->id, 0, sizeof(server->id)); memset(&server->last_client_id, 0, sizeof(server->last_client_id)); @@ -87,6 +89,7 @@ static inline void od_server_free(od_server_t *server) { shapito_stream_free(&server->stream); + shapito_stream_free(&server->stream_params); if (server->is_allocated) free(server); }