diff --git a/sources/CMakeLists.txt b/sources/CMakeLists.txt index 25c76336..ba9e385b 100644 --- a/sources/CMakeLists.txt +++ b/sources/CMakeLists.txt @@ -15,8 +15,8 @@ set(od_src pooler.c router.c console.c - relay.c - relay_pool.c + worker.c + worker_pool.c frontend.c backend.c reset.c diff --git a/sources/auth.c b/sources/auth.c index 28d7b35d..c374fc76 100644 --- a/sources/auth.c +++ b/sources/auth.c @@ -43,7 +43,7 @@ #include "sources/router_cancel.h" #include "sources/router.h" #include "sources/pooler.h" -#include "sources/relay.h" +#include "sources/worker.h" #include "sources/frontend.h" #include "sources/backend.h" #include "sources/auth.h" diff --git a/sources/auth_query.c b/sources/auth_query.c index d6b6c753..7d8bdd42 100644 --- a/sources/auth_query.c +++ b/sources/auth_query.c @@ -43,7 +43,7 @@ #include "sources/router_cancel.h" #include "sources/router.h" #include "sources/pooler.h" -#include "sources/relay.h" +#include "sources/worker.h" #include "sources/frontend.h" #include "sources/backend.h" #include "sources/reset.h" diff --git a/sources/backend.c b/sources/backend.c index e3ab346b..961ea6ac 100644 --- a/sources/backend.c +++ b/sources/backend.c @@ -43,7 +43,7 @@ #include "sources/router_cancel.h" #include "sources/router.h" #include "sources/pooler.h" -#include "sources/relay.h" +#include "sources/worker.h" #include "sources/frontend.h" #include "sources/backend.h" #include "sources/auth.h" diff --git a/sources/cancel.c b/sources/cancel.c index a1a2b5d7..8c9b9cac 100644 --- a/sources/cancel.c +++ b/sources/cancel.c @@ -43,7 +43,7 @@ #include "sources/router_cancel.h" #include "sources/router.h" #include "sources/pooler.h" -#include "sources/relay.h" +#include "sources/worker.h" #include "sources/frontend.h" #include "sources/backend.h" #include "sources/auth.h" diff --git a/sources/console.c b/sources/console.c index 7222330b..38f35176 100644 --- a/sources/console.c +++ b/sources/console.c @@ -44,7 +44,7 @@ #include "sources/router_cancel.h" #include "sources/router.h" #include "sources/pooler.h" -#include "sources/relay.h" +#include "sources/worker.h" #include "sources/frontend.h" #include "sources/backend.h" #include "sources/console.h" diff --git a/sources/deploy.c b/sources/deploy.c index aa99131a..c2739fe4 100644 --- a/sources/deploy.c +++ b/sources/deploy.c @@ -43,7 +43,7 @@ #include "sources/router_cancel.h" #include "sources/router.h" #include "sources/pooler.h" -#include "sources/relay.h" +#include "sources/worker.h" #include "sources/frontend.h" #include "sources/backend.h" #include "sources/deploy.h" diff --git a/sources/frontend.c b/sources/frontend.c index ff9aa151..857b8b37 100644 --- a/sources/frontend.c +++ b/sources/frontend.c @@ -43,7 +43,7 @@ #include "sources/router_cancel.h" #include "sources/router.h" #include "sources/pooler.h" -#include "sources/relay.h" +#include "sources/worker.h" #include "sources/frontend.h" #include "sources/backend.h" #include "sources/cancel.h" @@ -935,7 +935,7 @@ void od_frontend(void *arg) peer); } - /* attach client io to relay machine event loop */ + /* attach client io to worker machine event loop */ int rc; rc = machine_io_attach(client->io); if (rc == -1) { diff --git a/sources/instance.c b/sources/instance.c index 514f38fa..a99ba35f 100644 --- a/sources/instance.c +++ b/sources/instance.c @@ -45,8 +45,8 @@ #include "sources/console.h" #include "sources/pooler.h" #include "sources/periodic.h" -#include "sources/relay.h" -#include "sources/relay_pool.h" +#include "sources/worker.h" +#include "sources/worker_pool.h" #include "sources/frontend.h" void od_instance_init(od_instance_t *instance) @@ -199,21 +199,21 @@ int od_instance_main(od_instance_t *instance, int argc, char **argv) od_router_t router; od_console_t console; od_periodic_t periodic; - od_relaypool_t relay_pool; + od_workerpool_t worker_pool; od_system_t *system; system = &pooler.system; - system->instance = instance; - system->pooler = &pooler; - system->router = &router; - system->console = &console; - system->periodic = &periodic; - system->relay_pool = &relay_pool; + system->instance = instance; + system->pooler = &pooler; + system->router = &router; + system->console = &console; + system->periodic = &periodic; + system->worker_pool = &worker_pool; od_router_init(&router, system); od_console_init(&console, system); od_periodic_init(&periodic, system); - od_relaypool_init(&relay_pool); + od_workerpool_init(&worker_pool); /* start pooler machine thread */ rc = od_pooler_start(&pooler); diff --git a/sources/periodic.c b/sources/periodic.c index dce92b78..a555e450 100644 --- a/sources/periodic.c +++ b/sources/periodic.c @@ -43,7 +43,7 @@ #include "sources/router_cancel.h" #include "sources/router.h" #include "sources/pooler.h" -#include "sources/relay.h" +#include "sources/worker.h" #include "sources/frontend.h" #include "sources/backend.h" #include "sources/periodic.h" diff --git a/sources/pooler.c b/sources/pooler.c index 4872b5a1..3bfd1010 100644 --- a/sources/pooler.c +++ b/sources/pooler.c @@ -45,8 +45,8 @@ #include "sources/router_cancel.h" #include "sources/router.h" #include "sources/console.h" -#include "sources/relay.h" -#include "sources/relay_pool.h" +#include "sources/worker.h" +#include "sources/worker_pool.h" #include "sources/pooler.h" #include "sources/periodic.h" #include "sources/tls.h" @@ -56,7 +56,6 @@ od_pooler_server(void *arg) { od_poolerserver_t *server = arg; od_instance_t *instance = server->system->instance; - od_relaypool_t *relay_pool = server->system->relay_pool; for (;;) { @@ -109,7 +108,8 @@ od_pooler_server(void *arg) msg = machine_msg_create(OD_MCLIENT_NEW, sizeof(od_client_t*)); char *msg_data = machine_msg_get_data(msg); memcpy(msg_data, &client, sizeof(od_client_t*)); - od_relaypool_feed(relay_pool, msg); + od_workerpool_t *worker_pool = server->system->worker_pool; + od_workerpool_feed(worker_pool, msg); } } @@ -360,8 +360,8 @@ od_pooler(void *arg) return; /* start worker threads */ - od_relaypool_t *relay_pool = pooler->system.relay_pool; - rc = od_relaypool_start(relay_pool, &pooler->system, instance->scheme.workers); + od_workerpool_t *worker_pool = pooler->system.worker_pool; + rc = od_workerpool_start(worker_pool, &pooler->system, instance->scheme.workers); if (rc == -1) return; diff --git a/sources/relay.h b/sources/relay.h deleted file mode 100644 index 53c7126b..00000000 --- a/sources/relay.h +++ /dev/null @@ -1,23 +0,0 @@ -#ifndef OD_RELAY_H -#define OD_RELAY_H - -/* - * Odissey. - * - * Advanced PostgreSQL connection pooler. -*/ - -typedef struct od_relay od_relay_t; - -struct od_relay -{ - int64_t machine; - int id; - machine_channel_t *task_channel; - od_system_t *system; -}; - -void od_relay_init(od_relay_t*, od_system_t*, int); -int od_relay_start(od_relay_t*); - -#endif /* OD_RELAY_H */ diff --git a/sources/relay_pool.h b/sources/relay_pool.h deleted file mode 100644 index 9478e55b..00000000 --- a/sources/relay_pool.h +++ /dev/null @@ -1,23 +0,0 @@ -#ifndef OD_RELAY_POOL_H -#define OD_RELAY_POOL_H - -/* - * Odissey. - * - * Advanced PostgreSQL connection pooler. -*/ - -typedef struct od_relaypool od_relaypool_t; - -struct od_relaypool -{ - od_relay_t *pool; - int round_robin; - int count; -}; - -void od_relaypool_init(od_relaypool_t*); -int od_relaypool_start(od_relaypool_t*, od_system_t*, int); -void od_relaypool_feed(od_relaypool_t*, machine_msg_t*); - -#endif /* OD_RELAY_POOL_H */ diff --git a/sources/reset.c b/sources/reset.c index adbc8aa3..219b1415 100644 --- a/sources/reset.c +++ b/sources/reset.c @@ -43,7 +43,7 @@ #include "sources/router_cancel.h" #include "sources/router.h" #include "sources/pooler.h" -#include "sources/relay.h" +#include "sources/worker.h" #include "sources/frontend.h" #include "sources/backend.h" #include "sources/reset.h" diff --git a/sources/router.c b/sources/router.c index e4604e3a..bbb03487 100644 --- a/sources/router.c +++ b/sources/router.c @@ -43,7 +43,7 @@ #include "sources/router_cancel.h" #include "sources/router.h" #include "sources/pooler.h" -#include "sources/relay.h" +#include "sources/worker.h" #include "sources/frontend.h" #include "sources/backend.h" #include "sources/cancel.h" diff --git a/sources/system.h b/sources/system.h index f0bcef4f..fd5dd3e3 100644 --- a/sources/system.h +++ b/sources/system.h @@ -16,7 +16,7 @@ struct od_system void *router; void *console; void *periodic; - void *relay_pool; + void *worker_pool; }; #endif /* OD_SYSTEM_H */ diff --git a/sources/tls.c b/sources/tls.c index 699c8692..e35a3aed 100644 --- a/sources/tls.c +++ b/sources/tls.c @@ -43,7 +43,7 @@ #include "sources/router_cancel.h" #include "sources/router.h" #include "sources/pooler.h" -#include "sources/relay.h" +#include "sources/worker.h" #include "sources/tls.h" #include "sources/frontend.h" diff --git a/sources/relay.c b/sources/worker.c similarity index 60% rename from sources/relay.c rename to sources/worker.c index e4a59ac4..c62618fe 100644 --- a/sources/relay.c +++ b/sources/worker.c @@ -42,19 +42,19 @@ #include "sources/router_cancel.h" #include "sources/router.h" #include "sources/pooler.h" -#include "sources/relay.h" +#include "sources/worker.h" #include "sources/frontend.h" static inline void -od_relay(void *arg) +od_worker(void *arg) { - od_relay_t *relay = arg; - od_instance_t *instance = relay->system->instance; + od_worker_t *worker = arg; + od_instance_t *instance = worker->system->instance; for (;;) { machine_msg_t *msg; - msg = machine_channel_read(relay->task_channel, UINT32_MAX); + msg = machine_channel_read(worker->task_channel, UINT32_MAX); if (msg == NULL) break; @@ -65,11 +65,11 @@ od_relay(void *arg) { od_client_t *client; client = *(od_client_t**)machine_msg_get_data(msg); - client->system = relay->system; + client->system = worker->system; int64_t coroutine_id; coroutine_id = machine_coroutine_create(od_frontend, client); if (coroutine_id == -1) { - od_error(&instance->logger, "relay", client, NULL, + od_error(&instance->logger, "worker", client, NULL, "failed to create coroutine"); machine_close(client->io); od_client_free(client); @@ -86,43 +86,43 @@ od_relay(void *arg) machine_msg_free(msg); } - od_log(&instance->logger, "relay", NULL, NULL, "stopped"); + od_log(&instance->logger, "worker", NULL, NULL, "stopped"); } -void od_relay_init(od_relay_t *relay, od_system_t *system, int id) +void od_worker_init(od_worker_t *worker, od_system_t *system, int id) { - relay->machine = -1; - relay->id = id; - relay->system = system; + worker->machine = -1; + worker->id = id; + worker->system = system; } -int od_relay_start(od_relay_t *relay) +int od_worker_start(od_worker_t *worker) { - od_instance_t *instance = relay->system->instance; + od_instance_t *instance = worker->system->instance; - relay->task_channel = machine_channel_create(instance->is_shared); - if (relay->task_channel == NULL) { - od_error(&instance->logger, "relay", NULL, NULL, + worker->task_channel = machine_channel_create(instance->is_shared); + if (worker->task_channel == NULL) { + od_error(&instance->logger, "worker", NULL, NULL, "failed to create task channel"); return -1; } if (instance->is_shared) { char name[32]; - od_snprintf(name, sizeof(name), "relay: %d", relay->id); - relay->machine = machine_create(name, od_relay, relay); - if (relay->machine == -1) { - machine_channel_free(relay->task_channel); - od_error(&instance->logger, "relay", NULL, NULL, - "failed to start relay"); + od_snprintf(name, sizeof(name), "worker: %d", worker->id); + worker->machine = machine_create(name, od_worker, worker); + if (worker->machine == -1) { + machine_channel_free(worker->task_channel); + od_error(&instance->logger, "worker", NULL, NULL, + "failed to start worker"); return -1; } } else { int64_t coroutine_id; - coroutine_id = machine_coroutine_create(od_relay, relay); + coroutine_id = machine_coroutine_create(od_worker, worker); if (coroutine_id == -1) { - od_error(&instance->logger, "relay", NULL, NULL, - "failed to create relay coroutine"); - machine_channel_free(relay->task_channel); + od_error(&instance->logger, "worker", NULL, NULL, + "failed to create worker coroutine"); + machine_channel_free(worker->task_channel); return -1; } } diff --git a/sources/worker.h b/sources/worker.h new file mode 100644 index 00000000..15243546 --- /dev/null +++ b/sources/worker.h @@ -0,0 +1,23 @@ +#ifndef OD_WORKER_H +#define OD_WORKER_H + +/* + * Odissey. + * + * Advanced PostgreSQL connection pooler. +*/ + +typedef struct od_worker od_worker_t; + +struct od_worker +{ + int64_t machine; + int id; + machine_channel_t *task_channel; + od_system_t *system; +}; + +void od_worker_init(od_worker_t*, od_system_t*, int); +int od_worker_start(od_worker_t*); + +#endif /* OD_WORKER_H */ diff --git a/sources/relay_pool.c b/sources/worker_pool.c similarity index 73% rename from sources/relay_pool.c rename to sources/worker_pool.c index a7d85371..5b9eca96 100644 --- a/sources/relay_pool.c +++ b/sources/worker_pool.c @@ -42,36 +42,36 @@ #include "sources/router_cancel.h" #include "sources/router.h" #include "sources/pooler.h" -#include "sources/relay.h" -#include "sources/relay_pool.h" +#include "sources/worker.h" +#include "sources/worker_pool.h" #include "sources/frontend.h" -void od_relaypool_init(od_relaypool_t *pool) +void od_workerpool_init(od_workerpool_t *pool) { pool->count = 0; pool->round_robin = 0; pool->pool = NULL; } -int od_relaypool_start(od_relaypool_t *pool, od_system_t *system, int count) +int od_workerpool_start(od_workerpool_t *pool, od_system_t *system, int count) { - pool->pool = malloc(sizeof(od_relay_t) * count); + pool->pool = malloc(sizeof(od_worker_t) * count); if (pool->pool == NULL) return -1; pool->count = count; int i; for (i = 0; i < count; i++) { - od_relay_t *relay = &pool->pool[i]; - od_relay_init(relay, system, i); + od_worker_t *worker = &pool->pool[i]; + od_worker_init(worker, system, i); int rc; - rc = od_relay_start(relay); + rc = od_worker_start(worker); if (rc == -1) return -1; } return 0; } -void od_relaypool_feed(od_relaypool_t *pool, machine_msg_t *msg) +void od_workerpool_feed(od_workerpool_t *pool, machine_msg_t *msg) { int next = pool->round_robin; if (pool->round_robin >= pool->count) { @@ -80,7 +80,7 @@ void od_relaypool_feed(od_relaypool_t *pool, machine_msg_t *msg) } pool->round_robin++; - od_relay_t *relay; - relay = &pool->pool[next]; - machine_channel_write(relay->task_channel, msg); + od_worker_t *worker; + worker = &pool->pool[next]; + machine_channel_write(worker->task_channel, msg); } diff --git a/sources/worker_pool.h b/sources/worker_pool.h new file mode 100644 index 00000000..50a33aaa --- /dev/null +++ b/sources/worker_pool.h @@ -0,0 +1,23 @@ +#ifndef OD_WORKER_POOL_H +#define OD_WORKER_POOL_H + +/* + * Odissey. + * + * Advanced PostgreSQL connection pooler. +*/ + +typedef struct od_workerpool od_workerpool_t; + +struct od_workerpool +{ + od_worker_t *pool; + int round_robin; + int count; +}; + +void od_workerpool_init(od_workerpool_t*); +int od_workerpool_start(od_workerpool_t*, od_system_t*, int); +void od_workerpool_feed(od_workerpool_t*, machine_msg_t*); + +#endif /* OD_WORKER_POOL_H */