odissey: rework internal services init logic

This commit is contained in:
Dmitry Simonenko 2018-02-02 15:50:23 +03:00
parent 57faa0f7c4
commit 17673c123a
11 changed files with 94 additions and 101 deletions

View File

@ -733,22 +733,21 @@ od_console(void *arg)
}
}
int od_console_init(od_console_t *console, od_system_t *system)
void od_console_init(od_console_t *console, od_system_t *system)
{
od_instance_t *instance = system->instance;
console->system = system;
console->system = system;
console->channel = NULL;
}
int od_console_start(od_console_t *console)
{
od_instance_t *instance = console->system->instance;
console->channel = machine_channel_create(1);
if (console->channel == NULL) {
od_error(&instance->logger, "console", NULL, NULL,
"failed to create channel");
return -1;
}
return 0;
}
int od_console_start(od_console_t *console)
{
od_instance_t *instance = console->system->instance;
int64_t coroutine_id;
coroutine_id = machine_coroutine_create(od_console, console);
if (coroutine_id == -1) {

View File

@ -20,8 +20,8 @@ struct od_console_t {
machine_channel_t *channel;
};
int od_console_init(od_console_t*, od_system_t*);
int od_console_start(od_console_t*);
void od_console_init(od_console_t*, od_system_t*);
int od_console_start(od_console_t*);
od_consolestatus_t
od_console_request(od_client_t*, char*, int);

View File

@ -178,33 +178,33 @@ int od_instance_main(od_instance_t *instance, int argc, char **argv)
/* seed id manager */
od_idmgr_seed(&instance->id_mgr);
/* run system services */
/* prepare system services */
od_pooler_t pooler;
od_pooler_init(&pooler, instance);
od_router_t router;
od_console_t console;
od_periodic_t periodic;
od_pooler_t pooler;
od_relaypool_t relay_pool;
od_system_t system = {
.pooler = &pooler,
.router = &router,
.console = &console,
.periodic = &periodic,
.relay_pool = &relay_pool,
.instance = instance
};
od_router_init(&router, &system);
od_console_init(&console, &system);
od_periodic_init(&periodic, &system);
od_pooler_init(&pooler, &system);
rc = od_relaypool_init(&relay_pool, &system, instance->scheme.workers);
if (rc == -1)
return -1;
od_system_t *system;
system = &pooler.system;
system->instance = instance;
system->pooler = &pooler;
system->router = &router;
system->console = &console;
system->periodic = &periodic;
system->relay_pool = &relay_pool;
od_router_init(&router, system);
od_console_init(&console, system);
od_periodic_init(&periodic, system);
od_relaypool_init(&relay_pool);
/* start pooler machine thread */
rc = od_pooler_start(&pooler);
if (rc == -1)
return -1;
machine_wait(pooler.machine);
return 0;
}

View File

@ -300,10 +300,9 @@ od_periodic(void *arg)
}
}
int od_periodic_init(od_periodic_t *periodic, od_system_t *system)
void od_periodic_init(od_periodic_t *periodic, od_system_t *system)
{
periodic->system = system;
return 0;
}
int od_periodic_start(od_periodic_t *periodic)

View File

@ -13,7 +13,7 @@ struct od_periodic_t {
od_system_t *system;
};
int od_periodic_init(od_periodic_t*, od_system_t*);
int od_periodic_start(od_periodic_t*);
void od_periodic_init(od_periodic_t*, od_system_t*);
int od_periodic_start(od_periodic_t*);
#endif /* OD_PERIODIC_H */

View File

