diff --git a/src/od_instance.c b/src/od_instance.c index 101f6611..29fbae24 100644 --- a/src/od_instance.c +++ b/src/od_instance.c @@ -26,19 +26,7 @@ #include "od_lex.h" #include "od_config.h" #include "od_instance.h" - -/* -#include "od_stat.h" -#include "od_server.h" -#include "od_server_pool.h" -#include "od_client.h" -#include "od_client_list.h" -#include "od_client_pool.h" -#include "od_route_id.h" -#include "od_route.h" -#include "od_route_pool.h" #include "od_pooler.h" -*/ void od_instance_init(od_instance_t *instance) { @@ -135,17 +123,11 @@ int od_instance_main(od_instance_t *instance, int argc, char **argv) /* create pid file */ if (instance->scheme.pid_file) od_pid_create(&instance->pid, instance->scheme.pid_file); - /* run connection pooler */ - /* od_pooler_t pooler; - rc = od_pooler_init(&pooler, od); - if (rc == -1) - return 1; + od_pooler_init(&pooler, instance); rc = od_pooler_start(&pooler); if (rc == -1) return 1; - */ - return 0; } diff --git a/src/od_instance.h b/src/od_instance.h index cce20573..5d54b031 100644 --- a/src/od_instance.h +++ b/src/od_instance.h @@ -1,5 +1,5 @@ -#ifndef OD_INTANCE_H_ -#define OD_INTANCE_H_ +#ifndef OD_INSTANCE_H_ +#define OD_INSTANCE_H_ /* * odissey. @@ -7,9 +7,9 @@ * PostgreSQL connection pooler and request router. */ -typedef struct od_intance od_instance_t; +typedef struct od_instance od_instance_t; -struct od_intance +struct od_instance { od_pid_t pid; od_syslog_t syslog; @@ -22,4 +22,4 @@ void od_instance_init(od_instance_t*); void od_instance_free(od_instance_t*); int od_instance_main(od_instance_t*, int, char**); -#endif /* OD_H */ +#endif /* OD_INSTANCE_H */ diff --git a/src/od_io.c b/src/od_io.c new file mode 100644 index 00000000..cf446066 --- /dev/null +++ b/src/od_io.c @@ -0,0 +1,88 @@ + +/* + * odissey. + * + * PostgreSQL connection pooler and request router. +*/ + +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +#include "od_macro.h" +#include "od_pid.h" +#include "od_syslog.h" +#include "od_log.h" +#include "od_io.h" + +int od_read(machine_io_t io, so_stream_t *stream, int time_ms) +{ + uint32_t request_start = so_stream_used(stream); + uint32_t request_size = 0; + for (;;) + { + uint8_t *request_data = stream->s + request_start; + uint32_t len; + int to_read; + to_read = so_read(&len, &request_data, &request_size); + if (to_read == 0) + break; + if (to_read == -1) + return -1; + int rc = so_stream_ensure(stream, to_read); + if (rc == -1) + return -1; + rc = machine_read(io, (char*)stream->p, to_read, time_ms); + if (rc < 0) + return -1; + so_stream_advance(stream, to_read); + request_size += to_read; + } + return request_start; +} + +int od_write(machine_io_t io, so_stream_t *stream) +{ + int rc; + rc = machine_write(io, (char*)stream->s, + so_stream_used(stream), + UINT32_MAX); + if (rc < 0) + return -1; + return 0; +} + +int od_getpeername(machine_io_t io, char *buf, int size) +{ + char addr[128]; + struct sockaddr_storage sa; + int salen = sizeof(sa); + int rc = machine_getpeername(io, (struct sockaddr*)&sa, &salen); + if (rc < 0) + goto error; + + if (sa.ss_family == AF_INET) { + struct sockaddr_in *sin = (struct sockaddr_in*)&sa; + inet_ntop(sa.ss_family, &sin->sin_addr, addr, sizeof(addr)); + snprintf(buf, size, "%s:%d", addr, ntohs(sin->sin_port)); + return 0; + } + if (sa.ss_family == AF_INET6) { + struct sockaddr_in6 *sin = (struct sockaddr_in6*)&sa; + inet_ntop(sa.ss_family, &sin->sin6_addr, addr, sizeof(addr)); + snprintf(buf, size, "[%s]:%d", addr, ntohs(sin->sin6_port)); + return 0; + } + +error: + snprintf(buf, size, "unknown"); + return -1; +} diff --git a/src/od_io.h b/src/od_io.h new file mode 100644 index 00000000..ed3c49f3 --- /dev/null +++ b/src/od_io.h @@ -0,0 +1,14 @@ +#ifndef OD_IO_H +#define OD_IO_H + +/* + * odissey. + * + * PostgreSQL connection pooler and request router. +*/ + +int od_read(machine_io_t, so_stream_t*, int); +int od_write(machine_io_t, so_stream_t*); +int od_getpeername(machine_io_t, char*, int); + +#endif /* OD_IO_H */ diff --git a/src/od_log.c b/src/od_log.c index 4e934612..7025e909 100644 --- a/src/od_log.c +++ b/src/od_log.c @@ -25,7 +25,7 @@ #include "od_pid.h" #include "od_syslog.h" #include "od_log.h" -/*#include "od_io.h"*/ +#include "od_io.h" int od_log_init(od_log_t *l, od_pid_t *pid, od_syslog_t *syslog) { @@ -73,14 +73,11 @@ int od_logv(od_log_t *l, od_syslogprio_t prio, if (ident) len += snprintf(buffer + len, sizeof(buffer) - len, "%s ", ident); /* peer */ - // XXX - (void)peer; - /* if (peer) { - char *peer_name = od_getpeername(peer); + char peer_name[128]; + od_getpeername(peer, peer_name, sizeof(peer_name)); len += snprintf(buffer + len, sizeof(buffer) - len, "%s ", peer_name); } - */ /* message */ len += vsnprintf(buffer + len, sizeof(buffer) - len, fmt, args); va_end(args); diff --git a/src/od_msg.h b/src/od_msg.h new file mode 100644 index 00000000..a3c38478 --- /dev/null +++ b/src/od_msg.h @@ -0,0 +1,8 @@ +#ifndef OD_MSG_H +#define OD_MSG_H + +typedef enum { + OD_MCLIENT_NEW +} od_msg_t; + +#endif /* OD_MSG_H */ diff --git a/src/od_pooler.c b/src/od_pooler.c new file mode 100644 index 00000000..55b37f0c --- /dev/null +++ b/src/od_pooler.c @@ -0,0 +1,178 @@ + +/* + * odissey. + * + * PostgreSQL connection pooler and request router. +*/ + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "od_macro.h" +#include "od_version.h" +#include "od_list.h" +#include "od_pid.h" +#include "od_syslog.h" +#include "od_log.h" +#include "od_daemon.h" +#include "od_scheme.h" +#include "od_lex.h" +#include "od_config.h" +#include "od_msg.h" +#include "od_instance.h" +#include "od_pooler.h" + +#include "od_server.h" +#include "od_server_pool.h" +#include "od_client.h" +#include "od_client_pool.h" +#include "od_route_id.h" +#include "od_route.h" + +static inline void +od_pooler(void *arg) +{ + od_pooler_t *pooler = arg; + od_instance_t *instance = pooler->instance; + + /* create task queue */ + pooler->task_queue = machine_queue_create(); + if (pooler->task_queue == NULL) { + od_error(&instance->log, NULL, "failed to create task queue"); + return; + } + + /* init pooler tls */ + int rc; +#if 0 + pooler->tls = NULL; + od_scheme_t *scheme = &pooler->instance->scheme; + if (scheme->tls_verify != OD_TDISABLE) { + pooler->tls = od_tlsfe(pooler->env, scheme); + if (pooler->tls == NULL) + return; + } +#endif + + /* listen '*' */ + struct addrinfo *hints_ptr = NULL; + struct addrinfo hints; + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + hints.ai_protocol = 0; + hints.ai_canonname = NULL; + hints.ai_addr = NULL; + hints.ai_next = NULL; + char *host = instance->scheme.host; + if (strcmp(instance->scheme.host, "*") == 0) { + hints_ptr = &hints; + host = NULL; + } + + /* resolve listen address and port */ + char port[16]; + snprintf(port, sizeof(port), "%d", instance->scheme.port); + struct addrinfo *ai = NULL; + rc = machine_getaddrinfo(host, port, hints_ptr, &ai, UINT32_MAX); + if (rc < 0) { + od_error(&instance->log, NULL, "failed to resolve %s:%d", + instance->scheme.host, + instance->scheme.port); + return; + } + assert(ai != NULL); + + /* io */ + pooler->server = machine_io_create(); + if (pooler->server == NULL) { + od_error(&instance->log, NULL, "failed to create pooler io"); + return; + } + + /* bind to listen address and port */ + rc = machine_bind(pooler->server, ai->ai_addr); + freeaddrinfo(ai); + if (rc < 0) { + od_error(&instance->log, NULL, "bind %s:%d failed", + instance->scheme.host, + instance->scheme.port); + return; + } + + od_log(&instance->log, NULL, "listening on %s:%d", + instance->scheme.host, + instance->scheme.port); + od_log(&instance->log, NULL, ""); + + /* main loop */ + while (machine_active()) + { + machine_io_t client_io; + rc = machine_accept(pooler->server, &client_io, + instance->scheme.backlog, UINT32_MAX); + if (rc < 0) { + od_error(&instance->log, NULL, "accept failed"); + continue; + } + /* todo: client_max limit */ + + machine_set_nodelay(client_io, instance->scheme.nodelay); + if (instance->scheme.keepalive > 0) + machine_set_keepalive(client_io, 1, instance->scheme.keepalive); + + rc = machine_set_readahead(client_io, instance->scheme.readahead); + if (rc == -1) { + od_error(&instance->log, NULL, "failed to set client readahead"); + machine_close(client_io); + machine_io_free(client_io); + continue; + } + + /* allocate new client */ + od_client_t *client = od_client_allocate(); + if (client == NULL) { + od_error(&instance->log, NULL, "failed to allocate client object"); + machine_close(client_io); + machine_io_free(client_io); + continue; + } + client->id = pooler->client_seq++; + client->io = client_io; + + /* create new client event */ + machine_msg_t msg; + msg = machine_msg_create(OD_MCLIENT_NEW, sizeof(od_client_t*)); + char *msg_data = machine_msg_get_data(msg); + memcpy(msg_data, &client, sizeof(od_client_t*)); + machine_queue_put(pooler->task_queue, msg); + } +} + +void od_pooler_init(od_pooler_t *pooler, od_instance_t *instance) +{ + pooler->machine = -1; + pooler->server = NULL; + pooler->client_seq = 0; + pooler->instance = instance; + pooler->task_queue = NULL; +} + +int od_pooler_start(od_pooler_t *pooler) +{ + pooler->machine = machine_create("pooler", od_pooler, pooler); + if (pooler->machine == -1) { + od_error(&pooler->instance->log, NULL, "failed to start server"); + return 1; + } + machine_wait(pooler->machine); + return 0; +} diff --git a/src/od_pooler.h b/src/od_pooler.h new file mode 100644 index 00000000..e735e7c3 --- /dev/null +++ b/src/od_pooler.h @@ -0,0 +1,25 @@ +#ifndef OD_POOLER_H +#define OD_POOLER_H + +/* + * odissey. + * + * PostgreSQL connection pooler and request router. +*/ + +typedef struct od_pooler od_pooler_t; + +struct od_pooler +{ + int64_t machine; + machine_io_t server; + machine_tls_t tls; + uint64_t client_seq; + od_instance_t *instance; + machine_queue_t task_queue; +}; + +void od_pooler_init(od_pooler_t*, od_instance_t*); +int od_pooler_start(od_pooler_t*); + +#endif /* OD_INSTANCE_H */