odissey: make backend connection always only on relay thread

This commit is contained in:
Dmitry Simonenko 2017-06-16 14:57:43 +03:00
parent e85eab7b25
commit 33b54996a7
7 changed files with 71 additions and 100 deletions

View File

@ -244,18 +244,44 @@ od_backend_setup(od_server_t *server)
return 0;
}
static inline int
od_backend_connect(od_server_t *server)
int od_backend_connect(od_server_t *server)
{
od_instance_t *instance = server->system->instance;
od_route_t *route = server->route;
od_schemeserver_t *server_scheme = route->scheme->server;
assert(server->io == NULL);
assert(server->route != NULL);
/* create io handle */
server->io = machine_io_create();
if (server->io == NULL)
return -1;
/* set network options */
machine_set_nodelay(server->io, instance->scheme.nodelay);
if (instance->scheme.keepalive > 0)
machine_set_keepalive(server->io, 1, instance->scheme.keepalive);
int rc;
rc = machine_set_readahead(server->io, instance->scheme.readahead);
if (rc == -1) {
od_error_server(&instance->log, server->id, NULL,
"failed to set readahead");
return -1;
}
/* set tls options */
od_schemeserver_t *server_scheme;
server_scheme = route->scheme->server;
if (server_scheme->tls_verify != OD_TDISABLE) {
server->tls = od_tls_backend(server_scheme);
if (server->tls == NULL)
return -1;
}
/* resolve server address */
char port[16];
snprintf(port, sizeof(port), "%d", server_scheme->port);
struct addrinfo *ai = NULL;
int rc;
rc = machine_getaddrinfo(server_scheme->host, port, NULL, &ai, 0);
if (rc == -1) {
od_error_server(&instance->log, server->id, NULL,
@ -283,7 +309,6 @@ od_backend_connect(od_server_t *server)
if (rc == -1)
return -1;
}
od_log_server(&instance->log, server->id, NULL,
"new server connection %s:%d",
server_scheme->host,
@ -302,60 +327,6 @@ od_backend_connect(od_server_t *server)
return 0;
}
od_server_t*
od_backend_new(od_router_t *router, od_route_t *route)
{
od_instance_t *instance = router->system->instance;
/* create new server connection */
od_server_t *server;
server = od_server_allocate();
if (server == NULL)
return NULL;
server->route = route;
server->system = router->system;
/* assign server to connect pool */
od_serverpool_set(&route->server_pool, server, OD_SCONNECT);
/* set network options */
server->io = machine_io_create();
if (server->io == NULL)
goto error;
machine_set_nodelay(server->io, instance->scheme.nodelay);
if (instance->scheme.keepalive > 0)
machine_set_keepalive(server->io, 1, instance->scheme.keepalive);
int rc;
rc = machine_set_readahead(server->io, instance->scheme.readahead);
if (rc == -1) {
od_error_server(&instance->log, server->id, NULL,
"failed to set readahead");
goto error;
}
/* set tls options */
od_schemeserver_t *server_scheme;
server_scheme = route->scheme->server;
if (server_scheme->tls_verify != OD_TDISABLE) {
server->tls = od_tls_backend(server_scheme);
if (server->tls == NULL)
goto error;
}
/* initiate connection */
rc = od_backend_connect(server);
if (rc == -1)
goto error;
return server;
error:
od_serverpool_set(&route->server_pool, server, OD_SUNDEF);
server->route = NULL;
od_backend_close(server);
return NULL;
}
static inline int
od_backend_query(od_server_t *server, char *procedure, char *query, int len)
{

View File

@ -7,9 +7,7 @@
* PostgreSQL connection pooler and request router.
*/
od_server_t*
od_backend_new(od_router_t*, od_route_t*);
int od_backend_connect(od_server_t*);
void od_backend_close(od_server_t*);
int od_backend_terminate(od_server_t*);
int od_backend_reset(od_server_t*);

View File

@ -218,6 +218,7 @@ enum {
OD_RS_UNDEF,
OD_RS_OK,
OD_RS_EATTACH,
OD_RS_ESERVER_CONNECT,
OD_RS_ESERVER_CONFIGURE,
OD_RS_ESERVER_READ,
OD_RS_ESERVER_WRITE,
@ -281,7 +282,8 @@ od_frontend_main(od_client_t *client)
break;
/* get server connection from the route pool */
if (server == NULL) {
if (server == NULL)
{
od_routerstatus_t status;
status = od_router_attach(client);
if (status != OD_ROK)
@ -294,20 +296,29 @@ od_frontend_main(od_client_t *client)
/* configure server using client startup parameters,
* if it has not been configured before. */
if (server->last_client_id == client->id) {
assert(server->io != NULL);
od_debug_client(&instance->log, client->id, NULL,
"previously owned, no need to reconfigure S%" PRIu64,
server->id);
} else {
/* discard last server configuration, unless
* server has been just connected. */
od_route_t *route = client->route;
if (route->scheme->discard) {
if (server->last_client_id != UINT64_MAX) {
/* connect to server, if necessary */
if (server->io == NULL) {
rc = od_backend_connect(server);
if (rc == -1)
return OD_RS_ESERVER_CONNECT;
} else {
assert(server->last_client_id != UINT64_MAX);
/* discard last server configuration */
if (route->scheme->discard) {
rc = od_backend_discard(client->server);
if (rc == -1)
return OD_RS_ESERVER_CONFIGURE;
}
}
/* set client parameters */
rc = od_backend_configure(client->server, &client->startup);
if (rc == -1)
@ -410,7 +421,6 @@ od_frontend_main(od_client_t *client)
}
}
}
return OD_RS_OK;
}
@ -539,6 +549,7 @@ void od_frontend(void *arg)
/* detach client from route */
od_unroute(client);
break;
case OD_RS_OK:
/* graceful disconnect */
od_log_client(&instance->log, client->id, NULL,
@ -556,6 +567,7 @@ void od_frontend(void *arg)
/* push server to router server pool */
od_router_detach_and_unroute(client);
break;
case OD_RS_ECLIENT_READ:
case OD_RS_ECLIENT_WRITE:
/* close client connection and reuse server
@ -576,15 +588,24 @@ void od_frontend(void *arg)
/* push server to router server pool */
od_router_detach_and_unroute(client);
break;
case OD_RS_ESERVER_CONNECT:
/* server attached to client and connection failed */
od_frontend_error(client, SO_ERROR_CONNECTION_FAILURE,
"failed to connect to remote server");
/* close backend connection */
od_router_close_and_unroute(client);
break;
case OD_RS_ESERVER_CONFIGURE:
od_log_server(&instance->log, server->id, NULL,
"disconnected (server configure error): %s",
machine_error(server->io));
"disconnected (server configure error)");
od_frontend_error(client, SO_ERROR_CONNECTION_FAILURE,
"failed to configure remote server");
/* close backend connection */
od_router_close_and_unroute(client);
break;
case OD_RS_ESERVER_READ:
case OD_RS_ESERVER_WRITE:
/* close client connection and close server
@ -597,6 +618,7 @@ void od_frontend(void *arg)
/* close backend connection */
od_router_close_and_unroute(client);
break;
case OD_RS_UNDEF:
assert(0);
break;

View File

@ -130,7 +130,7 @@ od_router_attacher(void *arg)
{
server = od_serverpool_next(&route->server_pool, OD_SIDLE);
if (server)
goto on_connect;
goto on_attach;
/* maybe start new connection */
if (od_serverpool_total(&route->server_pool) < route->scheme->pool_size)
@ -170,20 +170,19 @@ od_router_attacher(void *arg)
continue;
}
/* create new backend connection */
/* create new server object */
uint64_t id = router->server_seq++;
server = od_backend_new(router, route);
server = od_server_allocate();
if (server == NULL) {
msg_attach->status = OD_RERROR;
machine_queue_put(msg_attach->response, msg);
return;
}
server->id = id;
server->system = router->system;
server->route = route;
/* detach server io from router context */
machine_io_detach(server->io);
on_connect:
on_attach:
od_serverpool_set(&route->server_pool, server, OD_SACTIVE);
od_clientpool_set(&route->client_pool, client, OD_CACTIVE);
client->server = server;
@ -511,9 +510,11 @@ od_router_attach(od_client_t *client)
{
od_routerstatus_t status;
status = od_router_do(client, OD_MROUTER_ATTACH, 1);
if (client->server) {
od_server_t *server = client->server;
if (server && server->io) {
/* attach server io to clients machine context */
machine_io_attach(client->server->io);
if (server->io)
machine_io_attach(server->io);
}
return status;
}

View File

@ -13,7 +13,6 @@ typedef enum
{
OD_SUNDEF,
OD_SIDLE,
OD_SCONNECT,
OD_SACTIVE,
OD_SEXPIRE
} od_serverstate_t;

View File

@ -32,12 +32,10 @@ void od_serverpool_init(od_serverpool_t *pool)
{
pool->count_idle = 0;
pool->count_active = 0;
pool->count_connect = 0;
pool->count_expire = 0;
od_list_init(&pool->idle);
od_list_init(&pool->active);
od_list_init(&pool->expire);
od_list_init(&pool->connect);
od_list_init(&pool->link);
}
@ -49,10 +47,6 @@ void od_serverpool_free(od_serverpool_t *pool)
server = od_container_of(i, od_server_t, link);
od_server_free(server);
}
od_list_foreach_safe(&pool->connect, i, n) {
server = od_container_of(i, od_server_t, link);
od_server_free(server);
}
od_list_foreach_safe(&pool->expire, i, n) {
server = od_container_of(i, od_server_t, link);
od_server_free(server);
@ -77,9 +71,6 @@ void od_serverpool_set(od_serverpool_t *pool, od_server_t *server,
case OD_SIDLE:
pool->count_idle--;
break;
case OD_SCONNECT:
pool->count_connect--;
break;
case OD_SACTIVE:
pool->count_active--;
break;
@ -92,10 +83,6 @@ void od_serverpool_set(od_serverpool_t *pool, od_server_t *server,
target = &pool->expire;
pool->count_expire++;
break;
case OD_SCONNECT:
target = &pool->connect;
pool->count_connect++;
break;
case OD_SIDLE:
target = &pool->idle;
pool->count_idle++;
@ -123,8 +110,6 @@ od_serverpool_next(od_serverpool_t *pool, od_serverstate_t state)
break;
case OD_SACTIVE: target = &pool->active;
break;
case OD_SCONNECT: target = &pool->connect;
break;
case OD_SUNDEF: assert(0);
break;
}
@ -149,8 +134,6 @@ od_serverpool_foreach(od_serverpool_t *pool,
break;
case OD_SACTIVE: target = &pool->active;
break;
case OD_SCONNECT: target = &pool->connect;
break;
case OD_SUNDEF: assert(0);
break;
}

View File

@ -15,11 +15,9 @@ struct od_serverpool
{
od_list_t active;
od_list_t idle;
od_list_t connect;
od_list_t expire;
int count_active;
int count_idle;
int count_connect;
int count_expire;
od_list_t link;
};
@ -40,7 +38,6 @@ static inline int
od_serverpool_total(od_serverpool_t *pool) {
return pool->count_active +
pool->count_idle +
pool->count_connect +
pool->count_expire;
}