mirror of https://github.com/yandex/odyssey.git
odissey: rework server buffering using client stream
This commit is contained in:
parent
34958f8e52
commit
f11a98432f
|
@ -105,7 +105,9 @@ od_auth_frontend_cleartext(od_client_t *client)
|
|||
shapito_password_init(&client_password);
|
||||
|
||||
if (client->scheme->auth_query) {
|
||||
rc = od_auth_query(client->system, client->scheme,
|
||||
rc = od_auth_query(client->system,
|
||||
stream,
|
||||
client->scheme,
|
||||
client->startup.user,
|
||||
&client_password);
|
||||
if (rc == -1) {
|
||||
|
@ -203,7 +205,9 @@ od_auth_frontend_md5(od_client_t *client)
|
|||
shapito_password_init(&query_password);
|
||||
|
||||
if (client->scheme->auth_query) {
|
||||
rc = od_auth_query(client->system, client->scheme,
|
||||
rc = od_auth_query(client->system,
|
||||
stream,
|
||||
client->scheme,
|
||||
client->startup.user,
|
||||
&query_password);
|
||||
if (rc == -1) {
|
||||
|
@ -313,7 +317,7 @@ int od_auth_frontend(od_client_t *client)
|
|||
}
|
||||
|
||||
static inline int
|
||||
od_auth_backend_cleartext(od_server_t *server)
|
||||
od_auth_backend_cleartext(od_server_t *server, shapito_stream_t *stream)
|
||||
{
|
||||
od_instance_t *instance = server->system->instance;
|
||||
od_route_t *route = server->route;
|
||||
|
@ -341,7 +345,6 @@ od_auth_backend_cleartext(od_server_t *server)
|
|||
}
|
||||
|
||||
/* PasswordMessage */
|
||||
shapito_stream_t *stream = server->stream;
|
||||
shapito_stream_reset(stream);
|
||||
int rc;
|
||||
rc = shapito_fe_write_password(stream, password, password_len + 1);
|
||||
|
@ -361,7 +364,8 @@ od_auth_backend_cleartext(od_server_t *server)
|
|||
}
|
||||
|
||||
static inline int
|
||||
od_auth_backend_md5(od_server_t *server, char salt[4])
|
||||
od_auth_backend_md5(od_server_t *server, shapito_stream_t *stream,
|
||||
char salt[4])
|
||||
{
|
||||
od_instance_t *instance = server->system->instance;
|
||||
od_route_t *route = server->route;
|
||||
|
@ -414,7 +418,6 @@ od_auth_backend_md5(od_server_t *server, char salt[4])
|
|||
}
|
||||
|
||||
/* PasswordMessage */
|
||||
shapito_stream_t *stream = server->stream;
|
||||
shapito_stream_reset(stream);
|
||||
rc = shapito_fe_write_password(stream,
|
||||
client_password.password,
|
||||
|
@ -435,11 +438,9 @@ od_auth_backend_md5(od_server_t *server, char salt[4])
|
|||
return 0;
|
||||
}
|
||||
|
||||
int od_auth_backend(od_server_t *server)
|
||||
int od_auth_backend(od_server_t *server, shapito_stream_t *stream)
|
||||
{
|
||||
od_instance_t *instance = server->system->instance;
|
||||
|
||||
shapito_stream_t *stream = server->stream;
|
||||
assert(*stream->start == 'R');
|
||||
|
||||
uint32_t auth_type;
|
||||
|
@ -458,13 +459,13 @@ int od_auth_backend(od_server_t *server)
|
|||
return 0;
|
||||
/* AuthenticationCleartextPassword */
|
||||
case 3:
|
||||
rc = od_auth_backend_cleartext(server);
|
||||
rc = od_auth_backend_cleartext(server, stream);
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
break;
|
||||
/* AuthenticationMD5Password */
|
||||
case 5:
|
||||
rc = od_auth_backend_md5(server, salt);
|
||||
rc = od_auth_backend_md5(server, stream, salt);
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
break;
|
||||
|
@ -479,14 +480,14 @@ int od_auth_backend(od_server_t *server)
|
|||
while (1) {
|
||||
int rc;
|
||||
shapito_stream_reset(stream);
|
||||
rc = od_read(server->io, server->stream, UINT32_MAX);
|
||||
rc = od_read(server->io, stream, UINT32_MAX);
|
||||
if (rc == -1) {
|
||||
od_error(&instance->logger, "auth", NULL, server,
|
||||
"read error: %s",
|
||||
machine_error(server->io));
|
||||
return -1;
|
||||
}
|
||||
char type = *server->stream->start;
|
||||
char type = *stream->start;
|
||||
od_debug(&instance->logger, "auth", NULL, server,
|
||||
"%c", type);
|
||||
switch (type) {
|
||||
|
|
|
@ -8,6 +8,6 @@
|
|||
*/
|
||||
|
||||
int od_auth_frontend(od_client_t*);
|
||||
int od_auth_backend(od_server_t*);
|
||||
int od_auth_backend(od_server_t*, shapito_stream_t*);
|
||||
|
||||
#endif /* OD_AUTH_H */
|
||||
|
|
|
@ -50,7 +50,8 @@
|
|||
#include "sources/auth_query.h"
|
||||
|
||||
static inline int
|
||||
od_auth_query_do(od_server_t *server, char *query, int len,
|
||||
od_auth_query_do(od_server_t *server, shapito_stream_t *stream,
|
||||
char *query, int len,
|
||||
shapito_password_t *result)
|
||||
{
|
||||
od_instance_t *instance = server->system->instance;
|
||||
|
@ -58,7 +59,6 @@ od_auth_query_do(od_server_t *server, char *query, int len,
|
|||
od_debug(&instance->logger, "auth_query", server->client, server,
|
||||
"%s", query);
|
||||
int rc;
|
||||
shapito_stream_t *stream = server->stream;
|
||||
shapito_stream_reset(stream);
|
||||
rc = shapito_fe_write_query(stream, query, len);
|
||||
if (rc == -1)
|
||||
|
@ -88,8 +88,7 @@ od_auth_query_do(od_server_t *server, char *query, int len,
|
|||
}
|
||||
return -1;
|
||||
}
|
||||
int offset = rc;
|
||||
char type = stream->start[offset];
|
||||
char type = *stream->start;
|
||||
od_debug(&instance->logger, "auth_query", server->client, server,
|
||||
"%c", type);
|
||||
|
||||
|
@ -165,9 +164,8 @@ od_auth_query_do(od_server_t *server, char *query, int len,
|
|||
}
|
||||
/* ReadyForQuery */
|
||||
case 'Z':
|
||||
od_backend_ready(server, "auth_query",
|
||||
stream->start + offset,
|
||||
shapito_stream_used(stream) - offset);
|
||||
od_backend_ready(server, "auth_query", stream->start,
|
||||
shapito_stream_used(stream));
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
@ -216,7 +214,9 @@ od_auth_query_format(od_schemeroute_t *scheme, shapito_parameter_t *user,
|
|||
return dst_pos - output;
|
||||
}
|
||||
|
||||
int od_auth_query(od_system_t *system, od_schemeroute_t *scheme,
|
||||
int od_auth_query(od_system_t *system,
|
||||
shapito_stream_t *stream,
|
||||
od_schemeroute_t *scheme,
|
||||
shapito_parameter_t *user,
|
||||
shapito_password_t *password)
|
||||
{
|
||||
|
@ -273,7 +273,7 @@ int od_auth_query(od_system_t *system, od_schemeroute_t *scheme,
|
|||
/* connect to server, if necessary */
|
||||
int rc;
|
||||
if (server->io == NULL) {
|
||||
rc = od_backend_connect(server, "auth_query");
|
||||
rc = od_backend_connect(server, stream, "auth_query");
|
||||
if (rc == -1) {
|
||||
od_router_close_and_unroute(auth_client);
|
||||
od_client_free(auth_client);
|
||||
|
@ -286,7 +286,7 @@ int od_auth_query(od_system_t *system, od_schemeroute_t *scheme,
|
|||
int query_len;
|
||||
query_len = od_auth_query_format(scheme, user, query, sizeof(query));
|
||||
|
||||
rc = od_auth_query_do(server, query, query_len, password);
|
||||
rc = od_auth_query_do(server, stream, query, query_len, password);
|
||||
if (rc == -1) {
|
||||
od_router_close_and_unroute(auth_client);
|
||||
od_client_free(auth_client);
|
||||
|
|
|
@ -7,8 +7,7 @@
|
|||
* Advanced PostgreSQL connection pooler.
|
||||
*/
|
||||
|
||||
int od_auth_query(od_system_t*, od_schemeroute_t*,
|
||||
shapito_parameter_t*,
|
||||
shapito_password_t*);
|
||||
int od_auth_query(od_system_t*, shapito_stream_t*, od_schemeroute_t*,
|
||||
shapito_parameter_t*, shapito_password_t*);
|
||||
|
||||
#endif /* OD_AUTH_QUERY_H */
|
||||
|
|
|
@ -51,7 +51,6 @@
|
|||
|
||||
void od_backend_close(od_server_t *server)
|
||||
{
|
||||
assert(server->stream == NULL);
|
||||
assert(server->route == NULL);
|
||||
if (server->io) {
|
||||
machine_close(server->io);
|
||||
|
@ -69,11 +68,10 @@ void od_backend_close(od_server_t *server)
|
|||
od_server_free(server);
|
||||
}
|
||||
|
||||
int od_backend_terminate(od_server_t *server)
|
||||
int od_backend_terminate(od_server_t *server, shapito_stream_t *stream)
|
||||
{
|
||||
int rc;
|
||||
shapito_stream_t *stream = server->stream;
|
||||
shapito_stream_reset(stream);
|
||||
int rc;
|
||||
rc = shapito_fe_write_terminate(stream);
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
|
@ -139,13 +137,12 @@ int od_backend_ready(od_server_t *server, char *context,
|
|||
}
|
||||
|
||||
static inline int
|
||||
od_backend_startup(od_server_t *server)
|
||||
od_backend_startup(od_server_t *server, shapito_stream_t *stream)
|
||||
{
|
||||
od_instance_t *instance = server->system->instance;
|
||||
od_route_t *route = server->route;
|
||||
|
||||
shapito_stream_t *stream = server->stream;
|
||||
shapito_stream_reset(stream);
|
||||
|
||||
shapito_fe_arg_t argv[] = {
|
||||
{ "user", 5 }, { route->id.user, route->id.user_len },
|
||||
{ "database", 9 }, { route->id.database, route->id.database_len }
|
||||
|
@ -168,14 +165,14 @@ od_backend_startup(od_server_t *server)
|
|||
while (1) {
|
||||
shapito_stream_reset(stream);
|
||||
int rc;
|
||||
rc = od_read(server->io, server->stream, UINT32_MAX);
|
||||
rc = od_read(server->io, stream, UINT32_MAX);
|
||||
if (rc == -1) {
|
||||
od_error(&instance->logger, "startup", NULL, server,
|
||||
"read error: %s",
|
||||
machine_error(server->io));
|
||||
return -1;
|
||||
}
|
||||
char type = *server->stream->start;
|
||||
char type = *stream->start;
|
||||
od_debug(&instance->logger, "startup", NULL, server,
|
||||
"%c", type);
|
||||
|
||||
|
@ -187,7 +184,7 @@ od_backend_startup(od_server_t *server)
|
|||
return 0;
|
||||
/* Authentication */
|
||||
case 'R':
|
||||
rc = od_auth_backend(server);
|
||||
rc = od_auth_backend(server, stream);
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
break;
|
||||
|
@ -240,8 +237,9 @@ od_backend_startup(od_server_t *server)
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
static inline int
|
||||
od_backend_connect_to(od_server_t *server,
|
||||
shapito_stream_t *stream,
|
||||
od_schemestorage_t *server_scheme,
|
||||
char *context)
|
||||
{
|
||||
|
@ -299,7 +297,7 @@ od_backend_connect_to(od_server_t *server,
|
|||
|
||||
/* do tls handshake */
|
||||
if (server_scheme->tls_mode != OD_TLS_DISABLE) {
|
||||
rc = od_tls_backend_connect(server, &instance->logger, server_scheme);
|
||||
rc = od_tls_backend_connect(server, &instance->logger, stream, server_scheme);
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
}
|
||||
|
@ -307,7 +305,8 @@ od_backend_connect_to(od_server_t *server,
|
|||
return 0;
|
||||
}
|
||||
|
||||
int od_backend_connect(od_server_t *server, char *context)
|
||||
int od_backend_connect(od_server_t *server, shapito_stream_t *stream,
|
||||
char *context)
|
||||
{
|
||||
od_instance_t *instance = server->system->instance;
|
||||
od_route_t *route = server->route;
|
||||
|
@ -318,7 +317,7 @@ int od_backend_connect(od_server_t *server, char *context)
|
|||
|
||||
/* connect to server */
|
||||
int rc;
|
||||
rc = od_backend_connect_to(server, server_scheme, context);
|
||||
rc = od_backend_connect_to(server, stream, server_scheme, context);
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
|
||||
|
@ -331,29 +330,27 @@ int od_backend_connect(od_server_t *server, char *context)
|
|||
}
|
||||
|
||||
/* send startup and do initial configuration */
|
||||
rc = od_backend_startup(server);
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
|
||||
return 0;
|
||||
rc = od_backend_startup(server, stream);
|
||||
return rc;
|
||||
}
|
||||
|
||||
int od_backend_connect_cancel(od_server_t *server,
|
||||
shapito_stream_t *stream,
|
||||
od_schemestorage_t *server_scheme,
|
||||
shapito_key_t *key)
|
||||
{
|
||||
od_instance_t *instance = server->system->instance;
|
||||
/* connect to server */
|
||||
int rc;
|
||||
rc = od_backend_connect_to(server, server_scheme, "cancel");
|
||||
rc = od_backend_connect_to(server, stream, server_scheme, "cancel");
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
/* send cancel request */
|
||||
shapito_stream_reset(server->stream);
|
||||
rc = shapito_fe_write_cancel(server->stream, key->key_pid, key->key);
|
||||
shapito_stream_reset(stream);
|
||||
rc = shapito_fe_write_cancel(stream, key->key_pid, key->key);
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
rc = od_write(server->io, server->stream);
|
||||
rc = od_write(server->io, stream);
|
||||
if (rc == -1) {
|
||||
od_error(&instance->logger, "cancel", NULL, NULL,
|
||||
"write error: %s",
|
||||
|
@ -362,14 +359,12 @@ int od_backend_connect_cancel(od_server_t *server,
|
|||
return 0;
|
||||
}
|
||||
|
||||
int od_backend_ready_wait(od_server_t *server, char *context, int count,
|
||||
int od_backend_ready_wait(od_server_t *server, shapito_stream_t *stream,
|
||||
char *context, int count,
|
||||
uint32_t time_ms)
|
||||
{
|
||||
od_instance_t *instance = server->system->instance;
|
||||
|
||||
shapito_stream_t *stream = server->stream;
|
||||
/* wait for response */
|
||||
|
||||
int ready = 0;
|
||||
for (;;)
|
||||
{
|
||||
|
@ -384,8 +379,7 @@ int od_backend_ready_wait(od_server_t *server, char *context, int count,
|
|||
}
|
||||
return -1;
|
||||
}
|
||||
int offset = rc;
|
||||
char type = stream->start[offset];
|
||||
char type = *stream->start;
|
||||
od_debug(&instance->logger, context, server->client, server,
|
||||
"%c", type);
|
||||
/* ErrorResponse */
|
||||
|
@ -421,9 +415,8 @@ int od_backend_ready_wait(od_server_t *server, char *context, int count,
|
|||
}
|
||||
/* ReadyForQuery */
|
||||
if (type == 'Z') {
|
||||
od_backend_ready(server, context,
|
||||
stream->start + offset,
|
||||
shapito_stream_used(stream) - offset);
|
||||
od_backend_ready(server, context, stream->start,
|
||||
shapito_stream_used(stream));
|
||||
ready++;
|
||||
if (ready == count)
|
||||
break;
|
||||
|
@ -432,13 +425,13 @@ int od_backend_ready_wait(od_server_t *server, char *context, int count,
|
|||
return 0;
|
||||
}
|
||||
|
||||
int od_backend_query(od_server_t *server, char *context,
|
||||
int od_backend_query(od_server_t *server, shapito_stream_t *stream,
|
||||
char *context,
|
||||
char *query, int len)
|
||||
{
|
||||
od_instance_t *instance = server->system->instance;
|
||||
int rc;
|
||||
shapito_stream_t *stream = server->stream;
|
||||
shapito_stream_reset(stream);
|
||||
int rc;
|
||||
rc = shapito_fe_write_query(stream, query, len);
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
|
@ -453,10 +446,8 @@ int od_backend_query(od_server_t *server, char *context,
|
|||
/* update server sync state and stats */
|
||||
od_server_stat_request(server, 1);
|
||||
|
||||
rc = od_backend_ready_wait(server, context, 1, UINT32_MAX);
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
return 0;
|
||||
rc = od_backend_ready_wait(server, stream, context, 1, UINT32_MAX);
|
||||
return rc;
|
||||
}
|
||||
|
||||
int
|
||||
|
@ -502,10 +493,10 @@ od_backend_deploy(od_server_t *server, char *context,
|
|||
return 0;
|
||||
}
|
||||
|
||||
int od_backend_deploy_wait(od_server_t *server, char *context, uint32_t time_ms)
|
||||
int od_backend_deploy_wait(od_server_t *server, shapito_stream_t *stream,
|
||||
char *context, uint32_t time_ms)
|
||||
{
|
||||
od_instance_t *instance = server->system->instance;
|
||||
shapito_stream_t *stream = server->stream;
|
||||
while (server->deploy_sync > 0) {
|
||||
shapito_stream_reset(stream);
|
||||
int rc;
|
||||
|
|
|
@ -7,15 +7,16 @@
|
|||
* Advanced PostgreSQL connection pooler.
|
||||
*/
|
||||
|
||||
int od_backend_connect(od_server_t*, char*);
|
||||
int od_backend_connect_cancel(od_server_t*, od_schemestorage_t*, shapito_key_t*);
|
||||
int od_backend_connect(od_server_t*, shapito_stream_t*, char*);
|
||||
int od_backend_connect_cancel(od_server_t*, shapito_stream_t*, od_schemestorage_t*,
|
||||
shapito_key_t*);
|
||||
void od_backend_close(od_server_t*);
|
||||
int od_backend_terminate(od_server_t*);
|
||||
int od_backend_terminate(od_server_t*, shapito_stream_t*);
|
||||
void od_backend_error(od_server_t*, char*, char*, int);
|
||||
int od_backend_ready(od_server_t*, char*, char*, int);
|
||||
int od_backend_ready_wait(od_server_t*, char*, int, uint32_t);
|
||||
int od_backend_query(od_server_t*, char*, char*, int);
|
||||
int od_backend_ready_wait(od_server_t*, shapito_stream_t*, char*, int, uint32_t);
|
||||
int od_backend_query(od_server_t*, shapito_stream_t*, char*, char*, int);
|
||||
int od_backend_deploy(od_server_t*, char*, char*, int);
|
||||
int od_backend_deploy_wait(od_server_t*, char*, uint32_t);
|
||||
int od_backend_deploy_wait(od_server_t*, shapito_stream_t*, char*, uint32_t);
|
||||
|
||||
#endif /* OD_BACKEND_H */
|
||||
|
|
|
@ -50,6 +50,7 @@
|
|||
#include "sources/cancel.h"
|
||||
|
||||
int od_cancel(od_system_t *system,
|
||||
shapito_stream_t *stream,
|
||||
od_schemestorage_t *server_scheme,
|
||||
shapito_key_t *key,
|
||||
od_id_t *server_id)
|
||||
|
@ -61,10 +62,8 @@ int od_cancel(od_system_t *system,
|
|||
sizeof(server_id->id), server_id->id);
|
||||
od_server_t server;
|
||||
od_server_init(&server);
|
||||
od_server_stream_attach(&server, &instance->stream_cache);
|
||||
server.system = system;
|
||||
od_backend_connect_cancel(&server, server_scheme, key);
|
||||
od_server_stream_detach(&server, &instance->stream_cache);
|
||||
od_backend_connect_cancel(&server, stream, server_scheme, key);
|
||||
od_backend_close(&server);
|
||||
return 0;
|
||||
}
|
||||
|
@ -80,6 +79,7 @@ int od_cancel_match(od_system_t *system,
|
|||
od_routepool_t *route_pool,
|
||||
shapito_key_t *key)
|
||||
{
|
||||
od_instance_t *instance = system->instance;
|
||||
/* match server by client key (forge) */
|
||||
od_server_t *server;
|
||||
server = od_routepool_server_foreach(route_pool, OD_SACTIVE,
|
||||
|
@ -90,5 +90,11 @@ int od_cancel_match(od_system_t *system,
|
|||
od_route_t *route = server->route;
|
||||
od_schemestorage_t *server_scheme = route->scheme->storage;
|
||||
shapito_key_t cancel_key = server->key;
|
||||
return od_cancel(system, server_scheme, &cancel_key, &server->id);
|
||||
|
||||
shapito_stream_t *stream;
|
||||
stream = shapito_cache_pop(&instance->stream_cache);
|
||||
int rc;
|
||||
rc = od_cancel(system, stream, server_scheme, &cancel_key, &server->id);
|
||||
shapito_cache_push(&instance->stream_cache, stream);
|
||||
return rc;
|
||||
}
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
* Advanced PostgreSQL connection pooler.
|
||||
*/
|
||||
|
||||
int od_cancel(od_system_t*, od_schemestorage_t*, shapito_key_t*, od_id_t*);
|
||||
int od_cancel(od_system_t*, shapito_stream_t*, od_schemestorage_t*, shapito_key_t*, od_id_t*);
|
||||
int od_cancel_match(od_system_t*, od_routepool_t*, shapito_key_t*);
|
||||
|
||||
#endif
|
||||
|
|
|
@ -84,17 +84,19 @@ void od_frontend_close(od_client_t *client)
|
|||
static inline int
|
||||
od_frontend_error_fwd(od_client_t *client)
|
||||
{
|
||||
od_server_t *server;
|
||||
server = client->server;
|
||||
od_instance_t *instance = client->system->instance;
|
||||
od_server_t *server = client->server;
|
||||
assert(server != NULL);
|
||||
assert(server->stats.count_error != 0);
|
||||
shapito_fe_error_t error;
|
||||
int rc;
|
||||
rc = shapito_fe_read_error(&error, server->stream->start,
|
||||
shapito_stream_used(server->stream));
|
||||
rc = shapito_fe_read_error(&error, client->stream->start,
|
||||
shapito_stream_used(client->stream));
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
shapito_stream_t *stream = client->stream;
|
||||
|
||||
shapito_stream_t *stream;
|
||||
stream = shapito_cache_pop(&instance->stream_cache);
|
||||
shapito_stream_reset(stream);
|
||||
char msg[512];
|
||||
int msg_len;
|
||||
|
@ -111,9 +113,12 @@ od_frontend_error_fwd(od_client_t *client)
|
|||
error.detail, detail_len,
|
||||
error.hint, hint_len,
|
||||
msg, msg_len);
|
||||
if (rc == -1)
|
||||
if (rc == -1) {
|
||||
shapito_cache_push(&instance->stream_cache, stream);
|
||||
return -1;
|
||||
}
|
||||
rc = od_write(client->io, stream);
|
||||
shapito_cache_push(&instance->stream_cache, stream);
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
@ -288,11 +293,13 @@ od_frontend_attach(od_client_t *client, char *context)
|
|||
/* connect to server, if necessary */
|
||||
int rc;
|
||||
if (server->io == NULL) {
|
||||
rc = od_backend_connect(server, context);
|
||||
rc = od_backend_connect(server, client->stream, context);
|
||||
if (rc == -1)
|
||||
return OD_FE_ESERVER_CONNECT;
|
||||
}
|
||||
|
||||
shapito_stream_reset(client->stream);
|
||||
|
||||
if (! od_idmgr_cmp(&server->last_client_id, &client->id))
|
||||
{
|
||||
rc = od_deploy_write(client->server, context, client->stream,
|
||||
|
@ -385,13 +392,13 @@ od_frontend_setup(od_client_t *client)
|
|||
|
||||
od_server_stat_request(server, server->deploy_sync);
|
||||
|
||||
shapito_stream_reset(client->stream);
|
||||
|
||||
/* wait for completion */
|
||||
rc = od_backend_deploy_wait(server, "setup", UINT32_MAX);
|
||||
rc = od_backend_deploy_wait(server, client->stream, "setup", UINT32_MAX);
|
||||
if (rc == -1)
|
||||
return OD_FE_ESERVER_CONFIGURE;
|
||||
|
||||
shapito_stream_reset(client->stream);
|
||||
|
||||
/* append paremeter status messages */
|
||||
od_debug(&instance->logger, "setup", client, server,
|
||||
"sending params:");
|
||||
|
@ -662,7 +669,7 @@ od_frontend_remote_server(od_client_t *client)
|
|||
if (route->scheme->pool == OD_POOLING_TRANSACTION) {
|
||||
if (! server->is_transaction) {
|
||||
/* cleanup server */
|
||||
rc = od_reset(server);
|
||||
rc = od_reset(server, client->stream);
|
||||
if (rc == -1)
|
||||
return OD_FE_ESERVER_WRITE;
|
||||
/* push server connection back to route pool */
|
||||
|
@ -814,7 +821,7 @@ od_frontend_cleanup(od_client_t *client, char *context,
|
|||
od_unroute(client);
|
||||
break;
|
||||
}
|
||||
rc = od_reset(server);
|
||||
rc = od_reset(server, client->stream);
|
||||
if (rc != 1) {
|
||||
/* close backend connection */
|
||||
od_router_close_and_unroute(client);
|
||||
|
@ -835,7 +842,7 @@ od_frontend_cleanup(od_client_t *client, char *context,
|
|||
od_unroute(client);
|
||||
break;
|
||||
}
|
||||
rc = od_reset(server);
|
||||
rc = od_reset(server, client->stream);
|
||||
if (rc != 1) {
|
||||
/* close backend connection */
|
||||
od_router_close_and_unroute(client);
|
||||
|
|
|
@ -277,9 +277,10 @@ od_periodic_expire(od_periodic_t *periodic)
|
|||
if (instance->is_shared)
|
||||
machine_io_attach(server->io);
|
||||
|
||||
od_server_stream_attach(server, &instance->stream_cache);
|
||||
od_backend_terminate(server);
|
||||
od_server_stream_detach(server, &instance->stream_cache);
|
||||
shapito_stream_t *stream;
|
||||
stream = shapito_cache_pop(&instance->stream_cache);
|
||||
od_backend_terminate(server, stream);
|
||||
shapito_cache_push(&instance->stream_cache, stream);
|
||||
|
||||
od_backend_close(server);
|
||||
}
|
||||
|
|
|
@ -50,7 +50,7 @@
|
|||
#include "sources/tls.h"
|
||||
#include "sources/cancel.h"
|
||||
|
||||
int od_reset(od_server_t *server)
|
||||
int od_reset(od_server_t *server, shapito_stream_t *stream)
|
||||
{
|
||||
od_instance_t *instance = server->system->instance;
|
||||
od_route_t *route = server->route;
|
||||
|
@ -110,7 +110,7 @@ int od_reset(od_server_t *server)
|
|||
wait_timeout,
|
||||
wait_try);
|
||||
wait_try++;
|
||||
rc = od_backend_ready_wait(server, "reset", 1, wait_timeout);
|
||||
rc = od_backend_ready_wait(server, stream, "reset", 1, wait_timeout);
|
||||
if (rc == -1)
|
||||
break;
|
||||
}
|
||||
|
@ -127,6 +127,7 @@ int od_reset(od_server_t *server)
|
|||
wait_try_cancel);
|
||||
wait_try_cancel++;
|
||||
rc = od_cancel(server->system,
|
||||
stream,
|
||||
route->scheme->storage, &server->key,
|
||||
&server->id);
|
||||
if (rc == -1)
|
||||
|
@ -144,7 +145,7 @@ int od_reset(od_server_t *server)
|
|||
if (route->scheme->pool_rollback) {
|
||||
if (server->is_transaction) {
|
||||
char query_rlb[] = "ROLLBACK";
|
||||
rc = od_backend_query(server, "reset rollback", query_rlb,
|
||||
rc = od_backend_query(server, stream, "reset rollback", query_rlb,
|
||||
sizeof(query_rlb));
|
||||
if (rc == -1)
|
||||
goto error;
|
||||
|
|
|
@ -7,6 +7,6 @@
|
|||
* Advanced PostgreSQL connection pooler.
|
||||
*/
|
||||
|
||||
int od_reset(od_server_t*);
|
||||
int od_reset(od_server_t*, shapito_stream_t*);
|
||||
|
||||
#endif /* OD_RESET_H */
|
||||
|
|
|
@ -200,7 +200,6 @@ on_attach:
|
|||
server->idle_time = 0;
|
||||
/* assign client session key */
|
||||
server->key_client = client->key;
|
||||
assert(server->stream == NULL);
|
||||
msg_attach->status = OD_ROK;
|
||||
machine_channel_write(msg_attach->response, msg);
|
||||
}
|
||||
|
@ -439,10 +438,10 @@ od_router(void *arg)
|
|||
if (machine_connected(server->io)) {
|
||||
if (instance->is_shared)
|
||||
machine_io_attach(server->io);
|
||||
|
||||
od_server_stream_attach(server, &instance->stream_cache);
|
||||
od_backend_terminate(server);
|
||||
od_server_stream_detach(server, &instance->stream_cache);
|
||||
shapito_stream_t *stream;
|
||||
stream = shapito_cache_pop(&instance->stream_cache);
|
||||
od_backend_terminate(server, stream);
|
||||
shapito_cache_push(&instance->stream_cache, stream);
|
||||
}
|
||||
od_backend_close(server);
|
||||
|
||||
|
@ -569,10 +568,8 @@ od_router_attach(od_client_t *client)
|
|||
od_instance_t *instance = client->system->instance;
|
||||
od_routerstatus_t status;
|
||||
status = od_router_do(client, OD_MROUTER_ATTACH, 1);
|
||||
od_server_t *server = client->server;
|
||||
if (server)
|
||||
od_server_stream_attach(server, &instance->stream_cache);
|
||||
/* attach server io to clients machine context */
|
||||
od_server_t *server = client->server;
|
||||
if (instance->is_shared) {
|
||||
if (server && server->io)
|
||||
machine_io_attach(server->io);
|
||||
|
@ -585,8 +582,6 @@ od_router_detach(od_client_t *client)
|
|||
{
|
||||
od_instance_t *instance = client->system->instance;
|
||||
od_server_t *server = client->server;
|
||||
if (server->stream)
|
||||
od_server_stream_detach(server, &instance->stream_cache);
|
||||
if (instance->is_shared)
|
||||
machine_io_detach(server->io);
|
||||
return od_router_do(client, OD_MROUTER_DETACH, 1);
|
||||
|
@ -597,8 +592,6 @@ od_router_detach_and_unroute(od_client_t *client)
|
|||
{
|
||||
od_instance_t *instance = client->system->instance;
|
||||
od_server_t *server = client->server;
|
||||
if (server->stream)
|
||||
od_server_stream_detach(server, &instance->stream_cache);
|
||||
if (instance->is_shared)
|
||||
machine_io_detach(server->io);
|
||||
return od_router_do(client, OD_MROUTER_DETACH_AND_UNROUTE, 1);
|
||||
|
@ -609,8 +602,6 @@ od_router_close(od_client_t *client)
|
|||
{
|
||||
od_instance_t *instance = client->system->instance;
|
||||
od_server_t *server = client->server;
|
||||
if (server->stream)
|
||||
od_server_stream_detach(server, &instance->stream_cache);
|
||||
if (instance->is_shared)
|
||||
machine_io_detach(server->io);
|
||||
return od_router_do(client, OD_MROUTER_CLOSE, 1);
|
||||
|
@ -621,8 +612,6 @@ od_router_close_and_unroute(od_client_t *client)
|
|||
{
|
||||
od_instance_t *instance = client->system->instance;
|
||||
od_server_t *server = client->server;
|
||||
if (server->stream)
|
||||
od_server_stream_detach(server, &instance->stream_cache);
|
||||
if (instance->is_shared)
|
||||
machine_io_detach(server->io);
|
||||
return od_router_do(client, OD_MROUTER_CLOSE_AND_UNROUTE, 1);
|
||||
|
|
|
@ -33,7 +33,6 @@ struct od_server
|
|||
{
|
||||
od_serverstate_t state;
|
||||
od_id_t id;
|
||||
shapito_stream_t *stream;
|
||||
shapito_parameters_t params;
|
||||
machine_io_t *io;
|
||||
machine_tls_t *tls;
|
||||
|
@ -66,7 +65,6 @@ od_server_init(od_server_t *server)
|
|||
server->is_transaction = 0;
|
||||
server->is_copy = 0;
|
||||
server->deploy_sync = 0;
|
||||
server->stream = NULL;
|
||||
memset(&server->stats, 0, sizeof(server->stats));
|
||||
shapito_key_init(&server->key);
|
||||
shapito_key_init(&server->key_client);
|
||||
|
@ -90,7 +88,6 @@ od_server_allocate(void)
|
|||
static inline void
|
||||
od_server_free(od_server_t *server)
|
||||
{
|
||||
assert(server->stream == NULL);
|
||||
shapito_parameters_free(&server->params);
|
||||
if (server->is_allocated)
|
||||
free(server);
|
||||
|
@ -137,20 +134,4 @@ od_server_stat_error(od_server_t *server)
|
|||
server->stats.count_error++;
|
||||
}
|
||||
|
||||
static inline shapito_stream_t*
|
||||
od_server_stream_attach(od_server_t *server, shapito_cache_t *cache)
|
||||
{
|
||||
assert(server->stream == NULL);
|
||||
server->stream = shapito_cache_pop(cache);
|
||||
return server->stream;
|
||||
}
|
||||
|
||||
static inline void
|
||||
od_server_stream_detach(od_server_t *server, shapito_cache_t *cache)
|
||||
{
|
||||
assert(server->stream != NULL);
|
||||
shapito_cache_push(cache, server->stream);
|
||||
server->stream = NULL;
|
||||
}
|
||||
|
||||
#endif /* OD_SERVER_H */
|
||||
|
|
|
@ -182,10 +182,9 @@ od_tls_backend(od_schemestorage_t *scheme)
|
|||
int
|
||||
od_tls_backend_connect(od_server_t *server,
|
||||
od_logger_t *logger,
|
||||
shapito_stream_t *stream,
|
||||
od_schemestorage_t *scheme)
|
||||
{
|
||||
shapito_stream_t *stream = server->stream;
|
||||
|
||||
od_debug(logger, "tls", NULL, server, "init");
|
||||
|
||||
/* SSL Request */
|
||||
|
|
|
@ -18,6 +18,7 @@ machine_tls_t*
|
|||
od_tls_backend(od_schemestorage_t*);
|
||||
|
||||
int
|
||||
od_tls_backend_connect(od_server_t*, od_logger_t*, od_schemestorage_t*);
|
||||
od_tls_backend_connect(od_server_t*, od_logger_t*, shapito_stream_t*,
|
||||
od_schemestorage_t*);
|
||||
|
||||
#endif /* OD_TLS_H */
|
||||
|
|
Loading…
Reference in New Issue