odissey: pipeline deploy configuration prio to user request

This commit is contained in:
Dmitry Simonenko 2018-02-05 16:39:20 +03:00
parent be757e8e85
commit 87bbd19700
6 changed files with 174 additions and 29 deletions

View File

@ -457,3 +457,42 @@ int od_backend_query(od_server_t *server, char *context,
return -1;
return 0;
}
int
od_backend_deploy(od_server_t *server, char *context,
char *request, int request_size)
{
od_instance_t *instance = server->system->instance;
int rc;
switch (*request) {
case 'S':
{
char *name;
uint32_t name_len;
char *value;
uint32_t value_len;
rc = shapito_fe_read_parameter(request, request_size,
&name, &name_len, &value, &value_len);
if (rc == -1) {
od_error(&instance->logger, context, NULL, server,
"failed to parse ParameterStatus message");
return -1;
}
rc = shapito_parameters_add(&server->params, name, name_len,
value, value_len);
if (rc == -1)
return -1;
break;
}
case 'E':
od_backend_error(server, context, request, request_size);
break;
case 'Z':
rc = od_backend_ready(server, context, request, request_size);
if (rc == -1)
return -1;
server->deploy_sync--;
break;
}
return 0;
}

View File

@ -15,5 +15,6 @@ void od_backend_error(od_server_t*, char*, char*, int);
int od_backend_ready(od_server_t*, char*, char*, int);
int od_backend_ready_wait(od_server_t*, char*, int, uint32_t);
int od_backend_query(od_server_t*, char*, char*, int);
int od_backend_deploy(od_server_t*, char*, char*, int);
#endif /* OD_BACKEND_H */

View File

@ -149,3 +149,52 @@ int od_deploy(od_server_t *server, char *context,
rc = od_backend_ready_wait(server, context, query_count, UINT32_MAX);
return rc;
}
int od_deploy_write(od_server_t *server, char *context,
shapito_stream_t *stream,
shapito_parameters_t *params)
{
od_instance_t *instance = server->system->instance;
int rc;
/* discard */
int query_count = 1;
char query_discard[] = "DISCARD ALL";
rc = shapito_fe_write_query(stream, query_discard, sizeof(query_discard));
if (rc == -1)
return -1;
/* parameters */
char query[512];
int size = 0;
size += od_deploy_add(server, params, query + size, sizeof(query) - size,
"TimeZone", 9);
size += od_deploy_add(server, params, query + size, sizeof(query) - size,
"DateStyle", 10);
size += od_deploy_add(server, params, query + size, sizeof(query) - size,
"client_encoding", 16);
size += od_deploy_add(server, params, query + size, sizeof(query) - size,
"application_name", 17);
size += od_deploy_add(server, params, query + size, sizeof(query) - size,
"extra_float_digits", 19);
size += od_deploy_add(server, params, query + size, sizeof(query) - size,
"standard_conforming_strings", 28);
size += od_deploy_add(server, params, query + size, sizeof(query) - size,
"statement_timeout", 18);
size += od_deploy_add(server, params, query + size, sizeof(query) - size,
"search_path", 12);
if (size == 0) {
od_debug(&instance->logger, context, server->client, server,
"%s", "no need to configure");
} else {
od_debug(&instance->logger, context, server->client, server,
"%s", query);
size++;
query_count++;
rc = shapito_fe_write_query(stream, query, size);
if (rc == -1)
return -1;
}
return query_count;
}

View File

@ -9,4 +9,6 @@
int od_deploy(od_server_t*, char*, shapito_parameters_t*, int);
int od_deploy_write(od_server_t*, char*, shapito_stream_t*, shapito_parameters_t*);
#endif /* OD_DEPLOY_H */

View File

