odissey: relay and relay_pool is worker and worker_pool now

This commit is contained in:
Dmitry Simonenko 2018-03-02 13:00:52 +03:00
parent c90932b3d5
commit d48cd092a0
21 changed files with 116 additions and 116 deletions

View File

@ -15,8 +15,8 @@ set(od_src
pooler.c
router.c
console.c
relay.c
relay_pool.c
worker.c
worker_pool.c
frontend.c
backend.c
reset.c

View File

@ -43,7 +43,7 @@
#include "sources/router_cancel.h"
#include "sources/router.h"
#include "sources/pooler.h"
#include "sources/relay.h"
#include "sources/worker.h"
#include "sources/frontend.h"
#include "sources/backend.h"
#include "sources/auth.h"

View File

@ -43,7 +43,7 @@
#include "sources/router_cancel.h"
#include "sources/router.h"
#include "sources/pooler.h"
#include "sources/relay.h"
#include "sources/worker.h"
#include "sources/frontend.h"
#include "sources/backend.h"
#include "sources/reset.h"

View File

@ -43,7 +43,7 @@
#include "sources/router_cancel.h"
#include "sources/router.h"
#include "sources/pooler.h"
#include "sources/relay.h"
#include "sources/worker.h"
#include "sources/frontend.h"
#include "sources/backend.h"
#include "sources/auth.h"

View File

@ -43,7 +43,7 @@
#include "sources/router_cancel.h"
#include "sources/router.h"
#include "sources/pooler.h"
#include "sources/relay.h"
#include "sources/worker.h"
#include "sources/frontend.h"
#include "sources/backend.h"
#include "sources/auth.h"

View File

@ -44,7 +44,7 @@
#include "sources/router_cancel.h"
#include "sources/router.h"
#include "sources/pooler.h"
#include "sources/relay.h"
#include "sources/worker.h"
#include "sources/frontend.h"
#include "sources/backend.h"
#include "sources/console.h"

View File

@ -43,7 +43,7 @@
#include "sources/router_cancel.h"
#include "sources/router.h"
#include "sources/pooler.h"
#include "sources/relay.h"
#include "sources/worker.h"
#include "sources/frontend.h"
#include "sources/backend.h"
#include "sources/deploy.h"

View File

@ -43,7 +43,7 @@
#include "sources/router_cancel.h"
#include "sources/router.h"
#include "sources/pooler.h"
#include "sources/relay.h"
#include "sources/worker.h"
#include "sources/frontend.h"
#include "sources/backend.h"
#include "sources/cancel.h"
@ -935,7 +935,7 @@ void od_frontend(void *arg)
peer);
}
/* attach client io to relay machine event loop */
/* attach client io to worker machine event loop */
int rc;
rc = machine_io_attach(client->io);
if (rc == -1) {

View File

@ -45,8 +45,8 @@
#include "sources/console.h"
#include "sources/pooler.h"
#include "sources/periodic.h"
#include "sources/relay.h"
#include "sources/relay_pool.h"
#include "sources/worker.h"
#include "sources/worker_pool.h"
#include "sources/frontend.h"
void od_instance_init(od_instance_t *instance)
@ -199,21 +199,21 @@ int od_instance_main(od_instance_t *instance, int argc, char **argv)
od_router_t router;
od_console_t console;
od_periodic_t periodic;
od_relaypool_t relay_pool;
od_workerpool_t worker_pool;
od_system_t *system;
system = &pooler.system;
system->instance = instance;
system->pooler = &pooler;
system->router = &router;
system->console = &console;
system->periodic = &periodic;
system->relay_pool = &relay_pool;
system->instance = instance;
system->pooler = &pooler;
system->router = &router;
system->console = &console;
system->periodic = &periodic;
system->worker_pool = &worker_pool;
od_router_init(&router, system);
od_console_init(&console, system);
od_periodic_init(&periodic, system);
od_relaypool_init(&relay_pool);
od_workerpool_init(&worker_pool);
/* start pooler machine thread */
rc = od_pooler_start(&pooler);

View File

@ -43,7 +43,7 @@
#include "sources/router_cancel.h"
#include "sources/router.h"
#include "sources/pooler.h"
#include "sources/relay.h"
#include "sources/worker.h"
#include "sources/frontend.h"
#include "sources/backend.h"
#include "sources/periodic.h"

View File

@ -45,8 +45,8 @@
#include "sources/router_cancel.h"
#include "sources/router.h"
#include "sources/console.h"
#include "sources/relay.h"
#include "sources/relay_pool.h"
#include "sources/worker.h"
#include "sources/worker_pool.h"
#include "sources/pooler.h"
#include "sources/periodic.h"
#include "sources/tls.h"
@ -56,7 +56,6 @@ od_pooler_server(void *arg)
{
od_poolerserver_t *server = arg;
od_instance_t *instance = server->system->instance;
od_relaypool_t *relay_pool = server->system->relay_pool;
for (;;)
{
@ -109,7 +108,8 @@ od_pooler_server(void *arg)
msg = machine_msg_create(OD_MCLIENT_NEW, sizeof(od_client_t*));
char *msg_data = machine_msg_get_data(msg);
memcpy(msg_data, &client, sizeof(od_client_t*));
od_relaypool_feed(relay_pool, msg);
od_workerpool_t *worker_pool = server->system->worker_pool;
od_workerpool_feed(worker_pool, msg);
}
}
@ -360,8 +360,8 @@ od_pooler(void *arg)
return;
/* start worker threads */
od_relaypool_t *relay_pool = pooler->system.relay_pool;
rc = od_relaypool_start(relay_pool, &pooler->system, instance->scheme.workers);
od_workerpool_t *worker_pool = pooler->system.worker_pool;
rc = od_workerpool_start(worker_pool, &pooler->system, instance->scheme.workers);
if (rc == -1)
return;

View File

@ -1,23 +0,0 @@
#ifndef OD_RELAY_H
#define OD_RELAY_H
/*
* Odissey.
*
* Advanced PostgreSQL connection pooler.
*/
typedef struct od_relay od_relay_t;
struct od_relay
{
int64_t machine;
int id;
machine_channel_t *task_channel;
od_system_t *system;
};
void od_relay_init(od_relay_t*, od_system_t*, int);
int od_relay_start(od_relay_t*);
#endif /* OD_RELAY_H */

View File

@ -1,23 +0,0 @@
#ifndef OD_RELAY_POOL_H
#define OD_RELAY_POOL_H
/*
* Odissey.
*
* Advanced PostgreSQL connection pooler.
*/
typedef struct od_relaypool od_relaypool_t;
struct od_relaypool
{
od_relay_t *pool;
int round_robin;
int count;
};
void od_relaypool_init(od_relaypool_t*);
int od_relaypool_start(od_relaypool_t*, od_system_t*, int);
void od_relaypool_feed(od_relaypool_t*, machine_msg_t*);
#endif /* OD_RELAY_POOL_H */

View File

@ -43,7 +43,7 @@
#include "sources/router_cancel.h"
#include "sources/router.h"
#include "sources/pooler.h"
#include "sources/relay.h"
#include "sources/worker.h"
#include "sources/frontend.h"
#include "sources/backend.h"
#include "sources/reset.h"

View File

@ -43,7 +43,7 @@
#include "sources/router_cancel.h"
#include "sources/router.h"
#include "sources/pooler.h"
#include "sources/relay.h"
#include "sources/worker.h"
#include "sources/frontend.h"
#include "sources/backend.h"
#include "sources/cancel.h"

View File

@ -16,7 +16,7 @@ struct od_system
void *router;
void *console;
void *periodic;
void *relay_pool;
void *worker_pool;
};
#endif /* OD_SYSTEM_H */

