odissey: session pooling: pipeline server reply

This commit is contained in:
Dmitry Simonenko 2017-04-21 14:00:42 +03:00
parent 9da4569537
commit 3de1271b34
1 changed files with 22 additions and 13 deletions

View File

@ -94,8 +94,8 @@ od_router_session(od_client_t *client)
rc = od_read(client->io, stream, INT_MAX); rc = od_read(client->io, stream, INT_MAX);
if (rc == -1) if (rc == -1)
return OD_RS_ECLIENT_READ; return OD_RS_ECLIENT_READ;
type = *stream->s; type = stream->s[rc];
od_debug(&pooler->od->log, client->io, "C: %c", *stream->s); od_debug(&pooler->od->log, client->io, "C: %c", type);
/* client graceful shutdown */ /* client graceful shutdown */
if (type == 'X') if (type == 'X')
@ -107,12 +107,12 @@ od_router_session(od_client_t *client)
server->count_request++; server->count_request++;
for (;;) {
/* read server reply */
for (;;) {
so_stream_reset(stream); so_stream_reset(stream);
for (;;) {
/* pipeline server reply */
for (;;) {
rc = od_read(server->io, stream, 1000); rc = od_read(server->io, stream, 1000);
if (rc == 0) if (rc >= 0)
break; break;
/* client watchdog. /* client watchdog.
* *
@ -126,22 +126,30 @@ od_router_session(od_client_t *client)
"S (watchdog): client disconnected"); "S (watchdog): client disconnected");
return OD_RS_ECLIENT_READ; return OD_RS_ECLIENT_READ;
} }
type = *stream->s; type = stream->s[rc];
od_debug(&pooler->od->log, server->io, "S: %c", type); od_debug(&pooler->od->log, server->io, "S: %c", type);
if (type == 'Z') /* ReadyForQuery */
od_beset_ready(server, stream); if (type == 'Z') {
rc = od_beset_ready(server, stream->s + rc,
so_stream_used(stream) - rc);
if (rc == -1)
return OD_RS_ECLIENT_READ;
/* transmit reply to client */ /* flush reply buffer to client */
rc = od_write(client->io, stream); rc = od_write(client->io, stream);
if (rc == -1) if (rc == -1)
return OD_RS_ECLIENT_WRITE; return OD_RS_ECLIENT_WRITE;
if (type == 'Z')
break; break;
}
/* CopyInResponse */ /* CopyInResponse */
if (type == 'G') { if (type == 'G') {
/* transmit reply to client */
rc = od_write(client->io, stream);
if (rc == -1)
return OD_RS_ECLIENT_WRITE;
od_routerstatus_t copy_rc; od_routerstatus_t copy_rc;
copy_rc = od_router_copy_in(client); copy_rc = od_router_copy_in(client);
if (copy_rc != OD_RS_OK) if (copy_rc != OD_RS_OK)
@ -157,6 +165,7 @@ od_router_session(od_client_t *client)
/* copy out complete */ /* copy out complete */
if (type == 'c') { if (type == 'c') {
server->is_copy = 0; server->is_copy = 0;
continue;
} }
} }
} }