diff --git a/core/od_be.c b/core/od_be.c index bbe0baea..4b5dfab7 100644 --- a/core/od_be.c +++ b/core/od_be.c @@ -30,8 +30,7 @@ static int od_beclose(odpooler_t *pooler, odserver_t *server) { - od_serverpool_set(&pooler->server_pool, server, - OD_SUNDEF); + od_serverpool_set(&pooler->server_pool, server, OD_SUNDEF); if (server->io) { ft_close(server->io); server->io = NULL; @@ -40,11 +39,79 @@ od_beclose(odpooler_t *pooler, odserver_t *server) return 0; } +static int +od_bestartup(odserver_t *server) +{ + odscheme_server_t *dest = server->route->server; + (void)dest; + + sostream_t *stream = &server->stream; + so_stream_reset(stream); + sofearg_t argv[] = { + { "user", 5 }, { "test", 5 }, + { "database", 9 }, { "test", 5 } + }; + int rc; + rc = so_fewrite_startup_message(stream, 4, argv); + if (rc == -1) + return -1; + rc = ft_write(server->io, (char*)stream->s, + so_stream_used(stream), 0); + return rc; +} + +static int +od_beauth(odserver_t *server) +{ +#if 0 + sofehandshake handshake; + memset(&handshake, 0, sizeof(handshake)); + for (;;) { + rc = od_read(server->handle, buf); + if (rc == -1) { + goto error; + } + rc = so_fehandshake(&handshake, buf->s, so_bufused(buf)); + if (rc <= 0) { + if (rc == -1) { + /* ErrorResponce */ + goto error; + } + break; + } + } +#endif + return 0; +} + static int od_beconnect(odpooler_t *pooler, odserver_t *server) { - (void)pooler; - (void)server; + odscheme_server_t *dest = server->route->server; + + /* place server to connect pool */ + od_serverpool_set(&pooler->server_pool, server, OD_SCONNECT); + + /* connect to server */ + int rc; + rc = ft_connect(server->io, dest->host, dest->port, 0); + if (rc < 0) { + od_log(&pooler->od->log, "failed to connect to %s:%d", + dest->host, dest->port); + return -1; + } + /* startup */ + rc = od_bestartup(server); + if (rc == -1) + return -1; + /* auth */ + rc = od_beauth(server); + if (rc == -1) + return -1; + + /* server is ready to use */ + od_serverpool_set(&pooler->server_pool, server, + OD_SIDLE); return 0; } @@ -54,11 +121,8 @@ od_bepop(odpooler_t *pooler, odscheme_route_t *route) /* try to fetch server from idle pool */ odserver_t *server = od_serverpool_pop(&pooler->server_pool); - if (server) { - od_serverpool_set(&pooler->server_pool, server, - OD_SACTIVE); - return server; - } + if (server) + goto ready; /* create new server connection */ server = od_serveralloc(); if (server == NULL) @@ -75,5 +139,9 @@ od_bepop(odpooler_t *pooler, odscheme_route_t *route) od_beclose(pooler, server); return NULL; } +ready: + /* server is ready to use */ + od_serverpool_set(&pooler->server_pool, server, + OD_SACTIVE); return server; } diff --git a/core/od_server.h b/core/od_server.h index d03e7c95..64f9046e 100644 --- a/core/od_server.h +++ b/core/od_server.h @@ -19,6 +19,7 @@ typedef enum { struct odserver_t { odserver_state_t state; odscheme_route_t *route; + sostream_t stream; ftio_t io; odlist_t link; }; @@ -29,6 +30,7 @@ od_serverinit(odserver_t *s) s->state = OD_SUNDEF; s->route = NULL; s->io = NULL; + so_stream_init(&s->stream); od_listinit(&s->link); } @@ -45,6 +47,7 @@ od_serveralloc(void) static inline void od_serverfree(odserver_t *s) { + so_stream_free(&s->stream); free(s); } diff --git a/core/od_server_pool.c b/core/od_server_pool.c index fcd4d5bf..261140ab 100644 --- a/core/od_server_pool.c +++ b/core/od_server_pool.c @@ -12,6 +12,7 @@ #include #include +#include #include "od_macro.h" #include "od_list.h"