Merge pull request #281 from reshke/asynclog

asynclog
This commit is contained in:
Andrey Borodin 2021-02-20 11:56:46 +05:00 committed by GitHub
commit f5894f9ad2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 148 additions and 28 deletions

View File

@ -30,15 +30,19 @@ void od_config_init(od_config_t *config)
config->bindwith_reuseport = 0;
config->graceful_die_on_errors = 0;
config->unix_socket_mode = NULL;
config->log_syslog = 0;
config->log_syslog_ident = NULL;
config->log_syslog_facility = NULL;
config->readahead = 8192;
config->nodelay = 1;
config->keepalive = 15;
config->keepalive_keep_interval = 5;
config->keepalive_probes = 3;
config->keepalive_usr_timeout = 0; // use sys default
config->workers = 1;
config->resolvers = 1;
config->client_max_set = 0;

View File

@ -107,9 +107,12 @@ static od_keyword_t od_config_keywords[] = {
od_keyword("unix_socket_dir", OD_LUNIX_SOCKET_DIR),
od_keyword("unix_socket_mode", OD_LUNIX_SOCKET_MODE),
od_keyword("locks_dir", OD_LLOCKS_DIR),
od_keyword("enable_online_restart", OD_LENABLE_ONLINE_RESTART),
od_keyword("graceful_die_on_errors", OD_LGRACEFUL_DIE_ON_ERRORS),
od_keyword("bindwith_reuseport", OD_LBINDWITH_REUSEPORT),
/* logging */
od_keyword("log_debug", OD_LLOG_DEBUG),
od_keyword("log_to_stdout", OD_LLOG_TO_STDOUT),
od_keyword("log_config", OD_LLOG_CONFIG),
@ -122,16 +125,21 @@ static od_keyword_t od_config_keywords[] = {
od_keyword("log_syslog_ident", OD_LLOG_SYSLOG_IDENT),
od_keyword("log_syslog_facility", OD_LLOG_SYSLOG_FACILITY),
od_keyword("stats_interval", OD_LSTATS_INTERVAL),
/* listen */
od_keyword("listen", OD_LLISTEN),
od_keyword("host", OD_LHOST),
od_keyword("port", OD_LPORT),
od_keyword("backlog", OD_LBACKLOG),
od_keyword("nodelay", OD_LNODELAY),
/* TCP keepalive */
od_keyword("keepalive", OD_LKEEPALIVE),
od_keyword("keepalive_keep_interval", OD_LKEEPALIVE_INTERVAL),
od_keyword("keepalive_probes", OD_LKEEPALIVE_PROBES),
od_keyword("keepalive_usr_timeout", OD_LKEEPALIVE_USR_TIMEOUT),
/* */
od_keyword("readahead", OD_LREADAHEAD),
od_keyword("workers", OD_LWORKERS),
od_keyword("resolvers", OD_LRESOLVERS),
@ -150,17 +158,21 @@ static od_keyword_t od_config_keywords[] = {
od_keyword("client_fwd_error", OD_LCLIENT_FWD_ERROR),
od_keyword("application_name_add_host", OD_LAPPLICATION_NAME_ADD_HOST),
od_keyword("server_lifetime", OD_LSERVER_LIFETIME),
/* tls */
od_keyword("tls", OD_LTLS),
od_keyword("tls_ca_file", OD_LTLS_CA_FILE),
od_keyword("tls_key_file", OD_LTLS_KEY_FILE),
od_keyword("tls_cert_file", OD_LTLS_CERT_FILE),
od_keyword("tls_protocols", OD_LTLS_PROTOCOLS),
od_keyword("compression", OD_LCOMPRESSION),
/* storage */
od_keyword("storage", OD_LSTORAGE),
od_keyword("type", OD_LTYPE),
od_keyword("server_max_routing", OD_LSERVERS_MAX_ROUTING),
od_keyword("default", OD_LDEFAULT),
/* database */
od_keyword("database", OD_LDATABASE),
od_keyword("user", OD_LUSER),
@ -178,6 +190,8 @@ static od_keyword_t od_config_keywords[] = {
od_keyword("storage_db", OD_LSTORAGE_DB),
od_keyword("storage_user", OD_LSTORAGE_USER),
od_keyword("storage_password", OD_LSTORAGE_PASSWORD),
/* auth */
od_keyword("authentication", OD_LAUTHENTICATION),
od_keyword("auth_common_name", OD_LAUTH_COMMON_NAME),
od_keyword("auth_query", OD_LAUTH_QUERY),

View File

@ -11,8 +11,10 @@
void od_instance_init(od_instance_t *instance)
{
od_pid_init(&instance->pid);
od_logger_init(&instance->logger, &instance->pid);
od_config_init(&instance->config);
instance->config_file = NULL;
instance->shutdown_worker_id = INVALID_ID;
@ -183,6 +185,9 @@ int od_instance_main(od_instance_t *instance, int argc, char **argv)
if (instance->config.pid_file)
od_pid_create(&instance->pid, instance->config.pid_file);
// start aync logging thread if needed
od_logger_load(&instance->logger);
/* start system machine thread */
rc = od_system_start(&system, &global);
if (rc == -1) {

View File

@ -32,7 +32,7 @@ static int od_log_syslog_level[] = { LOG_INFO, LOG_ERR, LOG_DEBUG, LOG_CRIT };
static char *od_log_level[] = { "info", "error", "debug", "fatal" };
void od_logger_init(od_logger_t *logger, od_pid_t *pid)
od_retcode_t od_logger_init(od_logger_t *logger, od_pid_t *pid)
{
logger->pid = pid;
logger->log_debug = 0;
@ -41,8 +41,35 @@ void od_logger_init(od_logger_t *logger, od_pid_t *pid)
logger->format = NULL;
logger->format_len = 0;
logger->fd = -1;
logger->loaded = 0;
/* set temporary format */
od_logger_set_format(logger, "%p %t %l (%c) %h %m\n");
return OK_RESPONSE;
}
static inline void od_logger(void *arg);
od_retcode_t od_logger_load(od_logger_t *logger)
{
// we should do this in separate function, after config read and machinauim initialization
logger->task_channel = machine_channel_create();
if (logger->task_channel == NULL) {
return NOT_OK_RESPONSE;
}
char name[32];
od_snprintf(name, sizeof(name), "logger");
logger->machine = machine_create(name, od_logger, logger);
if (logger->machine == -1) {
machine_channel_free(logger->task_channel);
return NOT_OK_RESPONSE;
}
logger->loaded = 1;
return OK_RESPONSE;
}
int od_logger_open(od_logger_t *logger, char *path)
@ -351,6 +378,63 @@ od_logger_format(od_logger_t *logger, od_logger_level_t level, char *context,
return dst_pos - output;
}
typedef struct {
od_logger_level_t lvl;
char msg[FLEXIBLE_ARRAY_MEMBER];
} _od_log_entry;
static inline size_t od_log_entry_req_size(size_t msg_len)
{
return sizeof(od_logger_level_t) +
sizeof(char) * (msg_len + /* NULL */ 1);
}
static inline void _od_logger_write(od_logger_t *l, char *data, int len,
od_logger_level_t lvl)
{
int rc;
if (l->fd != -1) {
rc = write(l->fd, data, len);
}
if (l->log_stdout) {
rc = write(STDOUT_FILENO, data, len);
}
if (l->log_syslog) {
syslog(od_log_syslog_level[lvl], "%.*s", len, data);
}
(void)rc;
}
static inline void od_logger(void *arg)
{
od_logger_t *logger = arg;
for (;;) {
machine_msg_t *msg;
msg = machine_channel_read(logger->task_channel, UINT32_MAX);
if (msg == NULL)
break;
od_msg_t msg_type;
msg_type = machine_msg_type(msg);
switch (msg_type) {
case OD_MSG_LOG: {
od_dbg_printf_on_dvl_lvl(
1, "reveiced async logger logger msg", "");
_od_log_entry *le = machine_msg_data(msg);
int len = strlen(le->msg);
_od_logger_write(logger, le->msg, len, le->lvl);
} break;
default: {
assert(0);
} break;
}
machine_msg_free(msg);
}
}
void od_logger_write(od_logger_t *logger, od_logger_level_t level,
char *context, void *client, void *server, char *fmt,
va_list args)
@ -374,19 +458,22 @@ void od_logger_write(od_logger_t *logger, od_logger_level_t level,
return;
}
char output[1024];
char output[OD_LOGLINE_MAXLEN];
int len;
len = od_logger_format(logger, level, context, client, server, fmt,
args, output, sizeof(output));
int rc;
if (logger->fd != -1) {
rc = write(logger->fd, output, len);
if (logger->loaded) {
/* create new log event and pass it to logger pool */
machine_msg_t *msg;
msg = machine_msg_create(od_log_entry_req_size(len));
machine_msg_set_type(msg, OD_MSG_LOG);
_od_log_entry *le = machine_msg_data(msg);
strncpy(le->msg, output, len);
le->msg[len] = NULL;
machine_channel_write(logger->task_channel, msg);
} else {
_od_logger_write(logger, output, len, level);
}
if (logger->log_stdout) {
rc = write(STDOUT_FILENO, output, len);
}
if (logger->log_syslog) {
syslog(od_log_syslog_level[level], "%.*s", len, output);
}
(void)rc;
}

View File

@ -7,6 +7,8 @@
* Scalable PostgreSQL connection pooler.
*/
#define OD_LOGLINE_MAXLEN 1024
typedef struct od_logger od_logger_t;
typedef enum { OD_LOG, OD_ERROR, OD_DEBUG, OD_FATAL } od_logger_level_t;
@ -18,10 +20,17 @@ struct od_logger {
int log_syslog;
char *format;
int format_len;
int fd;
int loaded;
int64_t machine;
/* makes sence only with use_asynclog option on */
machine_channel_t *task_channel;
};
void od_logger_init(od_logger_t *, od_pid_t *);
extern od_retcode_t od_logger_init(od_logger_t *, od_pid_t *);
extern od_retcode_t od_logger_load(od_logger_t *logger);
static inline void od_logger_set_debug(od_logger_t *logger, int enable)
{
@ -39,12 +48,12 @@ static inline void od_logger_set_format(od_logger_t *logger, char *format)
logger->format_len = strlen(format);
}
int od_logger_open(od_logger_t *, char *);
int od_logger_reopen(od_logger_t *, char *);
int od_logger_open_syslog(od_logger_t *, char *, char *);
void od_logger_close(od_logger_t *);
void od_logger_write(od_logger_t *, od_logger_level_t, char *, void *, void *,
char *, va_list);
extern int od_logger_open(od_logger_t *, char *);
extern int od_logger_reopen(od_logger_t *, char *);
extern int od_logger_open_syslog(od_logger_t *, char *, char *);
extern void od_logger_close(od_logger_t *);
extern void od_logger_write(od_logger_t *, od_logger_level_t, char *, void *,
void *, char *, va_list);
static inline void od_log(od_logger_t *logger, char *context, void *client,
void *server, char *fmt, ...)

View File

@ -7,6 +7,6 @@
* Scalable PostgreSQL connection pooler.
*/
typedef enum { OD_MSG_STAT, OD_MSG_CLIENT_NEW } od_msg_t;
typedef enum { OD_MSG_STAT, OD_MSG_CLIENT_NEW, OD_MSG_LOG } od_msg_t;
#endif /* ODYSSEY_MSG_H */

View File

@ -886,7 +886,7 @@ void od_rules_print(od_rules_t *rules, od_logger_t *logger)
" pool_client_idle_timeout %d",
rule->pool_client_idle_timeout);
od_log(logger, "rules", NULL, NULL,
" pool_idle_in_transaction_timeout %d",
" pool_idle_in_transaction_timeout %d",
rule->pool_idle_in_transaction_timeout);
if (rule->client_max_set)

View File

@ -14,13 +14,14 @@ void od_watchdog_worker(void *arg)
int fd_ctrl = od_get_control_lock(instance->config.locks_dir);
if (fd_ctrl == -1) {
od_log(&instance->logger, "watchdog", NULL, NULL,
"failed to create ctrl lock file in %s (errno: %d) try to "
"specify another locks dir or disable online restart feature",
instance->config.locks_dir == NULL ?
ODYSSEY_DEFAULT_LOCK_DIR :
instance->config.locks_dir,
errno);
od_error(
&instance->logger, "watchdog", NULL, NULL,
"failed to create ctrl lock file in %s (errno: %d) try to "
"specify another locks dir or disable online restart feature",
instance->config.locks_dir == NULL ?
ODYSSEY_DEFAULT_LOCK_DIR :
instance->config.locks_dir,
errno);
if (instance->config.graceful_die_on_errors) {
kill(instance->pid.pid, OD_SIG_GRACEFUL_SHUTDOWN);