odissey: place new clients info into shared task_queue

This commit is contained in:
Dmitry Simonenko 2017-05-25 15:01:56 +03:00
parent 86005e82e1
commit f20cb8305b
8 changed files with 322 additions and 30 deletions

View File

@ -26,19 +26,7 @@
#include "od_lex.h"
#include "od_config.h"
#include "od_instance.h"
/*
#include "od_stat.h"
#include "od_server.h"
#include "od_server_pool.h"
#include "od_client.h"
#include "od_client_list.h"
#include "od_client_pool.h"
#include "od_route_id.h"
#include "od_route.h"
#include "od_route_pool.h"
#include "od_pooler.h"
*/
void od_instance_init(od_instance_t *instance)
{
@ -135,17 +123,11 @@ int od_instance_main(od_instance_t *instance, int argc, char **argv)
/* create pid file */
if (instance->scheme.pid_file)
od_pid_create(&instance->pid, instance->scheme.pid_file);
/* run connection pooler */
/*
od_pooler_t pooler;
rc = od_pooler_init(&pooler, od);
if (rc == -1)
return 1;
od_pooler_init(&pooler, instance);
rc = od_pooler_start(&pooler);
if (rc == -1)
return 1;
*/
return 0;
}

View File

@ -1,5 +1,5 @@
#ifndef OD_INTANCE_H_
#define OD_INTANCE_H_
#ifndef OD_INSTANCE_H_
#define OD_INSTANCE_H_
/*
* odissey.
@ -7,9 +7,9 @@
* PostgreSQL connection pooler and request router.
*/
typedef struct od_intance od_instance_t;
typedef struct od_instance od_instance_t;
struct od_intance
struct od_instance
{
od_pid_t pid;
od_syslog_t syslog;
@ -22,4 +22,4 @@ void od_instance_init(od_instance_t*);
void od_instance_free(od_instance_t*);
int od_instance_main(od_instance_t*, int, char**);
#endif /* OD_H */
#endif /* OD_INSTANCE_H */

88
src/od_io.c Normal file
View File

@ -0,0 +1,88 @@
/*
* odissey.
*
* PostgreSQL connection pooler and request router.
*/
#include <stdlib.h>
#include <stdarg.h>
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <machinarium.h>
#include <soprano.h>
#include "od_macro.h"
#include "od_pid.h"
#include "od_syslog.h"
#include "od_log.h"
#include "od_io.h"
int od_read(machine_io_t io, so_stream_t *stream, int time_ms)
{
uint32_t request_start = so_stream_used(stream);
uint32_t request_size = 0;
for (;;)
{
uint8_t *request_data = stream->s + request_start;
uint32_t len;
int to_read;
to_read = so_read(&len, &request_data, &request_size);
if (to_read == 0)
break;
if (to_read == -1)
return -1;
int rc = so_stream_ensure(stream, to_read);
if (rc == -1)
return -1;
rc = machine_read(io, (char*)stream->p, to_read, time_ms);
if (rc < 0)
return -1;
so_stream_advance(stream, to_read);
request_size += to_read;
}
return request_start;
}
int od_write(machine_io_t io, so_stream_t *stream)
{
int rc;
rc = machine_write(io, (char*)stream->s,
so_stream_used(stream),
UINT32_MAX);
if (rc < 0)
return -1;
return 0;
}
int od_getpeername(machine_io_t io, char *buf, int size)
{
char addr[128];
struct sockaddr_storage sa;
int salen = sizeof(sa);
int rc = machine_getpeername(io, (struct sockaddr*)&sa, &salen);
if (rc < 0)
goto error;
if (sa.ss_family == AF_INET) {
struct sockaddr_in *sin = (struct sockaddr_in*)&sa;
inet_ntop(sa.ss_family, &sin->sin_addr, addr, sizeof(addr));
snprintf(buf, size, "%s:%d", addr, ntohs(sin->sin_port));
return 0;
}
if (sa.ss_family == AF_INET6) {
struct sockaddr_in6 *sin = (struct sockaddr_in6*)&sa;
inet_ntop(sa.ss_family, &sin->sin6_addr, addr, sizeof(addr));
snprintf(buf, size, "[%s]:%d", addr, ntohs(sin->sin6_port));
return 0;
}
error:
snprintf(buf, size, "unknown");
return -1;
}

14
src/od_io.h Normal file
View File

@ -0,0 +1,14 @@
#ifndef OD_IO_H
#define OD_IO_H
/*
* odissey.
*
* PostgreSQL connection pooler and request router.
*/
int od_read(machine_io_t, so_stream_t*, int);
int od_write(machine_io_t, so_stream_t*);
int od_getpeername(machine_io_t, char*, int);
#endif /* OD_IO_H */

View File

@ -25,7 +25,7 @@
#include "od_pid.h"
#include "od_syslog.h"
#include "od_log.h"
/*#include "od_io.h"*/
#include "od_io.h"
int od_log_init(od_log_t *l, od_pid_t *pid, od_syslog_t *syslog)
{
@ -73,14 +73,11 @@ int od_logv(od_log_t *l, od_syslogprio_t prio,
if (ident)
len += snprintf(buffer + len, sizeof(buffer) - len, "%s ", ident);
/* peer */
// XXX
(void)peer;
/*
if (peer) {
char *peer_name = od_getpeername(peer);
char peer_name[128];
od_getpeername(peer, peer_name, sizeof(peer_name));
len += snprintf(buffer + len, sizeof(buffer) - len, "%s ", peer_name);
}
*/
/* message */
len += vsnprintf(buffer + len, sizeof(buffer) - len, fmt, args);
va_end(args);

8
src/od_msg.h Normal file
View File

@ -0,0 +1,8 @@
#ifndef OD_MSG_H
#define OD_MSG_H
typedef enum {
OD_MCLIENT_NEW
} od_msg_t;
#endif /* OD_MSG_H */

178
src/od_pooler.c Normal file
View File

@ -0,0 +1,178 @@
/*
* odissey.
*
* PostgreSQL connection pooler and request router.
*/
#include <stdlib.h>
#include <stdarg.h>
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <signal.h>
#include <machinarium.h>
#include <soprano.h>
#include "od_macro.h"
#include "od_version.h"
#include "od_list.h"
#include "od_pid.h"
#include "od_syslog.h"
#include "od_log.h"
#include "od_daemon.h"
#include "od_scheme.h"
#include "od_lex.h"
#include "od_config.h"
#include "od_msg.h"
#include "od_instance.h"
#include "od_pooler.h"
#include "od_server.h"
#include "od_server_pool.h"
#include "od_client.h"
#include "od_client_pool.h"
#include "od_route_id.h"
#include "od_route.h"
static inline void
od_pooler(void *arg)
{
od_pooler_t *pooler = arg;
od_instance_t *instance = pooler->instance;
/* create task queue */
pooler->task_queue = machine_queue_create();
if (pooler->task_queue == NULL) {
od_error(&instance->log, NULL, "failed to create task queue");
return;
}
/* init pooler tls */
int rc;
#if 0
pooler->tls = NULL;
od_scheme_t *scheme = &pooler->instance->scheme;
if (scheme->tls_verify != OD_TDISABLE) {
pooler->tls = od_tlsfe(pooler->env, scheme);
if (pooler->tls == NULL)
return;
}
#endif
/* listen '*' */
struct addrinfo *hints_ptr = NULL;
struct addrinfo hints;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
hints.ai_protocol = 0;
hints.ai_canonname = NULL;
hints.ai_addr = NULL;
hints.ai_next = NULL;
char *host = instance->scheme.host;
if (strcmp(instance->scheme.host, "*") == 0) {
hints_ptr = &hints;
host = NULL;
}
/* resolve listen address and port */
char port[16];
snprintf(port, sizeof(port), "%d", instance->scheme.port);
struct addrinfo *ai = NULL;
rc = machine_getaddrinfo(host, port, hints_ptr, &ai, UINT32_MAX);
if (rc < 0) {
od_error(&instance->log, NULL, "failed to resolve %s:%d",
instance->scheme.host,
instance->scheme.port);
return;
}
assert(ai != NULL);
/* io */
pooler->server = machine_io_create();
if (pooler->server == NULL) {
od_error(&instance->log, NULL, "failed to create pooler io");
return;
}
/* bind to listen address and port */
rc = machine_bind(pooler->server, ai->ai_addr);
freeaddrinfo(ai);
if (rc < 0) {
od_error(&instance->log, NULL, "bind %s:%d failed",
instance->scheme.host,
instance->scheme.port);
return;
}
od_log(&instance->log, NULL, "listening on %s:%d",
instance->scheme.host,
instance->scheme.port);
od_log(&instance->log, NULL, "");
/* main loop */
while (machine_active())
{
machine_io_t client_io;
rc = machine_accept(pooler->server, &client_io,
instance->scheme.backlog, UINT32_MAX);
if (rc < 0) {
od_error(&instance->log, NULL, "accept failed");
continue;
}
/* todo: client_max limit */
machine_set_nodelay(client_io, instance->scheme.nodelay);
if (instance->scheme.keepalive > 0)
machine_set_keepalive(client_io, 1, instance->scheme.keepalive);
rc = machine_set_readahead(client_io, instance->scheme.readahead);
if (rc == -1) {
od_error(&instance->log, NULL, "failed to set client readahead");
machine_close(client_io);
machine_io_free(client_io);
continue;
}
/* allocate new client */
od_client_t *client = od_client_allocate();
if (client == NULL) {
od_error(&instance->log, NULL, "failed to allocate client object");
machine_close(client_io);
machine_io_free(client_io);
continue;
}
client->id = pooler->client_seq++;
client->io = client_io;
/* create new client event */
machine_msg_t msg;
msg = machine_msg_create(OD_MCLIENT_NEW, sizeof(od_client_t*));
char *msg_data = machine_msg_get_data(msg);
memcpy(msg_data, &client, sizeof(od_client_t*));
machine_queue_put(pooler->task_queue, msg);
}
}
void od_pooler_init(od_pooler_t *pooler, od_instance_t *instance)
{
pooler->machine = -1;
pooler->server = NULL;
pooler->client_seq = 0;
pooler->instance = instance;
pooler->task_queue = NULL;
}
int od_pooler_start(od_pooler_t *pooler)
{
pooler->machine = machine_create("pooler", od_pooler, pooler);
if (pooler->machine == -1) {
od_error(&pooler->instance->log, NULL, "failed to start server");
return 1;
}
machine_wait(pooler->machine);
return 0;
}

25
src/od_pooler.h Normal file
View File

@ -0,0 +1,25 @@
#ifndef OD_POOLER_H
#define OD_POOLER_H
/*
* odissey.
*
* PostgreSQL connection pooler and request router.
*/
typedef struct od_pooler od_pooler_t;
struct od_pooler
{
int64_t machine;
machine_io_t server;
machine_tls_t tls;
uint64_t client_seq;
od_instance_t *instance;
machine_queue_t task_queue;
};
void od_pooler_init(od_pooler_t*, od_instance_t*);
int od_pooler_start(od_pooler_t*);
#endif /* OD_INSTANCE_H */