odissey: make fair client scheduling per relay thread

This commit is contained in:
Dmitry Simonenko 2017-06-19 13:30:56 +03:00
parent 8ea82abbba
commit 7262b6cbd9
9 changed files with 43 additions and 19 deletions

View File

@ -158,11 +158,6 @@ int od_instance_main(od_instance_t *instance, int argc, char **argv)
.relay_pool = &relay_pool,
.instance = instance
};
system.task_queue = machine_queue_create();
if (system.task_queue == NULL) {
od_error(&instance->log, NULL, "failed to create task queue");
return 1;
}
od_router_init(&router, &system);
od_periodic_init(&periodic, &system);
od_pooler_init(&pooler, &system);

View File

@ -193,7 +193,7 @@ int od_periodic_start(od_periodic_t *periodic)
coroutine_id = machine_coroutine_create(od_periodic, periodic);
if (coroutine_id == -1) {
od_error(&instance->log, "periodic", "failed to start periodic coroutine");
return 1;
return -1;
}
return 0;
}

View File

@ -38,6 +38,9 @@
#include "od_route.h"
#include "od_route_pool.h"
#include "od_router.h"
#include "od_router.h"
#include "od_relay.h"
#include "od_relay_pool.h"
#include "od_pooler.h"
#include "od_periodic.h"
#include "od_tls.h"
@ -46,6 +49,7 @@ static inline void
od_pooler_main(od_pooler_t *pooler)
{
od_instance_t *instance = pooler->system->instance;
od_relaypool_t *relay_pool = pooler->system->relay_pool;
/* listen '*' */
struct addrinfo *hints_ptr = NULL;
@ -149,12 +153,13 @@ od_pooler_main(od_pooler_t *pooler)
client->id = pooler->client_seq++;
client->io = client_io;
/* create new client event */
/* create new client connect event and pass it
* to worker pool */
machine_msg_t *msg;
msg = machine_msg_create(OD_MCLIENT_NEW, sizeof(od_client_t*));
char *msg_data = machine_msg_get_data(msg);
memcpy(msg_data, &client, sizeof(od_client_t*));
machine_queue_put(pooler->system->task_queue, msg);
od_relaypool_feed(relay_pool, msg);
}
}
@ -225,7 +230,7 @@ od_pooler(void *arg)
int od_pooler_init(od_pooler_t *pooler, od_system_t *system)
{
od_instance_t *instance = pooler->system->instance;
od_instance_t *instance = system->instance;
pooler->machine = -1;
pooler->client_seq = 0;
@ -248,7 +253,7 @@ int od_pooler_start(od_pooler_t *pooler)
pooler->machine = machine_create("pooler", od_pooler, pooler);
if (pooler->machine == -1) {
od_error(&instance->log, "pooler", "failed to create pooler thread");
return 1;
return -1;
}
return 0;
}

View File

@ -54,7 +54,7 @@ od_relay(void *arg)
for (;;)
{
machine_msg_t *msg;
msg = machine_queue_get(relay->system->task_queue, UINT32_MAX);
msg = machine_queue_get(relay->task_queue, UINT32_MAX);
if (msg == NULL)
break;
@ -99,12 +99,19 @@ void od_relay_init(od_relay_t *relay, od_system_t *system, int id)
int od_relay_start(od_relay_t *relay)
{
od_instance_t *instance = relay->system->instance;
relay->task_queue = machine_queue_create();
if (relay->task_queue == NULL) {
od_error(&instance->log, "relay", "failed to create task queue");
return -1;
}
char name[32];
snprintf(name, sizeof(name), "relay: %d", relay->id);
relay->machine = machine_create(name, od_relay, relay);
if (relay->machine == -1) {
machine_queue_free(relay->task_queue);
od_error(&instance->log, "relay", "failed to start relay");
return 1;
return -1;
}
return 0;
}

View File

@ -11,9 +11,10 @@ typedef struct od_relay od_relay_t;
struct od_relay
{
int64_t machine;
int id;
od_system_t *system;
int64_t machine;
int id;
machine_queue_t *task_queue;
od_system_t *system;
};
void od_relay_init(od_relay_t*, od_system_t*, int);

View File

@ -46,6 +46,7 @@
int od_relaypool_init(od_relaypool_t *pool, od_system_t *system, int count)
{
pool->round_robin = 0;
pool->count = count;
pool->pool = malloc(sizeof(od_relay_t) * count);
if (pool->pool == NULL)
@ -70,3 +71,17 @@ int od_relaypool_start(od_relaypool_t *pool)
}
return 0;
}
void od_relaypool_feed(od_relaypool_t *pool, machine_msg_t *msg)
{
int next = pool->round_robin;
if (pool->round_robin < pool->count) {
pool->round_robin++;
} else {
pool->round_robin = 0;
next = 0;
}
od_relay_t *relay;
relay = &pool->pool[next];
machine_queue_put(relay->task_queue, msg);
}

View File

@ -13,9 +13,11 @@ struct od_relaypool
{
od_relay_t *pool;
int count;
int round_robin;
};
int od_relaypool_init(od_relaypool_t*, od_system_t*, int);
int od_relaypool_start(od_relaypool_t*);
int od_relaypool_init(od_relaypool_t*, od_system_t*, int);
int od_relaypool_start(od_relaypool_t*);
void od_relaypool_feed(od_relaypool_t*, machine_msg_t*);
#endif /* OD_RELAY_POOL_H */

View File

@ -442,7 +442,7 @@ int od_router_start(od_router_t *router)
coroutine_id = machine_coroutine_create(od_router, router);
if (coroutine_id == -1) {
od_error(&instance->log, "router", "failed to start router");
return 1;
return -1;
}
return 0;
}

View File

@ -16,7 +16,6 @@ struct od_system
void *router;
void *periodic;
void *relay_pool;
machine_queue_t *task_queue;
};
#endif /* OD_SYSTEM_H */