mirror of https://github.com/yandex/odyssey.git
odissey: add frontend setup and auth
This commit is contained in:
parent
e818fe5211
commit
717e1ab05d
|
@ -0,0 +1,252 @@
|
|||
|
||||
/*
|
||||
* odissey.
|
||||
*
|
||||
* PostgreSQL connection pooler and request router.
|
||||
*/
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <stdarg.h>
|
||||
#include <stdint.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <signal.h>
|
||||
|
||||
#include <machinarium.h>
|
||||
#include <soprano.h>
|
||||
|
||||
#include "od_macro.h"
|
||||
#include "od_version.h"
|
||||
#include "od_list.h"
|
||||
#include "od_pid.h"
|
||||
#include "od_syslog.h"
|
||||
#include "od_log.h"
|
||||
#include "od_daemon.h"
|
||||
#include "od_scheme.h"
|
||||
#include "od_lex.h"
|
||||
#include "od_config.h"
|
||||
#include "od_msg.h"
|
||||
#include "od_instance.h"
|
||||
|
||||
#include "od_server.h"
|
||||
#include "od_server_pool.h"
|
||||
#include "od_client.h"
|
||||
#include "od_client_pool.h"
|
||||
#include "od_route_id.h"
|
||||
#include "od_route.h"
|
||||
#include "od_io.h"
|
||||
|
||||
#include "od_pooler.h"
|
||||
#include "od_relay.h"
|
||||
#include "od_frontend.h"
|
||||
#include "od_auth.h"
|
||||
|
||||
static inline int
|
||||
od_auth_frontend_cleartext(od_client_t *client)
|
||||
{
|
||||
od_relay_t *relay = client->relay;
|
||||
od_instance_t *instance = relay->pooler->instance;
|
||||
|
||||
/* AuthenticationCleartextPassword */
|
||||
so_stream_t *stream = &client->stream;
|
||||
so_stream_reset(stream);
|
||||
int rc;
|
||||
rc = so_bewrite_authentication_clear_text(stream);
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
rc = od_write(client->io, stream);
|
||||
if (rc == -1) {
|
||||
od_error(&instance->log, client->io, "C (auth): write error: %s",
|
||||
machine_error(client->io));
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* wait for password response */
|
||||
while (1) {
|
||||
so_stream_reset(stream);
|
||||
rc = od_read(client->io, stream, INT_MAX);
|
||||
if (rc == -1) {
|
||||
od_error(&instance->log, client->io, "C (auth): read error: %s",
|
||||
machine_error(client->io));
|
||||
return -1;
|
||||
}
|
||||
uint8_t type = *stream->s;
|
||||
od_debug(&instance->log, client->io, "C (auth): %c", *stream->s);
|
||||
/* PasswordMessage */
|
||||
if (type == 'p')
|
||||
break;
|
||||
}
|
||||
|
||||
/* read password message */
|
||||
so_password_t client_token;
|
||||
so_password_init(&client_token);
|
||||
rc = so_beread_password(&client_token, stream->s,
|
||||
so_stream_used(stream));
|
||||
if (rc == -1) {
|
||||
od_error(&instance->log, client->io,
|
||||
"C (auth): password read error");
|
||||
so_password_free(&client_token);
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* set user password */
|
||||
so_password_t client_password = {
|
||||
.password_len = client->scheme->password_len + 1,
|
||||
.password = client->scheme->password,
|
||||
};
|
||||
|
||||
/* authenticate */
|
||||
int check = so_password_compare(&client_password, &client_token);
|
||||
so_password_free(&client_token);
|
||||
if (! check) {
|
||||
od_log(&instance->log, client->io,
|
||||
"C (auth): user '%s' incorrect password",
|
||||
client->startup.user);
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline int
|
||||
od_auth_frontend_md5(od_client_t *client)
|
||||
{
|
||||
od_relay_t *relay = client->relay;
|
||||
od_instance_t *instance = relay->pooler->instance;
|
||||
|
||||
/* generate salt */
|
||||
uint32_t salt = so_password_salt(&client->key);
|
||||
|
||||
/* AuthenticationMD5Password */
|
||||
so_stream_t *stream = &client->stream;
|
||||
so_stream_reset(stream);
|
||||
int rc;
|
||||
rc = so_bewrite_authentication_md5(stream, (uint8_t*)&salt);
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
rc = od_write(client->io, stream);
|
||||
if (rc == -1) {
|
||||
od_error(&instance->log, client->io, "C (auth): write error: %s",
|
||||
machine_error(client->io));
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* wait for password response */
|
||||
while (1) {
|
||||
int rc;
|
||||
so_stream_reset(stream);
|
||||
rc = od_read(client->io, stream, INT_MAX);
|
||||
if (rc == -1) {
|
||||
od_error(&instance->log, client->io, "C (auth): read error: %s",
|
||||
machine_error(client->io));
|
||||
return -1;
|
||||
}
|
||||
uint8_t type = *stream->s;
|
||||
od_debug(&instance->log, client->io, "C (auth): %c", *stream->s);
|
||||
/* PasswordMessage */
|
||||
if (type == 'p')
|
||||
break;
|
||||
}
|
||||
|
||||
/* read password message */
|
||||
so_password_t client_token;
|
||||
so_password_init(&client_token);
|
||||
rc = so_beread_password(&client_token, stream->s, so_stream_used(stream));
|
||||
if (rc == -1) {
|
||||
od_error(&instance->log, client->io,
|
||||
"C (auth): password read error");
|
||||
so_password_free(&client_token);
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* set user password */
|
||||
so_password_t client_password;
|
||||
so_password_init(&client_password);
|
||||
rc = so_password_md5(&client_password,
|
||||
so_parameter_value(client->startup.user),
|
||||
client->startup.user->value_len - 1,
|
||||
client->scheme->password,
|
||||
client->scheme->password_len,
|
||||
(uint8_t*)&salt);
|
||||
if (rc == -1) {
|
||||
od_error(&instance->log, NULL, "memory allocation error");
|
||||
so_password_free(&client_password);
|
||||
so_password_free(&client_token);
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* authenticate */
|
||||
int check = so_password_compare(&client_password, &client_token);
|
||||
so_password_free(&client_password);
|
||||
so_password_free(&client_token);
|
||||
if (! check) {
|
||||
od_log(&instance->log, client->io,
|
||||
"C (auth): user '%s' incorrect password",
|
||||
client->startup.user);
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int od_auth_frontend(od_client_t *client)
|
||||
{
|
||||
od_relay_t *relay = client->relay;
|
||||
od_instance_t *instance = relay->pooler->instance;
|
||||
|
||||
/* match user scheme */
|
||||
od_schemeuser_t *user_scheme =
|
||||
od_schemeuser_match(&instance->scheme,
|
||||
so_parameter_value(client->startup.user));
|
||||
if (user_scheme == NULL) {
|
||||
/* try to use default user */
|
||||
user_scheme = instance->scheme.users_default;
|
||||
if (user_scheme == NULL) {
|
||||
od_error(&instance->log, client->io,
|
||||
"C (auth): user '%s' not found",
|
||||
so_parameter_value(client->startup.user));
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
client->scheme = user_scheme;
|
||||
|
||||
/* is user access denied */
|
||||
if (user_scheme->is_deny) {
|
||||
od_log(&instance->log, client->io,
|
||||
"C (auth): user '%s' access denied",
|
||||
so_parameter_value(client->startup.user));
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* authentication mode */
|
||||
int rc;
|
||||
switch (user_scheme->auth_mode) {
|
||||
case OD_ACLEAR_TEXT:
|
||||
rc = od_auth_frontend_cleartext(client);
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
break;
|
||||
case OD_AMD5:
|
||||
rc = od_auth_frontend_md5(client);
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
break;
|
||||
case OD_ANONE:
|
||||
break;
|
||||
default:
|
||||
assert(0);
|
||||
break;
|
||||
}
|
||||
|
||||
/* pass */
|
||||
so_stream_t *stream = &client->stream;
|
||||
so_stream_reset(stream);
|
||||
rc = so_bewrite_authentication_ok(stream);
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
rc = od_write(client->io, stream);
|
||||
if (rc == -1) {
|
||||
od_error(&instance->log, client->io, "C (auth): write error: %s",
|
||||
machine_error(client->io));
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
#ifndef OD_AUTH_H
|
||||
#define OD_AUTH_H
|
||||
|
||||
/*
|
||||
* odissey.
|
||||
*
|
||||
* PostgreSQL connection pooler and request router.
|
||||
*/
|
||||
|
||||
int od_auth_frontend(od_client_t*);
|
||||
|
||||
#endif /* OD_AUTH_H */
|
|
@ -21,6 +21,7 @@ struct od_client
|
|||
{
|
||||
od_clientstate_t state;
|
||||
uint64_t id;
|
||||
uint64_t coroutine_id;
|
||||
machine_io_t io;
|
||||
od_schemeuser_t *scheme;
|
||||
so_bestartup_t startup;
|
||||
|
@ -28,6 +29,7 @@ struct od_client
|
|||
so_stream_t stream;
|
||||
od_server_t *server;
|
||||
void *route;
|
||||
void *relay;
|
||||
od_list_t link_pool;
|
||||
od_list_t link;
|
||||
};
|
||||
|
@ -37,10 +39,12 @@ od_client_init(od_client_t *client)
|
|||
{
|
||||
client->state = OD_CUNDEF;
|
||||
client->id = 0;
|
||||
client->coroutine_id = 0;
|
||||
client->io = NULL;
|
||||
client->scheme = NULL;
|
||||
client->server = NULL;
|
||||
client->route = NULL;
|
||||
client->relay = NULL;
|
||||
so_bestartup_init(&client->startup);
|
||||
so_keyinit(&client->key);
|
||||
so_stream_init(&client->stream);
|
||||
|
|
|
@ -0,0 +1,327 @@
|
|||
|
||||
/*
|
||||
* odissey.
|
||||
*
|
||||
* PostgreSQL connection pooler and request router.
|
||||
*/
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <stdarg.h>
|
||||
#include <stdint.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <signal.h>
|
||||
|
||||
#include <machinarium.h>
|
||||
#include <soprano.h>
|
||||
|
||||
#include "od_macro.h"
|
||||
#include "od_version.h"
|
||||
#include "od_list.h"
|
||||
#include "od_pid.h"
|
||||
#include "od_syslog.h"
|
||||
#include "od_log.h"
|
||||
#include "od_daemon.h"
|
||||
#include "od_scheme.h"
|
||||
#include "od_lex.h"
|
||||
#include "od_config.h"
|
||||
#include "od_msg.h"
|
||||
#include "od_instance.h"
|
||||
|
||||
#include "od_server.h"
|
||||
#include "od_server_pool.h"
|
||||
#include "od_client.h"
|
||||
#include "od_client_pool.h"
|
||||
#include "od_route_id.h"
|
||||
#include "od_route.h"
|
||||
#include "od_io.h"
|
||||
|
||||
#include "od_pooler.h"
|
||||
#include "od_relay.h"
|
||||
#include "od_frontend.h"
|
||||
#include "od_auth.h"
|
||||
|
||||
void od_frontend_close(od_client_t *client)
|
||||
{
|
||||
#if 0
|
||||
od_pooler_t *pooler = client->pooler;
|
||||
if (client->route) {
|
||||
od_route_t *route = client->route;
|
||||
od_clientpool_set(&route->client_pool, client, OD_CUNDEF);
|
||||
client->route = NULL;
|
||||
}
|
||||
#endif
|
||||
if (client->io) {
|
||||
machine_close(client->io);
|
||||
machine_io_free(client->io);
|
||||
client->io = NULL;
|
||||
}
|
||||
#if 0
|
||||
od_clientlist_unlink(&pooler->client_list, client);
|
||||
#endif
|
||||
od_client_free(client);
|
||||
}
|
||||
|
||||
static int
|
||||
od_frontend_startup_read(od_client_t *client)
|
||||
{
|
||||
od_relay_t *relay = client->relay;
|
||||
od_instance_t *instance = relay->pooler->instance;
|
||||
|
||||
so_stream_t *stream = &client->stream;
|
||||
so_stream_reset(stream);
|
||||
for (;;) {
|
||||
uint32_t pos_size = so_stream_used(stream);
|
||||
uint8_t *pos_data = stream->s;
|
||||
uint32_t len;
|
||||
int to_read;
|
||||
to_read = so_read_startup(&len, &pos_data, &pos_size);
|
||||
if (to_read == 0)
|
||||
break;
|
||||
if (to_read == -1) {
|
||||
od_error(&instance->log, client->io,
|
||||
"C (startup): bad startup packet");
|
||||
return -1;
|
||||
}
|
||||
int rc = so_stream_ensure(stream, to_read);
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
rc = machine_read(client->io, (char*)stream->p, to_read, INT_MAX);
|
||||
if (rc < 0) {
|
||||
od_error(&instance->log, client->io,
|
||||
"C (startup): read error: %s",
|
||||
machine_error(client->io));
|
||||
return -1;
|
||||
}
|
||||
so_stream_advance(stream, to_read);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
od_frontend_startup(od_client_t *client)
|
||||
{
|
||||
int rc;
|
||||
rc = od_frontend_startup_read(client);
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
so_stream_t *stream = &client->stream;
|
||||
rc = so_beread_startup(&client->startup,
|
||||
stream->s,
|
||||
so_stream_used(stream));
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
|
||||
#if 0
|
||||
/* client ssl request */
|
||||
rc = od_tlsfe_accept(pooler->env, client->io, pooler->tls,
|
||||
&client->stream,
|
||||
&pooler->od->log, "C",
|
||||
&pooler->od->scheme,
|
||||
&client->startup);
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
#endif
|
||||
if (! client->startup.is_ssl_request)
|
||||
return 0;
|
||||
|
||||
/* read startup-cancel message followed after ssl
|
||||
* negotiation */
|
||||
assert(client->startup.is_ssl_request);
|
||||
rc = od_frontend_startup_read(client);
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
rc = so_beread_startup(&client->startup,
|
||||
stream->s,
|
||||
so_stream_used(stream));
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline void
|
||||
od_frontend_key(od_client_t *client)
|
||||
{
|
||||
client->key.key_pid = client->id;
|
||||
client->key.key = 1 + rand();
|
||||
}
|
||||
|
||||
static inline int
|
||||
od_frontend_setup(od_client_t *client)
|
||||
{
|
||||
od_relay_t *relay = client->relay;
|
||||
od_instance_t *instance = relay->pooler->instance;
|
||||
|
||||
so_stream_t *stream = &client->stream;
|
||||
so_stream_reset(stream);
|
||||
int rc;
|
||||
rc = so_bewrite_backend_key_data(stream, client->key.key_pid,
|
||||
client->key.key);
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
rc = so_bewrite_parameter_status(stream, "", 1, "", 1);
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
rc = od_write(client->io, stream);
|
||||
if (rc == -1) {
|
||||
od_error(&instance->log, client->io, "C (setup): write error: %s",
|
||||
machine_error(client->io));
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline int
|
||||
od_frontend_ready(od_client_t *client)
|
||||
{
|
||||
od_relay_t *relay = client->relay;
|
||||
od_instance_t *instance = relay->pooler->instance;
|
||||
|
||||
so_stream_t *stream = &client->stream;
|
||||
so_stream_reset(stream);
|
||||
int rc;
|
||||
rc = so_bewrite_ready(stream, 'I');
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
rc = od_write(client->io, stream);
|
||||
if (rc == -1) {
|
||||
od_error(&instance->log, client->io, "C: write error: %s",
|
||||
machine_error(client->io));
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void od_frontend_main(void *arg)
|
||||
{
|
||||
od_client_t *client = arg;
|
||||
od_relay_t *relay = client->relay;
|
||||
od_instance_t *instance = relay->pooler->instance;
|
||||
|
||||
od_log(&instance->log, client->io, "C: new connection");
|
||||
|
||||
/* attach client io to relay machine event loop */
|
||||
int rc;
|
||||
rc = machine_io_attach(client->io);
|
||||
if (rc == -1) {
|
||||
od_error(&instance->log, client->io, "failed to transfer client io");
|
||||
machine_close(client->io);
|
||||
od_client_free(client);
|
||||
return;
|
||||
}
|
||||
|
||||
/* client startup */
|
||||
rc = od_frontend_startup(client);
|
||||
if (rc == -1) {
|
||||
od_frontend_close(client);
|
||||
return;
|
||||
}
|
||||
|
||||
#if 0
|
||||
/* client cancel request */
|
||||
if (client->startup.is_cancel) {
|
||||
od_debug(&pooler->od->log, client->io, "C: cancel request");
|
||||
so_key_t key = client->startup.key;
|
||||
od_feclose(client);
|
||||
od_cancel(pooler, &key);
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
|
||||
/* Generate backend key for the client.
|
||||
*
|
||||
* This key will be used to identify a server by
|
||||
* user cancel requests. The key must be regenerated
|
||||
* for each new client-server assignment, to avoid
|
||||
* possibility of cancelling requests by a previous
|
||||
* server owners.
|
||||
*/
|
||||
od_frontend_key(client);
|
||||
|
||||
/* client authentication */
|
||||
rc = od_auth_frontend(client);
|
||||
if (rc == -1) {
|
||||
od_frontend_close(client);
|
||||
return;
|
||||
}
|
||||
|
||||
/* set client backend options and the key */
|
||||
rc = od_frontend_setup(client);
|
||||
if (rc == -1) {
|
||||
od_frontend_close(client);
|
||||
return;
|
||||
}
|
||||
|
||||
/* notify client that we are ready */
|
||||
rc = od_frontend_ready(client);
|
||||
if (rc == -1) {
|
||||
od_frontend_close(client);
|
||||
return;
|
||||
}
|
||||
|
||||
/* route */
|
||||
|
||||
#if 0
|
||||
/* execute pooler method */
|
||||
od_routerstatus_t status = OD_RS_UNDEF;
|
||||
switch (pooler->od->scheme.pooling_mode) {
|
||||
case OD_PSESSION:
|
||||
status = od_router_session(client);
|
||||
break;
|
||||
case OD_PTRANSACTION:
|
||||
status = od_router_transaction(client);
|
||||
break;
|
||||
case OD_PUNDEF:
|
||||
break;
|
||||
}
|
||||
|
||||
od_server_t *server = client->server;
|
||||
switch (status) {
|
||||
case OD_RS_EROUTE:
|
||||
case OD_RS_EPOOL:
|
||||
case OD_RS_ELIMIT:
|
||||
assert(! client->server);
|
||||
od_feclose(client);
|
||||
break;
|
||||
case OD_RS_OK:
|
||||
case OD_RS_ECLIENT_READ:
|
||||
case OD_RS_ECLIENT_WRITE:
|
||||
if (status == OD_RS_OK)
|
||||
od_log(&pooler->od->log, client->io,
|
||||
"C: disconnected");
|
||||
else
|
||||
od_log(&pooler->od->log, client->io,
|
||||
"C: disconnected (read/write error): %s",
|
||||
machine_error(client->io));
|
||||
/* close client connection and reuse server
|
||||
* link in case of client errors and
|
||||
* graceful shutdown */
|
||||
od_feclose(client);
|
||||
if (server)
|
||||
od_berelease(server);
|
||||
break;
|
||||
case OD_RS_ESERVER_CONFIGURE:
|
||||
od_log(&pooler->od->log, server->io,
|
||||
"S: disconnected (server configure error): %s",
|
||||
machine_error(server->io));
|
||||
od_feclose(client);
|
||||
if (server)
|
||||
od_beclose(server);
|
||||
break;
|
||||
case OD_RS_ESERVER_READ:
|
||||
case OD_RS_ESERVER_WRITE:
|
||||
od_log(&pooler->od->log, server->io,
|
||||
"S: disconnected (read/write error): %s",
|
||||
machine_error(server->io));
|
||||
/* close client connection and close server
|
||||
* connection in case of server errors */
|
||||
od_feclose(client);
|
||||
if (server)
|
||||
od_beclose(server);
|
||||
break;
|
||||
case OD_RS_UNDEF:
|
||||
assert(0);
|
||||
break;
|
||||
}
|
||||
#endif
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
#ifndef OD_FRONTEND_H
|
||||
#define OD_FRONTEND_H
|
||||
|
||||
/*
|
||||
* odissey.
|
||||
*
|
||||
* PostgreSQL connection pooler and request router.
|
||||
*/
|
||||
|
||||
void od_frontend_main(void*);
|
||||
|
||||
#endif /* OD_FRONTEND_H */
|
|
@ -27,6 +27,7 @@
|
|||
#include "od_config.h"
|
||||
#include "od_instance.h"
|
||||
#include "od_pooler.h"
|
||||
#include "od_relay.h"
|
||||
|
||||
void od_instance_init(od_instance_t *instance)
|
||||
{
|
||||
|
@ -123,11 +124,20 @@ int od_instance_main(od_instance_t *instance, int argc, char **argv)
|
|||
/* create pid file */
|
||||
if (instance->scheme.pid_file)
|
||||
od_pid_create(&instance->pid, instance->scheme.pid_file);
|
||||
|
||||
/* run connection pooler */
|
||||
od_pooler_t pooler;
|
||||
od_pooler_init(&pooler, instance);
|
||||
rc = od_pooler_start(&pooler);
|
||||
if (rc == -1)
|
||||
return 1;
|
||||
|
||||
od_relay_t relay;
|
||||
od_relay_init(&relay, &pooler);
|
||||
rc = od_relay_start(&relay);
|
||||
if (rc == -1)
|
||||
return 1;
|
||||
|
||||
machine_wait(pooler.machine);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -42,13 +42,6 @@ od_pooler(void *arg)
|
|||
od_pooler_t *pooler = arg;
|
||||
od_instance_t *instance = pooler->instance;
|
||||
|
||||
/* create task queue */
|
||||
pooler->task_queue = machine_queue_create();
|
||||
if (pooler->task_queue == NULL) {
|
||||
od_error(&instance->log, NULL, "failed to create task queue");
|
||||
return;
|
||||
}
|
||||
|
||||
/* init pooler tls */
|
||||
int rc;
|
||||
#if 0
|
||||
|
@ -129,6 +122,7 @@ od_pooler(void *arg)
|
|||
if (instance->scheme.keepalive > 0)
|
||||
machine_set_keepalive(client_io, 1, instance->scheme.keepalive);
|
||||
|
||||
/*
|
||||
rc = machine_set_readahead(client_io, instance->scheme.readahead);
|
||||
if (rc == -1) {
|
||||
od_error(&instance->log, NULL, "failed to set client readahead");
|
||||
|
@ -136,11 +130,22 @@ od_pooler(void *arg)
|
|||
machine_io_free(client_io);
|
||||
continue;
|
||||
}
|
||||
*/
|
||||
|
||||
rc = machine_io_detach(client_io);
|
||||
if (rc == -1) {
|
||||
od_error(&instance->log, client_io,
|
||||
"failed to transfer client io");
|
||||
machine_close(client_io);
|
||||
machine_io_free(client_io);
|
||||
continue;
|
||||
}
|
||||
|
||||
/* allocate new client */
|
||||
od_client_t *client = od_client_allocate();
|
||||
if (client == NULL) {
|
||||
od_error(&instance->log, NULL, "failed to allocate client object");
|
||||
od_error(&instance->log, client_io,
|
||||
"failed to allocate client object");
|
||||
machine_close(client_io);
|
||||
machine_io_free(client_io);
|
||||
continue;
|
||||
|
@ -157,13 +162,19 @@ od_pooler(void *arg)
|
|||
}
|
||||
}
|
||||
|
||||
void od_pooler_init(od_pooler_t *pooler, od_instance_t *instance)
|
||||
int od_pooler_init(od_pooler_t *pooler, od_instance_t *instance)
|
||||
{
|
||||
pooler->machine = -1;
|
||||
pooler->server = NULL;
|
||||
pooler->client_seq = 0;
|
||||
pooler->instance = instance;
|
||||
pooler->task_queue = NULL;
|
||||
pooler->task_queue = machine_queue_create();
|
||||
if (pooler->task_queue == NULL) {
|
||||
od_error(&instance->log, NULL, "failed to create task queue");
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int od_pooler_start(od_pooler_t *pooler)
|
||||
|
@ -173,6 +184,5 @@ int od_pooler_start(od_pooler_t *pooler)
|
|||
od_error(&pooler->instance->log, NULL, "failed to start server");
|
||||
return 1;
|
||||
}
|
||||
machine_wait(pooler->machine);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ struct od_pooler
|
|||
machine_queue_t task_queue;
|
||||
};
|
||||
|
||||
void od_pooler_init(od_pooler_t*, od_instance_t*);
|
||||
int od_pooler_start(od_pooler_t*);
|
||||
int od_pooler_init(od_pooler_t*, od_instance_t*);
|
||||
int od_pooler_start(od_pooler_t*);
|
||||
|
||||
#endif /* OD_INSTANCE_H */
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
|
||||
/*
|
||||
* odissey.
|
||||
*
|
||||
* PostgreSQL connection pooler and request router.
|
||||
*/
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <stdarg.h>
|
||||
#include <stdint.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <signal.h>
|
||||
|
||||
#include <machinarium.h>
|
||||
#include <soprano.h>
|
||||
|
||||
#include "od_macro.h"
|
||||
#include "od_version.h"
|
||||
#include "od_list.h"
|
||||
#include "od_pid.h"
|
||||
#include "od_syslog.h"
|
||||
#include "od_log.h"
|
||||
#include "od_daemon.h"
|
||||
#include "od_scheme.h"
|
||||
#include "od_lex.h"
|
||||
#include "od_config.h"
|
||||
#include "od_msg.h"
|
||||
#include "od_instance.h"
|
||||
|
||||
#include "od_server.h"
|
||||
#include "od_server_pool.h"
|
||||
#include "od_client.h"
|
||||
#include "od_client_pool.h"
|
||||
#include "od_route_id.h"
|
||||
#include "od_route.h"
|
||||
|
||||
#include "od_pooler.h"
|
||||
#include "od_relay.h"
|
||||
#include "od_frontend.h"
|
||||
|
||||
static inline void
|
||||
od_relay(void *arg)
|
||||
{
|
||||
od_relay_t *relay = arg;
|
||||
od_instance_t *instance = relay->pooler->instance;
|
||||
|
||||
od_log(&instance->log, NULL, "relay: started");
|
||||
|
||||
for (;;) {
|
||||
machine_msg_t msg;
|
||||
msg = machine_queue_get(relay->pooler->task_queue, UINT32_MAX);
|
||||
if (msg == NULL)
|
||||
break;
|
||||
|
||||
od_msg_t msg_type;
|
||||
msg_type = machine_msg_get_type(msg);
|
||||
switch (msg_type) {
|
||||
case OD_MCLIENT_NEW:
|
||||
{
|
||||
od_client_t *client;
|
||||
client = *(od_client_t**)machine_msg_get_data(msg);
|
||||
client->relay = relay;
|
||||
int64_t coroutine_id;
|
||||
coroutine_id = machine_coroutine_create(od_frontend_main, client);
|
||||
if (coroutine_id == -1) {
|
||||
od_error(&relay->pooler->instance->log, client->io,
|
||||
"failed to create coroutine");
|
||||
machine_close(client->io);
|
||||
od_client_free(client);
|
||||
break;
|
||||
}
|
||||
client->coroutine_id = coroutine_id;
|
||||
break;
|
||||
}
|
||||
}
|
||||
machine_msg_free(msg);
|
||||
}
|
||||
|
||||
od_log(&instance->log, NULL, "relay: stopped");
|
||||
}
|
||||
|
||||
void od_relay_init(od_relay_t *relay, od_pooler_t *pooler)
|
||||
{
|
||||
relay->machine = -1;
|
||||
relay->pooler = pooler;
|
||||
}
|
||||
|
||||
int od_relay_start(od_relay_t *relay)
|
||||
{
|
||||
relay->machine = machine_create("relay", od_relay, relay);
|
||||
if (relay->machine == -1) {
|
||||
od_error(&relay->pooler->instance->log, NULL, "failed to start relay");
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
#ifndef OD_RELAY_H
|
||||
#define OD_RELAY_H
|
||||
|
||||
/*
|
||||
* odissey.
|
||||
*
|
||||
* PostgreSQL connection pooler and request router.
|
||||
*/
|
||||
|
||||
typedef struct od_relay od_relay_t;
|
||||
|
||||
struct od_relay
|
||||
{
|
||||
int64_t machine;
|
||||
od_pooler_t *pooler;
|
||||
};
|
||||
|
||||
void od_relay_init(od_relay_t*, od_pooler_t*);
|
||||
int od_relay_start(od_relay_t*);
|
||||
|
||||
#endif /* OD_RELAY_H */
|
Loading…
Reference in New Issue