/* * ODISSEY. * * PostgreSQL connection pooler and request console. */ #include #include #include #include #include #include #include #include #include #include "od_macro.h" #include "od_version.h" #include "od_list.h" #include "od_pid.h" #include "od_id.h" #include "od_syslog.h" #include "od_log.h" #include "od_daemon.h" #include "od_scheme.h" #include "od_lex.h" #include "od_config.h" #include "od_msg.h" #include "od_system.h" #include "od_instance.h" #include "od_server.h" #include "od_server_pool.h" #include "od_client.h" #include "od_client_pool.h" #include "od_route_id.h" #include "od_route.h" #include "od_route_pool.h" #include "od_io.h" #include "od_router.h" #include "od_pooler.h" #include "od_console.h" typedef struct { od_consolestatus_t status; od_client_t *client; char *request; machine_queue_t *response; } od_msgconsole_t; static void od_console(void *arg) { od_console_t *console = arg; od_instance_t *instance = console->system->instance; (void)instance; for (;;) { machine_msg_t *msg; msg = machine_queue_get(console->queue, UINT32_MAX); if (msg == NULL) break; od_msg_t msg_type; msg_type = machine_msg_get_type(msg); switch (msg_type) { case OD_MCONSOLE_REQUEST: { od_msgconsole_t *msg_console; msg_console = machine_msg_get_data(msg); od_client_t *client = msg_console->client; so_stream_t *stream = &client->stream; od_debug_client(&instance->log, &client->id, "console", "%s", msg_console->request); so_stream_reset(stream); so_bewrite_no_data(stream); so_bewrite_ready(stream, 'I'); msg_console->status = OD_COK; machine_queue_put(msg_console->response, msg); break; } default: assert(0); break; } } } int od_console_init(od_console_t *console, od_system_t *system) { od_instance_t *instance = system->instance; console->system = system; console->queue = machine_queue_create(); if (console->queue == NULL) { od_error(&instance->log, "console", "failed to create queue"); return -1; } return 0; } int od_console_start(od_console_t *console) { od_instance_t *instance = console->system->instance; int64_t coroutine_id; coroutine_id = machine_coroutine_create(od_console, console); if (coroutine_id == -1) { od_error(&instance->log, "console", "failed to start console coroutine"); return -1; } return 0; } static od_consolestatus_t od_console_do(od_client_t *client, od_msg_t msg_type, char *request, int wait_for_response) { od_console_t *console = client->system->console; /* send request to console */ machine_msg_t *msg; msg = machine_msg_create(msg_type, sizeof(od_msgconsole_t)); if (msg == NULL) return OD_CERROR; od_msgconsole_t *msg_console; msg_console = machine_msg_get_data(msg); msg_console->status = OD_CERROR; msg_console->client = client; msg_console->request = request; msg_console->response = NULL; /* create response queue */ machine_queue_t *response; if (wait_for_response) { response = machine_queue_create(); if (response == NULL) { machine_msg_free(msg); return OD_CERROR; } msg_console->response = response; } machine_queue_put(console->queue, msg); if (! wait_for_response) return OD_COK; /* wait for reply */ msg = machine_queue_get(response, UINT32_MAX); if (msg == NULL) { /* todo: */ abort(); machine_queue_free(response); return OD_CERROR; } msg_console = machine_msg_get_data(msg); od_consolestatus_t status; status = msg_console->status; machine_queue_free(response); machine_msg_free(msg); return status; } od_consolestatus_t od_console_request(od_client_t *client, char *request) { return od_console_do(client, OD_MCONSOLE_REQUEST, request, 1); }