diff --git a/src/od_backend.c b/src/od_backend.c index 9552a659..8bc1e237 100644 --- a/src/od_backend.c +++ b/src/od_backend.c @@ -244,18 +244,44 @@ od_backend_setup(od_server_t *server) return 0; } -static inline int -od_backend_connect(od_server_t *server) +int od_backend_connect(od_server_t *server) { od_instance_t *instance = server->system->instance; od_route_t *route = server->route; - od_schemeserver_t *server_scheme = route->scheme->server; + + assert(server->io == NULL); + assert(server->route != NULL); + + /* create io handle */ + server->io = machine_io_create(); + if (server->io == NULL) + return -1; + + /* set network options */ + machine_set_nodelay(server->io, instance->scheme.nodelay); + if (instance->scheme.keepalive > 0) + machine_set_keepalive(server->io, 1, instance->scheme.keepalive); + int rc; + rc = machine_set_readahead(server->io, instance->scheme.readahead); + if (rc == -1) { + od_error_server(&instance->log, server->id, NULL, + "failed to set readahead"); + return -1; + } + + /* set tls options */ + od_schemeserver_t *server_scheme; + server_scheme = route->scheme->server; + if (server_scheme->tls_verify != OD_TDISABLE) { + server->tls = od_tls_backend(server_scheme); + if (server->tls == NULL) + return -1; + } /* resolve server address */ char port[16]; snprintf(port, sizeof(port), "%d", server_scheme->port); struct addrinfo *ai = NULL; - int rc; rc = machine_getaddrinfo(server_scheme->host, port, NULL, &ai, 0); if (rc == -1) { od_error_server(&instance->log, server->id, NULL, @@ -283,7 +309,6 @@ od_backend_connect(od_server_t *server) if (rc == -1) return -1; } - od_log_server(&instance->log, server->id, NULL, "new server connection %s:%d", server_scheme->host, @@ -302,60 +327,6 @@ od_backend_connect(od_server_t *server) return 0; } -od_server_t* -od_backend_new(od_router_t *router, od_route_t *route) -{ - od_instance_t *instance = router->system->instance; - - /* create new server connection */ - od_server_t *server; - server = od_server_allocate(); - if (server == NULL) - return NULL; - server->route = route; - server->system = router->system; - - /* assign server to connect pool */ - od_serverpool_set(&route->server_pool, server, OD_SCONNECT); - - /* set network options */ - server->io = machine_io_create(); - if (server->io == NULL) - goto error; - machine_set_nodelay(server->io, instance->scheme.nodelay); - if (instance->scheme.keepalive > 0) - machine_set_keepalive(server->io, 1, instance->scheme.keepalive); - int rc; - rc = machine_set_readahead(server->io, instance->scheme.readahead); - if (rc == -1) { - od_error_server(&instance->log, server->id, NULL, - "failed to set readahead"); - goto error; - } - - /* set tls options */ - od_schemeserver_t *server_scheme; - server_scheme = route->scheme->server; - if (server_scheme->tls_verify != OD_TDISABLE) { - server->tls = od_tls_backend(server_scheme); - if (server->tls == NULL) - goto error; - } - - /* initiate connection */ - rc = od_backend_connect(server); - if (rc == -1) - goto error; - - return server; - -error: - od_serverpool_set(&route->server_pool, server, OD_SUNDEF); - server->route = NULL; - od_backend_close(server); - return NULL; -} - static inline int od_backend_query(od_server_t *server, char *procedure, char *query, int len) { diff --git a/src/od_backend.h b/src/od_backend.h index 97160c22..2fd23e66 100644 --- a/src/od_backend.h +++ b/src/od_backend.h @@ -7,9 +7,7 @@ * PostgreSQL connection pooler and request router. */ -od_server_t* -od_backend_new(od_router_t*, od_route_t*); - +int od_backend_connect(od_server_t*); void od_backend_close(od_server_t*); int od_backend_terminate(od_server_t*); int od_backend_reset(od_server_t*); diff --git a/src/od_frontend.c b/src/od_frontend.c index 4e3f14cd..deb06aab 100644 --- a/src/od_frontend.c +++ b/src/od_frontend.c @@ -218,6 +218,7 @@ enum { OD_RS_UNDEF, OD_RS_OK, OD_RS_EATTACH, + OD_RS_ESERVER_CONNECT, OD_RS_ESERVER_CONFIGURE, OD_RS_ESERVER_READ, OD_RS_ESERVER_WRITE, @@ -281,7 +282,8 @@ od_frontend_main(od_client_t *client) break; /* get server connection from the route pool */ - if (server == NULL) { + if (server == NULL) + { od_routerstatus_t status; status = od_router_attach(client); if (status != OD_ROK) @@ -294,20 +296,29 @@ od_frontend_main(od_client_t *client) /* configure server using client startup parameters, * if it has not been configured before. */ if (server->last_client_id == client->id) { + assert(server->io != NULL); od_debug_client(&instance->log, client->id, NULL, "previously owned, no need to reconfigure S%" PRIu64, server->id); } else { - /* discard last server configuration, unless - * server has been just connected. */ od_route_t *route = client->route; - if (route->scheme->discard) { - if (server->last_client_id != UINT64_MAX) { + + /* connect to server, if necessary */ + if (server->io == NULL) { + rc = od_backend_connect(server); + if (rc == -1) + return OD_RS_ESERVER_CONNECT; + } else { + assert(server->last_client_id != UINT64_MAX); + + /* discard last server configuration */ + if (route->scheme->discard) { rc = od_backend_discard(client->server); if (rc == -1) return OD_RS_ESERVER_CONFIGURE; } } + /* set client parameters */ rc = od_backend_configure(client->server, &client->startup); if (rc == -1) @@ -410,7 +421,6 @@ od_frontend_main(od_client_t *client) } } } - return OD_RS_OK; } @@ -539,6 +549,7 @@ void od_frontend(void *arg) /* detach client from route */ od_unroute(client); break; + case OD_RS_OK: /* graceful disconnect */ od_log_client(&instance->log, client->id, NULL, @@ -556,6 +567,7 @@ void od_frontend(void *arg) /* push server to router server pool */ od_router_detach_and_unroute(client); break; + case OD_RS_ECLIENT_READ: case OD_RS_ECLIENT_WRITE: /* close client connection and reuse server @@ -576,15 +588,24 @@ void od_frontend(void *arg) /* push server to router server pool */ od_router_detach_and_unroute(client); break; + + case OD_RS_ESERVER_CONNECT: + /* server attached to client and connection failed */ + od_frontend_error(client, SO_ERROR_CONNECTION_FAILURE, + "failed to connect to remote server"); + /* close backend connection */ + od_router_close_and_unroute(client); + break; + case OD_RS_ESERVER_CONFIGURE: od_log_server(&instance->log, server->id, NULL, - "disconnected (server configure error): %s", - machine_error(server->io)); + "disconnected (server configure error)"); od_frontend_error(client, SO_ERROR_CONNECTION_FAILURE, "failed to configure remote server"); /* close backend connection */ od_router_close_and_unroute(client); break; + case OD_RS_ESERVER_READ: case OD_RS_ESERVER_WRITE: /* close client connection and close server @@ -597,6 +618,7 @@ void od_frontend(void *arg) /* close backend connection */ od_router_close_and_unroute(client); break; + case OD_RS_UNDEF: assert(0); break; diff --git a/src/od_router.c b/src/od_router.c index cc31ec5c..e3e2d78d 100644 --- a/src/od_router.c +++ b/src/od_router.c @@ -130,7 +130,7 @@ od_router_attacher(void *arg) { server = od_serverpool_next(&route->server_pool, OD_SIDLE); if (server) - goto on_connect; + goto on_attach; /* maybe start new connection */ if (od_serverpool_total(&route->server_pool) < route->scheme->pool_size) @@ -170,20 +170,19 @@ od_router_attacher(void *arg) continue; } - /* create new backend connection */ + /* create new server object */ uint64_t id = router->server_seq++; - server = od_backend_new(router, route); + server = od_server_allocate(); if (server == NULL) { msg_attach->status = OD_RERROR; machine_queue_put(msg_attach->response, msg); return; } server->id = id; + server->system = router->system; + server->route = route; - /* detach server io from router context */ - machine_io_detach(server->io); - -on_connect: +on_attach: od_serverpool_set(&route->server_pool, server, OD_SACTIVE); od_clientpool_set(&route->client_pool, client, OD_CACTIVE); client->server = server; @@ -511,9 +510,11 @@ od_router_attach(od_client_t *client) { od_routerstatus_t status; status = od_router_do(client, OD_MROUTER_ATTACH, 1); - if (client->server) { + od_server_t *server = client->server; + if (server && server->io) { /* attach server io to clients machine context */ - machine_io_attach(client->server->io); + if (server->io) + machine_io_attach(server->io); } return status; } diff --git a/src/od_server.h b/src/od_server.h index 7f288b1a..d536a31f 100644 --- a/src/od_server.h +++ b/src/od_server.h @@ -13,7 +13,6 @@ typedef enum { OD_SUNDEF, OD_SIDLE, - OD_SCONNECT, OD_SACTIVE, OD_SEXPIRE } od_serverstate_t; diff --git a/src/od_server_pool.c b/src/od_server_pool.c index aa28b9fb..b2700d5e 100644 --- a/src/od_server_pool.c +++ b/src/od_server_pool.c @@ -32,12 +32,10 @@ void od_serverpool_init(od_serverpool_t *pool) { pool->count_idle = 0; pool->count_active = 0; - pool->count_connect = 0; pool->count_expire = 0; od_list_init(&pool->idle); od_list_init(&pool->active); od_list_init(&pool->expire); - od_list_init(&pool->connect); od_list_init(&pool->link); } @@ -49,10 +47,6 @@ void od_serverpool_free(od_serverpool_t *pool) server = od_container_of(i, od_server_t, link); od_server_free(server); } - od_list_foreach_safe(&pool->connect, i, n) { - server = od_container_of(i, od_server_t, link); - od_server_free(server); - } od_list_foreach_safe(&pool->expire, i, n) { server = od_container_of(i, od_server_t, link); od_server_free(server); @@ -77,9 +71,6 @@ void od_serverpool_set(od_serverpool_t *pool, od_server_t *server, case OD_SIDLE: pool->count_idle--; break; - case OD_SCONNECT: - pool->count_connect--; - break; case OD_SACTIVE: pool->count_active--; break; @@ -92,10 +83,6 @@ void od_serverpool_set(od_serverpool_t *pool, od_server_t *server, target = &pool->expire; pool->count_expire++; break; - case OD_SCONNECT: - target = &pool->connect; - pool->count_connect++; - break; case OD_SIDLE: target = &pool->idle; pool->count_idle++; @@ -123,8 +110,6 @@ od_serverpool_next(od_serverpool_t *pool, od_serverstate_t state) break; case OD_SACTIVE: target = &pool->active; break; - case OD_SCONNECT: target = &pool->connect; - break; case OD_SUNDEF: assert(0); break; } @@ -149,8 +134,6 @@ od_serverpool_foreach(od_serverpool_t *pool, break; case OD_SACTIVE: target = &pool->active; break; - case OD_SCONNECT: target = &pool->connect; - break; case OD_SUNDEF: assert(0); break; } diff --git a/src/od_server_pool.h b/src/od_server_pool.h index 20bcb661..c2638221 100644 --- a/src/od_server_pool.h +++ b/src/od_server_pool.h @@ -15,11 +15,9 @@ struct od_serverpool { od_list_t active; od_list_t idle; - od_list_t connect; od_list_t expire; int count_active; int count_idle; - int count_connect; int count_expire; od_list_t link; }; @@ -40,7 +38,6 @@ static inline int od_serverpool_total(od_serverpool_t *pool) { return pool->count_active + pool->count_idle + - pool->count_connect + pool->count_expire; }