odissey: major rework of server/client ids

This commit is contained in:
Dmitry Simonenko 2017-06-20 18:39:52 +03:00
parent cad4a813cf
commit a94acec1c8
25 changed files with 175 additions and 171 deletions

View File

@ -20,13 +20,13 @@
#include "od_version.h"
#include "od_list.h"
#include "od_pid.h"
#include "od_id.h"
#include "od_syslog.h"
#include "od_log.h"
#include "od_daemon.h"
#include "od_scheme.h"
#include "od_lex.h"
#include "od_config.h"
#include "od_id.h"
#include "od_msg.h"
#include "od_system.h"
#include "od_instance.h"
@ -61,7 +61,7 @@ od_auth_frontend_cleartext(od_client_t *client)
return -1;
rc = od_write(client->io, stream);
if (rc == -1) {
od_error_client(&instance->log, client->id,
od_error_client(&instance->log, &client->id,
"auth", "write error: %s",
machine_error(client->io));
return -1;
@ -72,14 +72,14 @@ od_auth_frontend_cleartext(od_client_t *client)
so_stream_reset(stream);
rc = od_read(client->io, stream, UINT32_MAX);
if (rc == -1) {
od_error_client(&instance->log, client->id, "auth",
od_error_client(&instance->log, &client->id, "auth",
"read error: %s",
machine_error(client->io));
return -1;
}
uint8_t type = *stream->s;
od_debug_client(&instance->log,
client->id, "auth", "%c", *stream->s);
od_debug_client(&instance->log, &client->id, "auth",
"%c", *stream->s);
/* PasswordMessage */
if (type == 'p')
break;
@ -91,7 +91,7 @@ od_auth_frontend_cleartext(od_client_t *client)
rc = so_beread_password(&client_token, stream->s,
so_stream_used(stream));
if (rc == -1) {
od_error_client(&instance->log, client->id, "auth",
od_error_client(&instance->log, &client->id, "auth",
"password read error");
od_frontend_error(client, SO_ERROR_PROTOCOL_VIOLATION,
"bad password message");
@ -109,7 +109,7 @@ od_auth_frontend_cleartext(od_client_t *client)
int check = so_password_compare(&client_password, &client_token);
so_password_free(&client_token);
if (! check) {
od_log_client(&instance->log, client->id, "auth",
od_log_client(&instance->log, &client->id, "auth",
"user '%s' incorrect password",
client->startup.user);
od_frontend_error(client, SO_ERROR_INVALID_PASSWORD,
@ -136,7 +136,7 @@ od_auth_frontend_md5(od_client_t *client)
return -1;
rc = od_write(client->io, stream);
if (rc == -1) {
od_error_client(&instance->log, client->id, "auth",
od_error_client(&instance->log, &client->id, "auth",
"write error: %s",
machine_error(client->io));
return -1;
@ -148,13 +148,13 @@ od_auth_frontend_md5(od_client_t *client)
so_stream_reset(stream);
rc = od_read(client->io, stream, UINT32_MAX);
if (rc == -1) {
od_error_client(&instance->log, client->id, "auth",
od_error_client(&instance->log, &client->id, "auth",
"read error: %s",
machine_error(client->io));
return -1;
}
uint8_t type = *stream->s;
od_debug_client(&instance->log, client->id, "auth",
od_debug_client(&instance->log, &client->id, "auth",
"%c", *stream->s);
/* PasswordMessage */
if (type == 'p')
@ -166,7 +166,7 @@ od_auth_frontend_md5(od_client_t *client)
so_password_init(&client_token);
rc = so_beread_password(&client_token, stream->s, so_stream_used(stream));
if (rc == -1) {
od_error_client(&instance->log, client->id, "auth",
od_error_client(&instance->log, &client->id, "auth",
"password read error");
od_frontend_error(client, SO_ERROR_PROTOCOL_VIOLATION,
"bad password message");
@ -184,7 +184,7 @@ od_auth_frontend_md5(od_client_t *client)
client->scheme->password_len,
(uint8_t*)&salt);
if (rc == -1) {
od_error_client(&instance->log, client->id, "auth",
od_error_client(&instance->log, &client->id, "auth",
"memory allocation error");
so_password_free(&client_password);
so_password_free(&client_token);
@ -196,7 +196,7 @@ od_auth_frontend_md5(od_client_t *client)
so_password_free(&client_password);
so_password_free(&client_token);
if (! check) {
od_log_client(&instance->log, client->id, "auth",
od_log_client(&instance->log, &client->id, "auth",
"user '%s' incorrect password",
client->startup.user);
od_frontend_error(client, SO_ERROR_INVALID_PASSWORD,
@ -218,7 +218,7 @@ int od_auth_frontend(od_client_t *client)
/* try to use default user */
user_scheme = instance->scheme.users_default;
if (user_scheme == NULL) {
od_error_client(&instance->log, client->id, "auth"
od_error_client(&instance->log, &client->id, "auth"
"user '%s' not found",
so_parameter_value(client->startup.user));
od_frontend_error(client, SO_ERROR_INVALID_AUTHORIZATION_SPECIFICATION,
@ -230,7 +230,7 @@ int od_auth_frontend(od_client_t *client)
/* is user access denied */
if (user_scheme->is_deny) {
od_log_client(&instance->log, client->id, "auth",
od_log_client(&instance->log, &client->id, "auth",
"user '%s' access denied",
so_parameter_value(client->startup.user));
od_frontend_error(client, SO_ERROR_INVALID_AUTHORIZATION_SPECIFICATION,
@ -266,7 +266,7 @@ int od_auth_frontend(od_client_t *client)
return -1;
rc = od_write(client->io, stream);
if (rc == -1) {
od_error_client(&instance->log, client->id, "auth",
od_error_client(&instance->log, &client->id, "auth",
"write error: %s",
machine_error(client->io));
return -1;
@ -281,12 +281,12 @@ od_auth_backend_cleartext(od_server_t *server)
od_route_t *route = server->route;
assert(route != NULL);
od_debug_server(&instance->log, server->id, "auth",
od_debug_server(&instance->log, &server->id, "auth",
"requested clear-text authentication");
/* validate route scheme */
if (route->scheme->password == NULL) {
od_error_server(&instance->log, server->id, "auth"
od_error_server(&instance->log, &server->id, "auth"
"password required for route '%s'",
route->scheme->target);
return -1;
@ -300,13 +300,13 @@ od_auth_backend_cleartext(od_server_t *server)
route->scheme->password,
route->scheme->password_len + 1);
if (rc == -1) {
od_error_server(&instance->log, server->id, "auth",
od_error_server(&instance->log, &server->id, "auth",
"memory allocation error");
return -1;
}
rc = od_write(server->io, stream);
if (rc == -1) {
od_error_server(&instance->log, server->id, "auth",
od_error_server(&instance->log, &server->id, "auth",
"write error: %s",
machine_error(server->io));
return -1;
@ -321,13 +321,13 @@ od_auth_backend_md5(od_server_t *server, uint8_t salt[4])
od_route_t *route = server->route;
assert(route != NULL);
od_debug_server(&instance->log, server->id, "auth",
od_debug_server(&instance->log, &server->id, "auth",
"requested md5 authentication");
/* validate route scheme */
if (route->scheme->user == NULL ||
route->scheme->password == NULL) {
od_error_server(&instance->log, server->id, "auth",
od_error_server(&instance->log, &server->id, "auth",
"user and password required for route '%s'",
route->scheme->target);
return -1;
@ -344,7 +344,7 @@ od_auth_backend_md5(od_server_t *server, uint8_t salt[4])
route->scheme->password_len,
(uint8_t*)salt);
if (rc == -1) {
od_error_server(&instance->log, server->id, "auth",
od_error_server(&instance->log, &server->id, "auth",
"memory allocation error");
so_password_free(&client_password);
return -1;
@ -358,13 +358,13 @@ od_auth_backend_md5(od_server_t *server, uint8_t salt[4])
client_password.password_len);
so_password_free(&client_password);
if (rc == -1) {
od_error_server(&instance->log, server->id, "auth",
od_error_server(&instance->log, &server->id, "auth",
"memory allocation error");
return -1;
}
rc = od_write(server->io, stream);
if (rc == -1) {
od_error_server(&instance->log, server->id, "auth",
od_error_server(&instance->log, &server->id, "auth",
"write error: %s",
machine_error(server->io));
return -1;
@ -385,7 +385,7 @@ int od_auth_backend(od_server_t *server)
rc = so_feread_auth(&auth_type, salt, stream->s,
so_stream_used(stream));
if (rc == -1) {
od_error_server(&instance->log, server->id, "auth",
od_error_server(&instance->log, &server->id, "auth",
"failed to parse authentication message");
return -1;
}
@ -407,7 +407,7 @@ int od_auth_backend(od_server_t *server)
break;
/* unsupported */
default:
od_error_server(&instance->log, server->id, "auth",
od_error_server(&instance->log, &server->id, "auth",
"unsupported authentication method");
return -1;
}
@ -418,25 +418,25 @@ int od_auth_backend(od_server_t *server)
so_stream_reset(stream);
rc = od_read(server->io, &server->stream, UINT32_MAX);
if (rc == -1) {
od_error_server(&instance->log, server->id, "auth",
od_error_server(&instance->log, &server->id, "auth",
"read error: %s",
machine_error(server->io));
return -1;
}
char type = *server->stream.s;
od_debug_server(&instance->log, server->id, "auth",
od_debug_server(&instance->log, &server->id, "auth",
"%c", type);
switch (type) {
case 'R':
rc = so_feread_auth(&auth_type, salt, stream->s,
so_stream_used(stream));
if (rc == -1) {
od_error_server(&instance->log, server->id, "auth",
od_error_server(&instance->log, &server->id, "auth",
"failed to parse authentication message");
return -1;
}
if (auth_type != 0) {
od_error_server(&instance->log, server->id, "auth",
od_error_server(&instance->log, &server->id, "auth",
"incorrect authentication flow");
return 0;
}

View File

@ -20,13 +20,13 @@
#include "od_version.h"
#include "od_list.h"
#include "od_pid.h"
#include "od_id.h"
#include "od_syslog.h"
#include "od_log.h"
#include "od_daemon.h"
#include "od_scheme.h"
#include "od_lex.h"
#include "od_config.h"
#include "od_id.h"
#include "od_msg.h"
#include "od_system.h"
#include "od_instance.h"
@ -90,11 +90,11 @@ void od_backend_error(od_server_t *server, char *state, uint8_t *data, int size)
int rc;
rc = so_feread_error(&error, data, size);
if (rc == -1) {
od_error_server(&instance->log, server->id, state,
od_error_server(&instance->log, &server->id, state,
"failed to parse error message from server");
return;
}
od_error_server(&instance->log, server->id, state,
od_error_server(&instance->log, &server->id, state,
"%s %s %s",
error.severity,
error.code,
@ -134,14 +134,14 @@ od_backend_ready_wait(od_server_t *server, char *context, int time_ms)
rc = od_read(server->io, stream, time_ms);
if (rc == -1) {
if (! machine_timedout()) {
od_error_server(&instance->log, server->id, context,
od_error_server(&instance->log, &server->id, context,
"read error: %s",
machine_error(server->io));
}
return -1;
}
uint8_t type = stream->s[rc];
od_debug_server(&instance->log, server->id, context,
od_debug_server(&instance->log, &server->id, context,
"%c", type);
/* ErrorResponse */
if (type == 'E') {
@ -176,7 +176,7 @@ od_backend_startup(od_server_t *server)
return -1;
rc = od_write(server->io, stream);
if (rc == -1) {
od_error_server(&instance->log, server->id, "startup",
od_error_server(&instance->log, &server->id, "startup",
"write error: %s",
machine_error(server->io));
return -1;
@ -188,13 +188,13 @@ od_backend_startup(od_server_t *server)
int rc;
rc = od_read(server->io, &server->stream, UINT32_MAX);
if (rc == -1) {
od_error_server(&instance->log, server->id, "startup",
od_error_server(&instance->log, &server->id, "startup",
"read error: %s",
machine_error(server->io));
return -1;
}
uint8_t type = *server->stream.s;
od_debug_server(&instance->log, server->id, "startup",
od_debug_server(&instance->log, &server->id, "startup",
"%c", type);
switch (type) {
@ -213,7 +213,7 @@ od_backend_startup(od_server_t *server)
rc = so_feread_key(&server->key,
stream->s, so_stream_used(stream));
if (rc == -1) {
od_error_server(&instance->log, server->id, "startup",
od_error_server(&instance->log, &server->id, "startup",
"failed to parse BackendKeyData message");
return -1;
}
@ -230,7 +230,7 @@ od_backend_startup(od_server_t *server)
so_stream_used(stream));
return -1;
default:
od_debug_server(&instance->log, server->id, "startup",
od_debug_server(&instance->log, &server->id, "startup",
"unknown packet: %c", type);
return -1;
}
@ -258,7 +258,7 @@ od_backend_connect_to(od_server_t *server,
int rc;
rc = machine_set_readahead(server->io, instance->scheme.readahead);
if (rc == -1) {
od_error_server(&instance->log, server->id, context,
od_error_server(&instance->log, &server->id, context,
"failed to set readahead");
return -1;
}
@ -276,7 +276,7 @@ od_backend_connect_to(od_server_t *server,
struct addrinfo *ai = NULL;
rc = machine_getaddrinfo(server_scheme->host, port, NULL, &ai, 0);
if (rc != 0) {
od_error_server(&instance->log, server->id, context,
od_error_server(&instance->log, &server->id, context,
"failed to resolve %s:%d",
server_scheme->host,
server_scheme->port);
@ -288,7 +288,7 @@ od_backend_connect_to(od_server_t *server,
rc = machine_connect(server->io, ai->ai_addr, UINT32_MAX);
freeaddrinfo(ai);
if (rc == -1) {
od_error_server(&instance->log, server->id, context,
od_error_server(&instance->log, &server->id, context,
"failed to connect to %s:%d",
server_scheme->host,
server_scheme->port);
@ -320,7 +320,7 @@ int od_backend_connect(od_server_t *server)
if (rc == -1)
return -1;
od_log_server(&instance->log, server->id, NULL,
od_log_server(&instance->log, &server->id, NULL,
"new server connection %s:%d",
server_scheme->host,
server_scheme->port);
@ -367,7 +367,7 @@ od_backend_query(od_server_t *server, char *context, char *query, int len)
return -1;
rc = od_write(server->io, stream);
if (rc == -1) {
od_error_server(&instance->log, server->id, context,
od_error_server(&instance->log, &server->id, context,
"write error: %s",
machine_error(server->io));
return -1;
@ -401,7 +401,7 @@ int od_backend_configure(od_server_t *server, so_bestartup_t *startup)
}
if (size == 0)
return 0;
od_debug_server(&instance->log, server->id, "configure",
od_debug_server(&instance->log, &server->id, "configure",
"%s", query_configure);
int rc;
rc = od_backend_query(server, "configure", query_configure,
@ -416,7 +416,7 @@ int od_backend_reset(od_server_t *server)
/* server left in copy mode */
if (server->is_copy) {
od_debug_server(&instance->log, server->id, "reset",
od_debug_server(&instance->log, &server->id, "reset",
"in copy, closing");
goto drop;
}
@ -424,7 +424,7 @@ int od_backend_reset(od_server_t *server)
/* support route rollback off */
if (! route->scheme->rollback) {
if (server->is_transaction) {
od_debug_server(&instance->log, server->id, "reset",
od_debug_server(&instance->log, &server->id, "reset",
"in active transaction, closing");
goto drop;
}
@ -433,7 +433,7 @@ int od_backend_reset(od_server_t *server)
/* support route cancel off */
if (! route->scheme->cancel) {
if (! od_server_is_sync(server)) {
od_debug_server(&instance->log, server->id, "reset",
od_debug_server(&instance->log, &server->id, "reset",
"not synchronized, closing");
goto drop;
}
@ -464,7 +464,7 @@ int od_backend_reset(od_server_t *server)
int rc = 0;
for (;;) {
while (! od_server_is_sync(server)) {
od_debug_server(&instance->log, server->id, "reset",
od_debug_server(&instance->log, &server->id, "reset",
"not synchronized, wait for %d msec (#%d)",
wait_timeout,
wait_try);
@ -477,17 +477,17 @@ int od_backend_reset(od_server_t *server)
if (! machine_timedout())
goto error;
if (wait_try_cancel == wait_cancel_limit) {
od_debug_server(&instance->log, server->id, "reset",
od_debug_server(&instance->log, &server->id, "reset",
"server cancel limit reached, closing");
goto error;
}
od_debug_server(&instance->log, server->id, "reset",
od_debug_server(&instance->log, &server->id, "reset",
"not responded, cancel (#%d)",
wait_try_cancel);
wait_try_cancel++;
rc = od_cancel(server->system,
route->scheme->server, &server->key,
server->id);
&server->id);
if (rc == -1)
goto error;
continue;
@ -495,7 +495,7 @@ int od_backend_reset(od_server_t *server)
assert(od_server_is_sync(server));
break;
}
od_debug_server(&instance->log, server->id, "reset",
od_debug_server(&instance->log, &server->id, "reset",
"synchronized");
/* send rollback in case server has an active
@ -523,7 +523,7 @@ int od_backend_discard(od_server_t *server)
{
od_instance_t *instance = server->system->instance;
char query_discard[] = "DISCARD ALL";
od_debug_server(&instance->log, server->id, "discard",
od_debug_server(&instance->log, &server->id, "discard",
"%s", query_discard);
return od_backend_query(server, "reset", query_discard,
sizeof(query_discard));

View File

@ -20,13 +20,13 @@
#include "od_version.h"
#include "od_list.h"
#include "od_pid.h"
#include "od_id.h"
#include "od_syslog.h"
#include "od_log.h"
#include "od_daemon.h"
#include "od_scheme.h"
#include "od_lex.h"
#include "od_config.h"
#include "od_id.h"
#include "od_msg.h"
#include "od_system.h"
#include "od_instance.h"
@ -52,11 +52,12 @@
int od_cancel(od_system_t *system,
od_schemeserver_t *server_scheme,
so_key_t *key,
uint64_t server_id)
od_id_t *server_id)
{
od_instance_t *instance = system->instance;
od_log_server(&instance->log, 0, "cancel",
"new cancel for S%" PRIu64, server_id);
"new cancel for s%.*s", sizeof(server_id->id),
server_id->id);
od_server_t server;
od_server_init(&server);
server.system = system;
@ -84,5 +85,5 @@ int od_cancel_match(od_system_t *system,
od_route_t *route = server->route;
od_schemeserver_t *server_scheme = route->scheme->server;
so_key_t cancel_key = server->key;
return od_cancel(system, server_scheme, &cancel_key, server->id);
return od_cancel(system, server_scheme, &cancel_key, &server->id);
}

View File

@ -7,7 +7,7 @@
* PostgreSQL connection pooler and request router.
*/
int od_cancel(od_system_t*, od_schemeserver_t*, so_key_t*, uint64_t);
int od_cancel(od_system_t*, od_schemeserver_t*, so_key_t*, od_id_t*);
int od_cancel_match(od_system_t*, od_routepool_t*, so_key_t*);
#endif

View File

@ -20,7 +20,7 @@ typedef enum
struct od_client
{
od_clientstate_t state;
uint64_t id;
od_id_t id;
uint64_t coroutine_id;
uint64_t coroutine_attacher_id;
machine_io_t *io;
@ -39,7 +39,6 @@ static inline void
od_client_init(od_client_t *client)
{
client->state = OD_CUNDEF;
client->id = 0;
client->coroutine_id = 0;
client->coroutine_attacher_id = 0;
client->io = NULL;

View File

@ -19,12 +19,12 @@
#include "od_macro.h"
#include "od_list.h"
#include "od_pid.h"
#include "od_id.h"
#include "od_syslog.h"
#include "od_log.h"
#include "od_scheme.h"
#include "od_lex.h"
#include "od_config.h"
#include "od_id.h"
#include "od_system.h"
#include "od_server.h"
#include "od_server_pool.h"

View File

@ -22,6 +22,7 @@
#include "od_macro.h"
#include "od_list.h"
#include "od_pid.h"
#include "od_id.h"
#include "od_syslog.h"
#include "od_log.h"
#include "od_scheme.h"

View File

@ -20,13 +20,13 @@
#include "od_version.h"
#include "od_list.h"
#include "od_pid.h"
#include "od_id.h"
#include "od_syslog.h"
#include "od_log.h"
#include "od_daemon.h"
#include "od_scheme.h"
#include "od_lex.h"
#include "od_config.h"
#include "od_id.h"
#include "od_msg.h"
#include "od_system.h"
#include "od_instance.h"
@ -95,7 +95,7 @@ od_frontend_startup_read(od_client_t *client)
if (to_read == 0)
break;
if (to_read == -1) {
od_error_client(&instance->log, client->id, "startup",
od_error_client(&instance->log, &client->id, "startup",
"failed to read startup packet, closing");
return -1;
}
@ -104,7 +104,7 @@ od_frontend_startup_read(od_client_t *client)
return -1;
rc = machine_read(client->io, (char*)stream->p, to_read, UINT32_MAX);
if (rc == -1) {
od_error_client(&instance->log, client->id, "startup",
od_error_client(&instance->log, &client->id, "startup",
"read error: %s",
machine_error(client->io));
return -1;
@ -155,7 +155,7 @@ od_frontend_startup(od_client_t *client)
return 0;
error:
od_error_client(&instance->log, client->id, "startup",
od_error_client(&instance->log, &client->id, "startup",
"incorrect startup packet");
od_frontend_error(client, SO_ERROR_PROTOCOL_VIOLATION,
"bad startup packet");
@ -165,7 +165,7 @@ error:
static inline void
od_frontend_key(od_client_t *client)
{
client->key.key_pid = client->id;
client->key.key_pid = *(uint32_t*)client->id.id;
client->key.key = 1 + rand();
}
@ -186,7 +186,7 @@ od_frontend_setup(od_client_t *client)
return -1;
rc = od_write(client->io, stream);
if (rc == -1) {
od_error_client(&instance->log, client->id, "setup",
od_error_client(&instance->log, &client->id, "setup",
"write error: %s",
machine_error(client->io));
return -1;
@ -207,7 +207,7 @@ od_frontend_ready(od_client_t *client)
return -1;
rc = od_write(client->io, stream);
if (rc == -1) {
od_error_client(&instance->log, client->id,
od_error_client(&instance->log, &client->id,
"write error: %s",
machine_error(client->io));
return -1;
@ -244,7 +244,7 @@ od_frontend_copy_in(od_client_t *client)
if (rc == -1)
return OD_RS_ECLIENT_READ;
type = *stream->s;
od_debug_client(&instance->log, client->id, "copy",
od_debug_client(&instance->log, &client->id, "copy",
"%c", *stream->s);
rc = od_write(server->io, stream);
if (rc == -1)
@ -275,7 +275,7 @@ od_frontend_main(od_client_t *client)
if (rc == -1)
return OD_RS_ECLIENT_READ;
int type = stream->s[rc];
od_debug_client(&instance->log, client->id, NULL,
od_debug_client(&instance->log, &client->id, NULL,
"%c", type);
/* Terminate (client graceful shutdown) */
@ -290,17 +290,19 @@ od_frontend_main(od_client_t *client)
if (status != OD_ROK)
return OD_RS_EATTACH;
server = client->server;
od_debug_client(&instance->log, client->id, NULL,
"attached to S%" PRIu64,
server->id);
od_debug_client(&instance->log, &client->id, NULL,
"attached to s%.*s",
sizeof(server->id.id),
server->id.id);
/* configure server using client startup parameters,
* if it has not been configured before. */
if (server->last_client_id == client->id) {
if (od_idmgr_cmp(&server->last_client_id, &client->id)) {
assert(server->io != NULL);
od_debug_client(&instance->log, client->id, NULL,
"previously owned, no need to reconfigure S%" PRIu64,
server->id);
od_debug_client(&instance->log, &client->id, NULL,
"previously owned, no need to reconfigure s%.*s",
sizeof(server->id.id),
server->id.id);
} else {
od_route_t *route = client->route;
@ -310,8 +312,6 @@ od_frontend_main(od_client_t *client)
if (rc == -1)
return OD_RS_ESERVER_CONNECT;
} else {
assert(server->last_client_id != UINT64_MAX);
/* discard last server configuration */
if (route->scheme->discard) {
rc = od_backend_discard(client->server);
@ -348,12 +348,12 @@ od_frontend_main(od_client_t *client)
return OD_RS_ESERVER_READ;
if (machine_connected(client->io))
continue;
od_debug_server(&instance->log, server->id, "watchdog",
od_debug_server(&instance->log, &server->id, "watchdog",
"client disconnected");
return OD_RS_ECLIENT_READ;
}
type = stream->s[rc];
od_debug_server(&instance->log, server->id, NULL,
od_debug_server(&instance->log, &server->id, NULL,
"%c", type);
/* ErrorResponse */
@ -432,7 +432,7 @@ void od_frontend(void *arg)
char peer[128];
od_getpeername(client->io, peer, sizeof(peer));
od_log_client(&instance->log, client->id, NULL,
od_log_client(&instance->log, &client->id, NULL,
"new client connection %s",
peer);
@ -440,7 +440,7 @@ void od_frontend(void *arg)
int rc;
rc = machine_io_attach(client->io);
if (rc == -1) {
od_error_client(&instance->log, client->id, NULL,
od_error_client(&instance->log, &client->id, NULL,
"failed to transfer client io");
machine_close(client->io);
od_client_free(client);
@ -456,7 +456,7 @@ void od_frontend(void *arg)
/* client cancel request */
if (client->startup.is_cancel) {
od_debug_client(&instance->log, client->id, NULL,
od_debug_client(&instance->log, &client->id, NULL,
"cancel request");
od_router_cancel(client);
od_frontend_close(client);
@ -499,14 +499,14 @@ void od_frontend(void *arg)
status = od_route(client);
switch (status) {
case OD_RERROR:
od_error_client(&instance->log, client->id, NULL,
od_error_client(&instance->log, &client->id, NULL,
"routing failed, closing");
od_frontend_error(client, SO_ERROR_SYSTEM_ERROR,
"client routing failed");
od_frontend_close(client);
return;
case OD_RERROR_NOT_FOUND:
od_error_client(&instance->log, client->id, NULL,
od_error_client(&instance->log, &client->id, NULL,
"database route '%s' is not declared, closing",
so_parameter_value(client->startup.database));
od_frontend_error(client, SO_ERROR_UNDEFINED_DATABASE,
@ -514,14 +514,14 @@ void od_frontend(void *arg)
od_frontend_close(client);
return;
case OD_RERROR_TIMEDOUT:
od_error_client(&instance->log, client->id, NULL,
od_error_client(&instance->log, &client->id, NULL,
"route connection timedout, closing");
od_frontend_error(client, SO_ERROR_TOO_MANY_CONNECTIONS,
"connection timedout");
od_frontend_close(client);
return;
case OD_RERROR_LIMIT:
od_error_client(&instance->log, client->id, NULL,
od_error_client(&instance->log, &client->id, NULL,
"route connection limit reached, closing");
od_frontend_error(client, SO_ERROR_TOO_MANY_CONNECTIONS,
"too many connections");
@ -529,7 +529,7 @@ void od_frontend(void *arg)
return;
case OD_ROK:;
od_route_t *route = client->route;
od_debug_client(&instance->log, client->id, NULL,
od_debug_client(&instance->log, &client->id, NULL,
"route to '%s' (using '%s' server)",
route->scheme->target,
route->scheme->server->name);
@ -546,15 +546,16 @@ void od_frontend(void *arg)
assert(server == NULL);
assert(client->route != NULL);
od_frontend_error(client, SO_ERROR_CONNECTION_FAILURE,
"C%d: failed to get remote server connection",
client->id);
"c%.*s: failed to get remote server connection",
sizeof(client->id.id),
client->id.id);
/* detach client from route */
od_unroute(client);
break;
case OD_RS_OK:
/* graceful disconnect */
od_log_client(&instance->log, client->id, NULL,
od_log_client(&instance->log, &client->id, NULL,
"disconnected");
if (! client->server) {
od_unroute(client);
@ -574,7 +575,7 @@ void od_frontend(void *arg)
case OD_RS_ECLIENT_WRITE:
/* close client connection and reuse server
* link in case of client errors */
od_log_client(&instance->log, client->id, NULL,
od_log_client(&instance->log, &client->id, NULL,
"disconnected (read/write error): %s",
machine_error(client->io));
if (! client->server) {
@ -594,18 +595,20 @@ void od_frontend(void *arg)
case OD_RS_ESERVER_CONNECT:
/* server attached to client and connection failed */
od_frontend_error(client, SO_ERROR_CONNECTION_FAILURE,
"S%d: failed to connect to remote server",
server->id);
"s%.*s: failed to connect to remote server",
sizeof(server->id.id),
server->id.id);
/* close backend connection */
od_router_close_and_unroute(client);
break;
case OD_RS_ESERVER_CONFIGURE:
od_log_server(&instance->log, server->id, NULL,
od_log_server(&instance->log, &server->id, NULL,
"disconnected (server configure error)");
od_frontend_error(client, SO_ERROR_CONNECTION_FAILURE,
"S%d: failed to configure remote server",
server->id);
"s%.*s: failed to configure remote server",
sizeof(server->id.id),
server->id.id);
/* close backend connection */
od_router_close_and_unroute(client);
break;
@ -614,12 +617,13 @@ void od_frontend(void *arg)
case OD_RS_ESERVER_WRITE:
/* close client connection and close server
* connection in case of server errors */
od_log_server(&instance->log, server->id, NULL,
od_log_server(&instance->log, &server->id, NULL,
"disconnected (read/write error): %s",
machine_error(server->io));
od_frontend_error(client, SO_ERROR_CONNECTION_FAILURE,
"S%d: remote server read/write error",
server->id);
"s%.*s: remote server read/write error",
sizeof(server->id.id),
server->id.id);
/* close backend connection */
od_router_close_and_unroute(client);
break;

View File

@ -20,13 +20,13 @@
#include "od_version.h"
#include "od_list.h"
#include "od_pid.h"
#include "od_id.h"
#include "od_syslog.h"
#include "od_log.h"
#include "od_daemon.h"
#include "od_scheme.h"
#include "od_lex.h"
#include "od_config.h"
#include "od_id.h"
#include "od_msg.h"
#include "od_system.h"
#include "od_instance.h"
@ -120,7 +120,7 @@ int od_instance_main(od_instance_t *instance, int argc, char **argv)
if (instance->scheme.log_file) {
rc = od_log_open(&instance->log, instance->scheme.log_file);
if (rc == -1) {
od_error(&instance->log, "failed to open log file '%s'",
od_error(&instance->log, NULL, "failed to open log file '%s'",
instance->scheme.log_file);
return 1;
}
@ -148,7 +148,10 @@ int od_instance_main(od_instance_t *instance, int argc, char **argv)
if (instance->scheme.pid_file)
od_pid_create(&instance->pid, instance->scheme.pid_file);
/* seed id manager */
od_idmgr_seed(&instance->id_mgr, &instance->log);
rc = od_idmgr_seed(&instance->id_mgr);
if (rc == -1)
od_error(&instance->log, NULL, "failed to open random source device");
/* run system services */
od_router_t router;
od_periodic_t periodic;
@ -167,7 +170,6 @@ int od_instance_main(od_instance_t *instance, int argc, char **argv)
rc = od_relaypool_init(&relay_pool, &system, instance->scheme.workers);
if (rc == -1)
return 1;
/* start pooler machine thread */
rc = od_pooler_start(&pooler);
if (rc == -1)

View File

@ -20,6 +20,7 @@
#include "od_macro.h"
#include "od_pid.h"
#include "od_id.h"
#include "od_syslog.h"
#include "od_log.h"
#include "od_io.h"

View File

@ -24,6 +24,7 @@
#include "od_macro.h"
#include "od_pid.h"
#include "od_id.h"
#include "od_syslog.h"
#include "od_log.h"
@ -57,7 +58,7 @@ int od_log_close(od_log_t *log)
int od_logv(od_log_t *log, od_syslogprio_t prio,
char *ident,
char *object,
uint64_t object_id,
od_id_t *id,
char *context,
char *fmt, va_list args)
{
@ -79,8 +80,8 @@ int od_logv(od_log_t *log, od_syslogprio_t prio,
/* object and id */
if (object) {
len += snprintf(buffer + len, sizeof(buffer) - len, "%s%" PRIu64": ",
object, object_id);
len += snprintf(buffer + len, sizeof(buffer) - len, "%s%.*s: ",
object, (int)sizeof(id->id), id->id);
}
/* context */

View File

@ -21,7 +21,7 @@ int od_log_init(od_log_t*, od_pid_t*, od_syslog_t*);
int od_log_open(od_log_t*, char*);
int od_log_close(od_log_t*);
int od_logv(od_log_t*, od_syslogprio_t, char*,
char*, uint64_t, char*,
char*, od_id_t*, char*,
char*, va_list);
static inline void
@ -53,33 +53,33 @@ od_error(od_log_t *log, char *context, char *fmt, ...)
/* client */
static inline int
od_log_client(od_log_t *log, uint64_t id, char *context, char *fmt, ...)
od_log_client(od_log_t *log, od_id_t *id, char *context, char *fmt, ...)
{
va_list args;
va_start(args, fmt);
int rc = od_logv(log, OD_SYSLOG_INFO, NULL, "C", id, context, fmt, args);
int rc = od_logv(log, OD_SYSLOG_INFO, NULL, "c", id, context, fmt, args);
va_end(args);
return rc;
}
static inline int
od_debug_client(od_log_t *log, uint64_t id, char *context, char *fmt, ...)
od_debug_client(od_log_t *log, od_id_t *id, char *context, char *fmt, ...)
{
if (! log->debug)
return 0;
va_list args;
va_start(args, fmt);
int rc = od_logv(log, OD_SYSLOG_INFO, "debug:", "C", id, context, fmt, args);
int rc = od_logv(log, OD_SYSLOG_INFO, "debug:", "c", id, context, fmt, args);
va_end(args);
return rc;
}
static inline int
od_error_client(od_log_t *log, uint64_t id, char *context, char *fmt, ...)
od_error_client(od_log_t *log, od_id_t *id, char *context, char *fmt, ...)
{
va_list args;
va_start(args, fmt);
int rc = od_logv(log, OD_SYSLOG_ERROR, "error:", "C", id, context, fmt, args);
int rc = od_logv(log, OD_SYSLOG_ERROR, "error:", "c", id, context, fmt, args);
va_end(args);
return rc;
}
@ -87,33 +87,33 @@ od_error_client(od_log_t *log, uint64_t id, char *context, char *fmt, ...)
/* server */
static inline int
od_log_server(od_log_t *log, uint64_t id, char *context, char *fmt, ...)
od_log_server(od_log_t *log, od_id_t *id, char *context, char *fmt, ...)
{
va_list args;
va_start(args, fmt);
int rc = od_logv(log, OD_SYSLOG_INFO, NULL, "S", id, context, fmt, args);
int rc = od_logv(log, OD_SYSLOG_INFO, NULL, "s", id, context, fmt, args);
va_end(args);
return rc;
}
static inline int
od_debug_server(od_log_t *log, uint64_t id, char *context, char *fmt, ...)
od_debug_server(od_log_t *log, od_id_t *id, char *context, char *fmt, ...)
{
if (! log->debug)
return 0;
va_list args;
va_start(args, fmt);
int rc = od_logv(log, OD_SYSLOG_INFO, "debug:", "S", id, context, fmt, args);
int rc = od_logv(log, OD_SYSLOG_INFO, "debug:", "s", id, context, fmt, args);
va_end(args);
return rc;
}
static inline int
od_error_server(od_log_t *log, uint64_t id, char *context, char *fmt, ...)
od_error_server(od_log_t *log, od_id_t *id, char *context, char *fmt, ...)
{
va_list args;
va_start(args, fmt);
int rc = od_logv(log, OD_SYSLOG_ERROR, "error:", "S", id, context, fmt, args);
int rc = od_logv(log, OD_SYSLOG_ERROR, "error:", "s", id, context, fmt, args);
va_end(args);
return rc;
}

View File

@ -20,13 +20,13 @@
#include "od_version.h"
#include "od_list.h"
#include "od_pid.h"
#include "od_id.h"
#include "od_syslog.h"
#include "od_log.h"
#include "od_daemon.h"
#include "od_scheme.h"
#include "od_lex.h"
#include "od_config.h"
#include "od_id.h"
#include "od_msg.h"
#include "od_system.h"
#include "od_instance.h"
@ -88,7 +88,7 @@ od_expire_mark(od_server_t *server, void *arg)
*/
if (! route->scheme->ttl)
return 0;
od_debug_server(&instance->log, server->id, "expire",
od_debug_server(&instance->log, &server->id, "expire",
"idle time: %d",
server->idle_time);
if (server->idle_time < route->scheme->ttl) {
@ -149,7 +149,7 @@ od_periodic(void *arg)
server = od_routepool_next(&router->route_pool, OD_SEXPIRE);
if (server == NULL)
break;
od_debug_server(&instance->log, server->id, "expire",
od_debug_server(&instance->log, &server->id, "expire",
"closing idle connection (%d secs)",
server->idle_time);
server->idle_time = 0;

View File

@ -22,13 +22,13 @@
#include "od_version.h"
#include "od_list.h"
#include "od_pid.h"
#include "od_id.h"
#include "od_syslog.h"
#include "od_log.h"
#include "od_daemon.h"
#include "od_scheme.h"
#include "od_lex.h"
#include "od_config.h"
#include "od_id.h"
#include "od_msg.h"
#include "od_system.h"
#include "od_instance.h"
@ -52,7 +52,6 @@ static inline void
od_pooler_server(void *arg)
{
od_poolerserver_t *server = arg;
od_pooler_t *pooler = server->system->pooler;
od_instance_t *instance = server->system->instance;
od_relaypool_t *relay_pool = server->system->relay_pool;
@ -127,7 +126,7 @@ od_pooler_server(void *arg)
machine_io_free(client_io);
continue;
}
client->id = pooler->client_seq++;
od_idmgr_generate(&instance->id_mgr, &client->id);
client->io = client_io;
/* create new client event and pass it to worker pool */
@ -273,7 +272,6 @@ int od_pooler_init(od_pooler_t *pooler, od_system_t *system)
od_instance_t *instance = system->instance;
pooler->machine = -1;
pooler->client_seq = 0;
pooler->system = system;
pooler->addr = NULL;
pooler->tls = NULL;

View File

@ -20,7 +20,6 @@ struct od_pooler
{
int64_t machine;
machine_tls_t *tls;
uint64_t client_seq;
struct addrinfo *addr;
od_system_t *system;
};

View File

@ -20,13 +20,13 @@
#include "od_version.h"
#include "od_list.h"
#include "od_pid.h"
#include "od_id.h"
#include "od_syslog.h"
#include "od_log.h"
#include "od_daemon.h"
#include "od_scheme.h"
#include "od_lex.h"
#include "od_config.h"
#include "od_id.h"
#include "od_msg.h"
#include "od_system.h"
#include "od_instance.h"
@ -68,7 +68,7 @@ od_relay(void *arg)
int64_t coroutine_id;
coroutine_id = machine_coroutine_create(od_frontend, client);
if (coroutine_id == -1) {
od_error_client(&instance->log, client->id, "relay",
od_error_client(&instance->log, &client->id, "relay",
"failed to create coroutine");
machine_close(client->io);
od_client_free(client);

View File

@ -20,13 +20,13 @@
#include "od_version.h"
#include "od_list.h"
#include "od_pid.h"
#include "od_id.h"
#include "od_syslog.h"
#include "od_log.h"
#include "od_daemon.h"
#include "od_scheme.h"
#include "od_lex.h"
#include "od_config.h"
#include "od_id.h"
#include "od_msg.h"
#include "od_system.h"
#include "od_instance.h"

View File

@ -19,12 +19,12 @@
#include "od_macro.h"
#include "od_list.h"
#include "od_pid.h"
#include "od_id.h"
#include "od_syslog.h"
#include "od_log.h"
#include "od_scheme.h"
#include "od_lex.h"
#include "od_config.h"
#include "od_id.h"
#include "od_system.h"
#include "od_server.h"
#include "od_server_pool.h"

View File

@ -20,13 +20,13 @@
#include "od_version.h"
#include "od_list.h"
#include "od_pid.h"
#include "od_id.h"
#include "od_syslog.h"
#include "od_log.h"
#include "od_daemon.h"
#include "od_scheme.h"
#include "od_lex.h"
#include "od_config.h"
#include "od_id.h"
#include "od_msg.h"
#include "od_system.h"
#include "od_instance.h"
@ -145,7 +145,7 @@ od_router_attacher(void *arg)
* The condition triggered when a server connection
* put into idle state by DETACH events.
*/
od_debug_client(&instance->log, client->id, "router",
od_debug_client(&instance->log, &client->id, "router",
"route '%s' pool limit reached (%d), waiting",
route->scheme->target,
route->scheme->pool_size);
@ -157,7 +157,7 @@ od_router_attacher(void *arg)
rc = machine_condition(route->scheme->pool_timeout);
if (rc == -1) {
od_clientpool_set(&route->client_pool, client, OD_CPENDING);
od_debug_client(&instance->log, client->id, "router",
od_debug_client(&instance->log, &client->id, "router",
"server pool wait timedout, closing");
msg_attach->status = OD_RERROR_TIMEDOUT;
machine_queue_put(msg_attach->response, msg);
@ -166,20 +166,19 @@ od_router_attacher(void *arg)
assert(client->state == OD_CPENDING);
/* retry */
od_debug_client(&instance->log, client->id, "router",
od_debug_client(&instance->log, &client->id, "router",
"server pool attach retry");
continue;
}
/* create new server object */
uint64_t id = router->server_seq++;
server = od_server_allocate();
if (server == NULL) {
msg_attach->status = OD_RERROR;
machine_queue_put(msg_attach->response, msg);
return;
}
server->id = id;
od_idmgr_generate(&instance->id_mgr, &server->id);
server->system = router->system;
server->route = route;
@ -209,7 +208,7 @@ od_router_wakeup(od_router_t *router, od_route_t *route)
assert(rc == 0);
(void)rc;
od_clientpool_set(&route->client_pool, waiter, OD_CPENDING);
od_debug_client(&instance->log, waiter->id, "router",
od_debug_client(&instance->log, &waiter->id, "router",
"server released, waking up");
}
}
@ -428,7 +427,6 @@ int od_router_init(od_router_t *router, od_system_t *system)
od_instance_t *instance = system->instance;
od_routepool_init(&router->route_pool);
router->system = system;
router->server_seq = 0;
router->clients = 0;
router->queue = machine_queue_create();
if (router->queue == NULL) {

View File

@ -22,7 +22,6 @@ struct od_router
{
od_routepool_t route_pool;
machine_queue_t *queue;
uint64_t server_seq;
int clients;
od_system_t *system;
};

View File

@ -16,6 +16,7 @@
#include "od_macro.h"
#include "od_list.h"
#include "od_pid.h"
#include "od_id.h"
#include "od_syslog.h"
#include "od_log.h"
#include "od_scheme.h"

View File

@ -20,7 +20,7 @@ typedef enum
struct od_server
{
od_serverstate_t state;
uint64_t id;
od_id_t id;
so_stream_t stream;
machine_io_t *io;
machine_tls_t *tls;
@ -32,7 +32,7 @@ struct od_server
int idle_time;
so_key_t key;
so_key_t key_client;
uint64_t last_client_id;
od_id_t last_client_id;
void *route;
od_system_t *system;
od_list_t link;
@ -47,7 +47,6 @@ static inline void
od_server_init(od_server_t *server)
{
server->state = OD_SUNDEF;
server->id = 0;
server->route = NULL;
server->system = NULL;
server->io = NULL;
@ -56,13 +55,14 @@ od_server_init(od_server_t *server)
server->is_allocated = 0;
server->is_transaction = 0;
server->is_copy = 0;
server->last_client_id = UINT64_MAX;
server->count_request = 0;
server->count_reply = 0;
so_keyinit(&server->key);
so_keyinit(&server->key_client);
so_stream_init(&server->stream);
od_list_init(&server->link);
memset(&server->id, 0, sizeof(server->id));
memset(&server->last_client_id, 0, sizeof(server->last_client_id));
}
static inline od_server_t*

View File

@ -19,12 +19,12 @@
#include "od_macro.h"
#include "od_list.h"
#include "od_pid.h"
#include "od_id.h"
#include "od_syslog.h"
#include "od_log.h"
#include "od_scheme.h"
#include "od_lex.h"
#include "od_config.h"
#include "od_id.h"
#include "od_system.h"
#include "od_server.h"
#include "od_server_pool.h"

View File

@ -20,13 +20,13 @@
#include "od_version.h"
#include "od_list.h"
#include "od_pid.h"
#include "od_id.h"
#include "od_syslog.h"
#include "od_log.h"
#include "od_daemon.h"
#include "od_scheme.h"
#include "od_lex.h"
#include "od_config.h"
#include "od_id.h"
#include "od_msg.h"
#include "od_system.h"
#include "od_instance.h"
@ -95,7 +95,7 @@ od_tls_frontend_accept(od_client_t *client,
if (client->startup.is_ssl_request)
{
od_debug_client(log, client->id, "tls", "ssl request");
od_debug_client(log, &client->id, "tls", "ssl request");
so_stream_reset(stream);
int rc;
if (scheme->tls_verify == OD_TDISABLE) {
@ -103,11 +103,11 @@ od_tls_frontend_accept(od_client_t *client,
so_stream_write8(stream, 'N');
rc = od_write(client->io, stream);
if (rc == -1) {
od_error_client(log, client->id, "tls", "write error: %s",
od_error_client(log, &client->id, "tls", "write error: %s",
machine_error(client->io));
return -1;
}
od_log_client(log, client->id, "tls", "disabled, closing");
od_log_client(log, &client->id, "tls", "disabled, closing");
od_frontend_error(client, SO_ERROR_FEATURE_NOT_SUPPORTED,
"SSL is not supported");
return -1;
@ -116,17 +116,17 @@ od_tls_frontend_accept(od_client_t *client,
so_stream_write8(stream, 'S');
rc = od_write(client->io, stream);
if (rc == -1) {
od_error_client(log, client->id, "tls", "write error: %s",
od_error_client(log, &client->id, "tls", "write error: %s",
machine_error(client->io));
return -1;
}
rc = machine_set_tls(client->io, tls);
if (rc == -1) {
od_error_client(log, client->id, "tls", "error: %s",
od_error_client(log, &client->id, "tls", "error: %s",
machine_error(client->io));
return -1;
}
od_debug_client(log, client->id, "tls", "ok");
od_debug_client(log, &client->id, "tls", "ok");
return 0;
}
switch (scheme->tls_verify) {
@ -134,7 +134,7 @@ od_tls_frontend_accept(od_client_t *client,
case OD_TALLOW:
break;
default:
od_log_client(log, client->id, "tls", "required, closing");
od_log_client(log, &client->id, "tls", "required, closing");
od_frontend_error(client, SO_ERROR_PROTOCOL_VIOLATION,
"SSL is required");
return -1;
@ -188,7 +188,7 @@ od_tls_backend_connect(od_server_t *server,
{
so_stream_t *stream = &server->stream;
od_debug_server(log, server->id, "tls", "init");
od_debug_server(log, &server->id, "tls", "init");
/* SSL Request */
so_stream_reset(stream);
@ -198,7 +198,7 @@ od_tls_backend_connect(od_server_t *server,
return -1;
rc = od_write(server->io, stream);
if (rc == -1) {
od_error_server(log, server->id, "tls", "write error: %s",
od_error_server(log, &server->id, "tls", "write error: %s",
machine_error(server->io));
return -1;
}
@ -207,33 +207,33 @@ od_tls_backend_connect(od_server_t *server,
so_stream_reset(stream);
rc = machine_read(server->io, (char*)stream->p, 1, UINT32_MAX);
if (rc == -1) {
od_error_server(log, server->id, "tls", "read error: %s",
od_error_server(log, &server->id, "tls", "read error: %s",
machine_error(server->io));
return -1;
}
switch (*stream->p) {
case 'S':
/* supported */
od_debug_server(log, server->id, "tls", "supported");
od_debug_server(log, &server->id, "tls", "supported");
rc = machine_set_tls(server->io, server->tls);
if (rc == -1) {
od_error_server(log, server->id, "tls", "error: %s",
od_error_server(log, &server->id, "tls", "error: %s",
machine_error(server->io));
return -1;
}
od_debug_server(log, server->id, "tls", "ok");
od_debug_server(log, &server->id, "tls", "ok");
break;
case 'N':
/* not supported */
if (scheme->tls_verify == OD_TALLOW) {
od_debug_server(log, server->id, "tls", "not supported, continue (allow)");
od_debug_server(log, &server->id, "tls", "not supported, continue (allow)");
} else {
od_error_server(log, server->id, "tls", "not supported, closing");
od_error_server(log, &server->id, "tls", "not supported, closing");
return -1;
}
break;
default:
od_error_server(log, server->id, "tls", "unexpected status reply");
od_error_server(log, &server->id, "tls", "unexpected status reply");
return -1;
}
return 0;

View File

@ -15,12 +15,12 @@
#include "od_macro.h"
#include "od_list.h"
#include "od_pid.h"
#include "od_id.h"
#include "od_syslog.h"
#include "od_log.h"
#include "od_scheme.h"
#include "od_lex.h"
#include "od_config.h"
#include "od_id.h"
#include "od_instance.h"
int main(int argc, char *argv[])