From 6b696ccdef49df142edea5967279d3140fd3e4af Mon Sep 17 00:00:00 2001 From: Dmitry Simonenko Date: Tue, 30 May 2017 14:34:08 +0300 Subject: [PATCH] odissey: implement router cancel logic --- src/od_cancel.c | 24 ++++++++++++++++++-- src/od_cancel.h | 1 + src/od_frontend.c | 7 +----- src/od_msg.h | 3 ++- src/od_router.c | 58 +++++++++++++++++++++++++++++++++++++++++++++++ src/od_router.h | 3 +++ 6 files changed, 87 insertions(+), 9 deletions(-) diff --git a/src/od_cancel.c b/src/od_cancel.c index 913f7ca1..38aeb044 100644 --- a/src/od_cancel.c +++ b/src/od_cancel.c @@ -84,13 +84,11 @@ int od_cancel(od_instance_t *instance, machine_io_free(io); return -1; } -#if 0 rc = machine_set_readahead(io, instance->scheme.readahead); if (rc == -1) { od_error(&instance->log, NULL, "(cancel) failed to set readahead"); return -1; } -#endif so_stream_t stream; so_stream_init(&stream); @@ -146,3 +144,25 @@ int od_cancel(od_instance_t *instance, so_stream_free(&stream); return 0; } + +static inline int +od_cancel_cmp(od_server_t *server, void *arg) +{ + so_key_t *key = arg; + return so_keycmp(&server->key_client, key); +} + +int od_cancel_match(od_instance_t *instance, + od_routepool_t *route_pool, + so_key_t *key) +{ + /* match server by client key (forge) */ + od_server_t *server; + server = od_routepool_foreach(route_pool, OD_SACTIVE, od_cancel_cmp, key); + if (server == NULL) + return -1; + od_route_t *route = server->route; + od_schemeserver_t *server_scheme = route->scheme->server; + so_key_t cancel_key = server->key; + return od_cancel(instance, server_scheme, &cancel_key); +} diff --git a/src/od_cancel.h b/src/od_cancel.h index 46231f5a..a66880d3 100644 --- a/src/od_cancel.h +++ b/src/od_cancel.h @@ -8,5 +8,6 @@ */ int od_cancel(od_instance_t*, od_schemeserver_t*, so_key_t*); +int od_cancel_match(od_instance_t*, od_routepool_t*, so_key_t*); #endif diff --git a/src/od_frontend.c b/src/od_frontend.c index 07644282..ed9dbd6c 100644 --- a/src/od_frontend.c +++ b/src/od_frontend.c @@ -381,12 +381,7 @@ void od_frontend(void *arg) /* client cancel request */ if (client->startup.is_cancel) { od_debug(&instance->log, client->io, "C: cancel request"); -#if 0 - od_relay_t *relay = client->system->relay; - so_key_t key = client->startup.key; - od_frontend_close(client); - od_cancel(relay, &key); -#endif + od_router_cancel(client); od_frontend_close(client); return; } diff --git a/src/od_msg.h b/src/od_msg.h index ad644af6..b51ada53 100644 --- a/src/od_msg.h +++ b/src/od_msg.h @@ -5,7 +5,8 @@ typedef enum { OD_MCLIENT_NEW, OD_MROUTER_ROUTE, OD_MROUTER_ATTACH, - OD_MROUTER_DETACH + OD_MROUTER_DETACH, + OD_MROUTER_CANCEL } od_msg_t; #endif /* OD_MSG_H */ diff --git a/src/od_router.c b/src/od_router.c index 2b402b7d..6ff1982b 100644 --- a/src/od_router.c +++ b/src/od_router.c @@ -43,6 +43,7 @@ #include "od_router.h" #include "od_frontend.h" #include "od_backend.h" +#include "od_cancel.h" typedef struct { @@ -227,6 +228,23 @@ od_router(void *arg) /* todo: wakeup attachers */ break; } + + case OD_MROUTER_CANCEL: + { + /* match server by client key and initiate + * cancel request connection */ + od_msgrouter_t *msg_cancel; + msg_cancel = machine_msg_get_data(msg); + int rc; + rc = od_cancel_match(instance, &router->route_pool, + &msg_cancel->client->startup.key); + if (rc == -1) + msg_cancel->status = OD_RERROR; + else + msg_cancel->status = OD_ROK; + machine_queue_put(msg_cancel->response, msg); + continue; + } default: assert(0); break; @@ -365,3 +383,43 @@ od_router_detach(od_server_t *server) msg_detach->server = server; machine_queue_put(router->queue, msg); } + +od_routerstatus_t +od_router_cancel(od_client_t *client) +{ + od_router_t *router = client->system->router; + + /* create response queue */ + machine_queue_t response; + response = machine_queue_create(); + if (response == NULL) + return OD_RERROR; + + /* send cancel request to router */ + machine_msg_t msg; + msg = machine_msg_create(OD_MROUTER_CANCEL, sizeof(od_msgrouter_t)); + if (msg == NULL) { + machine_queue_free(response); + return OD_RERROR; + } + od_msgrouter_t *msg_cancel; + msg_cancel = machine_msg_get_data(msg); + msg_cancel->status = OD_RERROR; + msg_cancel->client = client; + msg_cancel->response = response; + machine_queue_put(router->queue, msg); + + /* wait for reply */ + msg = machine_queue_get(response, UINT32_MAX); + if (msg == NULL) { + /* todo: */ + machine_queue_free(response); + return OD_RERROR; + } + msg_cancel = machine_msg_get_data(msg); + od_routerstatus_t status; + status = msg_cancel->status; + machine_queue_free(response); + machine_msg_free(msg); + return status; +} diff --git a/src/od_router.h b/src/od_router.h index 8c999dd8..9d3ef260 100644 --- a/src/od_router.h +++ b/src/od_router.h @@ -37,4 +37,7 @@ od_router_attach(od_client_t*); void od_router_detach(od_server_t*); +od_routerstatus_t +od_router_cancel(od_client_t*); + #endif /* OD_ROUTER_H */