From 785e85ab6f9d0e1c80208118b4f9bfe46c63a399 Mon Sep 17 00:00:00 2001 From: kirill reshke Date: Mon, 23 Nov 2020 14:13:28 +0500 Subject: [PATCH] Fix leaks and improve locking in cron (#229) Also fix some warnings. --- .gitignore | 1 + sources/auth.c | 2 +- sources/auth_query.c | 10 +- sources/backend.c | 34 ++- sources/c.h | 2 + sources/config.h | 6 - sources/config_reader.c | 251 +++++++++++------- sources/console.c | 16 +- sources/counter.c | 90 ++++--- sources/counter.h | 11 +- sources/cron.c | 32 ++- sources/debugprintf.h | 6 +- sources/deploy.c | 8 +- sources/deploy.h | 2 + sources/err_logger.c | 58 +++- sources/err_logger.h | 32 +-- sources/error.h | 2 +- sources/frontend.c | 79 +++--- sources/grac_shutdown_worker.c | 2 + sources/instance.c | 1 + sources/instance.h | 1 - sources/logger.h | 4 + sources/macro.h | 4 +- sources/module.c | 5 +- sources/route.h | 10 +- sources/route_pool.h | 28 +- sources/router.c | 24 +- sources/router.h | 16 +- sources/sighandler.c | 62 +++-- sources/sighandler.h | 3 + sources/stat.h | 13 +- sources/tdigest.c | 2 +- sources/watchdog.c | 14 - sources/watchdog.h | 17 +- sources/worker.c | 40 +-- sources/worker_pool.h | 20 +- test/shell-test/conf.conf | 129 +++++++++ test/shell-test/log_error_rate_test.sh | 14 + third_party/kiwi/kiwi/be_write.h | 3 +- .../benchmark/{makefile => Makefile} | 0 .../benchmark/benchmark_channel_shared.c | 2 +- third_party/machinarium/sources/channel_api.c | 19 +- .../machinarium/sources/channel_fast.h | 5 + third_party/machinarium/sources/fd.h | 2 + third_party/machinarium/sources/iov.h | 4 + third_party/machinarium/sources/machinarium.h | 2 +- third_party/machinarium/sources/msg.c | 20 +- third_party/machinarium/sources/msg_cache.c | 1 + third_party/machinarium/sources/write.c | 18 +- 49 files changed, 713 insertions(+), 414 deletions(-) create mode 100644 test/shell-test/conf.conf create mode 100755 test/shell-test/log_error_rate_test.sh rename third_party/machinarium/benchmark/{makefile => Makefile} (100%) diff --git a/.gitignore b/.gitignore index f8c79378..33605ea9 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,4 @@ third_party/machinarium/benchmark/benchmark_channel_shared build/ build-asan/ cmake-build-debug/ +test/shell-test/odyssey diff --git a/sources/auth.c b/sources/auth.c index a900da8b..7e37aa68 100644 --- a/sources/auth.c +++ b/sources/auth.c @@ -1154,7 +1154,6 @@ od_auth_backend(od_server_t *server, machine_msg_t *msg) "%s", kiwi_be_type_to_string(type)); - int rc; switch (type) { case KIWI_BE_AUTHENTICATION: rc = kiwi_fe_read_auth(machine_msg_data(msg), @@ -1184,6 +1183,7 @@ od_auth_backend(od_server_t *server, machine_msg_t *msg) case KIWI_BE_ERROR_RESPONSE: od_backend_error( server, "auth", machine_msg_data(msg), machine_msg_size(msg)); + /* save error to fwd it to client */ server->error_connect = msg; return -1; default: diff --git a/sources/auth_query.c b/sources/auth_query.c index b83cf990..a529cd85 100644 --- a/sources/auth_query.c +++ b/sources/auth_query.c @@ -233,14 +233,14 @@ od_auth_query(od_global_t *global, /* route */ od_router_status_t status; - status = od_router_route(router, &instance->config, auth_client); + status = od_router_route(router, auth_client); if (status != OD_ROUTER_OK) { od_client_free(auth_client); return -1; } /* attach */ - status = od_router_attach(router, &instance->config, auth_client, false); + status = od_router_attach(router, auth_client, false); if (status != OD_ROUTER_OK) { od_router_unroute(router, auth_client); od_client_free(auth_client); @@ -253,7 +253,7 @@ od_auth_query(od_global_t *global, "auth_query", NULL, server, - "attached to %s%.*s", + "%s attached to server %s%.*s", server->id.id_prefix, (int)sizeof(server->id.id), server->id.id); @@ -271,7 +271,7 @@ od_auth_query(od_global_t *global, } /* preformat and execute query */ - char query[512]; + char query[OD_QRY_MAX_SZ]; int query_len; query_len = od_auth_query_format(rule, user, peer, query, sizeof(query)); @@ -284,7 +284,7 @@ od_auth_query(od_global_t *global, } /* detach and unroute */ - od_router_detach(router, &instance->config, auth_client); + od_router_detach(router, auth_client); od_router_unroute(router, auth_client); od_client_free(auth_client); return 0; diff --git a/sources/backend.c b/sources/backend.c index 2c92aef3..58703ec6 100644 --- a/sources/backend.c +++ b/sources/backend.c @@ -66,6 +66,7 @@ od_backend_error(od_server_t *server, char *context, char *data, uint32_t size) { od_instance_t *instance = server->global->instance; kiwi_fe_error_t error; + int rc; rc = kiwi_fe_read_error(data, size, &error); if (rc == -1) { @@ -76,6 +77,7 @@ od_backend_error(od_server_t *server, char *context, char *data, uint32_t size) "failed to parse error message from server"); return; } + od_error(&instance->logger, context, server->client, @@ -84,6 +86,7 @@ od_backend_error(od_server_t *server, char *context, char *data, uint32_t size) error.severity, error.code, error.message); + if (error.detail) { od_error(&instance->logger, context, @@ -92,6 +95,7 @@ od_backend_error(od_server_t *server, char *context, char *data, uint32_t size) "DETAIL: %s", error.detail); } + if (error.hint) { od_error(&instance->logger, context, @@ -175,12 +179,13 @@ od_backend_startup(od_server_t *server, kiwi_params_t *route_params) od_io_error(&server->io)); return -1; } + kiwi_be_type_t type = *(char *)machine_msg_data(msg); od_debug(&instance->logger, "startup", NULL, server, - "%s", + "received packet type: %s", kiwi_be_type_to_string(type)); switch (type) { @@ -284,13 +289,15 @@ od_backend_connect_to(od_server_t *server, /* set network options */ machine_set_nodelay(io, instance->config.nodelay); - if (instance->config.keepalive > 0) + if (instance->config.keepalive > 0) { machine_set_keepalive(io, 1, instance->config.keepalive, instance->config.keepalive_keep_interval, instance->config.keepalive_probes, instance->config.keepalive_usr_timeout); + } + int rc; rc = od_io_prepare(&server->io, io, instance->config.readahead); if (rc == -1) { @@ -374,8 +381,9 @@ od_backend_connect_to(od_server_t *server, } uint64_t time_resolve = 0; - if (instance->config.log_session) + if (instance->config.log_session) { time_resolve = machine_time_us() - time_connect_start; + } /* connect to server */ rc = machine_connect(server->io.io, saddr, UINT32_MAX); @@ -409,8 +417,9 @@ od_backend_connect_to(od_server_t *server, } uint64_t time_connect = 0; - if (instance->config.log_session) + if (instance->config.log_session) { time_connect = machine_time_us() - time_connect_start; + } /* log server connection */ if (instance->config.log_session) { @@ -479,6 +488,7 @@ od_backend_connect_cancel(od_server_t *server, msg = kiwi_fe_write_cancel(NULL, key->key_pid, key->key); if (msg == NULL) return -1; + rc = od_write(&server->io, msg); if (rc == -1) { od_error(&instance->logger, @@ -489,6 +499,7 @@ od_backend_connect_cancel(od_server_t *server, od_io_error(&server->io)); return -1; } + return 0; } @@ -506,6 +517,7 @@ od_backend_update_parameter(od_server_t *server, uint32_t name_len; char *value; uint32_t value_len; + int rc; rc = kiwi_fe_read_parameter(data, size, &name, &name_len, &value, &value_len); @@ -529,11 +541,13 @@ od_backend_update_parameter(od_server_t *server, value_len, value); - if (server_only) + if (server_only) { kiwi_vars_update(&server->vars, name, name_len, value, value_len); - else + } else { kiwi_vars_update_both( &client->vars, &server->vars, name, name_len, value, value_len); + } + return 0; } @@ -587,12 +601,12 @@ od_backend_ready_wait(od_server_t *server, ready++; if (ready == count) { machine_msg_free(msg); - break; + return 0; } } machine_msg_free(msg); } - return 0; + /* never reached */ } int @@ -606,8 +620,10 @@ od_backend_query(od_server_t *server, machine_msg_t *msg; msg = kiwi_fe_write_query(NULL, query, len); - if (msg == NULL) + if (msg == NULL) { return -1; + } + int rc; rc = od_write(&server->io, msg); if (rc == -1) { diff --git a/sources/c.h b/sources/c.h index 7a3ee854..38194dd7 100644 --- a/sources/c.h +++ b/sources/c.h @@ -7,9 +7,11 @@ * * Scalable PostgreSQL connection pooler. */ + #include #include #include +#include #include #include diff --git a/sources/config.h b/sources/config.h index 9f124c45..e502abdd 100644 --- a/sources/config.h +++ b/sources/config.h @@ -87,12 +87,6 @@ struct od_config od_list_t listen; }; -static inline int -od_config_is_multi_workers(od_config_t *config) -{ - return config->workers > 1; -} - void od_config_init(od_config_t *); void diff --git a/sources/config_reader.c b/sources/config_reader.c index b339243c..68feac26 100644 --- a/sources/config_reader.c +++ b/sources/config_reader.c @@ -334,6 +334,7 @@ od_config_reader_quantiles(od_config_reader_t *reader, break; c++; } + free(*quantiles); return true; } @@ -949,19 +950,20 @@ od_config_reader_parse(od_config_reader_t *reader, od_module_t *modules) rc = od_parser_next(&reader->parser, &token); switch (rc) { case OD_PARSER_EOF: - return 0; + goto success; case OD_PARSER_KEYWORD: break; default: od_config_reader_error( reader, &token, "incorrect or unexpected parameter"); - return -1; + goto error; } + od_keyword_t *keyword; keyword = od_keyword_match(od_config_keywords, &token); if (keyword == NULL) { od_config_reader_error(reader, &token, "unknown parameter"); - return -1; + goto error; } switch (keyword->id) { /* include */ @@ -975,233 +977,292 @@ od_config_reader_parse(od_config_reader_t *reader, od_module_t *modules) modules, config_file); free(config_file); - if (rc == -1) - return -1; + if (rc == -1) { + goto error; + } continue; } /* daemonize */ case OD_LDAEMONIZE: - if (!od_config_reader_yes_no(reader, &config->daemonize)) - return -1; + if (!od_config_reader_yes_no(reader, &config->daemonize)) { + goto error; + } continue; /* priority */ case OD_LPRIORITY: - if (!od_config_reader_number(reader, &config->priority)) - return -1; + if (!od_config_reader_number(reader, &config->priority)) { + goto error; + } continue; /* pid_file */ case OD_LPID_FILE: - if (!od_config_reader_string(reader, &config->pid_file)) - return -1; + if (!od_config_reader_string(reader, &config->pid_file)) { + goto error; + } continue; /* unix_socket_dir */ case OD_LUNIX_SOCKET_DIR: - if (!od_config_reader_string(reader, &config->unix_socket_dir)) - return -1; + if (!od_config_reader_string(reader, + &config->unix_socket_dir)) { + goto error; + } continue; /* unix_socket_mode */ case OD_LUNIX_SOCKET_MODE: - if (!od_config_reader_string(reader, &config->unix_socket_mode)) - return -1; + if (!od_config_reader_string(reader, + &config->unix_socket_mode)) { + goto error; + } continue; /* locks_dir */ case OD_LLOCKS_DIR: - if (!od_config_reader_string(reader, &config->locks_dir)) - return -1; + if (!od_config_reader_string(reader, &config->locks_dir)) { + goto error; + } continue; /* enable_online_restart */ case OD_LENABLE_ONLINE_RESTART: if (!od_config_reader_yes_no( - reader, &config->enable_online_restart_feature)) - return -1; + reader, &config->enable_online_restart_feature)) { + goto error; + } continue; /* graceful_die_on_errors */ case OD_LGRACEFUL_DIE_ON_ERRORS: if (!od_config_reader_yes_no(reader, - &config->graceful_die_on_errors)) - return -1; + &config->graceful_die_on_errors)) { + goto error; + } continue; case OD_LBINDWITH_REUSEPORT: if (!od_config_reader_yes_no(reader, - &config->bindwith_reuseport)) - return -1; + &config->bindwith_reuseport)) { + goto error; + } continue; /* log_debug */ case OD_LLOG_DEBUG: - if (!od_config_reader_yes_no(reader, &config->log_debug)) - return -1; + if (!od_config_reader_yes_no(reader, &config->log_debug)) { + goto error; + } continue; /* log_stdout */ case OD_LLOG_TO_STDOUT: - if (!od_config_reader_yes_no(reader, &config->log_to_stdout)) - return -1; + if (!od_config_reader_yes_no(reader, &config->log_to_stdout)) { + goto error; + } continue; /* log_config */ case OD_LLOG_CONFIG: - if (!od_config_reader_yes_no(reader, &config->log_config)) - return -1; + if (!od_config_reader_yes_no(reader, &config->log_config)) { + goto error; + } continue; /* log_session */ case OD_LLOG_SESSION: - if (!od_config_reader_yes_no(reader, &config->log_session)) - return -1; + if (!od_config_reader_yes_no(reader, &config->log_session)) { + goto error; + } continue; /* log_query */ case OD_LLOG_QUERY: - if (!od_config_reader_yes_no(reader, &config->log_query)) - return -1; + if (!od_config_reader_yes_no(reader, &config->log_query)) { + goto error; + } continue; /* log_stats */ case OD_LLOG_STATS: - if (!od_config_reader_yes_no(reader, &config->log_stats)) - return -1; + if (!od_config_reader_yes_no(reader, &config->log_stats)) { + goto error; + } continue; /* log_format */ case OD_LLOG_FORMAT: - if (!od_config_reader_string(reader, &config->log_format)) - return -1; + if (!od_config_reader_string(reader, &config->log_format)) { + goto error; + } continue; /* log_file */ case OD_LLOG_FILE: - if (!od_config_reader_string(reader, &config->log_file)) - return -1; + if (!od_config_reader_string(reader, &config->log_file)) { + goto error; + } continue; /* log_syslog */ case OD_LLOG_SYSLOG: - if (!od_config_reader_yes_no(reader, &config->log_syslog)) - return -1; + if (!od_config_reader_yes_no(reader, &config->log_syslog)) { + goto error; + } continue; /* log_syslog_ident */ case OD_LLOG_SYSLOG_IDENT: - if (!od_config_reader_string(reader, &config->log_syslog_ident)) - return -1; + if (!od_config_reader_string(reader, + &config->log_syslog_ident)) { + goto error; + } continue; /* log_syslog_facility */ case OD_LLOG_SYSLOG_FACILITY: if (!od_config_reader_string(reader, - &config->log_syslog_facility)) - return -1; + &config->log_syslog_facility)) { + goto error; + } continue; /* stats_interval */ case OD_LSTATS_INTERVAL: - if (!od_config_reader_number(reader, &config->stats_interval)) - return -1; + if (!od_config_reader_number(reader, &config->stats_interval)) { + goto error; + } + continue; /* client_max */ case OD_LCLIENT_MAX: - if (!od_config_reader_number(reader, &config->client_max)) - return -1; + if (!od_config_reader_number(reader, &config->client_max)) { + goto error; + } config->client_max_set = 1; continue; /* client_max_routing */ case OD_LCLIENT_MAX_ROUTING: if (!od_config_reader_number(reader, - &config->client_max_routing)) - return -1; + &config->client_max_routing)) { + goto error; + } continue; /* server_login_retry */ case OD_LSERVER_LOGIN_RETRY: if (!od_config_reader_number(reader, - &config->server_login_retry)) - return -1; + &config->server_login_retry)) { + goto error; + } continue; /* readahead */ case OD_LREADAHEAD: - if (!od_config_reader_number(reader, &config->readahead)) - return -1; + if (!od_config_reader_number(reader, &config->readahead)) { + goto error; + } continue; /* nodelay */ case OD_LNODELAY: - if (!od_config_reader_yes_no(reader, &config->nodelay)) - return -1; + if (!od_config_reader_yes_no(reader, &config->nodelay)) { + goto error; + } continue; /* keepalive */ case OD_LKEEPALIVE: - if (!od_config_reader_number(reader, &config->keepalive)) - return -1; + if (!od_config_reader_number(reader, &config->keepalive)) { + goto error; + } continue; /* keepalive_keep_interval */ case OD_LKEEPALIVE_INTERVAL: - if (!od_config_reader_number(reader, - &config->keepalive_keep_interval)) - return -1; + if (!od_config_reader_number( + reader, &config->keepalive_keep_interval)) { + goto error; + } continue; /* keepalive_probes */ case OD_LKEEPALIVE_PROBES: - if (!od_config_reader_number(reader, &config->keepalive_probes)) - return -1; + if (!od_config_reader_number(reader, + &config->keepalive_probes)) { + goto error; + } continue; /* keepalive_usr_timeout */ case OD_LKEEPALIVE_USR_TIMEOUT: if (!od_config_reader_number(reader, - &config->keepalive_usr_timeout)) - return -1; + &config->keepalive_usr_timeout)) { + goto error; + } continue; /* workers */ case OD_LWORKERS: - if (!od_config_reader_number(reader, &config->workers)) - return -1; + if (!od_config_reader_number(reader, &config->workers)) { + goto error; + } + continue; /* resolvers */ case OD_LRESOLVERS: - if (!od_config_reader_number(reader, &config->resolvers)) - return -1; + if (!od_config_reader_number(reader, &config->resolvers)) { + goto error; + } + continue; /* pipeline */ - /* cache */ - /* cache_chunk */ case OD_LPIPELINE: + /* fallthrough */ case OD_LCACHE: + + /* cache */ + /* fallthrough */ + case OD_LCACHE_CHUNK: + + /* cache_chunk */ + /* fallthrough */ + case OD_LPACKET_WRITE_QUEUE: + + /* packet write queue */ + /* fallthrough */ + case OD_LPACKET_READ_SIZE: { /* deprecated */ int unused; - if (!od_config_reader_number(reader, &unused)) - return -1; + if (!od_config_reader_number(reader, &unused)) { + goto error; + } continue; } /* cache_msg_gc_size */ case OD_LCACHE_MSG_GC_SIZE: if (!od_config_reader_number(reader, - &config->cache_msg_gc_size)) - return -1; + &config->cache_msg_gc_size)) { + goto error; + } continue; /* cache_coroutine */ case OD_LCACHE_COROUTINE: - if (!od_config_reader_number(reader, &config->cache_coroutine)) - return -1; + if (!od_config_reader_number(reader, + &config->cache_coroutine)) { + goto error; + } continue; /* coroutine_stack_size */ case OD_LCOROUTINE_STACK_SIZE: if (!od_config_reader_number(reader, - &config->coroutine_stack_size)) - return -1; + &config->coroutine_stack_size)) { + goto error; + } continue; /* listen */ case OD_LLISTEN: rc = od_config_reader_listen(reader); - if (rc == -1) - return -1; + if (rc == -1) { + goto error; + } continue; /* storage */ case OD_LSTORAGE: rc = od_config_reader_storage(reader); - if (rc == -1) - return -1; + if (rc == -1) { + goto error; + } continue; /* database */ case OD_LDATABASE: rc = od_config_reader_database(reader, modules); - if (rc == -1) - return -1; + if (rc == -1) { + goto error; + } continue; case OD_LMODULE: { char *module_path = NULL; rc = od_config_reader_string(reader, &module_path); if (rc == -1) { - return -1; + goto error; } if (od_target_module_add(NULL, modules, module_path) == OD_MODULE_CB_FAIL_RETCODE) { @@ -1212,11 +1273,19 @@ od_config_reader_parse(od_config_reader_t *reader, od_module_t *modules) } default: od_config_reader_error(reader, &token, "unexpected parameter"); - return -1; + goto error; } } /* unreach */ return -1; +error: + return -1; +success: + if (!config->client_max_routing) { + config->client_max_routing = config->workers * 16; + } + + return 0; } int @@ -1233,12 +1302,12 @@ od_config_reader_import(od_config_t *config, reader.rules = rules; int rc; rc = od_config_reader_open(&reader, config_file); - if (rc == -1) + if (rc == -1) { return -1; + } + rc = od_config_reader_parse(&reader, modules); od_config_reader_close(&reader); - if (!config->client_max_routing) - config->client_max_routing = config->workers * 16; return rc; } diff --git a/sources/console.c b/sources/console.c index abcd5277..4f990c0e 100644 --- a/sources/console.c +++ b/sources/console.c @@ -665,11 +665,15 @@ od_console_show_databases_add_cb(od_route_t *route, void **argv) od_rule_storage_t *storage = rule->storage; char *host = storage->host; - if (!host) + if (!host) { host = ""; + } + rc = kiwi_be_write_data_row_add(stream, offset, host, strlen(host)); - if (rc == -1) + if (rc == -1) { goto error; + } + char data[64]; int data_len; @@ -708,8 +712,10 @@ od_console_show_databases_add_cb(od_route_t *route, void **argv) rc = kiwi_be_write_data_row_add(stream, offset, "session", 7); if (rule->pool == OD_RULE_POOL_TRANSACTION) rc = kiwi_be_write_data_row_add(stream, offset, "transaction", 11); - if (rc == -1) + + if (rc == -1) { goto error; + } /* max_connections */ data_len = od_snprintf(data, sizeof(data), "%d", rule->client_max); @@ -724,7 +730,8 @@ od_console_show_databases_add_cb(od_route_t *route, void **argv) route->client_pool.count_active + route->client_pool.count_pending + route->client_pool.count_queue); - rc = kiwi_be_write_data_row_add(stream, offset, data, data_len); + + rc = kiwi_be_write_data_row_add(stream, offset, data, data_len); if (rc == -1) goto error; @@ -1551,6 +1558,7 @@ od_console_query(od_client_t *client, uint32_t query_data_size) { od_instance_t *instance = client->global->instance; + od_system_t *system = client->global->system; uint32_t query_len; char *query; diff --git a/sources/counter.c b/sources/counter.c index 43fe303a..f26ed8e9 100644 --- a/sources/counter.c +++ b/sources/counter.c @@ -14,6 +14,30 @@ od_counter_llist_create(void) return llist; } +od_bucket_t * +od_bucket_create(void) +{ + od_bucket_t *b = malloc(sizeof(od_bucket_t)); + if (b == NULL) + return NULL; + + b->l = od_counter_llist_create(); + const int res = pthread_mutex_init(&b->mutex, NULL); + if (!res) { + return b; + } + + return NULL; +} + +void +od_bucket_free(od_bucket_t *b) +{ + od_counter_llist_free(b->l); + pthread_mutex_destroy(&b->mutex); + free(b); +} + inline void od_counter_llist_add(od_counter_llist_t *llist, const od_counter_item_t *it) { @@ -38,46 +62,41 @@ od_counter_llist_free(od_counter_llist_t *l) next = it->next; free(it); } - + free(l); return OK_RESPONSE; } +static size_t +od_counter_required_buf_size(int sz) +{ + return sizeof(od_counter_t) + (sz * sizeof(od_bucket_t)); +} + od_counter_t * od_counter_create(size_t sz) { - od_counter_t *t = malloc(sizeof(od_counter_t)); + od_counter_t *t = malloc(od_counter_required_buf_size(sz)); if (t == NULL) { goto error; } - t->buckets = malloc(sizeof(od_counter_llist_t) * sz); - if (t->buckets == NULL) { - goto error; - } - t->bucket_mutex = malloc(sizeof(pthread_mutex_t) * sz); - if (t->bucket_mutex == NULL) { - goto error; - } t->size = sz; for (size_t i = 0; i < t->size; ++i) { - t->buckets[i] = od_counter_llist_create(); + t->buckets[i] = od_bucket_create(); if (t->buckets[i] == NULL) { goto error; } - const int res = pthread_mutex_init(&t->bucket_mutex[i], NULL); - if (res) { - goto error; - } } return t; error: if (t) { - if (t->buckets) { - free(t->buckets); + for (size_t i = 0; i < t->size; ++i) { + if (t->buckets[i] == NULL) + continue; + + od_bucket_free(t->buckets[i]); } - if (t->bucket_mutex) - free(t->bucket_mutex); free(t); } @@ -94,11 +113,9 @@ od_retcode_t od_counter_free(od_counter_t *t) { for (size_t i = 0; i < t->size; ++i) { - od_counter_llist_free(t->buckets[i]); - pthread_mutex_destroy(&t->bucket_mutex[i]); + od_bucket_free(t->buckets[i]); } - free(t->buckets); free(t); return OK_RESPONSE; @@ -112,11 +129,11 @@ od_counter_inc(od_counter_t *t, od_counter_item_t item) * prevent concurrent access to * modify hash table section */ - pthread_mutex_lock(&t->bucket_mutex[key]); + pthread_mutex_lock(&t->buckets[key]->mutex); { bool fnd = false; - for (od_counter_litem_t *it = t->buckets[key]->list; it != NULL; + for (od_counter_litem_t *it = t->buckets[key]->l->list; it != NULL; it = it->next) { if (it->value == item) { ++it->cnt; @@ -125,9 +142,9 @@ od_counter_inc(od_counter_t *t, od_counter_item_t item) } } if (!fnd) - od_counter_llist_add(t->buckets[key], &item); + od_counter_llist_add(t->buckets[key]->l, &item); } - pthread_mutex_unlock(&t->bucket_mutex[key]); + pthread_mutex_unlock(&t->buckets[key]->mutex); } od_count_t @@ -137,9 +154,9 @@ od_counter_get_count(od_counter_t *t, od_counter_item_t value) od_count_t ret_val = 0; - pthread_mutex_lock(&t->bucket_mutex[key]); + pthread_mutex_lock(&t->buckets[key]->mutex); { - for (od_counter_litem_t *it = t->buckets[key]->list; it != NULL; + for (od_counter_litem_t *it = t->buckets[key]->l->list; it != NULL; it = it->next) { if (it->value == value) { ret_val = it->cnt; @@ -147,7 +164,7 @@ od_counter_get_count(od_counter_t *t, od_counter_item_t value) } } } - pthread_mutex_unlock(&t->bucket_mutex[key]); + pthread_mutex_unlock(&t->buckets[key]->mutex); return ret_val; } @@ -155,14 +172,15 @@ od_counter_get_count(od_counter_t *t, od_counter_item_t value) static inline od_retcode_t od_counter_reset_target_bucket(od_counter_t *t, size_t bucket_key) { - pthread_mutex_lock(&t->bucket_mutex[bucket_key]); + pthread_mutex_lock(&t->buckets[bucket_key]->mutex); { - for (od_counter_litem_t *it = t->buckets[bucket_key]->list; it != NULL; - it = it->next) { + for (od_counter_litem_t *it = t->buckets[bucket_key]->l->list; + it != NULL; + it = it->next) { it->value = 0; } } - pthread_mutex_unlock(&t->bucket_mutex[bucket_key]); + pthread_mutex_unlock(&t->buckets[bucket_key]->mutex); return OK_RESPONSE; } @@ -172,9 +190,9 @@ od_counter_reset(od_counter_t *t, od_counter_item_t value) { od_counter_item_t key = od_hash_item(t, value); - pthread_mutex_lock(&t->bucket_mutex[key]); + pthread_mutex_lock(&t->buckets[key]->mutex); { - for (od_counter_litem_t *it = t->buckets[key]->list; it != NULL; + for (od_counter_litem_t *it = t->buckets[key]->l->list; it != NULL; it = it->next) { if (it->value == value) { it->value = 0; @@ -182,7 +200,7 @@ od_counter_reset(od_counter_t *t, od_counter_item_t value) } } } - pthread_mutex_unlock(&t->bucket_mutex[key]); + pthread_mutex_unlock(&t->buckets[key]->mutex); return OK_RESPONSE; } diff --git a/sources/counter.h b/sources/counter.h index ee184a02..a2cd37b2 100644 --- a/sources/counter.h +++ b/sources/counter.h @@ -44,12 +44,19 @@ extern od_retcode_t od_counter_llist_free(od_counter_llist_t *l); #define OD_DEFAULT_HASH_TABLE_SIZE 15 +typedef struct od_bucket +{ + + od_counter_llist_t *l; + pthread_mutex_t mutex; +} od_bucket_t; + typedef struct od_counter od_counter_t; struct od_counter { - od_counter_llist_t **buckets; - pthread_mutex_t *bucket_mutex; size_t size; + // ISO C99 flexible array member + od_bucket_t *buckets[]; }; extern od_counter_t * diff --git a/sources/cron.c b/sources/cron.c index ea909c06..aa6997ed 100644 --- a/sources/cron.c +++ b/sources/cron.c @@ -105,6 +105,7 @@ od_cron_stat(od_cron_t *cron) uint64_t msg_cache_count = 0; uint64_t msg_cache_gc_count = 0; uint64_t msg_cache_size = 0; + od_atomic_u64_t startup_errors = od_atomic_u64_of(&cron->startup_errors); cron->startup_errors = 0; @@ -150,11 +151,13 @@ od_cron_stat(od_cron_t *cron) /* update stats per route and print info */ od_route_pool_stat_cb_t stat_cb; - stat_cb = od_cron_stat_cb; - if (!instance->config.log_stats) + if (!instance->config.log_stats) { stat_cb = NULL; + } else { + stat_cb = od_cron_stat_cb; + } void *argv[] = { instance }; - od_router_stat(router, cron->stat_time_us, 1, stat_cb, argv); + od_router_stat(router, cron->stat_time_us, stat_cb, argv); /* update current stat time mark */ cron->stat_time_us = machine_time_us(); @@ -185,8 +188,6 @@ od_cron_expire(od_cron_t *cron) "closing idle server connection (%d secs)", server->idle_time); server->route = NULL; - if (!od_config_is_multi_workers(&instance->config)) - od_io_attach(&server->io); od_backend_close_connection(server); od_backend_close(server); } @@ -205,13 +206,26 @@ od_cron_err_stat(od_cron_t *cron) od_list_foreach(&router->route_pool.list, it) { od_route_t *current_route = od_container_of(it, od_route_t, link); - if (current_route->extra_logging_enabled) { - od_err_logger_inc_interval(current_route->err_logger); + od_route_lock(current_route); + { + if (current_route->extra_logging_enabled) { + od_err_logger_inc_interval(current_route->err_logger); + } } + od_route_unlock(current_route); } - od_err_logger_inc_interval(router->route_pool.err_logger); - od_err_logger_inc_interval(router->router_err_logger); + od_router_lock(router) + { + od_err_logger_inc_interval(router->router_err_logger); + } + od_router_unlock(router) + + od_route_pool_lock(router->route_pool) + { + od_err_logger_inc_interval(router->route_pool.err_logger); + } + od_route_pool_unlock(router->route_pool) } static void diff --git a/sources/debugprintf.h b/sources/debugprintf.h index 5122b2ec..993dc227 100644 --- a/sources/debugprintf.h +++ b/sources/debugprintf.h @@ -13,12 +13,14 @@ void od_dbg_printf(char *fmt, ...); +#define OD_RELEASE_MODE -1 + #ifndef OD_DEVEL_LVL /* set "release" mode by default */ -#define OD_DEVEL_LVL -1 +#define OD_DEVEL_LVL OD_RELEASE_MODE #endif -#if OD_DEVEL_LVL == -1 +#if OD_DEVEL_LVL == OD_RELEASE_MODE #define od_dbg_printf_on_dvl_lvl(debug_lvl, fmt, ...) /* zero cost debug print on release mode */ #else diff --git a/sources/deploy.c b/sources/deploy.c index eafc97cf..b116fa2c 100644 --- a/sources/deploy.c +++ b/sources/deploy.c @@ -16,17 +16,19 @@ od_deploy(od_client_t *client, char *context) od_server_t *server = client->server; od_route_t *route = client->route; - if (route->id.physical_rep || route->id.logical_rep) + if (route->id.physical_rep || route->id.logical_rep) { return 0; + } /* compare and set options which are differs from server */ int query_count; query_count = 0; - char query[512]; + char query[OD_QRY_MAX_SZ]; int query_size; query_size = kiwi_vars_cas(&client->vars, &server->vars, query, sizeof(query) - 1); + if (query_size > 0) { query[query_size] = 0; query_size++; @@ -34,10 +36,12 @@ od_deploy(od_client_t *client, char *context) msg = kiwi_fe_write_query(NULL, query, query_size); if (msg == NULL) return -1; + int rc; rc = od_write(&server->io, msg); if (rc == -1) return -1; + query_count++; od_debug( diff --git a/sources/deploy.h b/sources/deploy.h index 6054f483..15f0182b 100644 --- a/sources/deploy.h +++ b/sources/deploy.h @@ -7,6 +7,8 @@ * Scalable PostgreSQL connection pooler. */ +#define OD_QRY_MAX_SZ 512 /* odyssey maximum allowed query size */ + int od_deploy(od_client_t *, char *); diff --git a/sources/err_logger.c b/sources/err_logger.c index 656eade2..d7296166 100644 --- a/sources/err_logger.c +++ b/sources/err_logger.c @@ -1,10 +1,17 @@ #include "err_logger.h" +static size_t +err_logger_required_buf_size(int sz) +{ + return sizeof(od_error_logger_t) + (sz * sizeof(od_counter_t)); +} + od_error_logger_t * od_err_logger_create(size_t intervals_count) { - od_error_logger_t *err_logger = malloc(sizeof(od_error_logger_t)); + od_error_logger_t *err_logger = + malloc(err_logger_required_buf_size(intervals_count)); if (err_logger == NULL) { goto error; } @@ -12,9 +19,6 @@ od_err_logger_create(size_t intervals_count) err_logger->intercals_cnt = intervals_count; err_logger->current_interval_num = 0; - err_logger->interval_counters = - MALLOC_ARRAY(intervals_count, od_counter_t *); - for (size_t i = 0; i < intervals_count; ++i) { err_logger->interval_counters[i] = od_counter_create_default(); if (err_logger->interval_counters[i] == NULL) { @@ -29,10 +33,14 @@ error: if (err_logger) { - if (err_logger->interval_counters) - free(err_logger->interval_counters); + for (size_t i = 0; i < err_logger->intercals_cnt; ++i) { + if (err_logger->interval_counters[i] == NULL) + continue; + od_counter_free(err_logger->interval_counters[i]); + } - free(err_logger); + pthread_mutex_destroy(&err_logger->lock); + free((void *)(err_logger)); } return NULL; @@ -41,14 +49,23 @@ error: od_retcode_t od_err_logger_free(od_error_logger_t *err_logger) { + if (err_logger == NULL) { + return OK_RESPONSE; + } + for (size_t i = 0; i < err_logger->intercals_cnt; ++i) { + if (err_logger->interval_counters[i] == NULL) { + continue; + } int rc = od_counter_free(err_logger->interval_counters[i]); + err_logger->interval_counters[i] = NULL; + if (rc != OK_RESPONSE) return rc; } pthread_mutex_destroy(&err_logger->lock); - free(err_logger); + free((void *)(err_logger)); return OK_RESPONSE; } @@ -59,3 +76,28 @@ od_error_logger_store_err(od_error_logger_t *l, size_t err_t) od_counter_inc(l->interval_counters[l->current_interval_num], err_t); return OK_RESPONSE; } + +od_retcode_t +od_err_logger_inc_interval(od_error_logger_t *l) +{ + pthread_mutex_lock(&l->lock); + { + ++l->current_interval_num; + l->current_interval_num %= l->intercals_cnt; + + od_counter_reset_all(l->interval_counters[l->current_interval_num]); + } + pthread_mutex_unlock(&l->lock); + + return OK_RESPONSE; +} + +size_t +od_err_logger_get_aggr_errors_count(od_error_logger_t *l, size_t err_t) +{ + size_t ret_val = 0; + for (size_t i = 0; i < l->intercals_cnt; ++i) { + ret_val += od_counter_get_count(l->interval_counters[i], err_t); + } + return ret_val; +} diff --git a/sources/err_logger.h b/sources/err_logger.h index 454bdaac..f027f597 100644 --- a/sources/err_logger.h +++ b/sources/err_logger.h @@ -17,11 +17,12 @@ typedef struct od_error_logger od_error_logger_t; struct od_error_logger { size_t intercals_cnt; - od_counter_t **interval_counters; pthread_mutex_t lock; size_t current_interval_num; + // ISO C99 flexible array member + od_counter_t *interval_counters[]; }; extern od_retcode_t @@ -36,32 +37,13 @@ od_err_logger_create_default() return od_err_logger_create(DEFAULT_ERROR_INTERVAL_NUMBER); } -extern od_retcode_t +od_retcode_t od_err_logger_free(od_error_logger_t *err_logger); -static inline od_retcode_t -od_err_logger_inc_interval(od_error_logger_t *l) -{ - pthread_mutex_lock(&l->lock); - { - ++l->current_interval_num; - l->current_interval_num %= l->intercals_cnt; +od_retcode_t +od_err_logger_inc_interval(od_error_logger_t *l); - od_counter_reset_all(l->interval_counters[l->current_interval_num]); - } - pthread_mutex_unlock(&l->lock); - - return OK_RESPONSE; -} - -static inline size_t -od_err_logger_get_aggr_errors_count(od_error_logger_t *l, size_t err_t) -{ - size_t ret_val = 0; - for (size_t i = 0; i < l->intercals_cnt; ++i) { - ret_val += od_counter_get_count(l->interval_counters[i], err_t); - } - return ret_val; -} +size_t +od_err_logger_get_aggr_errors_count(od_error_logger_t *l, size_t err_t); #endif // ODYSSEY_ERR_LOGGER_H diff --git a/sources/error.h b/sources/error.h index 203cc425..80eb0762 100644 --- a/sources/error.h +++ b/sources/error.h @@ -13,7 +13,7 @@ typedef struct od_error od_error_t; struct od_error { - char file[256]; + char file[OD_ERROR_MAX_LEN]; int file_len; char function[128]; int function_len; diff --git a/sources/frontend.c b/sources/frontend.c index 0a73d524..d703c650 100644 --- a/sources/frontend.c +++ b/sources/frontend.c @@ -202,8 +202,7 @@ od_frontend_attach(od_client_t *client, bool wait_for_idle = false; for (;;) { od_router_status_t status; - status = - od_router_attach(router, &instance->config, client, wait_for_idle); + status = od_router_attach(router, client, wait_for_idle); if (status != OD_ROUTER_OK) { if (status == OD_ROUTER_ERROR_TIMEDOUT) { od_error(&instance->logger, @@ -230,14 +229,16 @@ od_frontend_attach(od_client_t *client, context, client, server, - "attached to %s%.*s", + "client %s attached to %s%.*s", + client->id.id, server->id.id_prefix, (int)sizeof(server->id.id), server->id.id); /* connect to server, if necessary */ - if (server->io.io) + if (server->io.io) { return OD_OK; + } int rc; od_atomic_u32_inc(&router->servers_routing); @@ -251,8 +252,9 @@ od_frontend_attach(od_client_t *client, od_frontend_error_is_too_many_connections(client); if (wait_for_idle) { od_router_close(router, client); - if (instance->config.server_login_retry) + if (instance->config.server_login_retry) { machine_sleep(instance->config.server_login_retry); + } continue; } return OD_ESERVER_CONNECT; @@ -746,7 +748,6 @@ od_frontend_remote(od_client_t *client) } od_server_t *server; - od_instance_t *instance = client->global->instance; for (;;) { while (1) { if (machine_cond_wait(client->cond, 60000) == 0) { @@ -811,9 +812,8 @@ od_frontend_remote(od_client_t *client) } /* push server connection back to route pool */ - od_router_t *router = client->global->router; - od_instance_t *instance = client->global->instance; - od_router_detach(router, &instance->config, client); + od_router_t *router = client->global->router; + od_router_detach(router, client); server = NULL; } else if (status != OD_OK) { break; @@ -843,7 +843,8 @@ od_frontend_remote(od_client_t *client) static void od_frontend_cleanup(od_client_t *client, char *context, - od_frontend_status_t status) + od_frontend_status_t status, + od_error_logger_t *l) { od_instance_t *instance = client->global->instance; od_router_t *router = client->global->router; @@ -853,6 +854,14 @@ od_frontend_cleanup(od_client_t *client, od_server_t *server = client->server; + if (od_frontend_status_is_err(status)) { + od_error_logger_store_err(l, status); + + if (route->extra_logging_enabled && !od_route_is_dynamic(route)) { + od_error_logger_store_err(route->err_logger, status); + } + } + switch (status) { case OD_STOP: case OD_OK: @@ -862,7 +871,9 @@ od_frontend_cleanup(od_client_t *client, context, client, server, - "client disconnected"); + "client disconnected (route %s.%s)", + route->rule->db_name, + route->rule->user_name); } if (!client->server) break; @@ -874,7 +885,7 @@ od_frontend_cleanup(od_client_t *client, break; } /* push server to router server pool */ - od_router_detach(router, &instance->config, client); + od_router_detach(router, client); break; case OD_EOOM: @@ -910,6 +921,7 @@ od_frontend_cleanup(od_client_t *client, break; case OD_ECLIENT_READ: + /*fallthrough*/ case OD_ECLIENT_WRITE: /* close client connection and reuse server * link in case of client errors */ @@ -938,7 +950,7 @@ od_frontend_cleanup(od_client_t *client, break; } /* push server to router server pool */ - od_router_detach(router, &instance->config, client); + od_router_detach(router, client); break; case OD_ESERVER_CONNECT: @@ -1134,7 +1146,7 @@ od_frontend(void *arg) /* route client */ od_router_status_t router_status; - router_status = od_router_route(router, &instance->config, client); + router_status = od_router_route(router, client); /* routing is over */ od_atomic_u32_dec(&router->clients_routing); @@ -1286,63 +1298,32 @@ od_frontend(void *arg) /* setup client and run main loop */ od_route_t *route = client->route; - od_error_logger_t *l; - l = router->route_pool.err_logger; od_frontend_status_t status; status = OD_UNDEF; switch (route->rule->storage->storage_type) { case OD_RULE_STORAGE_LOCAL: { status = od_frontend_local_setup(client); - if (od_frontend_status_is_err(status)) { - od_error_logger_store_err(l, status); - - if (route->extra_logging_enabled && - !od_route_is_dynamic(route)) { - od_error_logger_store_err(route->err_logger, status); - } - } if (status != OD_OK) break; status = od_frontend_local(client); - if (od_frontend_status_is_err(status)) { - od_error_logger_store_err(l, status); - - if (route->extra_logging_enabled && - !od_route_is_dynamic(route)) { - od_error_logger_store_err(route->err_logger, status); - } - } break; } case OD_RULE_STORAGE_REMOTE: { status = od_frontend_setup(client); - if (od_frontend_status_is_err(status)) { - od_error_logger_store_err(l, status); - - if (route->extra_logging_enabled && - !od_route_is_dynamic(route)) { - od_error_logger_store_err(route->err_logger, status); - } - } if (status != OD_OK) break; status = od_frontend_remote(client); - if (od_frontend_status_is_err(status)) { - od_error_logger_store_err(l, status); - - if (route->extra_logging_enabled && - !od_route_is_dynamic(route)) { - od_error_logger_store_err(route->err_logger, status); - } - } break; } } - od_frontend_cleanup(client, "main", status); + od_error_logger_t *l; + l = router->route_pool.err_logger; + + od_frontend_cleanup(client, "main", status, l); od_list_foreach(&modules->link, i) { diff --git a/sources/grac_shutdown_worker.c b/sources/grac_shutdown_worker.c index ecad051d..b9fe236e 100644 --- a/sources/grac_shutdown_worker.c +++ b/sources/grac_shutdown_worker.c @@ -45,8 +45,10 @@ od_grac_shutdown_worker(void *arg) /* wait for all servers to complete old transations */ od_list_foreach(&router->servers, i) { +#if OD_DEVEL_LVL != -1 od_system_server_t *server; server = od_container_of(i, od_system_server_t, link); +#endif while (od_atomic_u32_of(&router->clients_routing) || od_atomic_u32_of(&router->clients)) { diff --git a/sources/instance.c b/sources/instance.c index f5841620..c9a57be7 100644 --- a/sources/instance.c +++ b/sources/instance.c @@ -184,6 +184,7 @@ od_instance_main(od_instance_t *instance, int argc, char **argv) NULL, "failed to set process priority: %s", strerror(errno)); + goto error; } } diff --git a/sources/instance.h b/sources/instance.h index e9102450..cc0aa934 100644 --- a/sources/instance.h +++ b/sources/instance.h @@ -13,7 +13,6 @@ typedef struct timeval od_timeval_t; struct od_instance { od_pid_t pid; - od_pid_t watchdog_pid; od_logger_t logger; char *config_file; od_config_t config; diff --git a/sources/logger.h b/sources/logger.h index 13749716..3259bb4a 100644 --- a/sources/logger.h +++ b/sources/logger.h @@ -84,6 +84,7 @@ od_log(od_logger_t *logger, va_end(args); } +#if OD_DEVEL_LVL != OD_RELEASE_MODE static inline void od_debug(od_logger_t *logger, char *context, @@ -97,6 +98,9 @@ od_debug(od_logger_t *logger, od_logger_write(logger, OD_DEBUG, context, client, server, fmt, args); va_end(args); } +#else +#define od_debug(...) ; +#endif static inline void od_error(od_logger_t *logger, diff --git a/sources/macro.h b/sources/macro.h index 83be4010..870c8147 100644 --- a/sources/macro.h +++ b/sources/macro.h @@ -7,7 +7,7 @@ * Scalable PostgreSQL connection pooler. */ -#include "stdbool.h" +#include "c.h" #define USE_SCRAM @@ -24,8 +24,6 @@ #define CONCAT_(A, B) A##B #define CONCAT(A, B) CONCAT_(A, B) -#define MALLOC_ARRAY(number, type) ((type *)malloc((number) * sizeof(type))) - typedef int od_retcode_t; #endif /* ODYSSEY_MACRO_H */ diff --git a/sources/module.c b/sources/module.c index db019c07..a3fff314 100644 --- a/sources/module.c +++ b/sources/module.c @@ -51,8 +51,10 @@ od_target_module_add(od_logger_t *logger, od_list_append(&modules->link, &module_ptr->link); strcat(module_ptr->path, target_module_path); - if (module_ptr->module_init_cb) + if (module_ptr->module_init_cb) { return module_ptr->module_init_cb(); + } + return OD_MODULE_CB_OK_RETCODE; module_exists: @@ -161,6 +163,7 @@ od_modules_unload(od_logger_t *logger, od_module_t *modules) goto error; } } + return OD_MODULE_CB_OK_RETCODE; error: err = od_dlerror(); diff --git a/sources/route.h b/sources/route.h index e157136b..59988327 100644 --- a/sources/route.h +++ b/sources/route.h @@ -68,14 +68,12 @@ od_route_free(od_route_t *route) if (route->wait_bus) machine_channel_free(route->wait_bus); if (route->stats.enable_quantiles) { - for (size_t i = 0; i < QUANTILES_WINDOW; ++i) { - td_free(route->stats.transaction_hgram[i]); - td_free(route->stats.query_hgram[i]); - } + od_stat_free(&route->stats); } if (route->extra_logging_enabled) { od_err_logger_free(route->err_logger); + route->err_logger = NULL; } pthread_mutex_destroy(&route->lock); @@ -83,13 +81,13 @@ od_route_free(od_route_t *route) } static inline od_route_t * -od_route_allocate(int is_shared) +od_route_allocate() { od_route_t *route = malloc(sizeof(*route)); if (route == NULL) return NULL; od_route_init(route, true); - route->wait_bus = machine_channel_create(is_shared); + route->wait_bus = machine_channel_create(); if (route->wait_bus == NULL) { od_route_free(route); return NULL; diff --git a/sources/route_pool.h b/sources/route_pool.h index 74bb2b3f..8d240c8a 100644 --- a/sources/route_pool.h +++ b/sources/route_pool.h @@ -27,14 +27,19 @@ typedef struct od_route_pool od_route_pool_t; struct od_route_pool { - od_list_t list; /* used for counting error for client without concrete route * like default_db.usr1, db1.default, etc * */ od_error_logger_t *err_logger; int count; + pthread_mutex_t lock; + + od_list_t list; }; +#define od_route_pool_lock(route_pool) pthread_mutex_lock(&route_pool.lock); +#define od_route_pool_unlock(route_pool) pthread_mutex_unlock(&route_pool.lock); + typedef od_retcode_t ( *od_route_pool_stat_frontend_error_cb_t)(od_route_pool_t *pool, void **argv); @@ -44,11 +49,18 @@ od_route_pool_init(od_route_pool_t *pool) od_list_init(&pool->list); pool->err_logger = od_err_logger_create_default(); pool->count = 0; + pthread_mutex_init(&pool->lock, NULL); } static inline void od_route_pool_free(od_route_pool_t *pool) { + if (pool == NULL) + return; + + pthread_mutex_destroy(&pool->lock); + + od_err_logger_free(pool->err_logger); od_list_t *i, *n; od_list_foreach_safe(&pool->list, i, n) { @@ -59,12 +71,9 @@ od_route_pool_free(od_route_pool_t *pool) } static inline od_route_t * -od_route_pool_new(od_route_pool_t *pool, - int is_shared, - od_route_id_t *id, - od_rule_t *rule) +od_route_pool_new(od_route_pool_t *pool, od_route_id_t *id, od_rule_t *rule) { - od_route_t *route = od_route_allocate(is_shared); + od_route_t *route = od_route_allocate(); if (route == NULL) return NULL; int rc; @@ -123,7 +132,6 @@ od_route_pool_match(od_route_pool_t *pool, od_route_id_t *key, od_rule_t *rule) static inline void od_route_pool_stat(od_route_pool_t *pool, uint64_t prev_time_us, - int prev_update, od_route_pool_stat_cb_t callback, void **argv) { @@ -152,11 +160,11 @@ od_route_pool_stat(od_route_pool_t *pool, od_stat_average(&avg, ¤t, &route->stats_prev, prev_time_us); /* update route stats */ - if (prev_update) - od_stat_update(&route->stats_prev, ¤t); + od_stat_update(&route->stats_prev, ¤t); - if (callback) + if (callback) { callback(route, ¤t, &avg, argv); + } } } diff --git a/sources/router.c b/sources/router.c index a74b06fa..ef4d948d 100644 --- a/sources/router.c +++ b/sources/router.c @@ -33,7 +33,6 @@ od_router_free(od_router_t *router) od_rules_free(&router->rules); pthread_mutex_destroy(&router->lock); od_err_logger_free(router->router_err_logger); - od_err_logger_free(router->route_pool.err_logger); } inline int @@ -204,18 +203,16 @@ od_router_gc(od_router_t *router) void od_router_stat(od_router_t *router, uint64_t prev_time_us, - int prev_update, od_route_pool_stat_cb_t callback, void **argv) { od_router_lock(router); - od_route_pool_stat( - &router->route_pool, prev_time_us, prev_update, callback, argv); + od_route_pool_stat(&router->route_pool, prev_time_us, callback, argv); od_router_unlock(router); } od_router_status_t -od_router_route(od_router_t *router, od_config_t *config, od_client_t *client) +od_router_route(od_router_t *router, od_client_t *client) { kiwi_be_startup_t *startup = &client->startup; @@ -262,9 +259,7 @@ od_router_route(od_router_t *router, od_config_t *config, od_client_t *client) od_route_t *route; route = od_route_pool_match(&router->route_pool, &id, rule); if (route == NULL) { - int is_shared; - is_shared = od_config_is_multi_workers(config); - route = od_route_pool_new(&router->route_pool, is_shared, &id, rule); + route = od_route_pool_new(&router->route_pool, &id, rule); if (route == NULL) { od_router_unlock(router); return OD_ROUTER_ERROR; @@ -347,10 +342,7 @@ od_should_not_spun_connection_yet(int connections_in_pool, } od_router_status_t -od_router_attach(od_router_t *router, - od_config_t *config, - od_client_t *client, - bool wait_for_idle) +od_router_attach(od_router_t *router, od_client_t *client, bool wait_for_idle) { (void)router; od_route_t *route = client->route; @@ -458,8 +450,9 @@ attach: od_route_unlock(route); /* attach server io to clients machine context */ - if (server->io.io && od_config_is_multi_workers(config)) + if (server->io.io) { od_io_attach(&server->io); + } /* maybe restore read events subscription */ if (restart_read) @@ -469,7 +462,7 @@ attach: } void -od_router_detach(od_router_t *router, od_config_t *config, od_client_t *client) +od_router_detach(od_router_t *router, od_client_t *client) { (void)router; od_route_t *route = client->route; @@ -477,8 +470,7 @@ od_router_detach(od_router_t *router, od_config_t *config, od_client_t *client) /* detach from current machine event loop */ od_server_t *server = client->server; - if (od_config_is_multi_workers(config)) - od_io_detach(&server->io); + od_io_detach(&server->io); od_route_lock(route); diff --git a/sources/router.h b/sources/router.h index 8bdf3e6e..1e0b3959 100644 --- a/sources/router.h +++ b/sources/router.h @@ -12,14 +12,18 @@ typedef struct od_router od_router_t; struct od_router { pthread_mutex_t lock; + od_rules_t rules; - od_list_t servers; od_route_pool_t route_pool; + /* clients */ od_atomic_u32_t clients; od_atomic_u32_t clients_routing; + /* servers */ od_atomic_u32_t servers_routing; - + /* error logging */ od_error_logger_t *router_err_logger; + /* router has type of list */ + od_list_t servers; }; #define od_router_lock(router) pthread_mutex_lock(&router->lock); @@ -36,21 +40,21 @@ od_router_expire(od_router_t *, od_list_t *); void od_router_gc(od_router_t *); void -od_router_stat(od_router_t *, uint64_t, int, od_route_pool_stat_cb_t, void **); +od_router_stat(od_router_t *, uint64_t, od_route_pool_stat_cb_t, void **); extern int od_router_foreach(od_router_t *, od_route_pool_cb_t, void **); od_router_status_t -od_router_route(od_router_t *router, od_config_t *config, od_client_t *client); +od_router_route(od_router_t *router, od_client_t *client); void od_router_unroute(od_router_t *, od_client_t *); od_router_status_t -od_router_attach(od_router_t *, od_config_t *, od_client_t *, bool); +od_router_attach(od_router_t *, od_client_t *, bool); void -od_router_detach(od_router_t *, od_config_t *, od_client_t *); +od_router_detach(od_router_t *, od_client_t *); void od_router_close(od_router_t *, od_client_t *); diff --git a/sources/sighandler.c b/sources/sighandler.c index 67cc4e0f..88c40b35 100644 --- a/sources/sighandler.c +++ b/sources/sighandler.c @@ -47,6 +47,42 @@ od_system_cleanup(od_system_t *system) } } +void +od_system_shutdown(od_system_t *system, od_instance_t *instance) +{ + od_log(&instance->logger, + "system", + NULL, + NULL, + "SIGINT received, shutting down"); + od_worker_pool_stop(system->global->worker_pool); + od_router_free(system->global->router); + /* Prevent OpenSSL usage during deinitialization */ + od_worker_pool_wait(); + od_modules_unload(&instance->logger, system->global->modules); + od_system_cleanup(system); + exit(0); +} + +void +od_system_shutdown_fast(od_system_t *system, od_instance_t *instance) +{ + od_log(&instance->logger, + "system", + NULL, + NULL, + "SIGTERM received, shutting down"); + od_worker_pool_stop(system->global->worker_pool); + od_router_free(system->global->router); + + /* No time for caution */ + od_system_cleanup(system); + + /* TODO: */ + od_modules_unload_fast(system->global->modules); + exit(0); +} + void od_system_signal_handler(void *arg) { @@ -79,32 +115,10 @@ od_system_signal_handler(void *arg) break; switch (rc) { case SIGTERM: - od_log(&instance->logger, - "system", - NULL, - NULL, - "SIGTERM received, shutting down"); - od_worker_pool_stop(system->global->worker_pool); - /* No time for caution */ - od_system_cleanup(system); - /* TODO: */ - od_modules_unload_fast(system->global->modules); - kill(instance->watchdog_pid.pid, SIGKILL); - exit(0); + od_system_shutdown_fast(system, instance); break; case SIGINT: - od_log(&instance->logger, - "system", - NULL, - NULL, - "SIGINT received, shutting down"); - od_worker_pool_stop(system->global->worker_pool); - /* Prevent OpenSSL usage during deinitialization */ - od_worker_pool_wait(system->global->worker_pool); - od_modules_unload(&instance->logger, system->global->modules); - od_system_cleanup(system); - kill(instance->watchdog_pid.pid, SIGKILL); - exit(0); + od_system_shutdown(system, instance); break; case SIGHUP: od_log( diff --git a/sources/sighandler.h b/sources/sighandler.h index b2cba560..ec7e5e1c 100644 --- a/sources/sighandler.h +++ b/sources/sighandler.h @@ -17,4 +17,7 @@ void od_system_signal_handler(void *arg); +void +od_system_shutdown(od_system_t *system, od_instance_t *instance); + #endif /* OD_SIGNAL_HANDLER */ diff --git a/sources/stat.h b/sources/stat.h index f2e03c48..bc6d73a3 100644 --- a/sources/stat.h +++ b/sources/stat.h @@ -52,6 +52,15 @@ od_stat_init(od_stat_t *stat) memset(stat, 0, sizeof(*stat)); } +static inline void +od_stat_free(od_stat_t *stat) +{ + for (size_t i = 0; i < QUANTILES_WINDOW; ++i) { + td_free(stat->transaction_hgram[i]); + td_free(stat->query_hgram[i]); + } +} + static inline void od_stat_query_start(od_stat_state_t *state) { @@ -137,9 +146,7 @@ od_stat_update_of(od_atomic_u64_t *prev, od_atomic_u64_t *current) { /* todo: this could be made more optimal */ /* prev <= current */ - uint64_t diff; - diff = od_atomic_u64_of(current) - od_atomic_u64_of(prev); - od_atomic_u64_add(prev, diff); + __atomic_store(prev, current, __ATOMIC_SEQ_CST); } static inline void diff --git a/sources/tdigest.c b/sources/tdigest.c index 4c7c11df..785b62cf 100644 --- a/sources/tdigest.c +++ b/sources/tdigest.c @@ -31,7 +31,7 @@ struct td_histogram mm_sleeplock_t lock; - node_t nodes[0]; + node_t nodes[]; // ISO C99 flexible array member }; static bool diff --git a/sources/watchdog.c b/sources/watchdog.c index 09692fdf..2f1e28a4 100644 --- a/sources/watchdog.c +++ b/sources/watchdog.c @@ -5,20 +5,6 @@ */ #include "watchdog.h" -#include "kiwi.h" -#include - -#include "instance.h" -#include "debugprintf.h" -#include "setproctitle.h" -#include -#include -#include -#include "pid.h" -#include "logger.h" -#include "restart_sync.h" - -#include void od_watchdog_worker(void *arg) diff --git a/sources/watchdog.h b/sources/watchdog.h index df7728bc..ec6446d1 100644 --- a/sources/watchdog.h +++ b/sources/watchdog.h @@ -10,10 +10,25 @@ #include #include #include -#include #include + +#include +#include +#include + +#include + +#include + #include "macro.h" #include "system.h" +#include "kiwi.h" +#include "instance.h" +#include "debugprintf.h" +#include "setproctitle.h" +#include "pid.h" +#include "logger.h" +#include "restart_sync.h" #define ODYSSEY_WATCHDOG_ITER_INTERVAL 500 // ms diff --git a/sources/worker.c b/sources/worker.c index 02554c12..2e7829f4 100644 --- a/sources/worker.c +++ b/sources/worker.c @@ -103,10 +103,7 @@ od_worker_start(od_worker_t *worker) { od_instance_t *instance = worker->global->instance; - int is_shared; - is_shared = od_config_is_multi_workers(&instance->config); - - worker->task_channel = machine_channel_create(is_shared); + worker->task_channel = machine_channel_create(); if (worker->task_channel == NULL) { od_error(&instance->logger, "worker", @@ -115,31 +112,16 @@ od_worker_start(od_worker_t *worker) "failed to create task channel"); return -1; } - if (is_shared) { - char name[32]; - od_snprintf(name, sizeof(name), "worker: %d", worker->id); - worker->machine = machine_create(name, od_worker, worker); - if (worker->machine == -1) { - machine_channel_free(worker->task_channel); - od_error(&instance->logger, - "worker", - NULL, - NULL, - "failed to start worker"); - return -1; - } - } else { - int64_t coroutine_id; - coroutine_id = machine_coroutine_create(od_worker, worker); - if (coroutine_id == -1) { - od_error(&instance->logger, - "worker", - NULL, - NULL, - "failed to create worker coroutine"); - machine_channel_free(worker->task_channel); - return -1; - } + + char name[32]; + od_snprintf(name, sizeof(name), "worker: %d", worker->id); + worker->machine = machine_create(name, od_worker, worker); + if (worker->machine == -1) { + machine_channel_free(worker->task_channel); + od_error( + &instance->logger, "worker", NULL, NULL, "failed to start worker"); + return -1; } + return 0; } diff --git a/sources/worker_pool.h b/sources/worker_pool.h index edf0a75a..809fb526 100644 --- a/sources/worker_pool.h +++ b/sources/worker_pool.h @@ -47,12 +47,6 @@ od_worker_pool_start(od_worker_pool_t *pool, od_global_t *global, int count) static inline void od_worker_pool_stop(od_worker_pool_t *pool) { - od_instance_t *instance = pool->pool->global->instance; - int is_shared; - is_shared = od_config_is_multi_workers(&instance->config); - if (!is_shared) - return; - for (int i = 0; i < pool->count; i++) { od_worker_t *worker = &pool->pool[i]; machine_stop(worker->machine); @@ -60,26 +54,14 @@ od_worker_pool_stop(od_worker_pool_t *pool) } static inline void -od_worker_pool_wait(od_worker_pool_t *pool) +od_worker_pool_wait() { - od_instance_t *instance = pool->pool->global->instance; - int is_shared; - is_shared = od_config_is_multi_workers(&instance->config); - if (!is_shared) - return; - machine_sleep(1); } static inline void od_worker_pool_wait_gracefully_shutdown(od_worker_pool_t *pool) { - od_instance_t *instance = pool->pool->global->instance; - int is_shared; - is_shared = od_config_is_multi_workers(&instance->config); - if (!is_shared) - return; - // // In fact we cannot wait anything here - machines may be in epoll // waiting // // No new TLS handshakes should be initiated, so, just wait a bit. diff --git a/test/shell-test/conf.conf b/test/shell-test/conf.conf new file mode 100644 index 00000000..493ecad3 --- /dev/null +++ b/test/shell-test/conf.conf @@ -0,0 +1,129 @@ +pid_file "/var/run/odyssey.pid" +daemonize no + +unix_socket_dir "/tmp" +unix_socket_mode "0644" + +log_format "%p %t %l [%i %s] (%c) %m\n" + +log_to_stdout yes + +log_syslog no +log_syslog_ident "odyssey" +log_syslog_facility "daemon" + +log_debug yes +log_config yes +log_session yes +log_query yes +log_stats yes +stats_interval 60 + +workers 5 +resolvers 1 + +readahead 8192 + +cache_coroutine 0 + +coroutine_stack_size 8 + +nodelay yes + +keepalive 15 +keepalive_keep_interval 75 +keepalive_probes 9 + +keepalive_usr_timeout 0 + + +listen { + host "*" + port 6432 + backlog 128 +} + + +storage "postgres_server" { + type "remote" + host "localhost" + port 5432 +} + +database default { + user default { + authentication "none" + + storage "postgres_server" + pool "transaction" + pool_size 0 + + pool_timeout 0 + + pool_ttl 60 + + pool_discard no + + pool_cancel yes + + pool_rollback yes + + client_fwd_error yes + + application_name_add_host yes + + server_lifetime 3600 + log_debug no + + quantiles "0.99,0.95,0.5" + client_max 107 + } +} + +database "postgres" { + user "user1" { + authentication "none" + + storage "postgres_server" + pool "transaction" + pool_size 0 + + pool_timeout 0 + + pool_ttl 60 + + pool_discard no + + pool_cancel yes + + pool_rollback yes + + client_fwd_error yes + + application_name_add_host yes + + server_lifetime 3600 + log_debug no + + quantiles "0.99,0.95,0.5" + client_max 107 + } +} +storage "local" { + type "local" +} + +database "console" { + user default { + authentication "none" + pool "session" + storage "local" + } +} + + +locks_dir "/tmp/odyssey" + +graceful_die_on_errors yes +enable_online_restart yes +bindwith_reuseport yes diff --git a/test/shell-test/log_error_rate_test.sh b/test/shell-test/log_error_rate_test.sh new file mode 100755 index 00000000..f2f3307a --- /dev/null +++ b/test/shell-test/log_error_rate_test.sh @@ -0,0 +1,14 @@ +#!/bin/bash +#kill -9 $(ps aux | grep odyssey | awk '{print $2}') +sleep 1 + +#ody-start +#./odyssey ./ody.conf + +for _ in $(seq 1 4000000) +do + for __ in $(seq 1 3000); do +# psql -U user1 -d postgres -h localhost -p 6432 -c 'SELECT 1/0' & + psql -U user1 -d postgres -h localhost -p 6432 -c 'select pg_sleep(0.01)' & + done +done diff --git a/third_party/kiwi/kiwi/be_write.h b/third_party/kiwi/kiwi/be_write.h index ed6f2db3..3b0939ee 100644 --- a/third_party/kiwi/kiwi/be_write.h +++ b/third_party/kiwi/kiwi/be_write.h @@ -25,8 +25,9 @@ kiwi_be_write_error_as(machine_msg_t *msg, if (hint && hint_len > 0) size += 1 + /* H */ +hint_len + 1; int offset = 0; - if (msg) + if (msg) { offset = machine_msg_size(msg); + } msg = machine_msg_create_or_advance(msg, sizeof(kiwi_header_t) + size); if (kiwi_unlikely(msg == NULL)) return NULL; diff --git a/third_party/machinarium/benchmark/makefile b/third_party/machinarium/benchmark/Makefile similarity index 100% rename from third_party/machinarium/benchmark/makefile rename to third_party/machinarium/benchmark/Makefile diff --git a/third_party/machinarium/benchmark/benchmark_channel_shared.c b/third_party/machinarium/benchmark/benchmark_channel_shared.c index 59eb5f65..019d0445 100644 --- a/third_party/machinarium/benchmark/benchmark_channel_shared.c +++ b/third_party/machinarium/benchmark/benchmark_channel_shared.c @@ -28,7 +28,7 @@ benchmark_writer(void *arg) machine_channel_t *q = arg; while (machine_active()) { machine_msg_t *msg; - msg = machine_msg_create(0, 0); + msg = machine_msg_create(0); machine_channel_write(q, msg); ops++; machine_sleep(0); diff --git a/third_party/machinarium/sources/channel_api.c b/third_party/machinarium/sources/channel_api.c index 0ad8325e..e62500d2 100644 --- a/third_party/machinarium/sources/channel_api.c +++ b/third_party/machinarium/sources/channel_api.c @@ -9,26 +9,15 @@ #include MACHINE_API machine_channel_t * -machine_channel_create(int shared) +machine_channel_create() { - if (shared) { - mm_channel_t *channel; - channel = malloc(sizeof(mm_channel_t)); - if (channel == NULL) { - mm_errno_set(ENOMEM); - return NULL; - } - mm_channel_init(channel); - return (machine_channel_t *)channel; - } - - mm_channelfast_t *channel; - channel = malloc(sizeof(mm_channelfast_t)); + mm_channel_t *channel; + channel = malloc(sizeof(mm_channel_t)); if (channel == NULL) { mm_errno_set(ENOMEM); return NULL; } - mm_channelfast_init(channel); + mm_channel_init(channel); return (machine_channel_t *)channel; } diff --git a/third_party/machinarium/sources/channel_fast.h b/third_party/machinarium/sources/channel_fast.h index 23e4ca5d..82316daf 100644 --- a/third_party/machinarium/sources/channel_fast.h +++ b/third_party/machinarium/sources/channel_fast.h @@ -7,6 +7,11 @@ * cooperative multitasking engine. */ +/* + * Channel fast is machinarium implementation + * of machine channel interface + * */ + typedef struct mm_channelfast_rd mm_channelfast_rd_t; typedef struct mm_channelfast mm_channelfast_t; diff --git a/third_party/machinarium/sources/fd.h b/third_party/machinarium/sources/fd.h index 4e4d857b..280d258b 100644 --- a/third_party/machinarium/sources/fd.h +++ b/third_party/machinarium/sources/fd.h @@ -7,6 +7,8 @@ * cooperative multitasking engine. */ +/* machinarium file descriptor */ + typedef struct mm_fd mm_fd_t; enum diff --git a/third_party/machinarium/sources/iov.h b/third_party/machinarium/sources/iov.h index 6282df81..cf509ba4 100644 --- a/third_party/machinarium/sources/iov.h +++ b/third_party/machinarium/sources/iov.h @@ -7,6 +7,10 @@ * cooperative multitasking engine. */ +/* + * Structure for scatter/gather I/O + * */ + typedef struct mm_iov mm_iov_t; struct mm_iov diff --git a/third_party/machinarium/sources/machinarium.h b/third_party/machinarium/sources/machinarium.h index d933f4d0..821feb7d 100644 --- a/third_party/machinarium/sources/machinarium.h +++ b/third_party/machinarium/sources/machinarium.h @@ -145,7 +145,7 @@ extern "C" /* channel */ - MACHINE_API machine_channel_t *machine_channel_create(int shared); + MACHINE_API machine_channel_t *machine_channel_create(); MACHINE_API void machine_channel_free(machine_channel_t *); diff --git a/third_party/machinarium/sources/msg.c b/third_party/machinarium/sources/msg.c index 9fd90206..b3102d37 100644 --- a/third_party/machinarium/sources/msg.c +++ b/third_party/machinarium/sources/msg.c @@ -12,8 +12,10 @@ MACHINE_API machine_msg_t * machine_msg_create(int reserve) { mm_msg_t *msg = mm_msgcache_pop(&mm_self->msg_cache); - if (msg == NULL) + if (msg == NULL) { return NULL; + } + msg->type = 0; if (reserve > 0) { int rc; @@ -30,39 +32,43 @@ machine_msg_create(int reserve) MACHINE_API machine_msg_t * machine_msg_create_or_advance(machine_msg_t *obj, int size) { - if (obj == NULL) + if (obj == NULL) { return machine_msg_create(size); + } + mm_msg_t *msg = mm_cast(mm_msg_t *, obj); int rc; rc = mm_buf_ensure(&msg->data, size); - if (rc == -1) + if (rc == -1) { return NULL; + } + mm_buf_advance(&msg->data, size); return obj; } -MACHINE_API void +MACHINE_API inline void machine_msg_free(machine_msg_t *obj) { mm_msg_t *msg = mm_cast(mm_msg_t *, obj); mm_msgcache_push(&mm_self->msg_cache, msg); } -MACHINE_API void +MACHINE_API inline void machine_msg_set_type(machine_msg_t *obj, int type) { mm_msg_t *msg = mm_cast(mm_msg_t *, obj); msg->type = type; } -MACHINE_API int +MACHINE_API inline int machine_msg_type(machine_msg_t *obj) { mm_msg_t *msg = mm_cast(mm_msg_t *, obj); return msg->type; } -MACHINE_API void * +MACHINE_API inline void * machine_msg_data(machine_msg_t *obj) { mm_msg_t *msg = mm_cast(mm_msg_t *, obj); diff --git a/third_party/machinarium/sources/msg_cache.c b/third_party/machinarium/sources/msg_cache.c index 288e6a43..1a990623 100644 --- a/third_party/machinarium/sources/msg_cache.c +++ b/third_party/machinarium/sources/msg_cache.c @@ -61,6 +61,7 @@ mm_msgcache_pop(mm_msgcache_t *cache) if (msg == NULL) return NULL; mm_buf_init(&msg->data); + /* fallthrough */ init: msg->machine_id = mm_self->id; msg->refs = 0; diff --git a/third_party/machinarium/sources/write.c b/third_party/machinarium/sources/write.c index cd05d20a..2bb2b4cc 100644 --- a/third_party/machinarium/sources/write.c +++ b/third_party/machinarium/sources/write.c @@ -132,10 +132,13 @@ machine_writev_raw(machine_io_t *obj, machine_iov_t *obj_iov) return -1; } +/* writes msg to io object. + * Frees memory after use in current implementation + * */ MACHINE_API int -machine_write(machine_io_t *obj, machine_msg_t *msg, uint32_t time_ms) +machine_write(machine_io_t *destination, machine_msg_t *msg, uint32_t time_ms) { - mm_io_t *io = mm_cast(mm_io_t *, obj); + mm_io_t *io = mm_cast(mm_io_t *, destination); mm_errno_set(0); if (!io->attached) { @@ -144,7 +147,7 @@ machine_write(machine_io_t *obj, machine_msg_t *msg, uint32_t time_ms) } if (!io->connected) { mm_errno_set(ENOTCONN); - return -1; + goto error; } if (io->on_write) { mm_errno_set(EINPROGRESS); @@ -155,8 +158,9 @@ machine_write(machine_io_t *obj, machine_msg_t *msg, uint32_t time_ms) mm_cond_init(&on_write); int rc; rc = mm_write_start(io, (machine_cond_t *)&on_write); - if (rc == -1) + if (rc == -1) { goto error; + } int total = 0; char *src = machine_msg_data(msg); @@ -169,11 +173,14 @@ machine_write(machine_io_t *obj, machine_msg_t *msg, uint32_t time_ms) mm_write_stop(io); goto error; } + /* when using compression, some data may be processed * despite the non-positive return code */ size_t processed = 0; - rc = machine_write_raw(obj, src + total, size - total, &processed); + rc = + machine_write_raw(destination, src + total, size - total, &processed); total += processed; + if (rc > 0) { total += rc; continue; @@ -194,6 +201,7 @@ machine_write(machine_io_t *obj, machine_msg_t *msg, uint32_t time_ms) machine_msg_free(msg); return 0; error: + /* free msg in case of any error */ machine_msg_free(msg); return -1; }