@ -165,7 +165,7 @@ od_pooler_server_start(od_pooler_t *pooler,
od_schemelisten_t *scheme,
struct addrinfo *addr)
{
od_instance_t *instance = pooler->system->instance;
od_instance_t *instance = pooler->system.instance;
od_poolerserver_t *server;
server = malloc(sizeof(od_poolerserver_t));
if (server == NULL) {
@ -175,7 +175,7 @@ od_pooler_server_start(od_pooler_t *pooler,
}
server->scheme = scheme;
server->addr = addr;
server->system = pooler->system;
server->system = &pooler->system;
int64_t coroutine_id;
coroutine_id = machine_coroutine_create(od_pooler_server, server);
if (coroutine_id == -1) {
@ -190,7 +190,7 @@ od_pooler_server_start(od_pooler_t *pooler,
static inline void
od_pooler_main(od_pooler_t *pooler)
{
od_instance_t *instance = pooler->system->instance;
od_instance_t *instance = pooler->system.instance;
od_list_t *i;
od_list_foreach(&instance->scheme.listen, i)
{
@ -241,7 +241,7 @@ od_pooler_main(od_pooler_t *pooler)
static inline void
od_pooler_config_import(od_pooler_t *pooler)
{
od_instance_t *instance = pooler->system->instance;
od_instance_t *instance = pooler->system.instance;
od_log(&instance->logger, "config", NULL, NULL, "importing changes from '%s'",
instance->config_file);
@ -287,7 +287,7 @@ static inline void
od_pooler_signal_handler(void *arg)
{
od_pooler_t *pooler = arg;
od_instance_t *instance = pooler->system->instance;
od_instance_t *instance = pooler->system.instance;
sigset_t mask;
sigemptyset(&mask);
@ -325,7 +325,32 @@ static inline void
od_pooler(void *arg)
{
od_pooler_t *pooler = arg;
od_instance_t *instance = pooler->system->instance;
od_instance_t *instance = pooler->instance;
/* start router coroutine */
int rc;
od_router_t *router = pooler->system.router;
rc = od_router_start(router);
if (rc == -1)
return;
/* start console coroutine */
od_console_t *console = pooler->system.console;
rc = od_console_start(console);
if (rc == -1)
return;
/* start periodic coroutine */
od_periodic_t *periodic = pooler->system.periodic;
rc = od_periodic_start(periodic);
if (rc == -1)
return;
/* start worker threads */
od_relaypool_t *relay_pool = pooler->system.relay_pool;
rc = od_relaypool_start(relay_pool, &pooler->system, instance->scheme.workers);
if (rc == -1)
return;
/* start signal handler coroutine */
int64_t coroutine_id;
@ -336,49 +361,22 @@ od_pooler(void *arg)
return;
}
/* start router coroutine */
int rc;
od_router_t *router;
router = pooler->system->router;
rc = od_router_start(router);
if (rc == -1)
return;
/* start console coroutine */
od_console_t *console;
console = pooler->system->console;
rc = od_console_start(console);
if (rc == -1)
return;
/* start periodic coroutine */
od_periodic_t *periodic;
periodic = pooler->system->periodic;
rc = od_periodic_start(periodic);
if (rc == -1)
return;
/* start worker threads */
od_relaypool_t *relay_pool = pooler->system->relay_pool;
rc = od_relaypool_start(relay_pool);
if (rc == -1)
return;
/* start pooler servers */
od_pooler_main(pooler);
}
int od_pooler_init(od_pooler_t *pooler, od_system_t *system)
int od_pooler_init(od_pooler_t *pooler, od_instance_t *instance)
{
pooler->machine = -1;
pooler->system = system;
pooler->addr = NULL;
pooler->machine = -1;
pooler->instance = instance;
pooler->addr = NULL;
memset(&pooler->system, 0, sizeof(pooler->system));
return 0;
}
int od_pooler_start(od_pooler_t *pooler)
{
od_instance_t *instance = pooler->system->instance;
od_instance_t *instance = pooler->system.instance;
pooler->machine = machine_create("pooler", od_pooler, pooler);
if (pooler->machine == -1) {
od_error(&instance->logger, "pooler", NULL, NULL,

View File

@ -22,10 +22,11 @@ struct od_pooler
{
int64_t machine;
struct addrinfo *addr;
od_system_t *system;
od_system_t system;
od_instance_t *instance;
};
int od_pooler_init(od_pooler_t*, od_system_t*);
int od_pooler_init(od_pooler_t*, od_instance_t*);
int od_pooler_start(od_pooler_t*);
#endif /* OD_INSTANCE_H */

View File

@ -45,26 +45,23 @@
#include "sources/relay_pool.h"
#include "sources/frontend.h"
int od_relaypool_init(od_relaypool_t *pool, od_system_t *system, int count)
void od_relaypool_init(od_relaypool_t *pool)
{
pool->count = 0;
pool->round_robin = 0;
pool->count = count;
pool->pool = NULL;
}
int od_relaypool_start(od_relaypool_t *pool, od_system_t *system, int count)
{
pool->pool = malloc(sizeof(od_relay_t) * count);
if (pool->pool == NULL)
return -1;
pool->count = count;
int i;
for (i = 0; i < count; i++) {
od_relay_t *relay = &pool->pool[i];
od_relay_init(relay, system, i);
}
return 0;
}
int od_relaypool_start(od_relaypool_t *pool)
{
int i;
for (i = 0; i < pool->count; i++) {
od_relay_t *relay = &pool->pool[i];
int rc;
rc = od_relay_start(relay);
if (rc == -1)

View File

@ -12,12 +12,12 @@ typedef struct od_relaypool od_relaypool_t;
struct od_relaypool
{
od_relay_t *pool;
int count;
int round_robin;
int round_robin;
int count;
};
int od_relaypool_init(od_relaypool_t*, od_system_t*, int);
int od_relaypool_start(od_relaypool_t*);
void od_relaypool_init(od_relaypool_t*);
int od_relaypool_start(od_relaypool_t*, od_system_t*, int);
void od_relaypool_feed(od_relaypool_t*, machine_msg_t*);
#endif /* OD_RELAY_POOL_H */

View File

@ -467,24 +467,23 @@ od_router(void *arg)
}
}
int od_router_init(od_router_t *router, od_system_t *system)
void od_router_init(od_router_t *router, od_system_t *system)
{
od_instance_t *instance = system->instance;
od_routepool_init(&router->route_pool);
router->system = system;
router->system = system;
router->clients = 0;
router->channel = NULL;
}
int od_router_start(od_router_t *router)
{
od_instance_t *instance = router->system->instance;
router->channel = machine_channel_create(1);
if (router->channel == NULL) {
od_error(&instance->logger, "router", NULL, NULL,
"failed to create router channel");
return -1;
}
return 0;
}
int od_router_start(od_router_t *router)
{
od_instance_t *instance = router->system->instance;
int64_t coroutine_id;
coroutine_id = machine_coroutine_create(od_router, router);
if (coroutine_id == -1) {

View File

@ -23,11 +23,11 @@ struct od_router
od_routepool_t route_pool;
machine_channel_t *channel;
int clients;
od_system_t *system;
od_system_t *system;
};
int od_router_init(od_router_t*, od_system_t*);
int od_router_start(od_router_t*);
void od_router_init(od_router_t*, od_system_t*);
int od_router_start(od_router_t*);
od_routerstatus_t
od_route(od_client_t*);