odissey: handle be connection and startup

This commit is contained in:
Dmitry Simonenko 2016-11-10 16:27:47 +03:00
parent d01fe64936
commit 8baa2eaa9c
3 changed files with 81 additions and 9 deletions

View File

@ -30,8 +30,7 @@
static int
od_beclose(odpooler_t *pooler, odserver_t *server)
{
od_serverpool_set(&pooler->server_pool, server,
OD_SUNDEF);
od_serverpool_set(&pooler->server_pool, server, OD_SUNDEF);
if (server->io) {
ft_close(server->io);
server->io = NULL;
@ -40,11 +39,79 @@ od_beclose(odpooler_t *pooler, odserver_t *server)
return 0;
}
static int
od_bestartup(odserver_t *server)
{
odscheme_server_t *dest = server->route->server;
(void)dest;
sostream_t *stream = &server->stream;
so_stream_reset(stream);
sofearg_t argv[] = {
{ "user", 5 }, { "test", 5 },
{ "database", 9 }, { "test", 5 }
};
int rc;
rc = so_fewrite_startup_message(stream, 4, argv);
if (rc == -1)
return -1;
rc = ft_write(server->io, (char*)stream->s,
so_stream_used(stream), 0);
return rc;
}
static int
od_beauth(odserver_t *server)
{
#if 0
sofehandshake handshake;
memset(&handshake, 0, sizeof(handshake));
for (;;) {
rc = od_read(server->handle, buf);
if (rc == -1) {
goto error;
}
rc = so_fehandshake(&handshake, buf->s, so_bufused(buf));
if (rc <= 0) {
if (rc == -1) {
/* ErrorResponce */
goto error;
}
break;
}
}
#endif
return 0;
}
static int
od_beconnect(odpooler_t *pooler, odserver_t *server)
{
(void)pooler;
(void)server;
odscheme_server_t *dest = server->route->server;
/* place server to connect pool */
od_serverpool_set(&pooler->server_pool, server, OD_SCONNECT);
/* connect to server */
int rc;
rc = ft_connect(server->io, dest->host, dest->port, 0);
if (rc < 0) {
od_log(&pooler->od->log, "failed to connect to %s:%d",
dest->host, dest->port);
return -1;
}
/* startup */
rc = od_bestartup(server);
if (rc == -1)
return -1;
/* auth */
rc = od_beauth(server);
if (rc == -1)
return -1;
/* server is ready to use */
od_serverpool_set(&pooler->server_pool, server,
OD_SIDLE);
return 0;
}
@ -54,11 +121,8 @@ od_bepop(odpooler_t *pooler, odscheme_route_t *route)
/* try to fetch server from idle pool */
odserver_t *server =
od_serverpool_pop(&pooler->server_pool);
if (server) {
od_serverpool_set(&pooler->server_pool, server,
OD_SACTIVE);
return server;
}
if (server)
goto ready;
/* create new server connection */
server = od_serveralloc();
if (server == NULL)
@ -75,5 +139,9 @@ od_bepop(odpooler_t *pooler, odscheme_route_t *route)
od_beclose(pooler, server);
return NULL;
}
ready:
/* server is ready to use */
od_serverpool_set(&pooler->server_pool, server,
OD_SACTIVE);
return server;
}

View File

@ -19,6 +19,7 @@ typedef enum {
struct odserver_t {
odserver_state_t state;
odscheme_route_t *route;
sostream_t stream;
ftio_t io;
odlist_t link;
};
@ -29,6 +30,7 @@ od_serverinit(odserver_t *s)
s->state = OD_SUNDEF;
s->route = NULL;
s->io = NULL;
so_stream_init(&s->stream);
od_listinit(&s->link);
}
@ -45,6 +47,7 @@ od_serveralloc(void)
static inline void
od_serverfree(odserver_t *s)
{
so_stream_free(&s->stream);
free(s);
}

View File

@ -12,6 +12,7 @@
#include <assert.h>
#include <flint.h>
#include <soprano.h>
#include "od_macro.h"
#include "od_list.h"