diff --git a/src/od_instance.c b/src/od_instance.c index c3e71667..8e3256fa 100644 --- a/src/od_instance.c +++ b/src/od_instance.c @@ -158,11 +158,6 @@ int od_instance_main(od_instance_t *instance, int argc, char **argv) .relay_pool = &relay_pool, .instance = instance }; - system.task_queue = machine_queue_create(); - if (system.task_queue == NULL) { - od_error(&instance->log, NULL, "failed to create task queue"); - return 1; - } od_router_init(&router, &system); od_periodic_init(&periodic, &system); od_pooler_init(&pooler, &system); diff --git a/src/od_periodic.c b/src/od_periodic.c index 6ad99944..a604be3c 100644 --- a/src/od_periodic.c +++ b/src/od_periodic.c @@ -193,7 +193,7 @@ int od_periodic_start(od_periodic_t *periodic) coroutine_id = machine_coroutine_create(od_periodic, periodic); if (coroutine_id == -1) { od_error(&instance->log, "periodic", "failed to start periodic coroutine"); - return 1; + return -1; } return 0; } diff --git a/src/od_pooler.c b/src/od_pooler.c index 1f9e6328..33faaffe 100644 --- a/src/od_pooler.c +++ b/src/od_pooler.c @@ -38,6 +38,9 @@ #include "od_route.h" #include "od_route_pool.h" #include "od_router.h" +#include "od_router.h" +#include "od_relay.h" +#include "od_relay_pool.h" #include "od_pooler.h" #include "od_periodic.h" #include "od_tls.h" @@ -46,6 +49,7 @@ static inline void od_pooler_main(od_pooler_t *pooler) { od_instance_t *instance = pooler->system->instance; + od_relaypool_t *relay_pool = pooler->system->relay_pool; /* listen '*' */ struct addrinfo *hints_ptr = NULL; @@ -149,12 +153,13 @@ od_pooler_main(od_pooler_t *pooler) client->id = pooler->client_seq++; client->io = client_io; - /* create new client event */ + /* create new client connect event and pass it + * to worker pool */ machine_msg_t *msg; msg = machine_msg_create(OD_MCLIENT_NEW, sizeof(od_client_t*)); char *msg_data = machine_msg_get_data(msg); memcpy(msg_data, &client, sizeof(od_client_t*)); - machine_queue_put(pooler->system->task_queue, msg); + od_relaypool_feed(relay_pool, msg); } } @@ -225,7 +230,7 @@ od_pooler(void *arg) int od_pooler_init(od_pooler_t *pooler, od_system_t *system) { - od_instance_t *instance = pooler->system->instance; + od_instance_t *instance = system->instance; pooler->machine = -1; pooler->client_seq = 0; @@ -248,7 +253,7 @@ int od_pooler_start(od_pooler_t *pooler) pooler->machine = machine_create("pooler", od_pooler, pooler); if (pooler->machine == -1) { od_error(&instance->log, "pooler", "failed to create pooler thread"); - return 1; + return -1; } return 0; } diff --git a/src/od_relay.c b/src/od_relay.c index 69a50175..9881f442 100644 --- a/src/od_relay.c +++ b/src/od_relay.c @@ -54,7 +54,7 @@ od_relay(void *arg) for (;;) { machine_msg_t *msg; - msg = machine_queue_get(relay->system->task_queue, UINT32_MAX); + msg = machine_queue_get(relay->task_queue, UINT32_MAX); if (msg == NULL) break; @@ -99,12 +99,19 @@ void od_relay_init(od_relay_t *relay, od_system_t *system, int id) int od_relay_start(od_relay_t *relay) { od_instance_t *instance = relay->system->instance; + + relay->task_queue = machine_queue_create(); + if (relay->task_queue == NULL) { + od_error(&instance->log, "relay", "failed to create task queue"); + return -1; + } char name[32]; snprintf(name, sizeof(name), "relay: %d", relay->id); relay->machine = machine_create(name, od_relay, relay); if (relay->machine == -1) { + machine_queue_free(relay->task_queue); od_error(&instance->log, "relay", "failed to start relay"); - return 1; + return -1; } return 0; } diff --git a/src/od_relay.h b/src/od_relay.h index 57713bc4..2ec40e1f 100644 --- a/src/od_relay.h +++ b/src/od_relay.h @@ -11,9 +11,10 @@ typedef struct od_relay od_relay_t; struct od_relay { - int64_t machine; - int id; - od_system_t *system; + int64_t machine; + int id; + machine_queue_t *task_queue; + od_system_t *system; }; void od_relay_init(od_relay_t*, od_system_t*, int); diff --git a/src/od_relay_pool.c b/src/od_relay_pool.c index 3ef2f3d0..752cdc1e 100644 --- a/src/od_relay_pool.c +++ b/src/od_relay_pool.c @@ -46,6 +46,7 @@ int od_relaypool_init(od_relaypool_t *pool, od_system_t *system, int count) { + pool->round_robin = 0; pool->count = count; pool->pool = malloc(sizeof(od_relay_t) * count); if (pool->pool == NULL) @@ -70,3 +71,17 @@ int od_relaypool_start(od_relaypool_t *pool) } return 0; } + +void od_relaypool_feed(od_relaypool_t *pool, machine_msg_t *msg) +{ + int next = pool->round_robin; + if (pool->round_robin < pool->count) { + pool->round_robin++; + } else { + pool->round_robin = 0; + next = 0; + } + od_relay_t *relay; + relay = &pool->pool[next]; + machine_queue_put(relay->task_queue, msg); +} diff --git a/src/od_relay_pool.h b/src/od_relay_pool.h index 95d8ac97..6e89daf9 100644 --- a/src/od_relay_pool.h +++ b/src/od_relay_pool.h @@ -13,9 +13,11 @@ struct od_relaypool { od_relay_t *pool; int count; + int round_robin; }; -int od_relaypool_init(od_relaypool_t*, od_system_t*, int); -int od_relaypool_start(od_relaypool_t*); +int od_relaypool_init(od_relaypool_t*, od_system_t*, int); +int od_relaypool_start(od_relaypool_t*); +void od_relaypool_feed(od_relaypool_t*, machine_msg_t*); #endif /* OD_RELAY_POOL_H */ diff --git a/src/od_router.c b/src/od_router.c index 7cb29dbb..9f5d4cd5 100644 --- a/src/od_router.c +++ b/src/od_router.c @@ -442,7 +442,7 @@ int od_router_start(od_router_t *router) coroutine_id = machine_coroutine_create(od_router, router); if (coroutine_id == -1) { od_error(&instance->log, "router", "failed to start router"); - return 1; + return -1; } return 0; } diff --git a/src/od_system.h b/src/od_system.h index 18f835c6..ef09299b 100644 --- a/src/od_system.h +++ b/src/od_system.h @@ -16,7 +16,6 @@ struct od_system void *router; void *periodic; void *relay_pool; - machine_queue_t *task_queue; }; #endif /* OD_SYSTEM_H */