machinarium: implement router push/pop requests

This commit is contained in:
Dmitry Simonenko 2017-05-27 16:14:39 +03:00
parent 0723417d9c
commit e32faef69c
4 changed files with 121 additions and 25 deletions

View File

@ -292,6 +292,5 @@ void od_frontend(void *arg)
}
/* main */
od_frontend_close(client);
}

View File

@ -3,7 +3,9 @@
typedef enum {
OD_MCLIENT_NEW,
OD_MROUTE
OD_MROUTER_ATTACH,
OD_MROUTER_POP,
OD_MROUTER_PUSH
} od_msg_t;
#endif /* OD_MSG_H */

View File

@ -43,11 +43,18 @@
#include "od_frontend.h"
#include "od_router.h"
typedef struct {
typedef struct
{
od_routerstatus_t status;
od_client_t *client;
machine_queue_t response;
} od_msgroute_t;
} od_msgrouter_t;
typedef struct
{
od_route_t *route;
od_server_t *server;
} od_msgrouter_push_t;
static od_route_t*
od_router_fwd(od_router_t *router, so_bestartup_t *startup)
@ -116,16 +123,17 @@ od_router(void *arg)
od_msg_t msg_type;
msg_type = machine_msg_get_type(msg);
switch (msg_type) {
case OD_MROUTE:
case OD_MROUTER_ATTACH:
{
od_msgroute_t *msg_route;
msg_route = machine_msg_get_data(msg);
/* attach client to route */
od_msgrouter_t *msg_attach;
msg_attach = machine_msg_get_data(msg);
od_route_t *route;
route = od_router_fwd(router, &msg_route->client->startup);
route = od_router_fwd(router, &msg_attach->client->startup);
if (route == NULL) {
msg_route->status = OD_RERROR_NOT_FOUND;
machine_queue_put(msg_route->response, msg);
msg_attach->status = OD_RERROR_NOT_FOUND;
machine_queue_put(msg_attach->response, msg);
continue;
}
@ -137,17 +145,43 @@ od_router(void *arg)
"router: route '%s' client_max reached (%d)",
route->scheme->target,
route->scheme->client_max);
msg_route->status = OD_RERROR_LIMIT;
machine_queue_put(msg_route->response, msg);
msg_attach->status = OD_RERROR_LIMIT;
machine_queue_put(msg_attach->response, msg);
continue;
}
/*od_clientpool_set(&route->client_pool, msg_route->client, OD_CPENDING);*/
msg_route->client->route = route;
msg_route->status = OD_ROK;
machine_queue_put(msg_route->response, msg);
/*od_clientpool_set(&route->client_pool, msg_attach->client, OD_CPENDING);*/
msg_attach->client->route = route;
msg_attach->status = OD_ROK;
machine_queue_put(msg_attach->response, msg);
continue;
}
case OD_MROUTER_POP:
{
/* get client server from route server pool */
od_msgrouter_t *msg_pop;
msg_pop = machine_msg_get_data(msg);
od_route_t *route;
route = msg_pop->client->route;
assert(route != NULL);
break;
}
case OD_MROUTER_PUSH:
{
/* push client server back to route server pool */
od_msgrouter_push_t *msg_push;
msg_push = machine_msg_get_data(msg);
od_serverpool_set(&msg_push->route->server_pool,
msg_push->server,
OD_SIDLE);
/* todo: wakeup waiters */
break;
}
default:
assert(0);
break;
@ -192,18 +226,18 @@ od_route(od_router_t *router, od_client_t *client)
if (response == NULL)
return OD_RERROR;
/* send route request to router */
/* send attach request to router */
machine_msg_t msg;
msg = machine_msg_create(OD_MROUTE, sizeof(od_msgroute_t));
msg = machine_msg_create(OD_MROUTER_ATTACH, sizeof(od_msgrouter_t));
if (msg == NULL) {
machine_queue_free(response);
return OD_RERROR;
}
od_msgroute_t *msg_route;
msg_route = machine_msg_get_data(msg);
msg_route->status = OD_RERROR;
msg_route->client = client;
msg_route->response = response;
od_msgrouter_t *msg_attach;
msg_attach = machine_msg_get_data(msg);
msg_attach->status = OD_RERROR;
msg_attach->client = client;
msg_attach->response = response;
machine_queue_put(router->queue, msg);
/* wait for reply */
@ -213,10 +247,65 @@ od_route(od_router_t *router, od_client_t *client)
machine_queue_free(response);
return OD_RERROR;
}
msg_route = machine_msg_get_data(msg);
msg_attach = machine_msg_get_data(msg);
od_routerstatus_t status;
status = msg_route->status;
status = msg_attach->status;
machine_queue_free(response);
machine_msg_free(msg);
return status;
}
od_routerstatus_t
od_route_pop(od_router_t *router, od_client_t *client)
{
/* create response queue */
machine_queue_t response;
response = machine_queue_create();
if (response == NULL)
return OD_RERROR;
/* send pop request to router */
machine_msg_t msg;
msg = machine_msg_create(OD_MROUTER_POP, sizeof(od_msgrouter_t));
if (msg == NULL) {
machine_queue_free(response);
return OD_RERROR;
}
od_msgrouter_t *msg_pop;
msg_pop = machine_msg_get_data(msg);
msg_pop->status = OD_RERROR;
msg_pop->client = client;
msg_pop->response = response;
machine_queue_put(router->queue, msg);
/* wait for reply */
msg = machine_queue_get(response, UINT32_MAX);
if (msg == NULL) {
/* todo: */
machine_queue_free(response);
return OD_RERROR;
}
msg_pop = machine_msg_get_data(msg);
od_routerstatus_t status;
status = msg_pop->status;
machine_queue_free(response);
machine_msg_free(msg);
return status;
}
void
od_route_push(od_router_t *router, od_client_t *client)
{
/* send server push request to router */
machine_msg_t msg;
msg = machine_msg_create(OD_MROUTER_PUSH, sizeof(od_msgrouter_push_t));
if (msg == NULL)
return;
od_msgrouter_push_t *msg_push;
msg_push = machine_msg_get_data(msg);
msg_push->route = client->route;
msg_push->server = client->server;
machine_queue_put(router->queue, msg);
client->server = NULL;
}

View File

@ -31,4 +31,10 @@ int od_router_start(od_router_t*);
od_routerstatus_t
od_route(od_router_t*, od_client_t*);
od_routerstatus_t
od_router_pop(od_router_t*, od_client_t*);
void
od_router_push(od_router_t*, od_client_t*);
#endif /* OD_ROUTER_H */