odissey: add pooling methods; implement session pooling

This commit is contained in:
Dmitry Simonenko 2016-11-14 13:50:06 +03:00
parent 19e98f757c
commit 847e14e69c
1 changed files with 74 additions and 60 deletions

View File

@ -79,6 +79,65 @@ od_route(odpooler_t *pooler, sobestartup_t *startup)
return route; return route;
} }
static inline int
od_router_session(odclient_t *client)
{
odpooler_t *pooler = client->pooler;
/* client routing */
odroute_t *route = od_route(pooler, &client->startup);
if (route == NULL) {
od_error(&pooler->od->log, "C: database route '%s' is not declared",
client->startup.database);
return -1;
}
/* get server connection for the route */
odserver_t *server = od_bepop(pooler, route);
if (server == NULL)
return -1;
client->server = server;
od_log(&pooler->od->log, "C: route to %s server",
route->scheme->server->name);
sostream_t *stream = &client->stream;
while (1) {
/* client to server */
int rc;
rc = od_read(client->io, stream);
if (rc == -1)
return -1;
char type = *client->stream.s;
od_log(&pooler->od->log, "C: %c", type);
if (type == 'X') {
/* client graceful shutdown */
break;
}
rc = od_write(server->io, stream);
if (rc == -1) {
}
/* server to client */
while (1) {
rc = od_read(server->io, stream);
if (rc == -1) {
}
type = *stream->s;
od_log(&pooler->od->log, "S: %c", type);
rc = od_write(client->io, stream);
if (rc == -1) {
}
if (type == 'Z')
break;
}
}
return 0;
}
void od_router(void *arg) void od_router(void *arg)
{ {
odclient_t *client = arg; odclient_t *client = arg;
@ -104,71 +163,26 @@ void od_router(void *arg)
od_feclose(client); od_feclose(client);
return; return;
} }
/* client routing */
odroute_t *route = od_route(pooler, &client->startup);
if (route == NULL) {
od_error(&pooler->od->log, "C: database route '%s' is not declared",
client->startup.database);
od_feclose(client);
return;
}
od_log(&pooler->od->log, "C: route to %s server",
route->scheme->server->name);
/* get server connection for the route */
odserver_t *server = od_bepop(pooler, route);
if (server == NULL) {
od_feclose(client);
return;
}
client->server = server;
/* notify client that we are ready */ /* notify client that we are ready */
rc = od_feready(client); rc = od_feready(client);
if (rc == -1) { if (rc == -1) {
od_feclose(client); od_feclose(client);
return; return;
} }
/* execute pooling method */
sostream_t *stream = &client->stream; switch (pooler->od->scheme.pooling_mode) {
char type; case OD_PSESSION:
while (1) { rc = od_router_session(client);
/* read client request */ break;
rc = od_read(client->io, stream); case OD_PTRANSACTION:
if (rc == -1) { case OD_PSTATEMENT:
od_feclose(client); case OD_PUNDEF:
return; rc = -1;
} assert(0);
type = *client->stream.s;
od_log(&pooler->od->log, "C: %c", type);
if (type == 'X') {
/* client graceful shutdown */
od_feclose(client);
break; break;
} }
/* write request to server */ odserver_t *server = client->server;
rc = od_write(server->io, stream); od_feclose(client);
if (rc == -1) {
}
while (1) { (void)server;
/* read response from server */
rc = od_read(server->io, stream);
if (rc == -1) {
}
type = *stream->s;
od_log(&pooler->od->log, "S: %c", type);
/* write response to client */
rc = od_write(client->io, stream);
if (rc == -1) {
}
if (type == 'Z')
break;
}
}
} }