odyssey: rework pool wait logic and refactor client notification

This commit is contained in:
Dmitry Simonenko 2018-12-07 15:12:06 +03:00
parent 2e03af9c88
commit 0504a34b52
9 changed files with 153 additions and 57 deletions

View File

@ -40,6 +40,18 @@ od_atomic_u32_sub(od_atomic_u32_t *atomic, uint32_t value)
__sync_sub_and_fetch(atomic, value);
}
static inline void
od_atomic_u32_or(od_atomic_u32_t *atomic, uint32_t value)
{
__sync_or_and_fetch(atomic, value);
}
static inline void
od_atomic_u32_xor(od_atomic_u32_t *atomic, uint32_t value)
{
__sync_xor_and_fetch(atomic, value);
}
static inline uint32_t
od_atomic_u64_of(od_atomic_u64_t *atomic)
{

View File

@ -20,13 +20,13 @@ typedef enum
typedef enum
{
OD_CLIENT_OP_NONE,
OD_CLIENT_OP_KILL
OD_CLIENT_OP_NONE = 0,
OD_CLIENT_OP_KILL = 1
} od_clientop_t;
struct od_client_ctl
{
volatile od_clientop_t op;
od_atomic_u32_t op;
};
struct od_client
@ -112,11 +112,28 @@ od_client_notify_read(od_client_t *client)
machine_msg_free(msg);
}
static inline uint32_t
od_client_ctl_of(od_client_t *client)
{
return od_atomic_u32_of(&client->ctl.op);
}
static inline void
od_client_ctl_set(od_client_t *client, uint32_t op)
{
od_atomic_u32_or(&client->ctl.op, op);
}
static inline void
od_client_ctl_unset(od_client_t *client, uint32_t op)
{
od_atomic_u32_xor(&client->ctl.op, op);
}
static inline void
od_client_kill(od_client_t *client)
{
/* TODO */
client->ctl.op = OD_CLIENT_OP_KILL;
od_client_ctl_set(client, OD_CLIENT_OP_KILL);
od_client_notify(client);
}

View File