@ -503,6 +503,52 @@ od_frontend_local(od_client_t *client)
return OD_FE_OK;
}
static inline od_frontend_rc_t
od_frontend_remote_client_attach(od_client_t *client)
{
od_instance_t *instance = client->system->instance;
od_routerstatus_t status;
status = od_router_attach(client);
if (status != OD_ROK)
return OD_FE_EATTACH;
od_server_t *server = client->server;
od_debug(&instance->logger, "main", client, server,
"attached to %s%.*s",
server->id.id_prefix, sizeof(server->id.id),
server->id.id);
/* connect to server, if necessary */
int rc;
if (server->io == NULL) {
rc = od_backend_connect(server, "main");
if (rc == -1)
return OD_FE_ESERVER_CONNECT;
}
if (! od_idmgr_cmp(&server->last_client_id, &client->id)) {
rc = od_deploy_write(client->server, "main", &client->stream, &client->params);
if (rc == -1) {
status = od_router_close(client);
if (status != OD_ROK)
return OD_FE_EATTACH;
}
client->server->deploy_sync = rc;
} else {
od_debug(&instance->logger, "main", client, server,
"previously owned, no need to reconfigure %s%.*s",
server->id.id_prefix, sizeof(server->id.id),
server->id.id);
client->server->deploy_sync = 0;
}
return OD_FE_OK;
}
static inline od_frontend_rc_t
od_frontend_remote_client(od_client_t *client)
{
@ -513,6 +559,17 @@ od_frontend_remote_client(od_client_t *client)
int request_count = 0;
int terminate = 0;
/* get server connection from the route pool, write configuration
* requests before client request */
if (server == NULL) {
od_frontend_rc_t fe_rc;
fe_rc = od_frontend_remote_client_attach(client);
if (fe_rc != OD_FE_OK)
return fe_rc;
server = client->server;
request_count = server->deploy_sync;
}
od_frontend_stream_reset(client);
int rc;
for (;;)
@ -540,8 +597,6 @@ od_frontend_remote_client(od_client_t *client)
/* CopyDone or CopyFail */
case 'c':
case 'f':
if (! server)
break;
server->is_copy = 0;
break;
@ -562,15 +617,6 @@ od_frontend_remote_client(od_client_t *client)
break;
}
/* get server connection from the route pool */
if (server == NULL) {
od_frontend_rc_t fe_rc;
fe_rc = od_frontend_attach(client, "main", 0);
if (fe_rc != OD_FE_OK)
return fe_rc;
server = client->server;
}
if (type == 'Q' || /* Query */
type == 'F' || /* FunctionCall */
type == 'S') /* Sync */
@ -588,19 +634,16 @@ od_frontend_remote_client(od_client_t *client)
break;
}
if (server)
{
/* update client recv stat */
od_server_stat_recv_client(server, shapito_stream_used(stream));
/* update client recv stat */
od_server_stat_recv_client(server, shapito_stream_used(stream));
/* forward to server */
rc = od_write(server->io, stream);
if (rc == -1)
return OD_FE_ESERVER_WRITE;
/* forward to server */
rc = od_write(server->io, stream);
if (rc == -1)
return OD_FE_ESERVER_WRITE;
/* update server sync state */
od_server_stat_request(server, request_count);
}
/* update server sync state */
od_server_stat_request(server, request_count);
if (terminate)
return OD_FE_TERMINATE;
@ -634,6 +677,15 @@ od_frontend_remote_server(od_client_t *client)
od_debug(&instance->logger, "main", client, server,
"%c", type);
/* discard replies during configuration deploy */
if (server->deploy_sync > 0) {
rc = od_backend_deploy(server, "main", request, request_size);
if (rc == -1)
return OD_FE_ESERVER_READ;
od_frontend_stream_reset(client);
continue;
}
/* ReadyForQuery */
if (type == 'Z') {
rc = od_backend_ready(server, "main", request, request_size);
@ -714,9 +766,11 @@ od_frontend_remote_server(od_client_t *client)
}
/* forward to client */
rc = od_write(client->io, stream);
if (rc == -1)
return OD_FE_ECLIENT_WRITE;
if (shapito_stream_used(stream) > 0) {
rc = od_write(client->io, stream);
if (rc == -1)
return OD_FE_ECLIENT_WRITE;
}
return OD_FE_OK;
}
@ -744,10 +798,8 @@ od_frontend_remote(od_client_t *client)
fe_rc = od_frontend_remote_client(client);
if (fe_rc != OD_FE_OK)
return fe_rc;
if (client->server) {
io_count = 2;
io_set[1] = client->server->io;
}
io_count = 2;
io_set[1] = client->server->io;
continue;
}
fe_rc = od_frontend_remote_server(client);

View File

@ -41,6 +41,7 @@ struct od_server
int is_allocated;
int is_transaction;
int is_copy;
int deploy_sync;
od_serverstat_t stats;
int idle_time;
shapito_key_t key;
@ -65,6 +66,7 @@ od_server_init(od_server_t *server)
server->is_allocated = 0;
server->is_transaction = 0;
server->is_copy = 0;
server->deploy_sync = 0;
memset(&server->stats, 0, sizeof(server->stats));
shapito_key_init(&server->key);
shapito_key_init(&server->key_client);