odissey: better name for server attach/detach

This commit is contained in:
Dmitry Simonenko 2017-05-29 15:39:18 +03:00
parent 7da27518d6
commit 7bedf59191
4 changed files with 91 additions and 79 deletions

View File

@ -265,17 +265,11 @@ od_backend_new(od_router_t *router, od_route_t *route)
}
#endif
/* place server to connection pool */
od_serverpool_set(&route->server_pool, server, OD_SCONNECT);
int rc;
rc = od_backend_connect(server);
if (rc == -1) {
od_backend_close(server);
return NULL;
}
/* server is ready to use */
od_serverpool_set(&route->server_pool, server, OD_SIDLE);
return server;
}

View File

@ -3,9 +3,9 @@
typedef enum {
OD_MCLIENT_NEW,
OD_MROUTER_ROUTE,
OD_MROUTER_ATTACH,
OD_MROUTER_POP,
OD_MROUTER_PUSH
OD_MROUTER_DETACH
} od_msg_t;
#endif /* OD_MSG_H */

View File

@ -105,6 +105,17 @@ od_router_fwd(od_router_t *router, so_bestartup_t *startup)
return route;
}
static inline void
od_router_attacher(void *arg)
{
od_msgrouter_t *msg_attach;
msg_attach = arg;
od_route_t *route;
route = msg_attach->client->route;
assert(route != NULL);
}
static inline void
od_router(void *arg)
{
@ -123,17 +134,17 @@ od_router(void *arg)
od_msg_t msg_type;
msg_type = machine_msg_get_type(msg);
switch (msg_type) {
case OD_MROUTER_ATTACH:
case OD_MROUTER_ROUTE:
{
/* attach client to route */
od_msgrouter_t *msg_attach;
msg_attach = machine_msg_get_data(msg);
od_msgrouter_t *msg_route;
msg_route = machine_msg_get_data(msg);
od_route_t *route;
route = od_router_fwd(router, &msg_attach->client->startup);
route = od_router_fwd(router, &msg_route->client->startup);
if (route == NULL) {
msg_attach->status = OD_RERROR_NOT_FOUND;
machine_queue_put(msg_attach->response, msg);
msg_route->status = OD_RERROR_NOT_FOUND;
machine_queue_put(msg_route->response, msg);
continue;
}
@ -145,41 +156,45 @@ od_router(void *arg)
"router: route '%s' client_max reached (%d)",
route->scheme->target,
route->scheme->client_max);
msg_attach->status = OD_RERROR_LIMIT;
machine_queue_put(msg_attach->response, msg);
msg_route->status = OD_RERROR_LIMIT;
machine_queue_put(msg_route->response, msg);
continue;
}
/*od_clientpool_set(&route->client_pool, msg_attach->client, OD_CPENDING);*/
msg_attach->client->route = route;
msg_attach->status = OD_ROK;
machine_queue_put(msg_attach->response, msg);
/*od_clientpool_set(&route->client_pool, msg_route->client, OD_CPENDING);*/
msg_route->client->route = route;
msg_route->status = OD_ROK;
machine_queue_put(msg_route->response, msg);
continue;
}
case OD_MROUTER_POP:
case OD_MROUTER_ATTACH:
{
/* get client server from route server pool */
od_msgrouter_t *msg_pop;
msg_pop = machine_msg_get_data(msg);
od_msgrouter_t *msg_attach;
msg_attach = machine_msg_get_data(msg);
od_route_t *route;
route = msg_pop->client->route;
assert(route != NULL);
break;
int64_t coroutine_id;
coroutine_id = machine_coroutine_create(od_router_attacher, msg);
if (coroutine_id == -1) {
msg_attach->status = OD_RERROR;
machine_queue_put(msg_attach->response, msg);
break;
}
continue;
}
case OD_MROUTER_PUSH:
case OD_MROUTER_DETACH:
{
/* push client server back to route server pool */
od_msgrouter_push_t *msg_push;
msg_push = machine_msg_get_data(msg);
od_msgrouter_push_t *msg_detach;
msg_detach = machine_msg_get_data(msg);
od_serverpool_set(&msg_push->route->server_pool,
msg_push->server,
od_serverpool_set(&msg_detach->route->server_pool,
msg_detach->server,
OD_SIDLE);
/* todo: wakeup waiters */
/* todo: wakeup attachers */
break;
}
default:
@ -219,6 +234,44 @@ int od_router_start(od_router_t *router)
od_routerstatus_t
od_route(od_router_t *router, od_client_t *client)
{
/* create response queue */
machine_queue_t response;
response = machine_queue_create();
if (response == NULL)
return OD_RERROR;
/* send attach request to router */
machine_msg_t msg;
msg = machine_msg_create(OD_MROUTER_ROUTE, sizeof(od_msgrouter_t));
if (msg == NULL) {
machine_queue_free(response);
return OD_RERROR;
}
od_msgrouter_t *msg_route;
msg_route = machine_msg_get_data(msg);
msg_route->status = OD_RERROR;
msg_route->client = client;
msg_route->response = response;
machine_queue_put(router->queue, msg);
/* wait for reply */
msg = machine_queue_get(response, UINT32_MAX);
if (msg == NULL) {
/* todo: */
machine_queue_free(response);
return OD_RERROR;
}
msg_route = machine_msg_get_data(msg);
od_routerstatus_t status;
status = msg_route->status;
machine_queue_free(response);
machine_msg_free(msg);
return status;
}
od_routerstatus_t
od_route_attach(od_router_t *router, od_client_t *client)
{
/* create response queue */
machine_queue_t response;
@ -252,60 +305,25 @@ od_route(od_router_t *router, od_client_t *client)
status = msg_attach->status;
machine_queue_free(response);
machine_msg_free(msg);
return status;
}
od_routerstatus_t
od_route_pop(od_router_t *router, od_client_t *client)
{
/* create response queue */
machine_queue_t response;
response = machine_queue_create();
if (response == NULL)
return OD_RERROR;
/* send pop request to router */
machine_msg_t msg;
msg = machine_msg_create(OD_MROUTER_POP, sizeof(od_msgrouter_t));
if (msg == NULL) {
machine_queue_free(response);
return OD_RERROR;
}
od_msgrouter_t *msg_pop;
msg_pop = machine_msg_get_data(msg);
msg_pop->status = OD_RERROR;
msg_pop->client = client;
msg_pop->response = response;
machine_queue_put(router->queue, msg);
/* wait for reply */
msg = machine_queue_get(response, UINT32_MAX);
if (msg == NULL) {
/* todo: */
machine_queue_free(response);
return OD_RERROR;
}
msg_pop = machine_msg_get_data(msg);
od_routerstatus_t status;
status = msg_pop->status;
machine_queue_free(response);
machine_msg_free(msg);
/* TODO: machine_attach(client->server->io) */
return status;
}
void
od_route_push(od_router_t *router, od_client_t *client)
od_route_detach(od_router_t *router, od_client_t *client)
{
/* TODO: machine_detach(client->server->io) */
/* send server push request to router */
machine_msg_t msg;
msg = machine_msg_create(OD_MROUTER_PUSH, sizeof(od_msgrouter_push_t));
msg = machine_msg_create(OD_MROUTER_DETACH, sizeof(od_msgrouter_push_t));
if (msg == NULL)
return;
od_msgrouter_push_t *msg_push;
msg_push = machine_msg_get_data(msg);
msg_push->route = client->route;
msg_push->server = client->server;
od_msgrouter_push_t *msg_detach;
msg_detach = machine_msg_get_data(msg);
msg_detach->route = client->route;
msg_detach->server = client->server;
machine_queue_put(router->queue, msg);
client->server = NULL;
}

View File

@ -32,7 +32,7 @@ od_routerstatus_t
od_route(od_router_t*, od_client_t*);
od_routerstatus_t
od_router_pop(od_router_t*, od_client_t*);
od_router_attach(od_router_t*, od_client_t*);
void
od_router_push(od_router_t*, od_client_t*);