Fix leaks and improve locking in cron (#229)

Also fix some warnings.
This commit is contained in:
kirill reshke 2020-11-23 14:13:28 +05:00 committed by GitHub
parent dc9e2b03e7
commit 785e85ab6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 713 additions and 414 deletions

1
.gitignore vendored
View File

@ -14,3 +14,4 @@ third_party/machinarium/benchmark/benchmark_channel_shared
build/
build-asan/
cmake-build-debug/
test/shell-test/odyssey

View File

@ -1154,7 +1154,6 @@ od_auth_backend(od_server_t *server, machine_msg_t *msg)
"%s",
kiwi_be_type_to_string(type));
int rc;
switch (type) {
case KIWI_BE_AUTHENTICATION:
rc = kiwi_fe_read_auth(machine_msg_data(msg),
@ -1184,6 +1183,7 @@ od_auth_backend(od_server_t *server, machine_msg_t *msg)
case KIWI_BE_ERROR_RESPONSE:
od_backend_error(
server, "auth", machine_msg_data(msg), machine_msg_size(msg));
/* save error to fwd it to client */
server->error_connect = msg;
return -1;
default:

View File

@ -233,14 +233,14 @@ od_auth_query(od_global_t *global,
/* route */
od_router_status_t status;
status = od_router_route(router, &instance->config, auth_client);
status = od_router_route(router, auth_client);
if (status != OD_ROUTER_OK) {
od_client_free(auth_client);
return -1;
}
/* attach */
status = od_router_attach(router, &instance->config, auth_client, false);
status = od_router_attach(router, auth_client, false);
if (status != OD_ROUTER_OK) {
od_router_unroute(router, auth_client);
od_client_free(auth_client);
@ -253,7 +253,7 @@ od_auth_query(od_global_t *global,
"auth_query",
NULL,
server,
"attached to %s%.*s",
"%s attached to server %s%.*s",
server->id.id_prefix,
(int)sizeof(server->id.id),
server->id.id);
@ -271,7 +271,7 @@ od_auth_query(od_global_t *global,
}
/* preformat and execute query */
char query[512];
char query[OD_QRY_MAX_SZ];
int query_len;
query_len = od_auth_query_format(rule, user, peer, query, sizeof(query));
@ -284,7 +284,7 @@ od_auth_query(od_global_t *global,
}
/* detach and unroute */
od_router_detach(router, &instance->config, auth_client);
od_router_detach(router, auth_client);
od_router_unroute(router, auth_client);
od_client_free(auth_client);
return 0;

View File

@ -66,6 +66,7 @@ od_backend_error(od_server_t *server, char *context, char *data, uint32_t size)
{
od_instance_t *instance = server->global->instance;
kiwi_fe_error_t error;
int rc;
rc = kiwi_fe_read_error(data, size, &error);
if (rc == -1) {
@ -76,6 +77,7 @@ od_backend_error(od_server_t *server, char *context, char *data, uint32_t size)
"failed to parse error message from server");
return;
}
od_error(&instance->logger,
context,
server->client,
@ -84,6 +86,7 @@ od_backend_error(od_server_t *server, char *context, char *data, uint32_t size)
error.severity,
error.code,
error.message);
if (error.detail) {
od_error(&instance->logger,
context,
@ -92,6 +95,7 @@ od_backend_error(od_server_t *server, char *context, char *data, uint32_t size)
"DETAIL: %s",
error.detail);
}
if (error.hint) {
od_error(&instance->logger,
context,
@ -175,12 +179,13 @@ od_backend_startup(od_server_t *server, kiwi_params_t *route_params)
od_io_error(&server->io));
return -1;
}
kiwi_be_type_t type = *(char *)machine_msg_data(msg);
od_debug(&instance->logger,
"startup",
NULL,
server,
"%s",
"received packet type: %s",
kiwi_be_type_to_string(type));
switch (type) {
@ -284,13 +289,15 @@ od_backend_connect_to(od_server_t *server,
/* set network options */
machine_set_nodelay(io, instance->config.nodelay);
if (instance->config.keepalive > 0)
if (instance->config.keepalive > 0) {
machine_set_keepalive(io,
1,
instance->config.keepalive,
instance->config.keepalive_keep_interval,
instance->config.keepalive_probes,
instance->config.keepalive_usr_timeout);
}
int rc;
rc = od_io_prepare(&server->io, io, instance->config.readahead);
if (rc == -1) {
@ -374,8 +381,9 @@ od_backend_connect_to(od_server_t *server,
}
uint64_t time_resolve = 0;
if (instance->config.log_session)
if (instance->config.log_session) {
time_resolve = machine_time_us() - time_connect_start;
}
/* connect to server */
rc = machine_connect(server->io.io, saddr, UINT32_MAX);
@ -409,8 +417,9 @@ od_backend_connect_to(od_server_t *server,
}
uint64_t time_connect = 0;
if (instance->config.log_session)
if (instance->config.log_session) {
time_connect = machine_time_us() - time_connect_start;
}
/* log server connection */
if (instance->config.log_session) {
@ -479,6 +488,7 @@ od_backend_connect_cancel(od_server_t *server,
msg = kiwi_fe_write_cancel(NULL, key->key_pid, key->key);
if (msg == NULL)
return -1;
rc = od_write(&server->io, msg);
if (rc == -1) {
od_error(&instance->logger,
@ -489,6 +499,7 @@ od_backend_connect_cancel(od_server_t *server,
od_io_error(&server->io));
return -1;
}
return 0;
}
@ -506,6 +517,7 @@ od_backend_update_parameter(od_server_t *server,
uint32_t name_len;
char *value;
uint32_t value_len;
int rc;
rc =
kiwi_fe_read_parameter(data, size, &name, &name_len, &value, &value_len);
@ -529,11 +541,13 @@ od_backend_update_parameter(od_server_t *server,
value_len,
value);
if (server_only)
if (server_only) {
kiwi_vars_update(&server->vars, name, name_len, value, value_len);
else
} else {
kiwi_vars_update_both(
&client->vars, &server->vars, name, name_len, value, value_len);
}
return 0;
}
@ -587,12 +601,12 @@ od_backend_ready_wait(od_server_t *server,
ready++;
if (ready == count) {
machine_msg_free(msg);
break;
return 0;
}
}
machine_msg_free(msg);
}
return 0;
/* never reached */
}
int
@ -606,8 +620,10 @@ od_backend_query(od_server_t *server,
machine_msg_t *msg;
msg = kiwi_fe_write_query(NULL, query, len);
if (msg == NULL)
if (msg == NULL) {
return -1;
}
int rc;
rc = od_write(&server->io, msg);
if (rc == -1) {

View File

@ -7,9 +7,11 @@
*
* Scalable PostgreSQL connection pooler.
*/
#include <stdlib.h>
#include <stdarg.h>
#include <stdint.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>

View File

@ -87,12 +87,6 @@ struct od_config
od_list_t listen;
};
static inline int
od_config_is_multi_workers(od_config_t *config)
{
return config->workers > 1;
}
void
od_config_init(od_config_t *);
void

View File

@ -334,6 +334,7 @@ od_config_reader_quantiles(od_config_reader_t *reader,
break;
c++;
}
free(*quantiles);
return true;
}
@ -949,19 +950,20 @@ od_config_reader_parse(od_config_reader_t *reader, od_module_t *modules)
rc = od_parser_next(&reader->parser, &token);
switch (rc) {
case OD_PARSER_EOF:
return 0;
goto success;
case OD_PARSER_KEYWORD:
break;
default:
od_config_reader_error(
reader, &token, "incorrect or unexpected parameter");
return -1;
goto error;
}
od_keyword_t *keyword;
keyword = od_keyword_match(od_config_keywords, &token);
if (keyword == NULL) {
od_config_reader_error(reader, &token, "unknown parameter");
return -1;
goto error;
}
switch (keyword->id) {
/* include */
@ -975,233 +977,292 @@ od_config_reader_parse(od_config_reader_t *reader, od_module_t *modules)
modules,
config_file);
free(config_file);
if (rc == -1)
return -1;
if (rc == -1) {
goto error;
}
continue;
}
/* daemonize */
case OD_LDAEMONIZE:
if (!od_config_reader_yes_no(reader, &config->daemonize))
return -1;
if (!od_config_reader_yes_no(reader, &config->daemonize)) {
goto error;
}
continue;
/* priority */
case OD_LPRIORITY:
if (!od_config_reader_number(reader, &config->priority))
return -1;
if (!od_config_reader_number(reader, &config->priority)) {
goto error;
}
continue;
/* pid_file */
case OD_LPID_FILE:
if (!od_config_reader_string(reader, &config->pid_file))
return -1;
if (!od_config_reader_string(reader, &config->pid_file)) {
goto error;
}
continue;
/* unix_socket_dir */
case OD_LUNIX_SOCKET_DIR:
if (!od_config_reader_string(reader, &config->unix_socket_dir))
return -1;
if (!od_config_reader_string(reader,
&config->unix_socket_dir)) {
goto error;
}
continue;
/* unix_socket_mode */
case OD_LUNIX_SOCKET_MODE:
if (!od_config_reader_string(reader, &config->unix_socket_mode))
return -1;
if (!od_config_reader_string(reader,
&config->unix_socket_mode)) {
goto error;
}
continue;
/* locks_dir */
case OD_LLOCKS_DIR:
if (!od_config_reader_string(reader, &config->locks_dir))
return -1;
if (!od_config_reader_string(reader, &config->locks_dir)) {
goto error;
}
continue;
/* enable_online_restart */
case OD_LENABLE_ONLINE_RESTART:
if (!od_config_reader_yes_no(
reader, &config->enable_online_restart_feature))
return -1;
reader, &config->enable_online_restart_feature)) {
goto error;
}
continue;
/* graceful_die_on_errors */
case OD_LGRACEFUL_DIE_ON_ERRORS:
if (!od_config_reader_yes_no(reader,
&config->graceful_die_on_errors))
return -1;
&config->graceful_die_on_errors)) {
goto error;
}
continue;
case OD_LBINDWITH_REUSEPORT:
if (!od_config_reader_yes_no(reader,
&config->bindwith_reuseport))
return -1;
&config->bindwith_reuseport)) {
goto error;
}
continue;
/* log_debug */
case OD_LLOG_DEBUG:
if (!od_config_reader_yes_no(reader, &config->log_debug))
return -1;
if (!od_config_reader_yes_no(reader, &config->log_debug)) {
goto error;
}
continue;
/* log_stdout */
case OD_LLOG_TO_STDOUT:
if (!od_config_reader_yes_no(reader, &config->log_to_stdout))
return -1;
if (!od_config_reader_yes_no(reader, &config->log_to_stdout)) {
goto error;
}
continue;
/* log_config */
case OD_LLOG_CONFIG:
if (!od_config_reader_yes_no(reader, &config->log_config))
return -1;
if (!od_config_reader_yes_no(reader, &config->log_config)) {
goto error;
}
continue;
/* log_session */
case OD_LLOG_SESSION:
if (!od_config_reader_yes_no(reader, &config->log_session))
return -1;
if (!od_config_reader_yes_no(reader, &config->log_session)) {
goto error;
}
continue;
/* log_query */
case OD_LLOG_QUERY:
if (!od_config_reader_yes_no(reader, &config->log_query))
return -1;
if (!od_config_reader_yes_no(reader, &config->log_query)) {
goto error;
}
continue;
/* log_stats */
case OD_LLOG_STATS:
if (!od_config_reader_yes_no(reader, &config->log_stats))
return -1;
if (!od_config_reader_yes_no(reader, &config->log_stats)) {
goto error;
}
continue;
/* log_format */
case OD_LLOG_FORMAT:
if (!od_config_reader_string(reader, &config->log_format))
return -1;
if (!od_config_reader_string(reader, &config->log_format)) {
goto error;
}
continue;
/* log_file */
case OD_LLOG_FILE:
if (!od_config_reader_string(reader, &config->log_file))
return -1;
if (!od_config_reader_string(reader, &config->log_file)) {
goto error;
}
continue;
/* log_syslog */
case OD_LLOG_SYSLOG:
if (!od_config_reader_yes_no(reader, &config->log_syslog))
return -1;
if (!od_config_reader_yes_no(reader, &config->log_syslog)) {
goto error;
}
continue;
/* log_syslog_ident */
case OD_LLOG_SYSLOG_IDENT:
if (!od_config_reader_string(reader, &config->log_syslog_ident))
return -1;
if (!od_config_reader_string(reader,
&config->log_syslog_ident)) {
goto error;
}
continue;
/* log_syslog_facility */
case OD_LLOG_SYSLOG_FACILITY:
if (!od_config_reader_string(reader,
&config->log_syslog_facility))
return -1;
&config->log_syslog_facility)) {
goto error;
}
continue;
/* stats_interval */
case OD_LSTATS_INTERVAL:
if (!od_config_reader_number(reader, &config->stats_interval))
return -1;
if (!od_config_reader_number(reader, &config->stats_interval)) {
goto error;
}
continue;
/* client_max */
case OD_LCLIENT_MAX:
if (!od_config_reader_number(reader, &config->client_max))
return -1;
if (!od_config_reader_number(reader, &config->client_max)) {
goto error;
}
config->client_max_set = 1;
continue;
/* client_max_routing */
case OD_LCLIENT_MAX_ROUTING:
if (!od_config_reader_number(reader,
&config->client_max_routing))
return -1;
&config->client_max_routing)) {
goto error;
}
continue;
/* server_login_retry */
case OD_LSERVER_LOGIN_RETRY:
if (!od_config_reader_number(reader,
&config->server_login_retry))
return -1;
&config->server_login_retry)) {
goto error;
}
continue;
/* readahead */
case OD_LREADAHEAD:
if (!od_config_reader_number(reader, &config->readahead))
return -1;
if (!od_config_reader_number(reader, &config->readahead)) {
goto error;
}
continue;
/* nodelay */
case OD_LNODELAY:
if (!od_config_reader_yes_no(reader, &config->nodelay))
return -1;
if (!od_config_reader_yes_no(reader, &config->nodelay)) {
goto error;
}
continue;
/* keepalive */
case OD_LKEEPALIVE:
if (!od_config_reader_number(reader, &config->keepalive))
return -1;
if (!od_config_reader_number(reader, &config->keepalive)) {
goto error;
}
continue;
/* keepalive_keep_interval */
case OD_LKEEPALIVE_INTERVAL:
if (!od_config_reader_number(reader,
&config->keepalive_keep_interval))
return -1;
if (!od_config_reader_number(
reader, &config->keepalive_keep_interval)) {
goto error;
}
continue;
/* keepalive_probes */
case OD_LKEEPALIVE_PROBES:
if (!od_config_reader_number(reader, &config->keepalive_probes))
return -1;
if (!od_config_reader_number(reader,
&config->keepalive_probes)) {
goto error;
}
continue;
/* keepalive_usr_timeout */
case OD_LKEEPALIVE_USR_TIMEOUT:
if (!od_config_reader_number(reader,
&config->keepalive_usr_timeout))
return -1;
&config->keepalive_usr_timeout)) {
goto error;
}
continue;
/* workers */
case OD_LWORKERS:
if (!od_config_reader_number(reader, &config->workers))
return -1;
if (!od_config_reader_number(reader, &config->workers)) {
goto error;
}
continue;
/* resolvers */
case OD_LRESOLVERS:
if (!od_config_reader_number(reader, &config->resolvers))
return -1;
if (!od_config_reader_number(reader, &config->resolvers)) {
goto error;
}
continue;
/* pipeline */
/* cache */
/* cache_chunk */
case OD_LPIPELINE:
/* fallthrough */
case OD_LCACHE:
/* cache */
/* fallthrough */
case OD_LCACHE_CHUNK:
/* cache_chunk */
/* fallthrough */
case OD_LPACKET_WRITE_QUEUE:
/* packet write queue */
/* fallthrough */
case OD_LPACKET_READ_SIZE: {
/* deprecated */
int unused;
if (!od_config_reader_number(reader, &unused))
return -1;
if (!od_config_reader_number(reader, &unused)) {
goto error;
}
continue;
}
/* cache_msg_gc_size */
case OD_LCACHE_MSG_GC_SIZE:
if (!od_config_reader_number(reader,
&config->cache_msg_gc_size))
return -1;
&config->cache_msg_gc_size)) {
goto error;
}
continue;
/* cache_coroutine */
case OD_LCACHE_COROUTINE:
if (!od_config_reader_number(reader, &config->cache_coroutine))
return -1;
if (!od_config_reader_number(reader,
&config->cache_coroutine)) {
goto error;
}
continue;
/* coroutine_stack_size */
case OD_LCOROUTINE_STACK_SIZE:
if (!od_config_reader_number(reader,
&config->coroutine_stack_size))
return -1;
&config->coroutine_stack_size)) {
goto error;
}
continue;
/* listen */
case OD_LLISTEN:
rc = od_config_reader_listen(reader);
if (rc == -1)
return -1;
if (rc == -1) {
goto error;
}
continue;
/* storage */
case OD_LSTORAGE:
rc = od_config_reader_storage(reader);
if (rc == -1)
return -1;
if (rc == -1) {
goto error;
}
continue;
/* database */
case OD_LDATABASE:
rc = od_config_reader_database(reader, modules);
if (rc == -1)
return -1;
if (rc == -1) {
goto error;
}
continue;
case OD_LMODULE: {
char *module_path = NULL;
rc = od_config_reader_string(reader, &module_path);
if (rc == -1) {
return -1;
goto error;
}
if (od_target_module_add(NULL, modules, module_path) ==
OD_MODULE_CB_FAIL_RETCODE) {
@ -1212,11 +1273,19 @@ od_config_reader_parse(od_config_reader_t *reader, od_module_t *modules)
}
default:
od_config_reader_error(reader, &token, "unexpected parameter");
return -1;
goto error;
}
}
/* unreach */
return -1;
error:
return -1;
success:
if (!config->client_max_routing) {
config->client_max_routing = config->workers * 16;
}
return 0;
}
int
@ -1233,12 +1302,12 @@ od_config_reader_import(od_config_t *config,
reader.rules = rules;
int rc;
rc = od_config_reader_open(&reader, config_file);
if (rc == -1)
if (rc == -1) {
return -1;
}
rc = od_config_reader_parse(&reader, modules);
od_config_reader_close(&reader);
if (!config->client_max_routing)
config->client_max_routing = config->workers * 16;
return rc;
}

