mirror of https://github.com/yandex/odyssey.git
odissey: implement router cancel logic
This commit is contained in:
parent
cf823e87ee
commit
6b696ccdef
|
@ -84,13 +84,11 @@ int od_cancel(od_instance_t *instance,
|
|||
machine_io_free(io);
|
||||
return -1;
|
||||
}
|
||||
#if 0
|
||||
rc = machine_set_readahead(io, instance->scheme.readahead);
|
||||
if (rc == -1) {
|
||||
od_error(&instance->log, NULL, "(cancel) failed to set readahead");
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
|
||||
so_stream_t stream;
|
||||
so_stream_init(&stream);
|
||||
|
@ -146,3 +144,25 @@ int od_cancel(od_instance_t *instance,
|
|||
so_stream_free(&stream);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline int
|
||||
od_cancel_cmp(od_server_t *server, void *arg)
|
||||
{
|
||||
so_key_t *key = arg;
|
||||
return so_keycmp(&server->key_client, key);
|
||||
}
|
||||
|
||||
int od_cancel_match(od_instance_t *instance,
|
||||
od_routepool_t *route_pool,
|
||||
so_key_t *key)
|
||||
{
|
||||
/* match server by client key (forge) */
|
||||
od_server_t *server;
|
||||
server = od_routepool_foreach(route_pool, OD_SACTIVE, od_cancel_cmp, key);
|
||||
if (server == NULL)
|
||||
return -1;
|
||||
od_route_t *route = server->route;
|
||||
od_schemeserver_t *server_scheme = route->scheme->server;
|
||||
so_key_t cancel_key = server->key;
|
||||
return od_cancel(instance, server_scheme, &cancel_key);
|
||||
}
|
||||
|
|
|
@ -8,5 +8,6 @@
|
|||
*/
|
||||
|
||||
int od_cancel(od_instance_t*, od_schemeserver_t*, so_key_t*);
|
||||
int od_cancel_match(od_instance_t*, od_routepool_t*, so_key_t*);
|
||||
|
||||
#endif
|
||||
|
|
|
@ -381,12 +381,7 @@ void od_frontend(void *arg)
|
|||
/* client cancel request */
|
||||
if (client->startup.is_cancel) {
|
||||
od_debug(&instance->log, client->io, "C: cancel request");
|
||||
#if 0
|
||||
od_relay_t *relay = client->system->relay;
|
||||
so_key_t key = client->startup.key;
|
||||
od_frontend_close(client);
|
||||
od_cancel(relay, &key);
|
||||
#endif
|
||||
od_router_cancel(client);
|
||||
od_frontend_close(client);
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -5,7 +5,8 @@ typedef enum {
|
|||
OD_MCLIENT_NEW,
|
||||
OD_MROUTER_ROUTE,
|
||||
OD_MROUTER_ATTACH,
|
||||
OD_MROUTER_DETACH
|
||||
OD_MROUTER_DETACH,
|
||||
OD_MROUTER_CANCEL
|
||||
} od_msg_t;
|
||||
|
||||
#endif /* OD_MSG_H */
|
||||
|
|
|
@ -43,6 +43,7 @@
|
|||
#include "od_router.h"
|
||||
#include "od_frontend.h"
|
||||
#include "od_backend.h"
|
||||
#include "od_cancel.h"
|
||||
|
||||
typedef struct
|
||||
{
|
||||
|
@ -227,6 +228,23 @@ od_router(void *arg)
|
|||
/* todo: wakeup attachers */
|
||||
break;
|
||||
}
|
||||
|
||||
case OD_MROUTER_CANCEL:
|
||||
{
|
||||
/* match server by client key and initiate
|
||||
* cancel request connection */
|
||||
od_msgrouter_t *msg_cancel;
|
||||
msg_cancel = machine_msg_get_data(msg);
|
||||
int rc;
|
||||
rc = od_cancel_match(instance, &router->route_pool,
|
||||
&msg_cancel->client->startup.key);
|
||||
if (rc == -1)
|
||||
msg_cancel->status = OD_RERROR;
|
||||
else
|
||||
msg_cancel->status = OD_ROK;
|
||||
machine_queue_put(msg_cancel->response, msg);
|
||||
continue;
|
||||
}
|
||||
default:
|
||||
assert(0);
|
||||
break;
|
||||
|
@ -365,3 +383,43 @@ od_router_detach(od_server_t *server)
|
|||
msg_detach->server = server;
|
||||
machine_queue_put(router->queue, msg);
|
||||
}
|
||||
|
||||
od_routerstatus_t
|
||||
od_router_cancel(od_client_t *client)
|
||||
{
|
||||
od_router_t *router = client->system->router;
|
||||
|
||||
/* create response queue */
|
||||
machine_queue_t response;
|
||||
response = machine_queue_create();
|
||||
if (response == NULL)
|
||||
return OD_RERROR;
|
||||
|
||||
/* send cancel request to router */
|
||||
machine_msg_t msg;
|
||||
msg = machine_msg_create(OD_MROUTER_CANCEL, sizeof(od_msgrouter_t));
|
||||
if (msg == NULL) {
|
||||
machine_queue_free(response);
|
||||
return OD_RERROR;
|
||||
}
|
||||
od_msgrouter_t *msg_cancel;
|
||||
msg_cancel = machine_msg_get_data(msg);
|
||||
msg_cancel->status = OD_RERROR;
|
||||
msg_cancel->client = client;
|
||||
msg_cancel->response = response;
|
||||
machine_queue_put(router->queue, msg);
|
||||
|
||||
/* wait for reply */
|
||||
msg = machine_queue_get(response, UINT32_MAX);
|
||||
if (msg == NULL) {
|
||||
/* todo: */
|
||||
machine_queue_free(response);
|
||||
return OD_RERROR;
|
||||
}
|
||||
msg_cancel = machine_msg_get_data(msg);
|
||||
od_routerstatus_t status;
|
||||
status = msg_cancel->status;
|
||||
machine_queue_free(response);
|
||||
machine_msg_free(msg);
|
||||
return status;
|
||||
}
|
||||
|
|
|
@ -37,4 +37,7 @@ od_router_attach(od_client_t*);
|
|||
void
|
||||
od_router_detach(od_server_t*);
|
||||
|
||||
od_routerstatus_t
|
||||
od_router_cancel(od_client_t*);
|
||||
|
||||
#endif /* OD_ROUTER_H */
|
||||
|
|
Loading…
Reference in New Issue