odissey: make periodic coroutine a part of pooler

This commit is contained in:
Dmitry Simonenko 2017-06-05 17:05:39 +03:00
parent 1415ecc03f
commit b50197408c
6 changed files with 74 additions and 31 deletions

View File

@ -41,6 +41,7 @@
#include "od_router.h"
#include "od_pooler.h"
#include "od_periodic.h"
#include "od_relay.h"
#include "od_relay_pool.h"
#include "od_frontend.h"
@ -142,13 +143,14 @@ int od_instance_main(od_instance_t *instance, int argc, char **argv)
od_pid_create(&instance->pid, instance->scheme.pid_file);
/* run system services */
od_pooler_t pooler;
od_router_t router;
od_periodic_t periodic;
od_pooler_t pooler;
od_relaypool_t relay_pool;
od_system_t system = {
.pooler = &pooler,
.router = &router,
.periodic = &periodic,
.relay_pool = &relay_pool,
.instance = instance
};
@ -157,17 +159,18 @@ int od_instance_main(od_instance_t *instance, int argc, char **argv)
od_error(&instance->log, "failed to create task queue");
return 1;
}
od_router_init(&router, &system);
od_periodic_init(&periodic, &system);
od_pooler_init(&pooler, &system);
rc = od_pooler_start(&pooler);
if (rc == -1)
return 1;
rc = od_relaypool_init(&relay_pool, &system, instance->scheme.workers);
if (rc == -1)
return 1;
/* start pooler machine thread */
rc = od_pooler_start(&pooler);
if (rc == -1)
return 1;
/* start workers */
rc = od_relaypool_start(&relay_pool);
if (rc == -1)
return 1;

View File

@ -99,10 +99,12 @@ od_expire_mark(od_server_t *server, void *arg)
return 0;
}
void od_periodic(void *arg)
static void
od_periodic(void *arg)
{
od_router_t *router = arg;
od_instance_t *instance = router->system->instance;
od_periodic_t *periodic = arg;
od_router_t *router = periodic->system->router;
od_instance_t *instance = periodic->system->instance;
int tick = 0;
for (;;)
@ -177,3 +179,21 @@ void od_periodic(void *arg)
machine_sleep(1000);
}
}
int od_periodic_init(od_periodic_t *periodic, od_system_t *system)
{
periodic->system = system;
return 0;
}
int od_periodic_start(od_periodic_t *periodic)
{
od_instance_t *instance = periodic->system->instance;
int64_t coroutine_id;
coroutine_id = machine_coroutine_create(od_periodic, periodic);
if (coroutine_id == -1) {
od_error(&instance->log, "failed to start router");
return 1;
}
return 0;
}

View File

@ -7,6 +7,13 @@
* PostgreSQL connection pooler and request router.
*/
void od_periodic(void*);
typedef struct od_periodic_t od_periodic_t;
struct od_periodic_t {
od_system_t *system;
};
int od_periodic_init(od_periodic_t*, od_system_t*);
int od_periodic_start(od_periodic_t*);
#endif /* OD_PERIODIC_H */

View File

@ -38,23 +38,14 @@
#include "od_route_pool.h"
#include "od_router.h"
#include "od_pooler.h"
#include "od_periodic.h"
#include "od_tls.h"
static inline void
od_pooler(void *arg)
od_pooler_main(od_pooler_t *pooler)
{
od_pooler_t *pooler = arg;
od_router_t *router = pooler->system->router;
od_instance_t *instance = pooler->system->instance;
od_log(&instance->log, "pooler: started");
/* start router coroutine */
int rc;
rc = od_router_start(router);
if (rc == -1)
return;
/* init pooler tls */
pooler->tls = NULL;
od_scheme_t *scheme = &instance->scheme;
@ -85,6 +76,7 @@ od_pooler(void *arg)
char port[16];
snprintf(port, sizeof(port), "%d", instance->scheme.port);
struct addrinfo *ai = NULL;
int rc;
rc = machine_getaddrinfo(host, port, hints_ptr, &ai, UINT32_MAX);
if (rc < 0) {
od_error(&instance->log, "failed to resolve %s:%d",
@ -170,6 +162,33 @@ od_pooler(void *arg)
}
}
static inline void
od_pooler(void *arg)
{
od_pooler_t *pooler = arg;
od_instance_t *instance = pooler->system->instance;
od_log(&instance->log, "pooler: started");
/* start router coroutine */
int rc;
od_router_t *router;
router = pooler->system->router;
rc = od_router_start(router);
if (rc == -1)
return;
/* start periodic coroutine */
od_periodic_t *periodic;
periodic = pooler->system->periodic;
rc = od_periodic_start(periodic);
if (rc == -1)
return;
/* start pooler server */
od_pooler_main(pooler);
}
int od_pooler_init(od_pooler_t *pooler, od_system_t *system)
{
pooler->machine = -1;

View File

@ -160,14 +160,6 @@ od_router(void *arg)
od_log(&instance->log, "router: started");
/* start periodic task coroutine */
int64_t coroutine_id;
coroutine_id = machine_coroutine_create(od_periodic, router);
if (coroutine_id == -1) {
od_error(&instance->log, "failed to create periodic coroutine");
return;
}
for (;;)
{
machine_msg_t msg;
@ -240,6 +232,7 @@ od_router(void *arg)
od_msgrouter_t *msg_attach;
msg_attach = machine_msg_get_data(msg);
int64_t coroutine_id;
coroutine_id = machine_coroutine_create(od_router_attacher, msg);
if (coroutine_id == -1) {
msg_attach->status = OD_RERROR;

View File

@ -14,6 +14,7 @@ struct od_system
void *instance;
void *pooler;
void *router;
void *periodic;
void *relay_pool;
machine_queue_t task_queue;
};