View File

@ -43,7 +43,7 @@
#include "sources/router_cancel.h"
#include "sources/router.h"
#include "sources/pooler.h"
#include "sources/relay.h"
#include "sources/worker.h"
#include "sources/tls.h"
#include "sources/frontend.h"

View File

@ -42,19 +42,19 @@
#include "sources/router_cancel.h"
#include "sources/router.h"
#include "sources/pooler.h"
#include "sources/relay.h"
#include "sources/worker.h"
#include "sources/frontend.h"
static inline void
od_relay(void *arg)
od_worker(void *arg)
{
od_relay_t *relay = arg;
od_instance_t *instance = relay->system->instance;
od_worker_t *worker = arg;
od_instance_t *instance = worker->system->instance;
for (;;)
{
machine_msg_t *msg;
msg = machine_channel_read(relay->task_channel, UINT32_MAX);
msg = machine_channel_read(worker->task_channel, UINT32_MAX);
if (msg == NULL)
break;
@ -65,11 +65,11 @@ od_relay(void *arg)
{
od_client_t *client;
client = *(od_client_t**)machine_msg_get_data(msg);
client->system = relay->system;
client->system = worker->system;
int64_t coroutine_id;
coroutine_id = machine_coroutine_create(od_frontend, client);
if (coroutine_id == -1) {
od_error(&instance->logger, "relay", client, NULL,
od_error(&instance->logger, "worker", client, NULL,
"failed to create coroutine");
machine_close(client->io);
od_client_free(client);
@ -86,43 +86,43 @@ od_relay(void *arg)
machine_msg_free(msg);
}
od_log(&instance->logger, "relay", NULL, NULL, "stopped");
od_log(&instance->logger, "worker", NULL, NULL, "stopped");
}
void od_relay_init(od_relay_t *relay, od_system_t *system, int id)
void od_worker_init(od_worker_t *worker, od_system_t *system, int id)
{
relay->machine = -1;
relay->id = id;
relay->system = system;
worker->machine = -1;
worker->id = id;
worker->system = system;
}
int od_relay_start(od_relay_t *relay)
int od_worker_start(od_worker_t *worker)
{
od_instance_t *instance = relay->system->instance;
od_instance_t *instance = worker->system->instance;
relay->task_channel = machine_channel_create(instance->is_shared);
if (relay->task_channel == NULL) {
od_error(&instance->logger, "relay", NULL, NULL,
worker->task_channel = machine_channel_create(instance->is_shared);
if (worker->task_channel == NULL) {
od_error(&instance->logger, "worker", NULL, NULL,
"failed to create task channel");
return -1;
}
if (instance->is_shared) {
char name[32];
od_snprintf(name, sizeof(name), "relay: %d", relay->id);
relay->machine = machine_create(name, od_relay, relay);
if (relay->machine == -1) {
machine_channel_free(relay->task_channel);
od_error(&instance->logger, "relay", NULL, NULL,
"failed to start relay");
od_snprintf(name, sizeof(name), "worker: %d", worker->id);
worker->machine = machine_create(name, od_worker, worker);
if (worker->machine == -1) {
machine_channel_free(worker->task_channel);
od_error(&instance->logger, "worker", NULL, NULL,
"failed to start worker");
return -1;
}
} else {
int64_t coroutine_id;
coroutine_id = machine_coroutine_create(od_relay, relay);
coroutine_id = machine_coroutine_create(od_worker, worker);
if (coroutine_id == -1) {
od_error(&instance->logger, "relay", NULL, NULL,
"failed to create relay coroutine");
machine_channel_free(relay->task_channel);
od_error(&instance->logger, "worker", NULL, NULL,
"failed to create worker coroutine");
machine_channel_free(worker->task_channel);
return -1;
}
}

23
sources/worker.h Normal file
View File

@ -0,0 +1,23 @@
#ifndef OD_WORKER_H
#define OD_WORKER_H
/*
* Odissey.
*
* Advanced PostgreSQL connection pooler.
*/
typedef struct od_worker od_worker_t;
struct od_worker
{
int64_t machine;
int id;
machine_channel_t *task_channel;
od_system_t *system;
};
void od_worker_init(od_worker_t*, od_system_t*, int);
int od_worker_start(od_worker_t*);
#endif /* OD_WORKER_H */

View File

@ -42,36 +42,36 @@
#include "sources/router_cancel.h"
#include "sources/router.h"
#include "sources/pooler.h"
#include "sources/relay.h"
#include "sources/relay_pool.h"
#include "sources/worker.h"
#include "sources/worker_pool.h"
#include "sources/frontend.h"
void od_relaypool_init(od_relaypool_t *pool)
void od_workerpool_init(od_workerpool_t *pool)
{
pool->count = 0;
pool->round_robin = 0;
pool->pool = NULL;
}
int od_relaypool_start(od_relaypool_t *pool, od_system_t *system, int count)
int od_workerpool_start(od_workerpool_t *pool, od_system_t *system, int count)
{
pool->pool = malloc(sizeof(od_relay_t) * count);
pool->pool = malloc(sizeof(od_worker_t) * count);
if (pool->pool == NULL)
return -1;
pool->count = count;
int i;
for (i = 0; i < count; i++) {
od_relay_t *relay = &pool->pool[i];
od_relay_init(relay, system, i);
od_worker_t *worker = &pool->pool[i];
od_worker_init(worker, system, i);
int rc;
rc = od_relay_start(relay);
rc = od_worker_start(worker);
if (rc == -1)
return -1;
}
return 0;
}
void od_relaypool_feed(od_relaypool_t *pool, machine_msg_t *msg)
void od_workerpool_feed(od_workerpool_t *pool, machine_msg_t *msg)
{
int next = pool->round_robin;
if (pool->round_robin >= pool->count) {
@ -80,7 +80,7 @@ void od_relaypool_feed(od_relaypool_t *pool, machine_msg_t *msg)
}
pool->round_robin++;
od_relay_t *relay;
relay = &pool->pool[next];
machine_channel_write(relay->task_channel, msg);
od_worker_t *worker;
worker = &pool->pool[next];
machine_channel_write(worker->task_channel, msg);
}

23
sources/worker_pool.h Normal file
View File

@ -0,0 +1,23 @@
#ifndef OD_WORKER_POOL_H
#define OD_WORKER_POOL_H
/*
* Odissey.
*
* Advanced PostgreSQL connection pooler.
*/
typedef struct od_workerpool od_workerpool_t;
struct od_workerpool
{
od_worker_t *pool;
int round_robin;
int count;
};
void od_workerpool_init(od_workerpool_t*);
int od_workerpool_start(od_workerpool_t*, od_system_t*, int);
void od_workerpool_feed(od_workerpool_t*, machine_msg_t*);
#endif /* OD_WORKER_POOL_H */