From e32faef69c237f8ff4175a087e34de4b36ffd05f Mon Sep 17 00:00:00 2001 From: Dmitry Simonenko Date: Sat, 27 May 2017 16:14:39 +0300 Subject: [PATCH] machinarium: implement router push/pop requests --- src/od_frontend.c | 1 - src/od_msg.h | 4 +- src/od_router.c | 135 ++++++++++++++++++++++++++++++++++++++-------- src/od_router.h | 6 +++ 4 files changed, 121 insertions(+), 25 deletions(-) diff --git a/src/od_frontend.c b/src/od_frontend.c index 38804a94..65d72242 100644 --- a/src/od_frontend.c +++ b/src/od_frontend.c @@ -292,6 +292,5 @@ void od_frontend(void *arg) } /* main */ - od_frontend_close(client); } diff --git a/src/od_msg.h b/src/od_msg.h index 5e9540d5..38e8b878 100644 --- a/src/od_msg.h +++ b/src/od_msg.h @@ -3,7 +3,9 @@ typedef enum { OD_MCLIENT_NEW, - OD_MROUTE + OD_MROUTER_ATTACH, + OD_MROUTER_POP, + OD_MROUTER_PUSH } od_msg_t; #endif /* OD_MSG_H */ diff --git a/src/od_router.c b/src/od_router.c index 505d3e0d..5282b37f 100644 --- a/src/od_router.c +++ b/src/od_router.c @@ -43,11 +43,18 @@ #include "od_frontend.h" #include "od_router.h" -typedef struct { +typedef struct +{ od_routerstatus_t status; od_client_t *client; machine_queue_t response; -} od_msgroute_t; +} od_msgrouter_t; + +typedef struct +{ + od_route_t *route; + od_server_t *server; +} od_msgrouter_push_t; static od_route_t* od_router_fwd(od_router_t *router, so_bestartup_t *startup) @@ -116,16 +123,17 @@ od_router(void *arg) od_msg_t msg_type; msg_type = machine_msg_get_type(msg); switch (msg_type) { - case OD_MROUTE: + case OD_MROUTER_ATTACH: { - od_msgroute_t *msg_route; - msg_route = machine_msg_get_data(msg); + /* attach client to route */ + od_msgrouter_t *msg_attach; + msg_attach = machine_msg_get_data(msg); od_route_t *route; - route = od_router_fwd(router, &msg_route->client->startup); + route = od_router_fwd(router, &msg_attach->client->startup); if (route == NULL) { - msg_route->status = OD_RERROR_NOT_FOUND; - machine_queue_put(msg_route->response, msg); + msg_attach->status = OD_RERROR_NOT_FOUND; + machine_queue_put(msg_attach->response, msg); continue; } @@ -137,17 +145,43 @@ od_router(void *arg) "router: route '%s' client_max reached (%d)", route->scheme->target, route->scheme->client_max); - msg_route->status = OD_RERROR_LIMIT; - machine_queue_put(msg_route->response, msg); + msg_attach->status = OD_RERROR_LIMIT; + machine_queue_put(msg_attach->response, msg); continue; } - /*od_clientpool_set(&route->client_pool, msg_route->client, OD_CPENDING);*/ - msg_route->client->route = route; - msg_route->status = OD_ROK; - machine_queue_put(msg_route->response, msg); + /*od_clientpool_set(&route->client_pool, msg_attach->client, OD_CPENDING);*/ + msg_attach->client->route = route; + msg_attach->status = OD_ROK; + machine_queue_put(msg_attach->response, msg); continue; } + + case OD_MROUTER_POP: + { + /* get client server from route server pool */ + od_msgrouter_t *msg_pop; + msg_pop = machine_msg_get_data(msg); + + od_route_t *route; + route = msg_pop->client->route; + assert(route != NULL); + + break; + } + + case OD_MROUTER_PUSH: + { + /* push client server back to route server pool */ + od_msgrouter_push_t *msg_push; + msg_push = machine_msg_get_data(msg); + + od_serverpool_set(&msg_push->route->server_pool, + msg_push->server, + OD_SIDLE); + /* todo: wakeup waiters */ + break; + } default: assert(0); break; @@ -192,18 +226,18 @@ od_route(od_router_t *router, od_client_t *client) if (response == NULL) return OD_RERROR; - /* send route request to router */ + /* send attach request to router */ machine_msg_t msg; - msg = machine_msg_create(OD_MROUTE, sizeof(od_msgroute_t)); + msg = machine_msg_create(OD_MROUTER_ATTACH, sizeof(od_msgrouter_t)); if (msg == NULL) { machine_queue_free(response); return OD_RERROR; } - od_msgroute_t *msg_route; - msg_route = machine_msg_get_data(msg); - msg_route->status = OD_RERROR; - msg_route->client = client; - msg_route->response = response; + od_msgrouter_t *msg_attach; + msg_attach = machine_msg_get_data(msg); + msg_attach->status = OD_RERROR; + msg_attach->client = client; + msg_attach->response = response; machine_queue_put(router->queue, msg); /* wait for reply */ @@ -213,10 +247,65 @@ od_route(od_router_t *router, od_client_t *client) machine_queue_free(response); return OD_RERROR; } - msg_route = machine_msg_get_data(msg); + msg_attach = machine_msg_get_data(msg); od_routerstatus_t status; - status = msg_route->status; + status = msg_attach->status; machine_queue_free(response); machine_msg_free(msg); return status; } + +od_routerstatus_t +od_route_pop(od_router_t *router, od_client_t *client) +{ + /* create response queue */ + machine_queue_t response; + response = machine_queue_create(); + if (response == NULL) + return OD_RERROR; + + /* send pop request to router */ + machine_msg_t msg; + msg = machine_msg_create(OD_MROUTER_POP, sizeof(od_msgrouter_t)); + if (msg == NULL) { + machine_queue_free(response); + return OD_RERROR; + } + od_msgrouter_t *msg_pop; + msg_pop = machine_msg_get_data(msg); + msg_pop->status = OD_RERROR; + msg_pop->client = client; + msg_pop->response = response; + machine_queue_put(router->queue, msg); + + /* wait for reply */ + msg = machine_queue_get(response, UINT32_MAX); + if (msg == NULL) { + /* todo: */ + machine_queue_free(response); + return OD_RERROR; + } + msg_pop = machine_msg_get_data(msg); + od_routerstatus_t status; + status = msg_pop->status; + machine_queue_free(response); + machine_msg_free(msg); + return status; +} + +void +od_route_push(od_router_t *router, od_client_t *client) +{ + /* send server push request to router */ + machine_msg_t msg; + msg = machine_msg_create(OD_MROUTER_PUSH, sizeof(od_msgrouter_push_t)); + if (msg == NULL) + return; + od_msgrouter_push_t *msg_push; + msg_push = machine_msg_get_data(msg); + msg_push->route = client->route; + msg_push->server = client->server; + machine_queue_put(router->queue, msg); + + client->server = NULL; +} diff --git a/src/od_router.h b/src/od_router.h index 4297a2a5..c537f664 100644 --- a/src/od_router.h +++ b/src/od_router.h @@ -31,4 +31,10 @@ int od_router_start(od_router_t*); od_routerstatus_t od_route(od_router_t*, od_client_t*); +od_routerstatus_t +od_router_pop(od_router_t*, od_client_t*); + +void +od_router_push(od_router_t*, od_client_t*); + #endif /* OD_ROUTER_H */