diff --git a/core/od_router_session.c b/core/od_router_session.c index f70530c5..6bee47a8 100644 --- a/core/od_router_session.c +++ b/core/od_router_session.c @@ -94,8 +94,8 @@ od_router_session(od_client_t *client) rc = od_read(client->io, stream, INT_MAX); if (rc == -1) return OD_RS_ECLIENT_READ; - type = *stream->s; - od_debug(&pooler->od->log, client->io, "C: %c", *stream->s); + type = stream->s[rc]; + od_debug(&pooler->od->log, client->io, "C: %c", type); /* client graceful shutdown */ if (type == 'X') @@ -107,12 +107,12 @@ od_router_session(od_client_t *client) server->count_request++; + so_stream_reset(stream); for (;;) { - /* read server reply */ + /* pipeline server reply */ for (;;) { - so_stream_reset(stream); rc = od_read(server->io, stream, 1000); - if (rc == 0) + if (rc >= 0) break; /* client watchdog. * @@ -126,22 +126,30 @@ od_router_session(od_client_t *client) "S (watchdog): client disconnected"); return OD_RS_ECLIENT_READ; } - type = *stream->s; + type = stream->s[rc]; od_debug(&pooler->od->log, server->io, "S: %c", type); - if (type == 'Z') - od_beset_ready(server, stream); + /* ReadyForQuery */ + if (type == 'Z') { + rc = od_beset_ready(server, stream->s + rc, + so_stream_used(stream) - rc); + if (rc == -1) + return OD_RS_ECLIENT_READ; - /* transmit reply to client */ - rc = od_write(client->io, stream); - if (rc == -1) - return OD_RS_ECLIENT_WRITE; + /* flush reply buffer to client */ + rc = od_write(client->io, stream); + if (rc == -1) + return OD_RS_ECLIENT_WRITE; - if (type == 'Z') break; + } /* CopyInResponse */ if (type == 'G') { + /* transmit reply to client */ + rc = od_write(client->io, stream); + if (rc == -1) + return OD_RS_ECLIENT_WRITE; od_routerstatus_t copy_rc; copy_rc = od_router_copy_in(client); if (copy_rc != OD_RS_OK) @@ -157,6 +165,7 @@ od_router_session(od_client_t *client) /* copy out complete */ if (type == 'c') { server->is_copy = 0; + continue; } } }