odissey: make transaction pooling work

This commit is contained in:
Dmitry Simonenko 2017-06-01 13:41:14 +03:00
parent a9e212927b
commit cc1eb8673c
4 changed files with 163 additions and 22 deletions

View File

@ -2,17 +2,16 @@
#
odissey {
log_debug yes
log_config yes
# log_file "./odissey.log"
# pid_file "./odissey.pid"
# daemonize no
# syslog no
# syslog_facility "daemon"
# syslog_ident "odissey"
# daemonize no
# pid_file "./odissey.pid"
# log_file "./odissey.log"
log_debug yes
log_config yes
pooling "transaction"
stats_period 1
pooling "session"
listen {
host "*"
port 6432

View File

@ -247,9 +247,6 @@ od_frontend_session(od_client_t *client)
"attached to S%" PRIu64,
server->id);
/* assign client session key */
server->key_client = client->key;
/* configure server using client startup parameters */
int rc;
rc = od_backend_configure(client->server, &client->startup);
@ -343,6 +340,125 @@ od_frontend_session(od_client_t *client)
return OD_RS_OK;
}
static int
od_frontend_transaction(od_client_t *client)
{
od_instance_t *instance = client->system->instance;
int rc;
od_server_t *server = NULL;
so_stream_t *stream = &client->stream;
for (;;)
{
/* client to server */
so_stream_reset(stream);
rc = od_read(client->io, stream, UINT32_MAX);
if (rc == -1)
return OD_RS_ECLIENT_READ;
int type = stream->s[rc];
od_debug_client(&instance->log, client->id, NULL,
"%c", type);
/* client graceful shutdown */
if (type == 'X')
break;
/* get server connection from the route pool */
if (server == NULL) {
od_routerstatus_t status;
status = od_router_attach(client);
if (status != OD_ROK)
return OD_RS_EATTACH;
server = client->server;
od_debug_client(&instance->log, client->id, NULL,
"attached to S%" PRIu64,
server->id);
/* configure server using client startup parameters */
rc = od_backend_configure(client->server, &client->startup);
if (rc == -1)
return OD_RS_ESERVER_CONFIGURE;
}
rc = od_write(server->io, stream);
if (rc == -1)
return OD_RS_ESERVER_WRITE;
server->count_request++;
so_stream_reset(stream);
for (;;) {
/* pipeline server reply */
for (;;) {
rc = od_read(server->io, stream, 1000);
if (rc >= 0)
break;
/* client watchdog.
*
* ensure that client has not closed
* the connection */
if (! machine_timedout())
return OD_RS_ESERVER_READ;
if (machine_connected(client->io))
continue;
od_debug_server(&instance->log, server->id, "watchdog",
"client disconnected");
return OD_RS_ECLIENT_READ;
}
type = stream->s[rc];
od_debug_server(&instance->log, server->id, NULL,
"%c", type);
if (type == 'Z') {
rc = od_backend_ready(server, stream->s + rc,
so_stream_used(stream) - rc);
if (rc == -1)
return OD_RS_ECLIENT_READ;
/* transmit reply to client */
rc = od_write(client->io, stream);
if (rc == -1)
return OD_RS_ECLIENT_WRITE;
if (! server->is_transaction) {
/* cleanup server */
rc = od_backend_reset(server);
if (rc == -1)
return OD_RS_ESERVER_WRITE;
/* push server connection back to route pool */
od_router_detach(client);
server = NULL;
}
break;
}
/* CopyInResponse */
if (type == 'G') {
/* transmit reply to client */
rc = od_write(client->io, stream);
if (rc == -1)
return OD_RS_ECLIENT_WRITE;
rc = od_frontend_copy_in(client);
if (rc != OD_RS_OK)
return rc;
continue;
}
/* CopyOutResponse */
if (type == 'H') {
assert(! server->is_copy);
server->is_copy = 1;
continue;
}
/* copy out complete */
if (type == 'c') {
server->is_copy = 0;
}
}
}
return OD_RS_OK;
}
void od_frontend(void *arg)
{
od_client_t *client = arg;
@ -441,7 +557,20 @@ void od_frontend(void *arg)
break;
}
rc = od_frontend_session(client);
/* client main */
switch (instance->scheme.pooling_mode) {
case OD_PSESSION:
rc = od_frontend_session(client);
break;
case OD_PTRANSACTION:
rc = od_frontend_transaction(client);
break;
case OD_PUNDEF:
assert(0);
break;
}
/* cleanup */
od_server_t *server = client->server;
switch (rc) {
case OD_RS_EATTACH:
@ -451,18 +580,29 @@ void od_frontend(void *arg)
od_unroute(client);
break;
case OD_RS_OK:
/* graceful disconnect */
od_log_client(&instance->log, client->id, NULL,
"disconnected");
if (! client->server) {
od_unroute(client);
break;
}
rc = od_backend_reset(server);
if (rc != 1) {
/* close backend connection */
od_router_close_and_unroute(client);
break;
}
/* push server to router server pool */
od_router_detach_and_unroute(client);
break;
case OD_RS_ECLIENT_READ:
case OD_RS_ECLIENT_WRITE:
/* close client connection and reuse server
* link in case of client errors and
* graceful shutdown */
if (rc == OD_RS_OK)
od_log_client(&instance->log, client->id, NULL,
"disconnected");
else
od_log_client(&instance->log, client->id, NULL,
"disconnected (read/write error): %s",
machine_error(client->io));
* link in case of client errors */
od_log_client(&instance->log, client->id, NULL,
"disconnected (read/write error): %s",
machine_error(client->io));
rc = od_backend_reset(server);
if (rc != 1) {

View File

@ -65,7 +65,7 @@ od_periodic_stats (od_router_t *router)
route->id.database,
route->id.user_len,
route->id.user,
route->client_pool.count_active,
od_clientpool_total(&route->client_pool),
route->server_pool.count_active,
route->server_pool.count_idle);
}

View File

@ -143,9 +143,11 @@ od_router_attacher(void *arg)
on_connect:
od_serverpool_set(&route->server_pool, server, OD_SACTIVE);
od_clientpool_set(&route->client_pool, client, OD_CACTIVE);
msg_attach->status = OD_ROK;
client->server = server;
/* assign client session key */
server->key_client = client->key;
server->idle_time = 0;
msg_attach->status = OD_ROK;
machine_queue_put(msg_attach->response, msg);
}