@ -164,7 +164,12 @@ od_frontend_attach(od_client_t *client, char *context)
{
status = od_router_attach(router, &instance->config, &instance->id_mgr, client);
if (status != OD_ROUTER_OK)
{
if (status == OD_ROUTER_ERROR_TIMEDOUT)
od_error(&instance->logger, "router", client, NULL,
"server pool wait timedout, closing");
return OD_FE_EATTACH;
}
server = client->server;
if (server->io && !machine_connected(server->io)) {
@ -608,10 +613,14 @@ static od_frontend_rc_t
od_frontend_ctl(od_client_t *client)
{
od_client_notify_read(client);
if (client->ctl.op == OD_CLIENT_OP_KILL) {
uint32_t op = od_client_ctl_of(client);
if (op & OD_CLIENT_OP_KILL)
{
od_client_ctl_unset(client, OD_CLIENT_OP_KILL);
machine_msg_t *msg;
msg = od_frontend_errorf(client, KIWI_OPERATOR_INTERVENTION,
"client connection dropped");
"client connection dropped");
if (msg == NULL)
return OD_FE_KILL;
machine_write(client->io, msg);

View File

@ -11,16 +11,17 @@ typedef struct od_route od_route_t;
struct od_route
{
od_rule_t *rule;
od_route_id_t id;
od_stat_t stats;
od_stat_t stats_prev;
int stats_mark;
od_server_pool_t server_pool;
od_client_pool_t client_pool;
kiwi_params_lock_t params;
pthread_mutex_t lock;
od_list_t link;
od_rule_t *rule;
od_route_id_t id;
od_stat_t stats;
od_stat_t stats_prev;
int stats_mark;
od_server_pool_t server_pool;
od_client_pool_t client_pool;
kiwi_params_lock_t params;
machine_channel_t *wait_bus;
pthread_mutex_t lock;
od_list_t link;
};
static inline void
@ -35,29 +36,49 @@ od_route_init(od_route_t *route)
od_stat_init(&route->stats_prev);
kiwi_params_lock_init(&route->params);
od_list_init(&route->link);
route->wait_bus = NULL;
pthread_mutex_init(&route->lock, NULL);
}
static inline od_route_t*
od_route_allocate(void)
{
od_route_t *route = malloc(sizeof(*route));
if (route == NULL)
return NULL;
od_route_init(route);
return route;
}
static inline void
od_route_free(od_route_t *route)
{
od_route_id_free(&route->id);
od_server_pool_free(&route->server_pool);
kiwi_params_lock_free(&route->params);
if (route->wait_bus)
machine_channel_free(route->wait_bus);
pthread_mutex_destroy(&route->lock);
free(route);
}
static inline od_route_t*
od_route_allocate(int is_shared)
{
od_route_t *route = malloc(sizeof(*route));
if (route == NULL)
return NULL;
od_route_init(route);
route->wait_bus = machine_channel_create(is_shared);
if (route->wait_bus == NULL) {
od_route_free(route);
return NULL;
}
return route;
}
static inline void
od_route_lock(od_route_t *route)
{
pthread_mutex_lock(&route->lock);
}
static inline void
od_route_unlock(od_route_t *route)
{
pthread_mutex_unlock(&route->lock);
}
static inline int
od_route_is_dynamic(od_route_t *route)
{
@ -122,16 +143,27 @@ od_route_kill_client_pool(od_route_t *route)
od_route_kill_cb, NULL);
}
static inline void
od_route_lock(od_route_t *route)
static inline int
od_route_wait(od_route_t *route, uint32_t time_ms)
{
pthread_mutex_lock(&route->lock);
machine_msg_t *msg;
msg = machine_channel_read(route->wait_bus, time_ms);
if (msg) {
machine_msg_free(msg);
return 0;
}
return -1;
}
static inline void
od_route_unlock(od_route_t *route)
static inline int
od_route_signal(od_route_t *route)
{
pthread_mutex_unlock(&route->lock);
machine_msg_t *msg;
msg = machine_msg_create(0);
if (msg == NULL)
return -1;
machine_channel_write(route->wait_bus, msg);
return 0;
}
#endif /* ODYSSEY_ROUTE_H */

View File

@ -47,10 +47,10 @@ od_route_pool_free(od_route_pool_t *pool)
}
static inline od_route_t*
od_route_pool_new(od_route_pool_t *pool, od_route_id_t *id,
od_route_pool_new(od_route_pool_t *pool, int is_shared, od_route_id_t *id,
od_rule_t *rule)
{
od_route_t *route = od_route_allocate();
od_route_t *route = od_route_allocate(is_shared);
if (route == NULL)
return NULL;
int rc;
@ -169,7 +169,7 @@ static inline int
od_route_pool_stat_database(od_route_pool_t *pool,
od_route_pool_stat_database_cb_t callback,
uint64_t prev_time_us,
void *arg)
void **argv)
{
od_route_t *route;
od_list_t *i;
@ -196,7 +196,7 @@ od_route_pool_stat_database(od_route_pool_t *pool,
int rc;
rc = callback(route->id.database, route->id.database_len - 1,
&current, &avg, arg);
&current, &avg, argv);
if (rc == -1) {
od_route_pool_stat_unmark(pool);
return -1;

View File

@ -79,10 +79,16 @@ od_router_reconfigure(od_router_t *router, od_rules_t *rules)
static inline int
od_router_expire_server_cb(od_server_t *server, void **argv)
{
od_route_t *route = server->route;
od_list_t *expire_list = argv[0];
int *count = argv[1];
/* remove server for server pool */
od_server_pool_set(&route->server_pool, server, OD_SERVER_UNDEF);
od_list_append(expire_list, &server->link);
(*count)++;
return 0;
}
@ -245,7 +251,9 @@ od_router_route(od_router_t *router, od_config_t *config, od_client_t *client)
od_route_t *route;
route = od_route_pool_match(&router->route_pool, &id, rule);
if (route == NULL) {
route = od_route_pool_new(&router->route_pool, &id, rule);
int is_shared;
is_shared = od_config_is_multi_workers(config);
route = od_route_pool_new(&router->route_pool, is_shared, &id, rule);
if (route == NULL) {
od_router_unlock(router);
return OD_ROUTER_ERROR;
@ -306,6 +314,9 @@ od_router_attach(od_router_t *router, od_config_t *config, od_id_mgr_t *id_mgr,
od_route_lock(route);
/* enqueue client (pending -> queue) */
od_client_pool_set(&route->client_pool, client, OD_CLIENT_QUEUE);
/* get client server from route server pool */
od_server_t *server;
for (;;)
@ -318,11 +329,29 @@ od_router_attach(od_router_t *router, od_config_t *config, od_id_mgr_t *id_mgr,
if (route->rule->pool_size == 0)
break;
/* TODO: wait for free server */
#if 0
/* enqueue client */
od_client_pool_set(&route->client_pool, client, OD_CLIENT_QUEUE);
#endif
/* maybe start new connection */
if (od_server_pool_total(&route->server_pool) < route->rule->pool_size)
break;
od_route_unlock(route);
/* pool_size limit implementation.
*
* If the limit reached, wait wakeup condition for
* pool_timeout milliseconds.
*
* The condition triggered when a server connection
* put into idle state by DETACH events.
*/
uint32_t timeout = route->rule->pool_timeout;
if (timeout == 0)
timeout = UINT32_MAX;
int rc;
rc = od_route_wait(route, timeout);
if (rc == -1)
return OD_ROUTER_ERROR_TIMEDOUT;
od_route_lock(route);
}
od_route_unlock(route);
@ -337,20 +366,19 @@ od_router_attach(od_router_t *router, od_config_t *config, od_id_mgr_t *id_mgr,
od_packet_set_chunk(&server->packet_reader, config->packet_read_size);
od_route_lock(route);
/* TODO */
/* recheck for free server again? */
/* xxx: maybe retry check for free server again */
attach:
od_server_pool_set(&route->server_pool, server, OD_SERVER_ACTIVE);
od_client_pool_set(&route->client_pool, client, OD_CLIENT_ACTIVE);
od_route_unlock(route);
client->server = server;
server->client = client;
server->idle_time = 0;
server->key_client = client->key;
od_route_unlock(route);
/* attach server io to clients machine context */
if (server->io && od_config_is_multi_workers(config))
machine_io_attach(server->io);
@ -378,13 +406,9 @@ od_router_detach(od_router_t *router, od_config_t *config, od_client_t *client)
od_server_pool_set(&route->server_pool, server, OD_SERVER_IDLE);
od_client_pool_set(&route->client_pool, client, OD_CLIENT_PENDING);
/* TODO: wakeup waiters */
/*
if (route->client_pool.count_queue > 0) {
od_client_t *waiter;
waiter = od_client_pool_next(&route->client_pool, OD_CLIENT_QUEUE);
}
*/
/* notify waiters */
if (route->client_pool.count_queue > 0)
od_route_signal(route);
od_route_unlock(route);
}

View File

@ -25,6 +25,9 @@ od_rules_init(od_rules_t *rules)
od_list_init(&rules->rules);
}
static inline void
od_rules_rule_free(od_rule_t*);
void
od_rules_free(od_rules_t *rules)
{
@ -205,7 +208,7 @@ od_rules_add(od_rules_t *rules)
return rule;
}
void
static inline void
od_rules_rule_free(od_rule_t *rule)
{
if (rule->db_name)

View File

@ -129,7 +129,6 @@ void od_rules_print(od_rules_t*, od_logger_t*);
/* rule */
od_rule_t*
od_rules_add(od_rules_t*);
void od_rules_rule_free(od_rule_t*);
void od_rules_ref(od_rule_t*);
void od_rules_unref(od_rule_t*);
int od_rules_compare(od_rule_t*, od_rule_t*);

View File

@ -114,7 +114,7 @@ od_stat_sum(od_stat_t *sum, od_stat_t *stat)
static inline void
od_stat_update_of(od_atomic_u64_t *prev, od_atomic_u64_t *current)
{
/* todo: this should be made more optiomal */
/* todo: this could be made more optimal */
/* prev <= current */
uint64_t diff;
diff = od_atomic_u64_of(current) - od_atomic_u64_of(prev);