From 87bbd197005d6f2ad62881c45b220b08198e7ee6 Mon Sep 17 00:00:00 2001 From: Dmitry Simonenko Date: Mon, 5 Feb 2018 16:39:20 +0300 Subject: [PATCH] odissey: pipeline deploy configuration prio to user request --- sources/backend.c | 39 ++++++++++++++++ sources/backend.h | 1 + sources/deploy.c | 49 ++++++++++++++++++++ sources/deploy.h | 2 + sources/frontend.c | 110 +++++++++++++++++++++++++++++++++------------ sources/server.h | 2 + 6 files changed, 174 insertions(+), 29 deletions(-) diff --git a/sources/backend.c b/sources/backend.c index a78fb817..71c2e7e0 100644 --- a/sources/backend.c +++ b/sources/backend.c @@ -457,3 +457,42 @@ int od_backend_query(od_server_t *server, char *context, return -1; return 0; } + +int +od_backend_deploy(od_server_t *server, char *context, + char *request, int request_size) +{ + od_instance_t *instance = server->system->instance; + int rc; + switch (*request) { + case 'S': + { + char *name; + uint32_t name_len; + char *value; + uint32_t value_len; + rc = shapito_fe_read_parameter(request, request_size, + &name, &name_len, &value, &value_len); + if (rc == -1) { + od_error(&instance->logger, context, NULL, server, + "failed to parse ParameterStatus message"); + return -1; + } + rc = shapito_parameters_add(&server->params, name, name_len, + value, value_len); + if (rc == -1) + return -1; + break; + } + case 'E': + od_backend_error(server, context, request, request_size); + break; + case 'Z': + rc = od_backend_ready(server, context, request, request_size); + if (rc == -1) + return -1; + server->deploy_sync--; + break; + } + return 0; +} diff --git a/sources/backend.h b/sources/backend.h index af402c05..d1f05016 100644 --- a/sources/backend.h +++ b/sources/backend.h @@ -15,5 +15,6 @@ void od_backend_error(od_server_t*, char*, char*, int); int od_backend_ready(od_server_t*, char*, char*, int); int od_backend_ready_wait(od_server_t*, char*, int, uint32_t); int od_backend_query(od_server_t*, char*, char*, int); +int od_backend_deploy(od_server_t*, char*, char*, int); #endif /* OD_BACKEND_H */ diff --git a/sources/deploy.c b/sources/deploy.c index 09a75bf8..9c7fafcd 100644 --- a/sources/deploy.c +++ b/sources/deploy.c @@ -149,3 +149,52 @@ int od_deploy(od_server_t *server, char *context, rc = od_backend_ready_wait(server, context, query_count, UINT32_MAX); return rc; } + + +int od_deploy_write(od_server_t *server, char *context, + shapito_stream_t *stream, + shapito_parameters_t *params) +{ + od_instance_t *instance = server->system->instance; + int rc; + + /* discard */ + int query_count = 1; + char query_discard[] = "DISCARD ALL"; + rc = shapito_fe_write_query(stream, query_discard, sizeof(query_discard)); + if (rc == -1) + return -1; + + /* parameters */ + char query[512]; + int size = 0; + size += od_deploy_add(server, params, query + size, sizeof(query) - size, + "TimeZone", 9); + size += od_deploy_add(server, params, query + size, sizeof(query) - size, + "DateStyle", 10); + size += od_deploy_add(server, params, query + size, sizeof(query) - size, + "client_encoding", 16); + size += od_deploy_add(server, params, query + size, sizeof(query) - size, + "application_name", 17); + size += od_deploy_add(server, params, query + size, sizeof(query) - size, + "extra_float_digits", 19); + size += od_deploy_add(server, params, query + size, sizeof(query) - size, + "standard_conforming_strings", 28); + size += od_deploy_add(server, params, query + size, sizeof(query) - size, + "statement_timeout", 18); + size += od_deploy_add(server, params, query + size, sizeof(query) - size, + "search_path", 12); + if (size == 0) { + od_debug(&instance->logger, context, server->client, server, + "%s", "no need to configure"); + } else { + od_debug(&instance->logger, context, server->client, server, + "%s", query); + size++; + query_count++; + rc = shapito_fe_write_query(stream, query, size); + if (rc == -1) + return -1; + } + return query_count; +} diff --git a/sources/deploy.h b/sources/deploy.h index 881922e1..f67f90af 100644 --- a/sources/deploy.h +++ b/sources/deploy.h @@ -9,4 +9,6 @@ int od_deploy(od_server_t*, char*, shapito_parameters_t*, int); +int od_deploy_write(od_server_t*, char*, shapito_stream_t*, shapito_parameters_t*); + #endif /* OD_DEPLOY_H */ diff --git a/sources/frontend.c b/sources/frontend.c index 63d59315..b4632e54 100644 --- a/sources/frontend.c +++ b/sources/frontend.c @@ -503,6 +503,52 @@ od_frontend_local(od_client_t *client) return OD_FE_OK; } +static inline od_frontend_rc_t +od_frontend_remote_client_attach(od_client_t *client) +{ + od_instance_t *instance = client->system->instance; + + od_routerstatus_t status; + status = od_router_attach(client); + if (status != OD_ROK) + return OD_FE_EATTACH; + + od_server_t *server = client->server; + od_debug(&instance->logger, "main", client, server, + "attached to %s%.*s", + server->id.id_prefix, sizeof(server->id.id), + server->id.id); + + /* connect to server, if necessary */ + int rc; + if (server->io == NULL) { + rc = od_backend_connect(server, "main"); + if (rc == -1) + return OD_FE_ESERVER_CONNECT; + } + + if (! od_idmgr_cmp(&server->last_client_id, &client->id)) { + + rc = od_deploy_write(client->server, "main", &client->stream, &client->params); + if (rc == -1) { + status = od_router_close(client); + if (status != OD_ROK) + return OD_FE_EATTACH; + } + + client->server->deploy_sync = rc; + } else { + od_debug(&instance->logger, "main", client, server, + "previously owned, no need to reconfigure %s%.*s", + server->id.id_prefix, sizeof(server->id.id), + server->id.id); + + client->server->deploy_sync = 0; + } + + return OD_FE_OK; +} + static inline od_frontend_rc_t od_frontend_remote_client(od_client_t *client) { @@ -513,6 +559,17 @@ od_frontend_remote_client(od_client_t *client) int request_count = 0; int terminate = 0; + /* get server connection from the route pool, write configuration + * requests before client request */ + if (server == NULL) { + od_frontend_rc_t fe_rc; + fe_rc = od_frontend_remote_client_attach(client); + if (fe_rc != OD_FE_OK) + return fe_rc; + server = client->server; + request_count = server->deploy_sync; + } + od_frontend_stream_reset(client); int rc; for (;;) @@ -540,8 +597,6 @@ od_frontend_remote_client(od_client_t *client) /* CopyDone or CopyFail */ case 'c': case 'f': - if (! server) - break; server->is_copy = 0; break; @@ -562,15 +617,6 @@ od_frontend_remote_client(od_client_t *client) break; } - /* get server connection from the route pool */ - if (server == NULL) { - od_frontend_rc_t fe_rc; - fe_rc = od_frontend_attach(client, "main", 0); - if (fe_rc != OD_FE_OK) - return fe_rc; - server = client->server; - } - if (type == 'Q' || /* Query */ type == 'F' || /* FunctionCall */ type == 'S') /* Sync */ @@ -588,19 +634,16 @@ od_frontend_remote_client(od_client_t *client) break; } - if (server) - { - /* update client recv stat */ - od_server_stat_recv_client(server, shapito_stream_used(stream)); + /* update client recv stat */ + od_server_stat_recv_client(server, shapito_stream_used(stream)); - /* forward to server */ - rc = od_write(server->io, stream); - if (rc == -1) - return OD_FE_ESERVER_WRITE; + /* forward to server */ + rc = od_write(server->io, stream); + if (rc == -1) + return OD_FE_ESERVER_WRITE; - /* update server sync state */ - od_server_stat_request(server, request_count); - } + /* update server sync state */ + od_server_stat_request(server, request_count); if (terminate) return OD_FE_TERMINATE; @@ -634,6 +677,15 @@ od_frontend_remote_server(od_client_t *client) od_debug(&instance->logger, "main", client, server, "%c", type); + /* discard replies during configuration deploy */ + if (server->deploy_sync > 0) { + rc = od_backend_deploy(server, "main", request, request_size); + if (rc == -1) + return OD_FE_ESERVER_READ; + od_frontend_stream_reset(client); + continue; + } + /* ReadyForQuery */ if (type == 'Z') { rc = od_backend_ready(server, "main", request, request_size); @@ -714,9 +766,11 @@ od_frontend_remote_server(od_client_t *client) } /* forward to client */ - rc = od_write(client->io, stream); - if (rc == -1) - return OD_FE_ECLIENT_WRITE; + if (shapito_stream_used(stream) > 0) { + rc = od_write(client->io, stream); + if (rc == -1) + return OD_FE_ECLIENT_WRITE; + } return OD_FE_OK; } @@ -744,10 +798,8 @@ od_frontend_remote(od_client_t *client) fe_rc = od_frontend_remote_client(client); if (fe_rc != OD_FE_OK) return fe_rc; - if (client->server) { - io_count = 2; - io_set[1] = client->server->io; - } + io_count = 2; + io_set[1] = client->server->io; continue; } fe_rc = od_frontend_remote_server(client); diff --git a/sources/server.h b/sources/server.h index be8a8a82..93b45f52 100644 --- a/sources/server.h +++ b/sources/server.h @@ -41,6 +41,7 @@ struct od_server int is_allocated; int is_transaction; int is_copy; + int deploy_sync; od_serverstat_t stats; int idle_time; shapito_key_t key; @@ -65,6 +66,7 @@ od_server_init(od_server_t *server) server->is_allocated = 0; server->is_transaction = 0; server->is_copy = 0; + server->deploy_sync = 0; memset(&server->stats, 0, sizeof(server->stats)); shapito_key_init(&server->key); shapito_key_init(&server->key_client);