Refactor prepared statement with tx pool frontend routines. (#587)

* Refac

* fix

* FIX
This commit is contained in:
reshke 2024-03-07 14:53:27 +05:00 committed by GitHub
parent ff5b656310
commit ddbbb698b7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 44 additions and 130 deletions

View File

@ -959,43 +959,37 @@ static inline od_retcode_t od_frontend_log_bind(od_instance_t *instance,
// 8 hex
#define OD_HASH_LEN 9
static inline machine_msg_t *od_frontend_rewrite_msg(char *data, int size,
int opname_start_offset,
int operator_name_len,
od_hash_t body_hash)
static inline machine_msg_t *
od_frontend_rewrite_msg(char *data, int size, int opname_start_offset,
int operator_name_len, char *opname, int opnamelen)
{
machine_msg_t *msg =
machine_msg_create(size - operator_name_len + OD_HASH_LEN);
machine_msg_create(size - operator_name_len + opnamelen);
char *rewrite_data = machine_msg_data(msg);
// packet header
memcpy(rewrite_data, data, opname_start_offset);
// prefix for opname
od_snprintf(rewrite_data + opname_start_offset, OD_HASH_LEN, "%08x",
body_hash);
od_snprintf(rewrite_data + opname_start_offset, opnamelen, opname);
// rest of msg
memcpy(rewrite_data + opname_start_offset + OD_HASH_LEN,
memcpy(rewrite_data + opname_start_offset + opnamelen,
data + opname_start_offset + operator_name_len,
size - opname_start_offset - operator_name_len);
// set proper size to package
kiwi_header_set_size((kiwi_header_t *)rewrite_data,
size - operator_name_len + OD_HASH_LEN);
size - operator_name_len + opnamelen);
return msg;
}
static od_frontend_status_t od_frontend_deploy_prepared_stmt(
od_server_t *server, od_relay_t *relay, char *ctx,
machine_msg_t *msg /* to adcance or to write? */
)
od_server_t *server, od_relay_t *relay, char *ctx, char *data,
int size /* to adcance or to write? */, od_hash_t body_hash,
char *opname, int opnamelen)
{
od_route_t *route = server->route;
od_instance_t *instance = server->global->instance;
od_client_t *client = server->client;
char *data = machine_msg_data(msg);
int size = machine_msg_size(msg);
od_hash_t body_hash = od_murmur_hash(data, size);
od_hashmap_elt_t desc;
desc.data = data;
@ -1004,9 +998,6 @@ static od_frontend_status_t od_frontend_deploy_prepared_stmt(
od_debug(&instance->logger, ctx, client, server,
"statement: %.*s, hash: %08x", desc.len, desc.data, body_hash);
char opname[OD_HASH_LEN];
od_snprintf(opname, OD_HASH_LEN, "%08x", body_hash);
int refcnt = 0;
od_hashmap_elt_t value;
value.data = &refcnt;
@ -1017,16 +1008,16 @@ static od_frontend_status_t od_frontend_deploy_prepared_stmt(
if (od_hashmap_insert(server->prep_stmts, body_hash, &desc,
&value_ptr) == 0) {
od_debug(&instance->logger, ctx, client, server,
"deploy %.*s operator %s to server", desc.len,
desc.data, opname);
"deploy %.*s operator %.*s to server", desc.len,
desc.data, opnamelen, opname);
// rewrite msg
// allocate prepered statement under name equal to body hash
od_stat_parse(&route->stats);
machine_msg_t *pmsg;
pmsg = kiwi_fe_write_parse_description(
NULL, opname, OD_HASH_LEN, desc.data, desc.len);
pmsg = kiwi_fe_write_parse_description(NULL, opname, opnamelen,
desc.data, desc.len);
if (pmsg == NULL) {
return OD_ESERVER_WRITE;
}
@ -1057,6 +1048,21 @@ static od_frontend_status_t od_frontend_deploy_prepared_stmt(
}
}
static inline od_frontend_status_t od_frontend_deploy_prepared_stmt_msg(
od_server_t *server, od_relay_t *relay, char *ctx,
machine_msg_t *msg /* to adcance or to write? */
)
{
char *data = machine_msg_data(msg);
int size = machine_msg_size(msg);
od_hash_t body_hash = od_murmur_hash(data, size);
char opname[OD_HASH_LEN];
od_snprintf(opname, OD_HASH_LEN, "%08x", body_hash);
return od_frontend_deploy_prepared_stmt(server, relay, ctx, data, size,
body_hash, opname, OD_HASH_LEN);
}
static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
char *data, int size)
{
@ -1140,63 +1146,15 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
od_hash_t body_hash =
od_murmur_hash(desc->data, desc->len);
od_debug(&instance->logger, "rewrite describe", client,
server, "statement: %.*s, hash: %08x",
desc->len, desc->data, body_hash);
char opname[OD_HASH_LEN];
od_snprintf(opname, OD_HASH_LEN, "%08x", body_hash);
int refcnt = 0;
od_hashmap_elt_t value;
value.data = &refcnt;
value.len = sizeof(int);
od_hashmap_elt_t *value_ptr = &value;
// send parse msg if needed
if (od_hashmap_insert(server->prep_stmts, body_hash,
desc, &value_ptr) == 0) {
od_debug(
&instance->logger,
"rewrite parse before describe", client,
server,
"deploy %.*s operator hash %u to server",
desc->len, desc->data, keyhash);
// rewrite msg
// allocate prepered statement under name equal to body hash
machine_msg_t *pmsg;
pmsg = kiwi_fe_write_parse_description(
NULL, opname, OD_HASH_LEN, desc->data,
desc->len);
if (pmsg == NULL) {
return OD_ESERVER_WRITE;
}
if (instance->config.log_query ||
route->rule->log_query) {
od_frontend_log_parse(
instance, client,
"rewrite parse",
machine_msg_data(pmsg),
machine_msg_size(pmsg));
}
od_stat_parse(&route->stats);
// msg deallocated here
machine_iov_add(relay->iov, pmsg);
od_dbg_printf_on_dvl_lvl(
1, "client relay %p advance msg %c\n",
relay, *(char *)machine_msg_data(pmsg));
} else {
int *refcnt;
refcnt = value_ptr->data;
*refcnt = 1 + *refcnt;
od_stat_parse_reuse(&route->stats);
/* fill internals structs in, send parse if needed */
if (od_frontend_deploy_prepared_stmt(
server, &server->relay, "parse before bind",
desc->data, desc->len, body_hash, opname,
OD_HASH_LEN) != OD_OK) {
return OD_ESERVER_WRITE;
}
machine_msg_t *msg;
@ -1316,59 +1274,15 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
od_hash_t body_hash =
od_murmur_hash(desc->data, desc->len);
od_debug(&instance->logger, "rewrite bind", client,
server, "statement: %.*s, hash: %08x",
desc->len, desc->data, body_hash);
od_hashmap_elt_t value;
int refcnt = 1;
value.data = &refcnt;
value.len = sizeof(int);
od_hashmap_elt_t *value_ptr = &value;
char opname[OD_HASH_LEN];
od_snprintf(opname, OD_HASH_LEN, "%08x", body_hash);
if (od_hashmap_insert(server->prep_stmts, body_hash,
desc, &value_ptr) == 0) {
od_debug(
&instance->logger,
"rewrite parse before bind", client,
server,
"deploy %.*s operator hash %u to server",
desc->len, desc->data, keyhash);
// rewrite msg
// allocate prepered statement under name equal to body hash
machine_msg_t *pmsg;
pmsg = kiwi_fe_write_parse_description(
NULL, opname, OD_HASH_LEN, desc->data,
desc->len);
if (pmsg == NULL) {
return OD_ESERVER_WRITE;
}
if (instance->config.log_query ||
route->rule->log_query) {
od_frontend_log_parse(
instance, client,
"rewrite parse",
machine_msg_data(pmsg),
machine_msg_size(pmsg));
}
od_stat_parse(&route->stats);
machine_iov_add(relay->iov, pmsg);
od_dbg_printf_on_dvl_lvl(
1, "client relay %p advance msg %c\n",
relay, *(char *)machine_msg_data(pmsg));
} else {
int *refcnt = value_ptr->data;
*refcnt = 1 + *refcnt;
od_stat_parse_reuse(&route->stats);
/* fill internals structs in, send parse if needed */
if (od_frontend_deploy_prepared_stmt(
server, &server->relay, "parse before bind",
desc->data, desc->len, body_hash, opname,
OD_HASH_LEN) != OD_OK) {
return OD_ESERVER_WRITE;
}
int opname_start_offset =
@ -1380,8 +1294,8 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
machine_msg_t *msg;
msg = od_frontend_rewrite_msg(data, size,
opname_start_offset,
operator_name_len,
body_hash);
operator_name_len, opname,
OD_HASH_LEN);
if (msg == NULL) {
return OD_ESERVER_WRITE;
@ -1784,7 +1698,7 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client)
assert(server->parse_msg != NULL);
/* fill internals structs in */
if (od_frontend_deploy_prepared_stmt(
if (od_frontend_deploy_prepared_stmt_msg(
server, &server->relay, "sync-point",
server->parse_msg) != OD_OK) {
status = OD_ESERVER_WRITE;