odissey: assemble basic session pooling

This commit is contained in:
Dmitry Simonenko 2017-05-29 17:53:21 +03:00
parent 7bedf59191
commit f5657689e5
4 changed files with 473 additions and 17 deletions

View File

@ -111,6 +111,35 @@ int od_backend_ready(od_server_t *server, uint8_t *data, int size)
return 0;
}
static inline int
od_backend_ready_wait(od_server_t *server, char *procedure, int time_ms)
{
od_instance_t *instance = server->system->instance;
so_stream_t *stream = &server->stream;
/* wait for response */
while (1) {
so_stream_reset(stream);
int rc;
rc = od_read(server->io, stream, time_ms);
if (rc == -1) {
od_error(&instance->log, server->io, "S (%s): read error: %s",
procedure, machine_error(server->io));
return -1;
}
uint8_t type = stream->s[rc];
od_debug(&instance->log, server->io, "S (%s): %c",
procedure, type);
/* ReadyForQuery */
if (type == 'Z') {
od_backend_ready(server, stream->s + rc,
so_stream_used(stream) - rc);
break;
}
}
return 0;
}
static inline int
od_backend_setup(od_server_t *server)
{
@ -196,11 +225,14 @@ od_backend_connect(od_server_t *server)
server_scheme->port);
return -1;
}
#if 0
rc = machine_set_readahead(server->io, instance->scheme.readahead);
if (rc == -1) {
od_error(&instance->log, NULL, "failed to set readahead");
return -1;
}
#endif
/* do tls handshake */
#if 0
@ -273,3 +305,175 @@ od_backend_new(od_router_t *router, od_route_t *route)
}
return server;
}
static inline int
od_backend_query(od_server_t *server, char *procedure, char *query, int len)
{
od_instance_t *instance = server->system->instance;
int rc;
so_stream_t *stream = &server->stream;
so_stream_reset(stream);
rc = so_fewrite_query(stream, query, len);
if (rc == -1)
return -1;
rc = od_write(server->io, stream);
if (rc == -1) {
od_error(&instance->log, server->io, "S (%s): write error: %s",
procedure, machine_error(server->io));
return -1;
}
server->count_request++;
rc = od_backend_ready_wait(server, procedure, UINT32_MAX);
if (rc == -1)
return -1;
return 0;
}
int od_backend_configure(od_server_t *server, so_bestartup_t *startup)
{
od_instance_t *instance = server->system->instance;
char query_configure[1024];
int size = 0;
so_parameter_t *param;
so_parameter_t *end;
param = (so_parameter_t*)startup->params.buf.s;
end = (so_parameter_t*)startup->params.buf.p;
for (; param < end; param = so_parameter_next(param)) {
if (param == startup->user ||
param == startup->database)
continue;
size += snprintf(query_configure + size,
sizeof(query_configure) - size,
"SET %s=%s;",
so_parameter_name(param),
so_parameter_value(param));
}
if (size == 0)
return 0;
od_debug(&instance->log, server->io,
"S (configure): %s", query_configure);
int rc;
rc = od_backend_query(server, "configure", query_configure,
size + 1);
return rc;
}
int od_backend_reset(od_server_t *server)
{
od_instance_t *instance = server->system->instance;
od_route_t *route = server->route;
/* server left in copy mode */
if (server->is_copy) {
od_debug(&instance->log, server->io,
"S (reset): in copy, closing");
goto drop;
}
/* support route rollback off */
if (! route->scheme->rollback) {
if (server->is_transaction) {
od_debug(&instance->log, server->io,
"S (reset): in active transaction, closing");
goto drop;
}
}
/* support route cancel off */
if (! route->scheme->cancel) {
if (! od_server_is_sync(server)) {
od_debug(&instance->log, server->io,
"S (reset): not synchronized, closing");
goto drop;
}
}
/* Server is not synchronized.
*
* Number of queries sent to server is not equal
* to the number of received replies. Do the following
* logic until server becomes synchronized:
*
* 1. Wait each ReadyForQuery until we receive all
* replies with 1 sec timeout.
*
* 2. Send Cancel in other connection.
*
* It is possible that client could previously
* pipeline server with requests. Each request
* may stall database on its own way and may require
* additional Cancel request.
*
* 3. Continue with (1)
*/
int wait_timeout = 1000;
int wait_try = 0;
int wait_try_cancel = 0;
int wait_cancel_limit = 1;
int rc = 0;
for (;;) {
while (! od_server_is_sync(server)) {
od_debug(&instance->log, server->io,
"S (reset): not synchronized, wait for %d msec (#%d)",
wait_timeout,
wait_try);
wait_try++;
rc = od_backend_ready_wait(server, "reset", wait_timeout);
if (rc == -1)
break;
}
if (rc == -1) {
if (! machine_read_timedout(server->io))
goto error;
if (wait_try_cancel == wait_cancel_limit) {
od_debug(&instance->log, server->io,
"S (reset): server cancel limit reached, closing");
goto error;
}
od_debug(&instance->log, server->io,
"S (reset): not responded, cancel (#%d)",
wait_try_cancel);
wait_try_cancel++;
/* TODO: */
/*
rc = od_cancel_of(pooler, route->scheme->server, &server->key);
if (rc < 0)
goto error;
*/
continue;
}
assert(od_server_is_sync(server));
break;
}
od_debug(&instance->log, server->io, "S (reset): synchronized");
/* send rollback in case server has an active
* transaction running */
if (route->scheme->rollback) {
if (server->is_transaction) {
char query_rlb[] = "ROLLBACK";
rc = od_backend_query(server, "reset rollback", query_rlb,
sizeof(query_rlb));
if (rc == -1)
goto error;
assert(! server->is_transaction);
}
}
/* send reset query */
if (route->scheme->discard) {
char query_reset[] = "DISCARD ALL";
rc = od_backend_query(server, "reset", query_reset,
sizeof(query_reset));
if (rc == -1)
goto error;
}
/* ready to use */
return 1;
drop:
return 0;
error:
return -1;
}

View File

@ -10,4 +10,8 @@
od_server_t*
od_backend_new(od_router_t*, od_route_t*);
int od_backend_reset(od_server_t*);
int od_backend_ready(od_server_t*, uint8_t*, int);
int od_backend_configure(od_server_t*, so_bestartup_t*);
#endif /* OD_BACKEND_H */

View File

@ -42,6 +42,7 @@
#include "od_router.h"
#include "od_relay.h"
#include "od_frontend.h"
#include "od_backend.h"
#include "od_auth.h"
void od_frontend_close(od_client_t *client)
@ -192,6 +193,160 @@ od_frontend_ready(od_client_t *client)
return 0;
}
enum {
OD_RS_UNDEF,
OD_RS_OK,
OD_RS_EROUTE,
OD_RS_EPOOL,
OD_RS_ELIMIT,
OD_RS_ESERVER_CONFIGURE,
OD_RS_ESERVER_READ,
OD_RS_ESERVER_WRITE,
OD_RS_ECLIENT_READ,
OD_RS_ECLIENT_WRITE
};
static inline od_routerstatus_t
od_frontend_copy_in(od_client_t *client)
{
od_instance_t *instance = client->system->instance;
od_server_t *server = client->server;
assert(! server->is_copy);
server->is_copy = 1;
int rc, type;
so_stream_t *stream = &client->stream;
for (;;) {
so_stream_reset(stream);
rc = od_read(client->io, stream, UINT32_MAX);
if (rc == -1)
return OD_RS_ECLIENT_READ;
type = *stream->s;
od_debug(&instance->log, client->io, "C (copy): %c", *stream->s);
rc = od_write(server->io, stream);
if (rc == -1)
return OD_RS_ESERVER_WRITE;
/* copy complete or fail */
if (type == 'c' || type == 'f')
break;
}
server->is_copy = 0;
return OD_RS_OK;
}
static int
od_frontend_session(od_client_t *client)
{
od_instance_t *instance = client->system->instance;
/* get server connection for the route */
od_routerstatus_t status;
status = od_router_attach(client->system->router, client);
if (status != OD_ROK)
return OD_RS_EPOOL;
od_server_t *server;
server = client->server;
/* assign client session key */
server->key_client = client->key;
/* configure server using client startup parameters */
int rc;
rc = od_backend_configure(client->server, &client->startup);
if (rc == -1)
return OD_RS_ESERVER_CONFIGURE;
so_stream_t *stream = &client->stream;
for (;;)
{
/* client to server */
so_stream_reset(stream);
rc = od_read(client->io, stream, UINT32_MAX);
if (rc == -1)
return OD_RS_ECLIENT_READ;
int type;
type = stream->s[rc];
od_debug(&instance->log, client->io, "C: %c", type);
/* client graceful shutdown */
if (type == 'X')
break;
rc = od_write(server->io, stream);
if (rc == -1)
return OD_RS_ESERVER_WRITE;
server->count_request++;
so_stream_reset(stream);
for (;;) {
/* pipeline server reply */
for (;;) {
rc = od_read(server->io, stream, 1000);
if (rc >= 0)
break;
/* client watchdog.
*
* ensure that client has not closed
* the connection */
if (! machine_read_timedout(server->io))
return OD_RS_ESERVER_READ;
if (machine_connected(client->io))
continue;
od_debug(&instance->log, server->io,
"S (watchdog): client disconnected");
return OD_RS_ECLIENT_READ;
}
type = stream->s[rc];
od_debug(&instance->log, server->io, "S: %c", type);
/* ReadyForQuery */
if (type == 'Z') {
rc = od_backend_ready(server, stream->s + rc,
so_stream_used(stream) - rc);
if (rc == -1)
return OD_RS_ECLIENT_READ;
/* flush reply buffer to client */
rc = od_write(client->io, stream);
if (rc == -1)
return OD_RS_ECLIENT_WRITE;
break;
}
/* CopyInResponse */
if (type == 'G') {
/* transmit reply to client */
rc = od_write(client->io, stream);
if (rc == -1)
return OD_RS_ECLIENT_WRITE;
rc = od_frontend_copy_in(client);
if (rc != OD_RS_OK)
return rc;
continue;
}
/* CopyOutResponse */
if (type == 'H') {
assert(! server->is_copy);
server->is_copy = 1;
continue;
}
/* copy out complete */
if (type == 'c') {
server->is_copy = 0;
continue;
}
}
}
return OD_RS_OK;
}
void od_frontend(void *arg)
{
od_client_t *client = arg;
@ -217,16 +372,18 @@ void od_frontend(void *arg)
return;
}
#if 0
/* client cancel request */
if (client->startup.is_cancel) {
od_debug(&pooler->od->log, client->io, "C: cancel request");
od_debug(&instance->log, client->io, "C: cancel request");
od_frontend_close(client);
#if 0
so_key_t key = client->startup.key;
od_feclose(client);
od_cancel(pooler, &key);
#endif
return;
}
#endif
/* Generate backend key for the client.
*
@ -288,6 +445,61 @@ void od_frontend(void *arg)
break;
}
/* main */
rc = od_frontend_session(client);
od_server_t *server;
server = client->server;
switch (rc) {
case OD_RS_EROUTE:
case OD_RS_EPOOL:
case OD_RS_ELIMIT:
assert(server == NULL);
break;
case OD_RS_OK:
case OD_RS_ECLIENT_READ:
case OD_RS_ECLIENT_WRITE:
/* close client connection and reuse server
* link in case of client errors and
* graceful shutdown */
if (rc == OD_RS_OK)
od_log(&instance->log, client->io,
"C: disconnected");
else
od_log(&instance->log, client->io,
"C: disconnected (read/write error): %s",
machine_error(client->io));
rc = od_backend_reset(server);
if (rc != 1) {
/* TODO: close backend connection */
break;
}
/* TODO: DETACH server */
break;
case OD_RS_ESERVER_CONFIGURE:
od_log(&instance->log, server->io,
"S: disconnected (server configure error): %s",
machine_error(server->io));
/* TODO: close backend connection */
break;
case OD_RS_ESERVER_READ:
case OD_RS_ESERVER_WRITE:
/* close client connection and close server
* connection in case of server errors */
od_log(&instance->log, server->io,
"S: disconnected (read/write error): %s",
machine_error(server->io));
/* TODO: close backend connection */
break;
case OD_RS_UNDEF:
assert(0);
break;
}
/* close frontend connection */
od_frontend_close(client);
}

View File

@ -40,8 +40,9 @@
#include "od_pooler.h"
#include "od_relay.h"
#include "od_frontend.h"
#include "od_router.h"
#include "od_frontend.h"
#include "od_backend.h"
typedef struct
{
@ -54,7 +55,7 @@ typedef struct
{
od_route_t *route;
od_server_t *server;
} od_msgrouter_push_t;
} od_msgrouter_detach_t;
static od_route_t*
od_router_fwd(od_router_t *router, so_bestartup_t *startup)
@ -108,12 +109,41 @@ od_router_fwd(od_router_t *router, so_bestartup_t *startup)
static inline void
od_router_attacher(void *arg)
{
machine_msg_t msg = arg;
od_msgrouter_t *msg_attach;
msg_attach = arg;
msg_attach = machine_msg_get_data(msg);
od_client_t *client;
client = msg_attach->client;
od_route_t *route;
route = msg_attach->client->route;
route = client->route;
assert(route != NULL);
od_server_t *server;
server = od_serverpool_next(&route->server_pool, OD_SIDLE);
if (server)
goto on_connect;
/* TODO: wait */
/* create new backend connection */
server = od_backend_new(client->system->router, route);
if (server == NULL) {
msg_attach->status = OD_RERROR;
machine_queue_put(msg_attach->response, msg);
return;
}
/* detach server io from router context */
machine_io_detach(server->io);
on_connect:
od_serverpool_set(&route->server_pool, server, OD_SACTIVE);
msg_attach->status = OD_ROK;
client->server = server;
machine_queue_put(msg_attach->response, msg);
}
static inline void
@ -162,8 +192,8 @@ od_router(void *arg)
}
/*od_clientpool_set(&route->client_pool, msg_route->client, OD_CPENDING);*/
msg_route->client->route = route;
msg_route->status = OD_ROK;
msg_route->client->route = route;
machine_queue_put(msg_route->response, msg);
continue;
}
@ -187,7 +217,7 @@ od_router(void *arg)
case OD_MROUTER_DETACH:
{
/* push client server back to route server pool */
od_msgrouter_push_t *msg_detach;
od_msgrouter_detach_t *msg_detach;
msg_detach = machine_msg_get_data(msg);
od_serverpool_set(&msg_detach->route->server_pool,
@ -271,7 +301,7 @@ od_route(od_router_t *router, od_client_t *client)
}
od_routerstatus_t
od_route_attach(od_router_t *router, od_client_t *client)
od_router_attach(od_router_t *router, od_client_t *client)
{
/* create response queue */
machine_queue_t response;
@ -306,21 +336,27 @@ od_route_attach(od_router_t *router, od_client_t *client)
machine_queue_free(response);
machine_msg_free(msg);
/* TODO: machine_attach(client->server->io) */
if (client->server) {
/* attach server io to clients machine context */
machine_io_attach(client->server->io);
}
return status;
}
void
od_route_detach(od_router_t *router, od_client_t *client)
od_router_detach(od_router_t *router, od_client_t *client)
{
/* TODO: machine_detach(client->server->io) */
assert(client->server != NULL);
/* send server push request to router */
/* detach server io from clients machine context */
machine_io_detach(client->server->io);
/* send server detach request to router */
machine_msg_t msg;
msg = machine_msg_create(OD_MROUTER_DETACH, sizeof(od_msgrouter_push_t));
msg = machine_msg_create(OD_MROUTER_DETACH, sizeof(od_msgrouter_detach_t));
if (msg == NULL)
return;
od_msgrouter_push_t *msg_detach;
od_msgrouter_detach_t *msg_detach;
msg_detach = machine_msg_get_data(msg);
msg_detach->route = client->route;
msg_detach->server = client->server;