odyssey: optimize relay processing

This commit is contained in:
Dmitry Simonenko 2019-01-24 15:35:07 +03:00
parent 6c0a1f09f8
commit 49795088f7
3 changed files with 86 additions and 70 deletions

View File

@ -452,8 +452,10 @@ od_frontend_remote_server(od_relay_t *relay, char *data, int size)
od_instance_t *instance = client->global->instance; od_instance_t *instance = client->global->instance;
kiwi_be_type_t type = *data; kiwi_be_type_t type = *data;
od_debug(&instance->logger, "main", client, server, "%s",
kiwi_be_type_to_string(type)); if (instance->config.log_debug)
od_debug(&instance->logger, "main", client, server, "%s",
kiwi_be_type_to_string(type));
int is_deploy = od_server_in_deploy(server); int is_deploy = od_server_in_deploy(server);
int is_ready_for_query = 0; int is_ready_for_query = 0;
@ -485,7 +487,7 @@ od_frontend_remote_server(od_relay_t *relay, char *data, int size)
od_stat_query_end(&route->stats, &server->stats_state, od_stat_query_end(&route->stats, &server->stats_state,
server->is_transaction, server->is_transaction,
&query_time); &query_time);
if (query_time > 0) { if (instance->config.log_debug && query_time > 0) {
od_debug(&instance->logger, "main", server->client, server, od_debug(&instance->logger, "main", server->client, server,
"query time: %d microseconds", "query time: %d microseconds",
query_time); query_time);
@ -531,8 +533,9 @@ od_frontend_remote_client(od_relay_t *relay, char *data, int size)
od_server_t *server = client->server; od_server_t *server = client->server;
assert(server != NULL); assert(server != NULL);
od_debug(&instance->logger, "main", client, server, "%s", if (instance->config.log_debug)
kiwi_fe_type_to_string(type)); od_debug(&instance->logger, "main", client, server, "%s",
kiwi_fe_type_to_string(type));
switch (type) { switch (type) {
case KIWI_FE_COPY_DONE: case KIWI_FE_COPY_DONE:

View File

@ -116,67 +116,98 @@ od_relay_stop(od_relay_t *relay)
} }
static inline int static inline int
od_relay_process_is_full_packet(char *data) od_relay_full_packet_required(char *data)
{ {
kiwi_header_t *header; kiwi_header_t *header;
header = (kiwi_header_t*)data; header = (kiwi_header_t*)data;
if (header->type == KIWI_BE_PARAMETER_STATUS || if (header->type == KIWI_BE_PARAMETER_STATUS ||
header->type == KIWI_BE_READY_FOR_QUERY || header->type == KIWI_BE_READY_FOR_QUERY ||
header->type == KIWI_BE_ERROR_RESPONSE) header->type == KIWI_BE_ERROR_RESPONSE)
return 1; return 1;
return 0; return 0;
} }
static inline od_status_t static inline od_status_t
od_relay_on_packet_msg(od_relay_t *relay, machine_msg_t *msg)
{
int rc;
od_status_t status;
status = relay->on_packet(relay, machine_msg_data(msg),
machine_msg_size(msg));
switch (status) {
case OD_OK:
case OD_DETACH:
rc = machine_iov_add(relay->iov, msg);
if (rc == -1)
return OD_EOOM;
break;
default:
machine_msg_free(msg);
break;
}
return status;
}
static inline od_status_t
od_relay_on_packet(od_relay_t *relay, char *data, int size)
{
int rc;
od_status_t status;
status = relay->on_packet(relay, data, size);
switch (status) {
case OD_OK:
case OD_DETACH:
rc = machine_iov_add_pointer(relay->iov, data, size);
if (rc == -1)
return OD_EOOM;
break;
case OD_SKIP:
relay->packet_skip = 1;
status = OD_OK;
break;
default:
break;
}
return status;
}
__attribute__((hot)) static inline od_status_t
od_relay_process(od_relay_t *relay, int *progress, char *data, int size) od_relay_process(od_relay_t *relay, int *progress, char *data, int size)
{ {
*progress = 0; *progress = 0;
/* packet start */ /* on packet start */
int rc; int rc;
od_status_t status;
if (relay->packet == 0) if (relay->packet == 0)
{ {
if (size < (int)sizeof(kiwi_header_t)) if (size < (int)sizeof(kiwi_header_t))
return OD_UNDEF; return OD_UNDEF;
uint32_t body; int body;
body = kiwi_read_size(data, sizeof(kiwi_header_t)); body = kiwi_read_size(data, sizeof(kiwi_header_t));
relay->packet = body;
relay->packet_skip = 0;
rc = od_relay_process_is_full_packet(data); int total = sizeof(kiwi_header_t) + body;
if (rc) if (size >= total) {
{ *progress = total;
relay->packet_full = machine_msg_create(sizeof(kiwi_header_t) + body); return od_relay_on_packet(relay, data, total);
if (relay->packet_full == NULL)
return OD_EOOM;
char *dest;
dest = machine_msg_data(relay->packet_full);
memcpy(dest, data, sizeof(kiwi_header_t));
relay->packet_full_pos = sizeof(kiwi_header_t);
} else {
status = relay->on_packet(relay, data, sizeof(kiwi_header_t));
switch (status) {
case OD_OK:
rc = machine_iov_add_pointer(relay->iov, data, sizeof(kiwi_header_t));
if (rc == -1)
return OD_EOOM;
break;
case OD_DETACH:
rc = machine_iov_add_pointer(relay->iov, data, sizeof(kiwi_header_t));
if (rc == -1)
return OD_EOOM;
return status;
case OD_SKIP:
relay->packet_skip = 1;
break;
default:
return status;
}
} }
*progress = sizeof(kiwi_header_t); *progress = size;
relay->packet = total - size;
relay->packet_skip = 0;
rc = od_relay_full_packet_required(data);
if (! rc)
return od_relay_on_packet(relay, data, size);
relay->packet_full = machine_msg_create(total);
if (relay->packet_full == NULL)
return OD_EOOM;
char *dest;
dest = machine_msg_data(relay->packet_full);
memcpy(dest, data, size);
relay->packet_full_pos = size;
return OD_OK; return OD_OK;
} }
@ -193,30 +224,12 @@ od_relay_process(od_relay_t *relay, int *progress, char *data, int size)
dest = machine_msg_data(relay->packet_full); dest = machine_msg_data(relay->packet_full);
memcpy(dest + relay->packet_full_pos, data, to_parse); memcpy(dest + relay->packet_full_pos, data, to_parse);
relay->packet_full_pos += to_parse; relay->packet_full_pos += to_parse;
if (relay->packet == 0) { if (relay->packet > 0)
status = relay->on_packet(relay, dest, machine_msg_size(relay->packet_full)); return OD_OK;
machine_msg_t *msg = relay->packet_full; machine_msg_t *msg = relay->packet_full;
relay->packet_full = NULL; relay->packet_full = NULL;
relay->packet_full_pos = 0; relay->packet_full_pos = 0;
switch (status) { return od_relay_on_packet_msg(relay, msg);
case OD_OK:
rc = machine_iov_add(relay->iov, msg);
if (rc == -1)
return OD_EOOM;
break;
case OD_DETACH:
rc = machine_iov_add(relay->iov, msg);
if (rc == -1)
return OD_EOOM;
return status;
case OD_SKIP:
machine_msg_free(msg);
break;
default:
machine_msg_free(msg);
return status;
}
}
} else { } else {
if (relay->packet_skip) if (relay->packet_skip)
return OD_OK; return OD_OK;

View File

@ -54,8 +54,8 @@ mm_iov_reset(mm_iov_t *iov)
mm_iov_gc(iov); mm_iov_gc(iov);
} }
static inline int __attribute__((hot)) static inline int
mm_iov_add_pointer(mm_iov_t *iov, void *data, int size) mm_iov_add_pointer(mm_iov_t *iov, void *pointer, int size)
{ {
int rc; int rc;
rc = mm_buf_ensure(&iov->iov, sizeof(struct iovec)); rc = mm_buf_ensure(&iov->iov, sizeof(struct iovec));
@ -63,7 +63,7 @@ mm_iov_add_pointer(mm_iov_t *iov, void *data, int size)
return -1; return -1;
struct iovec *iovec; struct iovec *iovec;
iovec = (struct iovec*)iov->iov.pos; iovec = (struct iovec*)iov->iov.pos;
iovec->iov_base = data; iovec->iov_base = pointer;
iovec->iov_len = size; iovec->iov_len = size;
mm_buf_advance(&iov->iov, sizeof(struct iovec)); mm_buf_advance(&iov->iov, sizeof(struct iovec));
iov->iov_count++; iov->iov_count++;