View File

@ -665,11 +665,15 @@ od_console_show_databases_add_cb(od_route_t *route, void **argv)
od_rule_storage_t *storage = rule->storage;
char *host = storage->host;
if (!host)
if (!host) {
host = "";
}
rc = kiwi_be_write_data_row_add(stream, offset, host, strlen(host));
if (rc == -1)
if (rc == -1) {
goto error;
}
char data[64];
int data_len;
@ -708,8 +712,10 @@ od_console_show_databases_add_cb(od_route_t *route, void **argv)
rc = kiwi_be_write_data_row_add(stream, offset, "session", 7);
if (rule->pool == OD_RULE_POOL_TRANSACTION)
rc = kiwi_be_write_data_row_add(stream, offset, "transaction", 11);
if (rc == -1)
if (rc == -1) {
goto error;
}
/* max_connections */
data_len = od_snprintf(data, sizeof(data), "%d", rule->client_max);
@ -724,7 +730,8 @@ od_console_show_databases_add_cb(od_route_t *route, void **argv)
route->client_pool.count_active +
route->client_pool.count_pending +
route->client_pool.count_queue);
rc = kiwi_be_write_data_row_add(stream, offset, data, data_len);
rc = kiwi_be_write_data_row_add(stream, offset, data, data_len);
if (rc == -1)
goto error;
@ -1551,6 +1558,7 @@ od_console_query(od_client_t *client,
uint32_t query_data_size)
{
od_instance_t *instance = client->global->instance;
od_system_t *system = client->global->system;
uint32_t query_len;
char *query;

View File

@ -14,6 +14,30 @@ od_counter_llist_create(void)
return llist;
}
od_bucket_t *
od_bucket_create(void)
{
od_bucket_t *b = malloc(sizeof(od_bucket_t));
if (b == NULL)
return NULL;
b->l = od_counter_llist_create();
const int res = pthread_mutex_init(&b->mutex, NULL);
if (!res) {
return b;
}
return NULL;
}
void
od_bucket_free(od_bucket_t *b)
{
od_counter_llist_free(b->l);
pthread_mutex_destroy(&b->mutex);
free(b);
}
inline void
od_counter_llist_add(od_counter_llist_t *llist, const od_counter_item_t *it)
{
@ -38,46 +62,41 @@ od_counter_llist_free(od_counter_llist_t *l)
next = it->next;
free(it);
}
free(l);
return OK_RESPONSE;
}
static size_t
od_counter_required_buf_size(int sz)
{
return sizeof(od_counter_t) + (sz * sizeof(od_bucket_t));
}
od_counter_t *
od_counter_create(size_t sz)
{
od_counter_t *t = malloc(sizeof(od_counter_t));
od_counter_t *t = malloc(od_counter_required_buf_size(sz));
if (t == NULL) {
goto error;
}
t->buckets = malloc(sizeof(od_counter_llist_t) * sz);
if (t->buckets == NULL) {
goto error;
}
t->bucket_mutex = malloc(sizeof(pthread_mutex_t) * sz);
if (t->bucket_mutex == NULL) {
goto error;
}
t->size = sz;
for (size_t i = 0; i < t->size; ++i) {
t->buckets[i] = od_counter_llist_create();
t->buckets[i] = od_bucket_create();
if (t->buckets[i] == NULL) {
goto error;
}
const int res = pthread_mutex_init(&t->bucket_mutex[i], NULL);
if (res) {
goto error;
}
}
return t;
error:
if (t) {
if (t->buckets) {
free(t->buckets);
for (size_t i = 0; i < t->size; ++i) {
if (t->buckets[i] == NULL)
continue;
od_bucket_free(t->buckets[i]);
}
if (t->bucket_mutex)
free(t->bucket_mutex);
free(t);
}
@ -94,11 +113,9 @@ od_retcode_t
od_counter_free(od_counter_t *t)
{
for (size_t i = 0; i < t->size; ++i) {
od_counter_llist_free(t->buckets[i]);
pthread_mutex_destroy(&t->bucket_mutex[i]);
od_bucket_free(t->buckets[i]);
}
free(t->buckets);
free(t);
return OK_RESPONSE;
@ -112,11 +129,11 @@ od_counter_inc(od_counter_t *t, od_counter_item_t item)
* prevent concurrent access to
* modify hash table section
*/
pthread_mutex_lock(&t->bucket_mutex[key]);
pthread_mutex_lock(&t->buckets[key]->mutex);
{
bool fnd = false;
for (od_counter_litem_t *it = t->buckets[key]->list; it != NULL;
for (od_counter_litem_t *it = t->buckets[key]->l->list; it != NULL;
it = it->next) {
if (it->value == item) {
++it->cnt;
@ -125,9 +142,9 @@ od_counter_inc(od_counter_t *t, od_counter_item_t item)
}
}
if (!fnd)
od_counter_llist_add(t->buckets[key], &item);
od_counter_llist_add(t->buckets[key]->l, &item);
}
pthread_mutex_unlock(&t->bucket_mutex[key]);
pthread_mutex_unlock(&t->buckets[key]->mutex);
}
od_count_t
@ -137,9 +154,9 @@ od_counter_get_count(od_counter_t *t, od_counter_item_t value)
od_count_t ret_val = 0;
pthread_mutex_lock(&t->bucket_mutex[key]);
pthread_mutex_lock(&t->buckets[key]->mutex);
{
for (od_counter_litem_t *it = t->buckets[key]->list; it != NULL;
for (od_counter_litem_t *it = t->buckets[key]->l->list; it != NULL;
it = it->next) {
if (it->value == value) {
ret_val = it->cnt;
@ -147,7 +164,7 @@ od_counter_get_count(od_counter_t *t, od_counter_item_t value)
}
}
}
pthread_mutex_unlock(&t->bucket_mutex[key]);
pthread_mutex_unlock(&t->buckets[key]->mutex);
return ret_val;
}
@ -155,14 +172,15 @@ od_counter_get_count(od_counter_t *t, od_counter_item_t value)
static inline od_retcode_t
od_counter_reset_target_bucket(od_counter_t *t, size_t bucket_key)
{
pthread_mutex_lock(&t->bucket_mutex[bucket_key]);
pthread_mutex_lock(&t->buckets[bucket_key]->mutex);
{
for (od_counter_litem_t *it = t->buckets[bucket_key]->list; it != NULL;
it = it->next) {
for (od_counter_litem_t *it = t->buckets[bucket_key]->l->list;
it != NULL;
it = it->next) {
it->value = 0;
}
}
pthread_mutex_unlock(&t->bucket_mutex[bucket_key]);
pthread_mutex_unlock(&t->buckets[bucket_key]->mutex);
return OK_RESPONSE;
}
@ -172,9 +190,9 @@ od_counter_reset(od_counter_t *t, od_counter_item_t value)
{
od_counter_item_t key = od_hash_item(t, value);
pthread_mutex_lock(&t->bucket_mutex[key]);
pthread_mutex_lock(&t->buckets[key]->mutex);
{
for (od_counter_litem_t *it = t->buckets[key]->list; it != NULL;
for (od_counter_litem_t *it = t->buckets[key]->l->list; it != NULL;
it = it->next) {
if (it->value == value) {
it->value = 0;
@ -182,7 +200,7 @@ od_counter_reset(od_counter_t *t, od_counter_item_t value)
}
}
}
pthread_mutex_unlock(&t->bucket_mutex[key]);
pthread_mutex_unlock(&t->buckets[key]->mutex);
return OK_RESPONSE;
}

View File

@ -44,12 +44,19 @@ extern od_retcode_t
od_counter_llist_free(od_counter_llist_t *l);
#define OD_DEFAULT_HASH_TABLE_SIZE 15
typedef struct od_bucket
{
od_counter_llist_t *l;
pthread_mutex_t mutex;
} od_bucket_t;
typedef struct od_counter od_counter_t;
struct od_counter
{
od_counter_llist_t **buckets;
pthread_mutex_t *bucket_mutex;
size_t size;
// ISO C99 flexible array member
od_bucket_t *buckets[];
};
extern od_counter_t *

View File

@ -105,6 +105,7 @@ od_cron_stat(od_cron_t *cron)
uint64_t msg_cache_count = 0;
uint64_t msg_cache_gc_count = 0;
uint64_t msg_cache_size = 0;
od_atomic_u64_t startup_errors =
od_atomic_u64_of(&cron->startup_errors);
cron->startup_errors = 0;
@ -150,11 +151,13 @@ od_cron_stat(od_cron_t *cron)
/* update stats per route and print info */
od_route_pool_stat_cb_t stat_cb;
stat_cb = od_cron_stat_cb;
if (!instance->config.log_stats)
if (!instance->config.log_stats) {
stat_cb = NULL;
} else {
stat_cb = od_cron_stat_cb;
}
void *argv[] = { instance };
od_router_stat(router, cron->stat_time_us, 1, stat_cb, argv);
od_router_stat(router, cron->stat_time_us, stat_cb, argv);
/* update current stat time mark */
cron->stat_time_us = machine_time_us();
@ -185,8 +188,6 @@ od_cron_expire(od_cron_t *cron)
"closing idle server connection (%d secs)",
server->idle_time);
server->route = NULL;
if (!od_config_is_multi_workers(&instance->config))
od_io_attach(&server->io);
od_backend_close_connection(server);
od_backend_close(server);
}
@ -205,13 +206,26 @@ od_cron_err_stat(od_cron_t *cron)
od_list_foreach(&router->route_pool.list, it)
{
od_route_t *current_route = od_container_of(it, od_route_t, link);
if (current_route->extra_logging_enabled) {
od_err_logger_inc_interval(current_route->err_logger);
od_route_lock(current_route);
{
if (current_route->extra_logging_enabled) {
od_err_logger_inc_interval(current_route->err_logger);
}
}
od_route_unlock(current_route);
}
od_err_logger_inc_interval(router->route_pool.err_logger);
od_err_logger_inc_interval(router->router_err_logger);
od_router_lock(router)
{
od_err_logger_inc_interval(router->router_err_logger);
}
od_router_unlock(router)
od_route_pool_lock(router->route_pool)
{
od_err_logger_inc_interval(router->route_pool.err_logger);
}
od_route_pool_unlock(router->route_pool)
}
static void

View File

@ -13,12 +13,14 @@
void
od_dbg_printf(char *fmt, ...);
#define OD_RELEASE_MODE -1
#ifndef OD_DEVEL_LVL
/* set "release" mode by default */
#define OD_DEVEL_LVL -1
#define OD_DEVEL_LVL OD_RELEASE_MODE
#endif
#if OD_DEVEL_LVL == -1
#if OD_DEVEL_LVL == OD_RELEASE_MODE
#define od_dbg_printf_on_dvl_lvl(debug_lvl, fmt, ...)
/* zero cost debug print on release mode */
#else

View File

@ -16,17 +16,19 @@ od_deploy(od_client_t *client, char *context)
od_server_t *server = client->server;
od_route_t *route = client->route;
if (route->id.physical_rep || route->id.logical_rep)
if (route->id.physical_rep || route->id.logical_rep) {
return 0;
}
/* compare and set options which are differs from server */
int query_count;
query_count = 0;
char query[512];
char query[OD_QRY_MAX_SZ];
int query_size;
query_size =
kiwi_vars_cas(&client->vars, &server->vars, query, sizeof(query) - 1);
if (query_size > 0) {
query[query_size] = 0;
query_size++;
@ -34,10 +36,12 @@ od_deploy(od_client_t *client, char *context)
msg = kiwi_fe_write_query(NULL, query, query_size);
if (msg == NULL)
return -1;
int rc;
rc = od_write(&server->io, msg);
if (rc == -1)
return -1;
query_count++;
od_debug(

View File

@ -7,6 +7,8 @@
* Scalable PostgreSQL connection pooler.
*/
#define OD_QRY_MAX_SZ 512 /* odyssey maximum allowed query size */
int
od_deploy(od_client_t *, char *);

View File

@ -1,10 +1,17 @@
#include "err_logger.h"
static size_t
err_logger_required_buf_size(int sz)
{
return sizeof(od_error_logger_t) + (sz * sizeof(od_counter_t));
}
od_error_logger_t *
od_err_logger_create(size_t intervals_count)
{
od_error_logger_t *err_logger = malloc(sizeof(od_error_logger_t));
od_error_logger_t *err_logger =
malloc(err_logger_required_buf_size(intervals_count));
if (err_logger == NULL) {
goto error;
}
@ -12,9 +19,6 @@ od_err_logger_create(size_t intervals_count)
err_logger->intercals_cnt = intervals_count;
err_logger->current_interval_num = 0;
err_logger->interval_counters =
MALLOC_ARRAY(intervals_count, od_counter_t *);
for (size_t i = 0; i < intervals_count; ++i) {
err_logger->interval_counters[i] = od_counter_create_default();
if (err_logger->interval_counters[i] == NULL) {
@ -29,10 +33,14 @@ error:
if (err_logger) {
if (err_logger->interval_counters)
free(err_logger->interval_counters);
for (size_t i = 0; i < err_logger->intercals_cnt; ++i) {
if (err_logger->interval_counters[i] == NULL)
continue;
od_counter_free(err_logger->interval_counters[i]);
}
free(err_logger);
pthread_mutex_destroy(&err_logger->lock);
free((void *)(err_logger));
}
return NULL;
@ -41,14 +49,23 @@ error:
od_retcode_t
od_err_logger_free(od_error_logger_t *err_logger)
{
if (err_logger == NULL) {
return OK_RESPONSE;
}
for (size_t i = 0; i < err_logger->intercals_cnt; ++i) {
if (err_logger->interval_counters[i] == NULL) {
continue;
}
int rc = od_counter_free(err_logger->interval_counters[i]);
err_logger->interval_counters[i] = NULL;
if (rc != OK_RESPONSE)
return rc;
}
pthread_mutex_destroy(&err_logger->lock);
free(err_logger);
free((void *)(err_logger));
return OK_RESPONSE;
}
@ -59,3 +76,28 @@ od_error_logger_store_err(od_error_logger_t *l, size_t err_t)
od_counter_inc(l->interval_counters[l->current_interval_num], err_t);
return OK_RESPONSE;
}
od_retcode_t
od_err_logger_inc_interval(od_error_logger_t *l)
{
pthread_mutex_lock(&l->lock);
{
++l->current_interval_num;
l->current_interval_num %= l->intercals_cnt;
od_counter_reset_all(l->interval_counters[l->current_interval_num]);
}
pthread_mutex_unlock(&l->lock);
return OK_RESPONSE;
}
size_t
od_err_logger_get_aggr_errors_count(od_error_logger_t *l, size_t err_t)
{
size_t ret_val = 0;
for (size_t i = 0; i < l->intercals_cnt; ++i) {
ret_val += od_counter_get_count(l->interval_counters[i], err_t);
}
return ret_val;
}

View File

@ -17,11 +17,12 @@ typedef struct od_error_logger od_error_logger_t;
struct od_error_logger
{
size_t intercals_cnt;
od_counter_t **interval_counters;
pthread_mutex_t lock;
size_t current_interval_num;
// ISO C99 flexible array member
od_counter_t *interval_counters[];
};
extern od_retcode_t
@ -36,32 +37,13 @@ od_err_logger_create_default()
return od_err_logger_create(DEFAULT_ERROR_INTERVAL_NUMBER);
}
extern od_retcode_t
od_retcode_t
od_err_logger_free(od_error_logger_t *err_logger);
static inline od_retcode_t
od_err_logger_inc_interval(od_error_logger_t *l)
{
pthread_mutex_lock(&l->lock);
{
++l->current_interval_num;
l->current_interval_num %= l->intercals_cnt;
od_retcode_t
od_err_logger_inc_interval(od_error_logger_t *l);
od_counter_reset_all(l->interval_counters[l->current_interval_num]);
}
pthread_mutex_unlock(&l->lock);
return OK_RESPONSE;
}
static inline size_t
od_err_logger_get_aggr_errors_count(od_error_logger_t *l, size_t err_t)
{
size_t ret_val = 0;
for (size_t i = 0; i < l->intercals_cnt; ++i) {
ret_val += od_counter_get_count(l->interval_counters[i], err_t);
}
return ret_val;
}
size_t
od_err_logger_get_aggr_errors_count(od_error_logger_t *l, size_t err_t);
#endif // ODYSSEY_ERR_LOGGER_H

View File

@ -13,7 +13,7 @@ typedef struct od_error od_error_t;
struct od_error
{
char file[256];
char file[OD_ERROR_MAX_LEN];
int file_len;
char function[128];
int function_len;

View File

@ -202,8 +202,7 @@ od_frontend_attach(od_client_t *client,
bool wait_for_idle = false;
for (;;) {
od_router_status_t status;
status =
od_router_attach(router, &instance->config, client, wait_for_idle);
status = od_router_attach(router, client, wait_for_idle);
if (status != OD_ROUTER_OK) {
if (status == OD_ROUTER_ERROR_TIMEDOUT) {
od_error(&instance->logger,
@ -230,14 +229,16 @@ od_frontend_attach(od_client_t *client,
context,
client,
server,
"attached to %s%.*s",
"client %s attached to %s%.*s",
client->id.id,
server->id.id_prefix,
(int)sizeof(server->id.id),
server->id.id);
/* connect to server, if necessary */
if (server->io.io)
if (server->io.io) {
return OD_OK;
}
int rc;
od_atomic_u32_inc(&router->servers_routing);
@ -251,8 +252,9 @@ od_frontend_attach(od_client_t *client,
od_frontend_error_is_too_many_connections(client);
if (wait_for_idle) {
od_router_close(router, client);
if (instance->config.server_login_retry)
if (instance->config.server_login_retry) {
machine_sleep(instance->config.server_login_retry);
}
continue;
}
return OD_ESERVER_CONNECT;
@ -746,7 +748,6 @@ od_frontend_remote(od_client_t *client)
}
od_server_t *server;
od_instance_t *instance = client->global->instance;
for (;;) {
while (1) {
if (machine_cond_wait(client->cond, 60000) == 0) {
@ -811,9 +812,8 @@ od_frontend_remote(od_client_t *client)
}
/* push server connection back to route pool */
od_router_t *router = client->global->router;
od_instance_t *instance = client->global->instance;
od_router_detach(router, &instance->config, client);
od_router_t *router = client->global->router;
od_router_detach(router, client);
server = NULL;
} else if (status != OD_OK) {
break;
@ -843,7 +843,8 @@ od_frontend_remote(od_client_t *client)
static void
od_frontend_cleanup(od_client_t *client,
char *context,
od_frontend_status_t status)
od_frontend_status_t status,
od_error_logger_t *l)
{
od_instance_t *instance = client->global->instance;
od_router_t *router = client->global->router;
@ -853,6 +854,14 @@ od_frontend_cleanup(od_client_t *client,
od_server_t *server = client->server;
if (od_frontend_status_is_err(status)) {
od_error_logger_store_err(l, status);
if (route->extra_logging_enabled && !od_route_is_dynamic(route)) {
od_error_logger_store_err(route->err_logger, status);
}
}
switch (status) {
case OD_STOP:
case OD_OK:
@ -862,7 +871,9 @@ od_frontend_cleanup(od_client_t *client,
context,
client,
server,
"client disconnected");
"client disconnected (route %s.%s)",
route->rule->db_name,
route->rule->user_name);
}
if (!client->server)
break;
@ -874,7 +885,7 @@ od_frontend_cleanup(od_client_t *client,
break;
}
/* push server to router server pool */
od_router_detach(router, &instance->config, client);
od_router_detach(router, client);
break;
case OD_EOOM:
@ -910,6 +921,7 @@ od_frontend_cleanup(od_client_t *client,
break;
case OD_ECLIENT_READ:
/*fallthrough*/
case OD_ECLIENT_WRITE:
/* close client connection and reuse server
* link in case of client errors */
@ -938,7 +950,7 @@ od_frontend_cleanup(od_client_t *client,
break;
}
/* push server to router server pool */
od_router_detach(router, &instance->config, client);
od_router_detach(router, client);
break;
case OD_ESERVER_CONNECT:
@ -1134,7 +1146,7 @@ od_frontend(void *arg)
/* route client */
od_router_status_t router_status;
router_status = od_router_route(router, &instance->config, client);
router_status = od_router_route(router, client);
/* routing is over */
od_atomic_u32_dec(&router->clients_routing);
@ -1286,63 +1298,32 @@ od_frontend(void *arg)
/* setup client and run main loop */
od_route_t *route = client->route;
od_error_logger_t *l;
l = router->route_pool.err_logger;
od_frontend_status_t status;
status = OD_UNDEF;
switch (route->rule->storage->storage_type) {
case OD_RULE_STORAGE_LOCAL: {
status = od_frontend_local_setup(client);
if (od_frontend_status_is_err(status)) {
od_error_logger_store_err(l, status);
if (route->extra_logging_enabled &&
!od_route_is_dynamic(route)) {
od_error_logger_store_err(route->err_logger, status);
}
}
if (status != OD_OK)
break;
status = od_frontend_local(client);
if (od_frontend_status_is_err(status)) {
od_error_logger_store_err(l, status);
if (route->extra_logging_enabled &&
!od_route_is_dynamic(route)) {
od_error_logger_store_err(route->err_logger, status);
}
}
break;
}
case OD_RULE_STORAGE_REMOTE: {
status = od_frontend_setup(client);
if (od_frontend_status_is_err(status)) {
od_error_logger_store_err(l, status);
if (route->extra_logging_enabled &&
!od_route_is_dynamic(route)) {
od_error_logger_store_err(route->err_logger, status);
}
}
if (status != OD_OK)
break;
status = od_frontend_remote(client);
if (od_frontend_status_is_err(status)) {
od_error_logger_store_err(l, status);
if (route->extra_logging_enabled &&
!od_route_is_dynamic(route)) {
od_error_logger_store_err(route->err_logger, status);
}
}
break;
}
}
od_frontend_cleanup(client, "main", status);
od_error_logger_t *l;
l = router->route_pool.err_logger;
od_frontend_cleanup(client, "main", status, l);
od_list_foreach(&modules->link, i)
{

View File

@ -45,8 +45,10 @@ od_grac_shutdown_worker(void *arg)
/* wait for all servers to complete old transations */
od_list_foreach(&router->servers, i)
{
#if OD_DEVEL_LVL != -1
od_system_server_t *server;
server = od_container_of(i, od_system_server_t, link);
#endif
while (od_atomic_u32_of(&router->clients_routing) ||
od_atomic_u32_of(&router->clients)) {

View File

@ -184,6 +184,7 @@ od_instance_main(od_instance_t *instance, int argc, char **argv)
NULL,
"failed to set process priority: %s",
strerror(errno));
goto error;
}
}

View File

@ -13,7 +13,6 @@ typedef struct timeval od_timeval_t;
struct od_instance
{
od_pid_t pid;
od_pid_t watchdog_pid;
od_logger_t logger;
char *config_file;
od_config_t config;

View File

@ -84,6 +84,7 @@ od_log(od_logger_t *logger,
va_end(args);
}
#if OD_DEVEL_LVL != OD_RELEASE_MODE
static inline void
od_debug(od_logger_t *logger,
char *context,
@ -97,6 +98,9 @@ od_debug(od_logger_t *logger,
od_logger_write(logger, OD_DEBUG, context, client, server, fmt, args);
va_end(args);
}
#else
#define od_debug(...) ;
#endif
static inline void
od_error(od_logger_t *logger,

View File

@ -7,7 +7,7 @@
* Scalable PostgreSQL connection pooler.
*/
#include "stdbool.h"
#include "c.h"
#define USE_SCRAM
@ -24,8 +24,6 @@
#define CONCAT_(A, B) A##B
#define CONCAT(A, B) CONCAT_(A, B)
#define MALLOC_ARRAY(number, type) ((type *)malloc((number) * sizeof(type)))
typedef int od_retcode_t;
#endif /* ODYSSEY_MACRO_H */

View File

@ -51,8 +51,10 @@ od_target_module_add(od_logger_t *logger,
od_list_append(&modules->link, &module_ptr->link);
strcat(module_ptr->path, target_module_path);
if (module_ptr->module_init_cb)
if (module_ptr->module_init_cb) {
return module_ptr->module_init_cb();
}
return OD_MODULE_CB_OK_RETCODE;
module_exists:
@ -161,6 +163,7 @@ od_modules_unload(od_logger_t *logger, od_module_t *modules)
goto error;
}
}
return OD_MODULE_CB_OK_RETCODE;
error:
err = od_dlerror();

View File

@ -68,14 +68,12 @@ od_route_free(od_route_t *route)
if (route->wait_bus)
machine_channel_free(route->wait_bus);
if (route->stats.enable_quantiles) {
for (size_t i = 0; i < QUANTILES_WINDOW; ++i) {
td_free(route->stats.transaction_hgram[i]);
td_free(route->stats.query_hgram[i]);
}
od_stat_free(&route->stats);
}
if (route->extra_logging_enabled) {
od_err_logger_free(route->err_logger);
route->err_logger = NULL;
}
pthread_mutex_destroy(&route->lock);
@ -83,13 +81,13 @@ od_route_free(od_route_t *route)
}
static inline od_route_t *
od_route_allocate(int is_shared)
od_route_allocate()
{
od_route_t *route = malloc(sizeof(*route));
if (route == NULL)
return NULL;
od_route_init(route, true);
route->wait_bus = machine_channel_create(is_shared);
route->wait_bus = machine_channel_create();
if (route->wait_bus == NULL) {
od_route_free(route);
return NULL;

View File

@ -27,14 +27,19 @@ typedef struct od_route_pool od_route_pool_t;
struct od_route_pool
{
od_list_t list;
/* used for counting error for client without concrete route
* like default_db.usr1, db1.default, etc
* */
od_error_logger_t *err_logger;
int count;
pthread_mutex_t lock;
od_list_t list;
};
#define od_route_pool_lock(route_pool) pthread_mutex_lock(&route_pool.lock);
#define od_route_pool_unlock(route_pool) pthread_mutex_unlock(&route_pool.lock);
typedef od_retcode_t (
*od_route_pool_stat_frontend_error_cb_t)(od_route_pool_t *pool, void **argv);
@ -44,11 +49,18 @@ od_route_pool_init(od_route_pool_t *pool)
od_list_init(&pool->list);
pool->err_logger = od_err_logger_create_default();
pool->count = 0;
pthread_mutex_init(&pool->lock, NULL);
}
static inline void
od_route_pool_free(od_route_pool_t *pool)
{
if (pool == NULL)
return;
pthread_mutex_destroy(&pool->lock);
od_err_logger_free(pool->err_logger);
od_list_t *i, *n;
od_list_foreach_safe(&pool->list, i, n)
{
@ -59,12 +71,9 @@ od_route_pool_free(od_route_pool_t *pool)
}
static inline od_route_t *
od_route_pool_new(od_route_pool_t *pool,
int is_shared,
od_route_id_t *id,
od_rule_t *rule)
od_route_pool_new(od_route_pool_t *pool, od_route_id_t *id, od_rule_t *rule)
{
od_route_t *route = od_route_allocate(is_shared);
od_route_t *route = od_route_allocate();
if (route == NULL)
return NULL;
int rc;
@ -123,7 +132,6 @@ od_route_pool_match(od_route_pool_t *pool, od_route_id_t *key, od_rule_t *rule)
static inline void
od_route_pool_stat(od_route_pool_t *pool,
uint64_t prev_time_us,
int prev_update,
od_route_pool_stat_cb_t callback,
void **argv)
{
@ -152,11 +160,11 @@ od_route_pool_stat(od_route_pool_t *pool,
od_stat_average(&avg, &current, &route->stats_prev, prev_time_us);
/* update route stats */
if (prev_update)
od_stat_update(&route->stats_prev, &current);
od_stat_update(&route->stats_prev, &current);
if (callback)
if (callback) {
callback(route, &current, &avg, argv);
}
}
}

View File

@ -33,7 +33,6 @@ od_router_free(od_router_t *router)
od_rules_free(&router->rules);
pthread_mutex_destroy(&router->lock);
od_err_logger_free(router->router_err_logger);
od_err_logger_free(router->route_pool.err_logger);
}
inline int
@ -204,18 +203,16 @@ od_router_gc(od_router_t *router)
void
od_router_stat(od_router_t *router,
uint64_t prev_time_us,
int prev_update,
od_route_pool_stat_cb_t callback,
void **argv)
{
od_router_lock(router);
od_route_pool_stat(
&router->route_pool, prev_time_us, prev_update, callback, argv);
od_route_pool_stat(&router->route_pool, prev_time_us, callback, argv);
od_router_unlock(router);
}
od_router_status_t
od_router_route(od_router_t *router, od_config_t *config, od_client_t *client)
od_router_route(od_router_t *router, od_client_t *client)
{
kiwi_be_startup_t *startup = &client->startup;
@ -262,9 +259,7 @@ od_router_route(od_router_t *router, od_config_t *config, od_client_t *client)
od_route_t *route;
route = od_route_pool_match(&router->route_pool, &id, rule);
if (route == NULL) {
int is_shared;
is_shared = od_config_is_multi_workers(config);
route = od_route_pool_new(&router->route_pool, is_shared, &id, rule);
route = od_route_pool_new(&router->route_pool, &id, rule);
if (route == NULL) {
od_router_unlock(router);
return OD_ROUTER_ERROR;
@ -347,10 +342,7 @@ od_should_not_spun_connection_yet(int connections_in_pool,
}
od_router_status_t
od_router_attach(od_router_t *router,
od_config_t *config,
od_client_t *client,
bool wait_for_idle)
od_router_attach(od_router_t *router, od_client_t *client, bool wait_for_idle)
{
(void)router;
od_route_t *route = client->route;
@ -458,8 +450,9 @@ attach:
od_route_unlock(route);
/* attach server io to clients machine context */
if (server->io.io && od_config_is_multi_workers(config))
if (server->io.io) {
od_io_attach(&server->io);
}
/* maybe restore read events subscription */
if (restart_read)
@ -469,7 +462,7 @@ attach:
}
void
od_router_detach(od_router_t *router, od_config_t *config, od_client_t *client)
od_router_detach(od_router_t *router, od_client_t *client)
{
(void)router;
od_route_t *route = client->route;
@ -477,8 +470,7 @@ od_router_detach(od_router_t *router, od_config_t *config, od_client_t *client)
/* detach from current machine event loop */
od_server_t *server = client->server;
if (od_config_is_multi_workers(config))
od_io_detach(&server->io);
od_io_detach(&server->io);
od_route_lock(route);

View File

@ -12,14 +12,18 @@ typedef struct od_router od_router_t;
struct od_router
{
pthread_mutex_t lock;
od_rules_t rules;
od_list_t servers;
od_route_pool_t route_pool;
/* clients */
od_atomic_u32_t clients;
od_atomic_u32_t clients_routing;
/* servers */
od_atomic_u32_t servers_routing;
/* error logging */
od_error_logger_t *router_err_logger;
/* router has type of list */
od_list_t servers;
};
#define od_router_lock(router) pthread_mutex_lock(&router->lock);
@ -36,21 +40,21 @@ od_router_expire(od_router_t *, od_list_t *);
void
od_router_gc(od_router_t *);
void
od_router_stat(od_router_t *, uint64_t, int, od_route_pool_stat_cb_t, void **);
od_router_stat(od_router_t *, uint64_t, od_route_pool_stat_cb_t, void **);
extern int
od_router_foreach(od_router_t *, od_route_pool_cb_t, void **);
od_router_status_t
od_router_route(od_router_t *router, od_config_t *config, od_client_t *client);
od_router_route(od_router_t *router, od_client_t *client);
void
od_router_unroute(od_router_t *, od_client_t *);
od_router_status_t
od_router_attach(od_router_t *, od_config_t *, od_client_t *, bool);
od_router_attach(od_router_t *, od_client_t *, bool);
void
od_router_detach(od_router_t *, od_config_t *, od_client_t *);
od_router_detach(od_router_t *, od_client_t *);
void
od_router_close(od_router_t *, od_client_t *);

View File

@ -47,6 +47,42 @@ od_system_cleanup(od_system_t *system)
}
}
void
od_system_shutdown(od_system_t *system, od_instance_t *instance)
{
od_log(&instance->logger,
"system",
NULL,
NULL,
"SIGINT received, shutting down");
od_worker_pool_stop(system->global->worker_pool);
od_router_free(system->global->router);
/* Prevent OpenSSL usage during deinitialization */
od_worker_pool_wait();
od_modules_unload(&instance->logger, system->global->modules);
od_system_cleanup(system);
exit(0);
}
void
od_system_shutdown_fast(od_system_t *system, od_instance_t *instance)
{
od_log(&instance->logger,
"system",
NULL,
NULL,
"SIGTERM received, shutting down");
od_worker_pool_stop(system->global->worker_pool);
od_router_free(system->global->router);
/* No time for caution */
od_system_cleanup(system);
/* TODO: */
od_modules_unload_fast(system->global->modules);
exit(0);
}
void
od_system_signal_handler(void *arg)
{
@ -79,32 +115,10 @@ od_system_signal_handler(void *arg)
break;
switch (rc) {
case SIGTERM:
od_log(&instance->logger,
"system",
NULL,
NULL,
"SIGTERM received, shutting down");
od_worker_pool_stop(system->global->worker_pool);
/* No time for caution */
od_system_cleanup(system);
/* TODO: */
od_modules_unload_fast(system->global->modules);
kill(instance->watchdog_pid.pid, SIGKILL);
exit(0);
od_system_shutdown_fast(system, instance);
break;
case SIGINT:
od_log(&instance->logger,
"system",
NULL,
NULL,
"SIGINT received, shutting down");
od_worker_pool_stop(system->global->worker_pool);
/* Prevent OpenSSL usage during deinitialization */
od_worker_pool_wait(system->global->worker_pool);
od_modules_unload(&instance->logger, system->global->modules);
od_system_cleanup(system);
kill(instance->watchdog_pid.pid, SIGKILL);
exit(0);
od_system_shutdown(system, instance);
break;
case SIGHUP:
od_log(

View File

@ -17,4 +17,7 @@
void
od_system_signal_handler(void *arg);
void
od_system_shutdown(od_system_t *system, od_instance_t *instance);
#endif /* OD_SIGNAL_HANDLER */

View File

@ -52,6 +52,15 @@ od_stat_init(od_stat_t *stat)
memset(stat, 0, sizeof(*stat));
}
static inline void
od_stat_free(od_stat_t *stat)
{
for (size_t i = 0; i < QUANTILES_WINDOW; ++i) {
td_free(stat->transaction_hgram[i]);
td_free(stat->query_hgram[i]);
}
}
static inline void
od_stat_query_start(od_stat_state_t *state)
{
@ -137,9 +146,7 @@ od_stat_update_of(od_atomic_u64_t *prev, od_atomic_u64_t *current)
{
/* todo: this could be made more optimal */
/* prev <= current */
uint64_t diff;
diff = od_atomic_u64_of(current) - od_atomic_u64_of(prev);
od_atomic_u64_add(prev, diff);
__atomic_store(prev, current, __ATOMIC_SEQ_CST);
}
static inline void

View File

@ -31,7 +31,7 @@ struct td_histogram
mm_sleeplock_t lock;
node_t nodes[0];
node_t nodes[]; // ISO C99 flexible array member
};
static bool

View File

@ -5,20 +5,6 @@
*/
#include "watchdog.h"
#include "kiwi.h"
#include <sys/shm.h>
#include "instance.h"
#include "debugprintf.h"
#include "setproctitle.h"
#include <signal.h>
#include <sys/time.h>
#include <sys/file.h>
#include "pid.h"
#include "logger.h"
#include "restart_sync.h"
#include <unistd.h>
void
od_watchdog_worker(void *arg)

View File

@ -10,10 +10,25 @@
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <machinarium.h>
#include <unistd.h>
#include <sys/shm.h>
#include <sys/time.h>
#include <sys/file.h>
#include <signal.h>
#include <machinarium.h>
#include "macro.h"
#include "system.h"
#include "kiwi.h"
#include "instance.h"
#include "debugprintf.h"
#include "setproctitle.h"
#include "pid.h"
#include "logger.h"
#include "restart_sync.h"
#define ODYSSEY_WATCHDOG_ITER_INTERVAL 500 // ms

View File

@ -103,10 +103,7 @@ od_worker_start(od_worker_t *worker)
{
od_instance_t *instance = worker->global->instance;
int is_shared;
is_shared = od_config_is_multi_workers(&instance->config);
worker->task_channel = machine_channel_create(is_shared);
worker->task_channel = machine_channel_create();
if (worker->task_channel == NULL) {
od_error(&instance->logger,
"worker",
@ -115,31 +112,16 @@ od_worker_start(od_worker_t *worker)
"failed to create task channel");
return -1;
}
if (is_shared) {
char name[32];
od_snprintf(name, sizeof(name), "worker: %d", worker->id);
worker->machine = machine_create(name, od_worker, worker);
if (worker->machine == -1) {
machine_channel_free(worker->task_channel);
od_error(&instance->logger,
"worker",
NULL,
NULL,
"failed to start worker");
return -1;
}
} else {
int64_t coroutine_id;
coroutine_id = machine_coroutine_create(od_worker, worker);
if (coroutine_id == -1) {
od_error(&instance->logger,
"worker",
NULL,
NULL,
"failed to create worker coroutine");
machine_channel_free(worker->task_channel);
return -1;
}
char name[32];
od_snprintf(name, sizeof(name), "worker: %d", worker->id);
worker->machine = machine_create(name, od_worker, worker);
if (worker->machine == -1) {
machine_channel_free(worker->task_channel);
od_error(
&instance->logger, "worker", NULL, NULL, "failed to start worker");
return -1;
}
return 0;
}

View File

@ -47,12 +47,6 @@ od_worker_pool_start(od_worker_pool_t *pool, od_global_t *global, int count)
static inline void
od_worker_pool_stop(od_worker_pool_t *pool)
{
od_instance_t *instance = pool->pool->global->instance;
int is_shared;
is_shared = od_config_is_multi_workers(&instance->config);
if (!is_shared)
return;
for (int i = 0; i < pool->count; i++) {
od_worker_t *worker = &pool->pool[i];
machine_stop(worker->machine);
@ -60,26 +54,14 @@ od_worker_pool_stop(od_worker_pool_t *pool)
}
static inline void
od_worker_pool_wait(od_worker_pool_t *pool)
od_worker_pool_wait()
{
od_instance_t *instance = pool->pool->global->instance;
int is_shared;
is_shared = od_config_is_multi_workers(&instance->config);
if (!is_shared)
return;
machine_sleep(1);
}
static inline void
od_worker_pool_wait_gracefully_shutdown(od_worker_pool_t *pool)
{
od_instance_t *instance = pool->pool->global->instance;
int is_shared;
is_shared = od_config_is_multi_workers(&instance->config);
if (!is_shared)
return;
// // In fact we cannot wait anything here - machines may be in epoll
// waiting
// // No new TLS handshakes should be initiated, so, just wait a bit.

129
test/shell-test/conf.conf Normal file
View File

@ -0,0 +1,129 @@
pid_file "/var/run/odyssey.pid"
daemonize no
unix_socket_dir "/tmp"
unix_socket_mode "0644"
log_format "%p %t %l [%i %s] (%c) %m\n"
log_to_stdout yes
log_syslog no
log_syslog_ident "odyssey"
log_syslog_facility "daemon"
log_debug yes
log_config yes
log_session yes
log_query yes
log_stats yes
stats_interval 60
workers 5
resolvers 1
readahead 8192
cache_coroutine 0
coroutine_stack_size 8
nodelay yes
keepalive 15
keepalive_keep_interval 75
keepalive_probes 9
keepalive_usr_timeout 0
listen {
host "*"
port 6432
backlog 128
}
storage "postgres_server" {
type "remote"
host "localhost"
port 5432
}
database default {
user default {
authentication "none"
storage "postgres_server"
pool "transaction"
pool_size 0
pool_timeout 0
pool_ttl 60
pool_discard no
pool_cancel yes
pool_rollback yes
client_fwd_error yes
application_name_add_host yes
server_lifetime 3600
log_debug no
quantiles "0.99,0.95,0.5"
client_max 107
}
}
database "postgres" {
user "user1" {
authentication "none"
storage "postgres_server"
pool "transaction"
pool_size 0
pool_timeout 0
pool_ttl 60
pool_discard no
pool_cancel yes
pool_rollback yes
client_fwd_error yes
application_name_add_host yes
server_lifetime 3600
log_debug no
quantiles "0.99,0.95,0.5"
client_max 107
}
}
storage "local" {
type "local"
}
database "console" {
user default {
authentication "none"
pool "session"
storage "local"
}
}
locks_dir "/tmp/odyssey"
graceful_die_on_errors yes
enable_online_restart yes
bindwith_reuseport yes

View File

@ -0,0 +1,14 @@
#!/bin/bash
#kill -9 $(ps aux | grep odyssey | awk '{print $2}')
sleep 1
#ody-start
#./odyssey ./ody.conf
for _ in $(seq 1 4000000)
do
for __ in $(seq 1 3000); do
# psql -U user1 -d postgres -h localhost -p 6432 -c 'SELECT 1/0' &
psql -U user1 -d postgres -h localhost -p 6432 -c 'select pg_sleep(0.01)' &
done
done

View File

@ -25,8 +25,9 @@ kiwi_be_write_error_as(machine_msg_t *msg,
if (hint && hint_len > 0)
size += 1 + /* H */ +hint_len + 1;
int offset = 0;
if (msg)
if (msg) {
offset = machine_msg_size(msg);
}
msg = machine_msg_create_or_advance(msg, sizeof(kiwi_header_t) + size);
if (kiwi_unlikely(msg == NULL))
return NULL;

View File

@ -28,7 +28,7 @@ benchmark_writer(void *arg)
machine_channel_t *q = arg;
while (machine_active()) {
machine_msg_t *msg;
msg = machine_msg_create(0, 0);
msg = machine_msg_create(0);
machine_channel_write(q, msg);
ops++;
machine_sleep(0);

View File

@ -9,26 +9,15 @@
#include <machinarium_private.h>
MACHINE_API machine_channel_t *
machine_channel_create(int shared)
machine_channel_create()
{
if (shared) {
mm_channel_t *channel;
channel = malloc(sizeof(mm_channel_t));
if (channel == NULL) {
mm_errno_set(ENOMEM);
return NULL;
}
mm_channel_init(channel);
return (machine_channel_t *)channel;
}
mm_channelfast_t *channel;
channel = malloc(sizeof(mm_channelfast_t));
mm_channel_t *channel;
channel = malloc(sizeof(mm_channel_t));
if (channel == NULL) {
mm_errno_set(ENOMEM);
return NULL;
}
mm_channelfast_init(channel);
mm_channel_init(channel);
return (machine_channel_t *)channel;
}

View File

@ -7,6 +7,11 @@
* cooperative multitasking engine.
*/
/*
* Channel fast is machinarium implementation
* of machine channel interface
* */
typedef struct mm_channelfast_rd mm_channelfast_rd_t;
typedef struct mm_channelfast mm_channelfast_t;

View File

@ -7,6 +7,8 @@
* cooperative multitasking engine.
*/
/* machinarium file descriptor */
typedef struct mm_fd mm_fd_t;
enum

View File

@ -7,6 +7,10 @@
* cooperative multitasking engine.
*/
/*
* Structure for scatter/gather I/O
* */
typedef struct mm_iov mm_iov_t;
struct mm_iov

View File

@ -145,7 +145,7 @@ extern "C"
/* channel */
MACHINE_API machine_channel_t *machine_channel_create(int shared);
MACHINE_API machine_channel_t *machine_channel_create();
MACHINE_API void machine_channel_free(machine_channel_t *);

View File

@ -12,8 +12,10 @@ MACHINE_API machine_msg_t *
machine_msg_create(int reserve)
{
mm_msg_t *msg = mm_msgcache_pop(&mm_self->msg_cache);
if (msg == NULL)
if (msg == NULL) {
return NULL;
}
msg->type = 0;
if (reserve > 0) {
int rc;
@ -30,39 +32,43 @@ machine_msg_create(int reserve)
MACHINE_API machine_msg_t *
machine_msg_create_or_advance(machine_msg_t *obj, int size)
{
if (obj == NULL)
if (obj == NULL) {
return machine_msg_create(size);
}
mm_msg_t *msg = mm_cast(mm_msg_t *, obj);
int rc;
rc = mm_buf_ensure(&msg->data, size);
if (rc == -1)
if (rc == -1) {
return NULL;
}
mm_buf_advance(&msg->data, size);
return obj;
}
MACHINE_API void
MACHINE_API inline void
machine_msg_free(machine_msg_t *obj)
{
mm_msg_t *msg = mm_cast(mm_msg_t *, obj);
mm_msgcache_push(&mm_self->msg_cache, msg);
}
MACHINE_API void
MACHINE_API inline void
machine_msg_set_type(machine_msg_t *obj, int type)
{
mm_msg_t *msg = mm_cast(mm_msg_t *, obj);
msg->type = type;
}
MACHINE_API int
MACHINE_API inline int
machine_msg_type(machine_msg_t *obj)
{
mm_msg_t *msg = mm_cast(mm_msg_t *, obj);
return msg->type;
}
MACHINE_API void *
MACHINE_API inline void *
machine_msg_data(machine_msg_t *obj)
{
mm_msg_t *msg = mm_cast(mm_msg_t *, obj);

View File

@ -61,6 +61,7 @@ mm_msgcache_pop(mm_msgcache_t *cache)
if (msg == NULL)
return NULL;
mm_buf_init(&msg->data);
/* fallthrough */
init:
msg->machine_id = mm_self->id;
msg->refs = 0;

View File

@ -132,10 +132,13 @@ machine_writev_raw(machine_io_t *obj, machine_iov_t *obj_iov)
return -1;
}
/* writes msg to io object.
* Frees memory after use in current implementation
* */
MACHINE_API int
machine_write(machine_io_t *obj, machine_msg_t *msg, uint32_t time_ms)
machine_write(machine_io_t *destination, machine_msg_t *msg, uint32_t time_ms)
{
mm_io_t *io = mm_cast(mm_io_t *, obj);
mm_io_t *io = mm_cast(mm_io_t *, destination);
mm_errno_set(0);
if (!io->attached) {
@ -144,7 +147,7 @@ machine_write(machine_io_t *obj, machine_msg_t *msg, uint32_t time_ms)
}
if (!io->connected) {
mm_errno_set(ENOTCONN);
return -1;
goto error;
}
if (io->on_write) {
mm_errno_set(EINPROGRESS);
@ -155,8 +158,9 @@ machine_write(machine_io_t *obj, machine_msg_t *msg, uint32_t time_ms)
mm_cond_init(&on_write);
int rc;
rc = mm_write_start(io, (machine_cond_t *)&on_write);
if (rc == -1)
if (rc == -1) {
goto error;
}
int total = 0;
char *src = machine_msg_data(msg);
@ -169,11 +173,14 @@ machine_write(machine_io_t *obj, machine_msg_t *msg, uint32_t time_ms)
mm_write_stop(io);
goto error;
}
/* when using compression, some data may be processed
* despite the non-positive return code */
size_t processed = 0;
rc = machine_write_raw(obj, src + total, size - total, &processed);
rc =
machine_write_raw(destination, src + total, size - total, &processed);
total += processed;
if (rc > 0) {
total += rc;
continue;
@ -194,6 +201,7 @@ machine_write(machine_io_t *obj, machine_msg_t *msg, uint32_t time_ms)
machine_msg_free(msg);
return 0;
error:
/* free msg in case of any error */
machine_msg_free(msg);
return -1;
}