mirror of https://github.com/yandex/odyssey.git
odissey: make client stream shared
This commit is contained in:
parent
0cc0f949b3
commit
c891dc757a
|
@ -54,7 +54,7 @@ od_auth_frontend_cleartext(od_client_t *client)
|
|||
od_instance_t *instance = client->system->instance;
|
||||
|
||||
/* AuthenticationCleartextPassword */
|
||||
shapito_stream_t *stream = &client->stream;
|
||||
shapito_stream_t *stream = client->stream;
|
||||
shapito_stream_reset(stream);
|
||||
int rc;
|
||||
rc = shapito_be_write_authentication_clear_text(stream);
|
||||
|
@ -148,7 +148,7 @@ od_auth_frontend_md5(od_client_t *client)
|
|||
uint32_t salt = shapito_password_salt(&client->key);
|
||||
|
||||
/* AuthenticationMD5Password */
|
||||
shapito_stream_t *stream = &client->stream;
|
||||
shapito_stream_t *stream = client->stream;
|
||||
shapito_stream_reset(stream);
|
||||
int rc;
|
||||
rc = shapito_be_write_authentication_md5(stream, (char*)&salt);
|
||||
|
@ -297,7 +297,7 @@ int od_auth_frontend(od_client_t *client)
|
|||
}
|
||||
|
||||
/* pass */
|
||||
shapito_stream_t *stream = &client->stream;
|
||||
shapito_stream_t *stream = client->stream;
|
||||
shapito_stream_reset(stream);
|
||||
rc = shapito_be_write_authentication_ok(stream);
|
||||
if (rc == -1)
|
||||
|
|
|
@ -32,7 +32,7 @@ struct od_client
|
|||
shapito_be_startup_t startup;
|
||||
shapito_parameters_t params;
|
||||
shapito_key_t key;
|
||||
shapito_stream_t stream;
|
||||
shapito_stream_t *stream;
|
||||
od_server_t *server;
|
||||
void *route;
|
||||
od_system_t *system;
|
||||
|
@ -55,10 +55,10 @@ od_client_init(od_client_t *client)
|
|||
client->system = NULL;
|
||||
client->time_accept = 0;
|
||||
client->time_setup = 0;
|
||||
client->stream = NULL;
|
||||
shapito_be_startup_init(&client->startup);
|
||||
shapito_parameters_init(&client->params);
|
||||
shapito_key_init(&client->key);
|
||||
shapito_stream_init(&client->stream);
|
||||
od_list_init(&client->link_pool);
|
||||
od_list_init(&client->link);
|
||||
}
|
||||
|
@ -76,10 +76,26 @@ od_client_allocate(void)
|
|||
static inline void
|
||||
od_client_free(od_client_t *client)
|
||||
{
|
||||
assert(client->stream == NULL);
|
||||
shapito_be_startup_free(&client->startup);
|
||||
shapito_parameters_free(&client->params);
|
||||
shapito_stream_free(&client->stream);
|
||||
free(client);
|
||||
}
|
||||
|
||||
static inline shapito_stream_t*
|
||||
od_client_stream_attach(od_client_t *client, shapito_cache_t *cache)
|
||||
{
|
||||
assert(client->stream == NULL);
|
||||
client->stream = shapito_cache_pop(cache);
|
||||
return client->stream;
|
||||
}
|
||||
|
||||
static inline void
|
||||
od_client_stream_detach(od_client_t *client, shapito_cache_t *cache)
|
||||
{
|
||||
assert(client->stream != NULL);
|
||||
shapito_cache_push(cache, client->stream);
|
||||
client->stream = NULL;
|
||||
}
|
||||
|
||||
#endif /* OD_CLIENT_H */
|
||||
|
|
|
@ -144,7 +144,7 @@ od_console_show_stats_callback(char *database,
|
|||
od_serverstat_t *avg, void *arg)
|
||||
{
|
||||
od_client_t *client = arg;
|
||||
return od_console_show_stats_add(&client->stream,
|
||||
return od_console_show_stats_add(client->stream,
|
||||
database, database_len,
|
||||
total, avg);
|
||||
}
|
||||
|
@ -153,7 +153,7 @@ static inline int
|
|||
od_console_show_stats(od_client_t *client)
|
||||
{
|
||||
od_router_t *router = client->system->router;
|
||||
shapito_stream_t *stream = &client->stream;
|
||||
shapito_stream_t *stream = client->stream;
|
||||
shapito_stream_reset(stream);
|
||||
int rc;
|
||||
rc = shapito_be_write_row_descriptionf(stream, "sllllllll",
|
||||
|
@ -285,7 +285,7 @@ static inline int
|
|||
od_console_show_servers(od_client_t *client)
|
||||
{
|
||||
od_router_t *router = client->system->router;
|
||||
shapito_stream_t *stream = &client->stream;
|
||||
shapito_stream_t *stream = client->stream;
|
||||
shapito_stream_reset(stream);
|
||||
int rc;
|
||||
rc = shapito_be_write_row_descriptionf(stream, "sssssdsdssssds",
|
||||
|
@ -432,7 +432,7 @@ static inline int
|
|||
od_console_show_clients(od_client_t *client)
|
||||
{
|
||||
od_router_t *router = client->system->router;
|
||||
shapito_stream_t *stream = &client->stream;
|
||||
shapito_stream_t *stream = client->stream;
|
||||
shapito_stream_reset(stream);
|
||||
int rc;
|
||||
rc = shapito_be_write_row_descriptionf(stream, "sssssdsdssssds",
|
||||
|
@ -507,7 +507,7 @@ static inline int
|
|||
od_console_show_lists(od_client_t *client)
|
||||
{
|
||||
od_router_t *router = client->system->router;
|
||||
shapito_stream_t *stream = &client->stream;
|
||||
shapito_stream_t *stream = client->stream;
|
||||
shapito_stream_reset(stream);
|
||||
|
||||
int used_servers = 0;
|
||||
|
@ -614,7 +614,7 @@ od_console_query_show(od_client_t *client, od_parser_t *parser)
|
|||
static inline int
|
||||
od_console_query_set(od_client_t *client, od_parser_t *parser)
|
||||
{
|
||||
shapito_stream_t *stream = &client->stream;
|
||||
shapito_stream_t *stream = client->stream;
|
||||
shapito_stream_reset(stream);
|
||||
(void)parser;
|
||||
int rc;
|
||||
|
@ -682,18 +682,18 @@ od_console_query(od_console_t *console, od_msgconsole_t *msg_console)
|
|||
bad_query:
|
||||
od_error(&instance->logger, "console", client, NULL,
|
||||
"bad console command: %.*s", query_len, query);
|
||||
shapito_stream_reset(&client->stream);
|
||||
shapito_stream_reset(client->stream);
|
||||
od_frontend_errorf(client, SHAPITO_SYNTAX_ERROR, "bad console command: %.*s",
|
||||
query_len, query);
|
||||
shapito_be_write_ready(&client->stream, 'I');
|
||||
shapito_be_write_ready(client->stream, 'I');
|
||||
return -1;
|
||||
|
||||
bad_command:
|
||||
od_error(&instance->logger, "console", client, NULL,
|
||||
"bad console command");
|
||||
shapito_stream_reset(&client->stream);
|
||||
shapito_stream_reset(client->stream);
|
||||
od_frontend_errorf(client, SHAPITO_SYNTAX_ERROR, "bad console command");
|
||||
shapito_be_write_ready(&client->stream, 'I');
|
||||
shapito_be_write_ready(client->stream, 'I');
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -67,8 +67,12 @@ typedef enum {
|
|||
|
||||
void od_frontend_close(od_client_t *client)
|
||||
{
|
||||
od_instance_t *instance = client->system->instance;
|
||||
assert(client->route == NULL);
|
||||
assert(client->server == NULL);
|
||||
if (client->stream) {
|
||||
od_client_stream_detach(client, &instance->stream_cache);
|
||||
}
|
||||
if (client->io) {
|
||||
machine_close(client->io);
|
||||
machine_io_free(client->io);
|
||||
|
@ -90,7 +94,7 @@ od_frontend_error_fwd(od_client_t *client)
|
|||
shapito_stream_used(server->stream));
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
shapito_stream_t *stream = &client->stream;
|
||||
shapito_stream_t *stream = client->stream;
|
||||
shapito_stream_reset(stream);
|
||||
char msg[512];
|
||||
int msg_len;
|
||||
|
@ -123,7 +127,7 @@ od_frontend_verror(od_client_t *client, char *code, char *fmt, va_list args)
|
|||
(signed)sizeof(client->id.id),
|
||||
client->id.id);
|
||||
msg_len += od_vsnprintf(msg + msg_len, sizeof(msg) - msg_len, fmt, args);
|
||||
shapito_stream_t *stream = &client->stream;
|
||||
shapito_stream_t *stream = client->stream;
|
||||
int rc;
|
||||
rc = shapito_be_write_error(stream, code, msg, msg_len);
|
||||
if (rc == -1)
|
||||
|
@ -143,7 +147,7 @@ int od_frontend_errorf(od_client_t *client, char *code, char *fmt, ...)
|
|||
|
||||
int od_frontend_error(od_client_t *client, char *code, char *fmt, ...)
|
||||
{
|
||||
shapito_stream_t *stream = &client->stream;
|
||||
shapito_stream_t *stream = client->stream;
|
||||
shapito_stream_reset(stream);
|
||||
va_list args;
|
||||
va_start(args, fmt);
|
||||
|
@ -161,7 +165,7 @@ od_frontend_startup_read(od_client_t *client)
|
|||
{
|
||||
od_instance_t *instance = client->system->instance;
|
||||
|
||||
shapito_stream_t *stream = &client->stream;
|
||||
shapito_stream_t *stream = client->stream;
|
||||
shapito_stream_reset(stream);
|
||||
for (;;) {
|
||||
uint32_t pos_size = shapito_stream_used(stream);
|
||||
|
@ -200,7 +204,7 @@ od_frontend_startup(od_client_t *client)
|
|||
rc = od_frontend_startup_read(client);
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
shapito_stream_t *stream = &client->stream;
|
||||
shapito_stream_t *stream = client->stream;
|
||||
rc = shapito_be_read_startup(&client->startup, stream->start,
|
||||
shapito_stream_used(stream));
|
||||
if (rc == -1)
|
||||
|
@ -291,7 +295,7 @@ od_frontend_attach(od_client_t *client, char *context)
|
|||
|
||||
if (! od_idmgr_cmp(&server->last_client_id, &client->id))
|
||||
{
|
||||
rc = od_deploy_write(client->server, context, &client->stream,
|
||||
rc = od_deploy_write(client->server, context, client->stream,
|
||||
&client->params);
|
||||
if (rc == -1) {
|
||||
status = od_router_close(client);
|
||||
|
@ -345,7 +349,7 @@ od_frontend_setup(od_client_t *client)
|
|||
{
|
||||
od_instance_t *instance = client->system->instance;
|
||||
od_route_t *route = client->route;
|
||||
shapito_stream_t *stream = &client->stream;
|
||||
shapito_stream_t *stream = client->stream;
|
||||
shapito_stream_reset(stream);
|
||||
|
||||
/* configure console client */
|
||||
|
@ -381,7 +385,7 @@ od_frontend_setup(od_client_t *client)
|
|||
|
||||
od_server_stat_request(server, server->deploy_sync);
|
||||
|
||||
shapito_stream_reset(&client->stream);
|
||||
shapito_stream_reset(client->stream);
|
||||
|
||||
/* wait for completion */
|
||||
rc = od_backend_deploy_wait(server, "setup", UINT32_MAX);
|
||||
|
@ -446,14 +450,15 @@ static inline int
|
|||
od_frontend_stream_hit_limit(od_client_t *client)
|
||||
{
|
||||
od_instance_t *instance = client->system->instance;
|
||||
return shapito_stream_used(&client->stream) >= instance->scheme.readahead;
|
||||
return shapito_stream_used(client->stream) >= instance->scheme.readahead;
|
||||
}
|
||||
|
||||
static inline void
|
||||
od_frontend_stream_reset(od_client_t *client)
|
||||
{
|
||||
shapito_stream_t *stream = client->stream;
|
||||
#if 0
|
||||
od_instance_t *instance = client->system->instance;
|
||||
shapito_stream_t *stream = &client->stream;
|
||||
int watermark = (instance->scheme.readahead * 2);
|
||||
if (od_unlikely(shapito_stream_used(stream) >= watermark)) {
|
||||
od_debug(&instance->logger, "main", client, client->server,
|
||||
|
@ -464,6 +469,8 @@ od_frontend_stream_reset(od_client_t *client)
|
|||
} else {
|
||||
shapito_stream_reset(stream);
|
||||
}
|
||||
#endif
|
||||
shapito_stream_reset(stream);
|
||||
}
|
||||
|
||||
static od_frontend_rc_t
|
||||
|
@ -472,7 +479,7 @@ od_frontend_local(od_client_t *client)
|
|||
od_instance_t *instance = client->system->instance;
|
||||
int rc;
|
||||
|
||||
shapito_stream_t *stream = &client->stream;
|
||||
shapito_stream_t *stream = client->stream;
|
||||
for (;;)
|
||||
{
|
||||
/* read client request */
|
||||
|
@ -522,7 +529,7 @@ static inline od_frontend_rc_t
|
|||
od_frontend_remote_client(od_client_t *client)
|
||||
{
|
||||
od_instance_t *instance = client->system->instance;
|
||||
shapito_stream_t *stream = &client->stream;
|
||||
shapito_stream_t *stream = client->stream;
|
||||
od_server_t *server = client->server;
|
||||
|
||||
od_frontend_stream_reset(client);
|
||||
|
@ -635,7 +642,7 @@ od_frontend_remote_server(od_client_t *client)
|
|||
{
|
||||
od_instance_t *instance = client->system->instance;
|
||||
od_route_t *route = client->route;
|
||||
shapito_stream_t *stream = &client->stream;
|
||||
shapito_stream_t *stream = client->stream;
|
||||
od_server_t *server = client->server;
|
||||
|
||||
od_frontend_stream_reset(client);
|
||||
|
@ -953,6 +960,9 @@ void od_frontend(void *arg)
|
|||
}
|
||||
}
|
||||
|
||||
/* attach stream to the client */
|
||||
od_client_stream_attach(client, &instance->stream_cache);
|
||||
|
||||
/* handle startup */
|
||||
rc = od_frontend_startup(client);
|
||||
if (rc == -1) {
|
||||
|
@ -1046,6 +1056,9 @@ void od_frontend(void *arg)
|
|||
}
|
||||
|
||||
/* reset client and server state */
|
||||
if (client->stream == NULL)
|
||||
od_client_stream_attach(client, &instance->stream_cache);
|
||||
|
||||
od_frontend_cleanup(client, "main", frontend_rc);
|
||||
|
||||
/* close frontend connection */
|
||||
|
|
|
@ -91,7 +91,7 @@ od_tls_frontend_accept(od_client_t *client,
|
|||
od_schemelisten_t *scheme,
|
||||
machine_tls_t *tls)
|
||||
{
|
||||
shapito_stream_t *stream = &client->stream;
|
||||
shapito_stream_t *stream = client->stream;
|
||||
|
||||
if (client->startup.is_ssl_request)
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue