mirror of https://github.com/yandex/odyssey.git
Sync relay extended proto messages in tx pool. (#586)
* Fix relay in tx pool + prep stmt try 2 * fmt * fix
This commit is contained in:
parent
ccf16398f6
commit
ff5b656310
|
@ -60,7 +60,7 @@ database "db1" {
|
|||
pool_timeout 0
|
||||
pool_ttl 60
|
||||
pool_discard no
|
||||
pool_smart_discard yes
|
||||
pool_smart_discard no
|
||||
pool_cancel yes
|
||||
pool_rollback yes
|
||||
pool_reserve_prepared_statement yes
|
||||
|
|
|
@ -100,6 +100,7 @@ int od_backend_ready(od_server_t *server, char *data, uint32_t size)
|
|||
}
|
||||
|
||||
/* update server sync reply state */
|
||||
|
||||
od_server_sync_reply(server);
|
||||
return 0;
|
||||
}
|
||||
|
@ -171,6 +172,7 @@ static inline int od_backend_startup(od_server_t *server,
|
|||
|
||||
/* update request count and sync state */
|
||||
od_server_sync_request(server, 1);
|
||||
assert(server->client);
|
||||
|
||||
while (1) {
|
||||
msg = od_read(&server->io, UINT32_MAX);
|
||||
|
@ -702,7 +704,7 @@ int od_backend_ready_wait(od_server_t *server, char *context, int count,
|
|||
int query_rc;
|
||||
query_rc = 0;
|
||||
|
||||
for (;;) {
|
||||
for (; !od_server_synchronized(server);) {
|
||||
machine_msg_t *msg;
|
||||
msg = od_read(&server->io, time_ms);
|
||||
if (msg == NULL) {
|
||||
|
@ -741,13 +743,12 @@ int od_backend_ready_wait(od_server_t *server, char *context, int count,
|
|||
machine_msg_size(msg));
|
||||
machine_msg_free(msg);
|
||||
ready++;
|
||||
if (ready == count) {
|
||||
return query_rc;
|
||||
}
|
||||
} else {
|
||||
machine_msg_free(msg);
|
||||
}
|
||||
}
|
||||
|
||||
return query_rc;
|
||||
/* never reached */
|
||||
}
|
||||
|
||||
|
@ -777,6 +778,7 @@ od_retcode_t od_backend_query_send(od_server_t *server, char *context,
|
|||
|
||||
/* update server sync state */
|
||||
od_server_sync_request(server, 1);
|
||||
assert(server->client);
|
||||
return OK_RESPONSE;
|
||||
}
|
||||
|
||||
|
|
|
@ -54,7 +54,7 @@ struct od_client {
|
|||
void *route;
|
||||
char peer[OD_CLIENT_MAX_PEERLEN];
|
||||
|
||||
// desc preparet statements ids
|
||||
/* desc preparet statements ids */
|
||||
od_hashmap_t *prep_stmt_ids;
|
||||
|
||||
/* passwd from config rule */
|
||||
|
|
|
@ -229,6 +229,8 @@ od_frontend_attach(od_client_t *client, char *context,
|
|||
server->id.id_prefix,
|
||||
(int)sizeof(server->id.id_prefix), server->id.id);
|
||||
|
||||
assert(od_server_synchronized(server));
|
||||
|
||||
/* connect to server, if necessary */
|
||||
if (server->io.io) {
|
||||
return OD_OK;
|
||||
|
@ -279,7 +281,7 @@ od_frontend_attach_and_deploy(od_client_t *client, char *context)
|
|||
return OD_ESERVER_WRITE;
|
||||
|
||||
/* set number of replies to discard */
|
||||
client->server->deploy_sync = rc;
|
||||
server->deploy_sync = rc;
|
||||
|
||||
od_server_sync_request(server, server->deploy_sync);
|
||||
return OD_OK;
|
||||
|
@ -726,6 +728,9 @@ static od_frontend_status_t od_frontend_remote_server(od_relay_t *relay,
|
|||
od_server_t *server = client->server;
|
||||
od_route_t *route = client->route;
|
||||
od_instance_t *instance = client->global->instance;
|
||||
od_frontend_status_t retstatus;
|
||||
|
||||
retstatus = OD_OK;
|
||||
|
||||
kiwi_be_type_t type = *data;
|
||||
if (instance->config.log_debug)
|
||||
|
@ -764,6 +769,11 @@ static od_frontend_status_t od_frontend_remote_server(od_relay_t *relay,
|
|||
is_ready_for_query = 1;
|
||||
od_backend_ready(server, data, size);
|
||||
|
||||
/* exactly one RFQ! */
|
||||
if (od_server_in_sync_point(server)) {
|
||||
retstatus = OD_SKIP;
|
||||
}
|
||||
|
||||
if (is_deploy)
|
||||
server->deploy_sync--;
|
||||
|
||||
|
@ -786,14 +796,14 @@ static od_frontend_status_t od_frontend_remote_server(od_relay_t *relay,
|
|||
case KIWI_BE_PARSE_COMPLETE:
|
||||
if (route->rule->pool->reserve_prepared_statement) {
|
||||
// skip msg
|
||||
is_deploy = 1;
|
||||
retstatus = OD_SKIP;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
/* discard replies during configuration deploy */
|
||||
if (is_deploy)
|
||||
if (is_deploy || retstatus == OD_SKIP)
|
||||
return OD_SKIP;
|
||||
|
||||
if (route->id.physical_rep || route->id.logical_rep) {
|
||||
|
@ -803,7 +813,8 @@ static od_frontend_status_t od_frontend_remote_server(od_relay_t *relay,
|
|||
return OD_DETACH;
|
||||
}
|
||||
} else {
|
||||
if (is_ready_for_query && od_server_synchronized(server)) {
|
||||
if (is_ready_for_query && od_server_synchronized(server) &&
|
||||
server->parse_msg == NULL) {
|
||||
switch (route->rule->pool->pool) {
|
||||
case OD_RULE_POOL_STATEMENT:
|
||||
return OD_DETACH;
|
||||
|
@ -822,7 +833,7 @@ static od_frontend_status_t od_frontend_remote_server(od_relay_t *relay,
|
|||
}
|
||||
}
|
||||
|
||||
return OD_OK;
|
||||
return retstatus;
|
||||
}
|
||||
|
||||
static inline od_retcode_t od_frontend_log_query(od_instance_t *instance,
|
||||
|
@ -973,6 +984,79 @@ static inline machine_msg_t *od_frontend_rewrite_msg(char *data, int size,
|
|||
return msg;
|
||||
}
|
||||
|
||||
static od_frontend_status_t od_frontend_deploy_prepared_stmt(
|
||||
od_server_t *server, od_relay_t *relay, char *ctx,
|
||||
machine_msg_t *msg /* to adcance or to write? */
|
||||
)
|
||||
{
|
||||
od_route_t *route = server->route;
|
||||
od_instance_t *instance = server->global->instance;
|
||||
od_client_t *client = server->client;
|
||||
char *data = machine_msg_data(msg);
|
||||
int size = machine_msg_size(msg);
|
||||
|
||||
od_hash_t body_hash = od_murmur_hash(data, size);
|
||||
|
||||
od_hashmap_elt_t desc;
|
||||
desc.data = data;
|
||||
desc.len = size;
|
||||
|
||||
od_debug(&instance->logger, ctx, client, server,
|
||||
"statement: %.*s, hash: %08x", desc.len, desc.data, body_hash);
|
||||
|
||||
char opname[OD_HASH_LEN];
|
||||
od_snprintf(opname, OD_HASH_LEN, "%08x", body_hash);
|
||||
|
||||
int refcnt = 0;
|
||||
od_hashmap_elt_t value;
|
||||
value.data = &refcnt;
|
||||
value.len = sizeof(int);
|
||||
od_hashmap_elt_t *value_ptr = &value;
|
||||
|
||||
// send parse msg if needed
|
||||
if (od_hashmap_insert(server->prep_stmts, body_hash, &desc,
|
||||
&value_ptr) == 0) {
|
||||
od_debug(&instance->logger, ctx, client, server,
|
||||
"deploy %.*s operator %s to server", desc.len,
|
||||
desc.data, opname);
|
||||
// rewrite msg
|
||||
// allocate prepered statement under name equal to body hash
|
||||
|
||||
od_stat_parse(&route->stats);
|
||||
|
||||
machine_msg_t *pmsg;
|
||||
pmsg = kiwi_fe_write_parse_description(
|
||||
NULL, opname, OD_HASH_LEN, desc.data, desc.len);
|
||||
if (pmsg == NULL) {
|
||||
return OD_ESERVER_WRITE;
|
||||
}
|
||||
|
||||
if (instance->config.log_query || route->rule->log_query) {
|
||||
od_frontend_log_parse(instance, client, "rewrite parse",
|
||||
machine_msg_data(pmsg),
|
||||
machine_msg_size(pmsg));
|
||||
}
|
||||
|
||||
od_stat_parse(&route->stats);
|
||||
// msg deallocated here
|
||||
od_dbg_printf_on_dvl_lvl(1, "relay %p write msg %c\n", relay,
|
||||
*(char *)machine_msg_data(pmsg));
|
||||
|
||||
od_write(&server->io, pmsg);
|
||||
// advance?
|
||||
// machine_iov_add(relay->iov, pmsg);
|
||||
|
||||
return OD_OK;
|
||||
} else {
|
||||
int *refcnt;
|
||||
refcnt = value_ptr->data;
|
||||
*refcnt = 1 + *refcnt;
|
||||
|
||||
od_stat_parse_reuse(&route->stats);
|
||||
return OD_OK;
|
||||
}
|
||||
}
|
||||
|
||||
static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
|
||||
char *data, int size)
|
||||
{
|
||||
|
@ -990,14 +1074,13 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
|
|||
configuration */
|
||||
od_server_t *server = client->server;
|
||||
assert(server != NULL);
|
||||
assert(server->parse_msg == NULL);
|
||||
|
||||
if (instance->config.log_debug)
|
||||
od_debug(&instance->logger, "remote client", client, server,
|
||||
"%s", kiwi_fe_type_to_string(type));
|
||||
|
||||
od_frontend_status_t retstatus = OD_OK;
|
||||
machine_msg_t *msg;
|
||||
msg = NULL;
|
||||
bool forwarded = 0;
|
||||
switch (type) {
|
||||
case KIWI_FE_COPY_DONE:
|
||||
|
@ -1031,16 +1114,11 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
|
|||
return OD_ECLIENT_READ;
|
||||
}
|
||||
if (type == KIWI_FE_DESCRIBE_PORTAL) {
|
||||
break; // skip this, we obly need to rewrite statement
|
||||
break; // skip this, we only need to rewrite statement
|
||||
}
|
||||
|
||||
assert(client->prep_stmt_ids);
|
||||
retstatus = OD_SKIP;
|
||||
int opname_start_offset =
|
||||
kiwi_be_describe_opname_offset(data, size);
|
||||
if (opname_start_offset < 0) {
|
||||
return OD_ECLIENT_READ;
|
||||
}
|
||||
|
||||
od_hashmap_elt_t key;
|
||||
key.len = operator_name_len;
|
||||
|
@ -1088,10 +1166,11 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
|
|||
// rewrite msg
|
||||
// allocate prepered statement under name equal to body hash
|
||||
|
||||
msg = kiwi_fe_write_parse_description(
|
||||
machine_msg_t *pmsg;
|
||||
pmsg = kiwi_fe_write_parse_description(
|
||||
NULL, opname, OD_HASH_LEN, desc->data,
|
||||
desc->len);
|
||||
if (msg == NULL) {
|
||||
if (pmsg == NULL) {
|
||||
return OD_ESERVER_WRITE;
|
||||
}
|
||||
|
||||
|
@ -1100,20 +1179,17 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
|
|||
od_frontend_log_parse(
|
||||
instance, client,
|
||||
"rewrite parse",
|
||||
machine_msg_data(msg),
|
||||
machine_msg_size(msg));
|
||||
machine_msg_data(pmsg),
|
||||
machine_msg_size(pmsg));
|
||||
}
|
||||
|
||||
od_stat_parse(&route->stats);
|
||||
// msg deallocated here
|
||||
rc = od_write(&server->io, msg);
|
||||
if (rc == -1) {
|
||||
od_error(&instance->logger, "describe",
|
||||
NULL, server,
|
||||
"write error: %s",
|
||||
od_io_error(&server->io));
|
||||
return OD_ESERVER_WRITE;
|
||||
}
|
||||
|
||||
machine_iov_add(relay->iov, pmsg);
|
||||
od_dbg_printf_on_dvl_lvl(
|
||||
1, "client relay %p advance msg %c\n",
|
||||
relay, *(char *)machine_msg_data(pmsg));
|
||||
|
||||
} else {
|
||||
int *refcnt;
|
||||
|
@ -1123,6 +1199,7 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
|
|||
od_stat_parse_reuse(&route->stats);
|
||||
}
|
||||
|
||||
machine_msg_t *msg;
|
||||
msg = kiwi_fe_write_describe(NULL, 'S', opname,
|
||||
OD_HASH_LEN);
|
||||
|
||||
|
@ -1138,14 +1215,11 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
|
|||
}
|
||||
|
||||
// msg if deallocated automaictly
|
||||
rc = od_write(&server->io, msg);
|
||||
machine_iov_add(relay->iov, msg);
|
||||
od_dbg_printf_on_dvl_lvl(
|
||||
1, "client relay %p advance msg %c\n", relay,
|
||||
*(char *)machine_msg_data(msg));
|
||||
forwarded = 1;
|
||||
if (rc == -1) {
|
||||
od_error(&instance->logger, "describe", NULL,
|
||||
server, "write error: %s",
|
||||
od_io_error(&server->io));
|
||||
return OD_ESERVER_WRITE;
|
||||
}
|
||||
}
|
||||
break;
|
||||
case KIWI_FE_PARSE:
|
||||
|
@ -1155,7 +1229,7 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
|
|||
|
||||
if (route->rule->pool->reserve_prepared_statement) {
|
||||
/* skip client parse msg */
|
||||
retstatus = OD_SKIP;
|
||||
retstatus = OD_REQ_SYNC;
|
||||
kiwi_prepared_statement_t desc;
|
||||
int rc;
|
||||
rc = kiwi_be_read_parse_dest(data, size, &desc);
|
||||
|
@ -1180,17 +1254,13 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
|
|||
|
||||
od_hashmap_elt_t *value_ptr = &value;
|
||||
|
||||
int opname_start_offset =
|
||||
kiwi_be_parse_opname_offset(data, size);
|
||||
if (opname_start_offset < 0) {
|
||||
return OD_ECLIENT_READ;
|
||||
server->parse_msg =
|
||||
machine_msg_create(desc.description_len);
|
||||
if (server->parse_msg == NULL) {
|
||||
return OD_ESERVER_WRITE;
|
||||
}
|
||||
|
||||
od_hash_t body_hash =
|
||||
od_murmur_hash(data + opname_start_offset +
|
||||
desc.operator_name_len,
|
||||
size - opname_start_offset -
|
||||
desc.operator_name_len);
|
||||
memcpy(machine_msg_data(server->parse_msg),
|
||||
desc.description, desc.description_len);
|
||||
|
||||
assert(client->prep_stmt_ids);
|
||||
if (od_hashmap_insert(client->prep_stmt_ids, keyhash,
|
||||
|
@ -1205,21 +1275,6 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
|
|||
return OD_ESERVER_WRITE;
|
||||
}
|
||||
}
|
||||
|
||||
machine_msg_t *pmsg;
|
||||
pmsg = kiwi_be_write_parse_complete(NULL);
|
||||
if (pmsg == NULL) {
|
||||
return OD_ESERVER_WRITE;
|
||||
}
|
||||
rc = od_write(&client->io, pmsg);
|
||||
forwarded = 1;
|
||||
|
||||
if (rc == -1) {
|
||||
od_error(&instance->logger, "parse", client,
|
||||
NULL, "write error: %s",
|
||||
od_io_error(&client->io));
|
||||
return OD_ESERVER_WRITE;
|
||||
}
|
||||
}
|
||||
break;
|
||||
case KIWI_FE_BIND:
|
||||
|
@ -1240,12 +1295,6 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
|
|||
return OD_ECLIENT_READ;
|
||||
}
|
||||
|
||||
int opname_start_offset =
|
||||
kiwi_be_bind_opname_offset(data, size);
|
||||
if (opname_start_offset < 0) {
|
||||
return OD_ECLIENT_READ;
|
||||
}
|
||||
|
||||
od_hashmap_elt_t key;
|
||||
key.len = operator_name_len;
|
||||
key.data = operator_name;
|
||||
|
@ -1291,12 +1340,12 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
|
|||
desc->len, desc->data, keyhash);
|
||||
// rewrite msg
|
||||
// allocate prepered statement under name equal to body hash
|
||||
|
||||
msg = kiwi_fe_write_parse_description(
|
||||
machine_msg_t *pmsg;
|
||||
pmsg = kiwi_fe_write_parse_description(
|
||||
NULL, opname, OD_HASH_LEN, desc->data,
|
||||
desc->len);
|
||||
|
||||
if (msg == NULL) {
|
||||
if (pmsg == NULL) {
|
||||
return OD_ESERVER_WRITE;
|
||||
}
|
||||
|
||||
|
@ -1305,25 +1354,30 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
|
|||
od_frontend_log_parse(
|
||||
instance, client,
|
||||
"rewrite parse",
|
||||
machine_msg_data(msg),
|
||||
machine_msg_size(msg));
|
||||
machine_msg_data(pmsg),
|
||||
machine_msg_size(pmsg));
|
||||
}
|
||||
|
||||
od_stat_parse(&route->stats);
|
||||
rc = od_write(&server->io, msg);
|
||||
if (rc == -1) {
|
||||
od_error(&instance->logger,
|
||||
"rewrite parse", NULL, server,
|
||||
"write error: %s",
|
||||
od_io_error(&server->io));
|
||||
return OD_ESERVER_WRITE;
|
||||
}
|
||||
machine_iov_add(relay->iov, pmsg);
|
||||
|
||||
od_dbg_printf_on_dvl_lvl(
|
||||
1, "client relay %p advance msg %c\n",
|
||||
relay, *(char *)machine_msg_data(pmsg));
|
||||
|
||||
} else {
|
||||
int *refcnt = value_ptr->data;
|
||||
*refcnt = 1 + *refcnt;
|
||||
od_stat_parse_reuse(&route->stats);
|
||||
}
|
||||
|
||||
int opname_start_offset =
|
||||
kiwi_be_bind_opname_offset(data, size);
|
||||
if (opname_start_offset < 0) {
|
||||
return OD_ECLIENT_READ;
|
||||
}
|
||||
|
||||
machine_msg_t *msg;
|
||||
msg = od_frontend_rewrite_msg(data, size,
|
||||
opname_start_offset,
|
||||
operator_name_len,
|
||||
|
@ -1341,15 +1395,12 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
|
|||
machine_msg_size(msg));
|
||||
}
|
||||
|
||||
rc = od_write(&server->io, msg);
|
||||
forwarded = 1;
|
||||
machine_iov_add(relay->iov, msg);
|
||||
|
||||
if (rc == -1) {
|
||||
od_error(&instance->logger, "rewrite bind",
|
||||
NULL, server, "write error: %s",
|
||||
od_io_error(&server->io));
|
||||
return OD_ESERVER_WRITE;
|
||||
}
|
||||
od_dbg_printf_on_dvl_lvl(
|
||||
1, "client relay %p advance msg %c\n", relay,
|
||||
*(char *)machine_msg_data(msg));
|
||||
forwarded = 1;
|
||||
}
|
||||
break;
|
||||
case KIWI_FE_EXECUTE:
|
||||
|
@ -1418,12 +1469,6 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay,
|
|||
}
|
||||
|
||||
/* If the retstatus is not SKIP */
|
||||
if (route->rule->pool->reserve_prepared_statement && forwarded != 1 &&
|
||||
msg != NULL) {
|
||||
msg = kiwi_fe_copy_msg(msg, data, size);
|
||||
od_write(&server->io, msg);
|
||||
retstatus = OD_SKIP;
|
||||
}
|
||||
/* update server stats */
|
||||
od_stat_query_start(&server->stats_state);
|
||||
return retstatus;
|
||||
|
@ -1500,9 +1545,10 @@ static inline od_frontend_status_t od_frontend_poll_catchup(od_client_t *client,
|
|||
}
|
||||
|
||||
static inline od_frontend_status_t
|
||||
od_frontend_remote_process_server(od_server_t *server, od_client_t *client)
|
||||
od_frontend_remote_process_server(od_server_t *server, od_client_t *client,
|
||||
bool await_read)
|
||||
{
|
||||
od_frontend_status_t status = od_relay_step(&server->relay);
|
||||
od_frontend_status_t status = od_relay_step(&server->relay, await_read);
|
||||
int rc;
|
||||
od_instance_t *instance = client->global->instance;
|
||||
|
||||
|
@ -1658,9 +1704,10 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client)
|
|||
break;
|
||||
|
||||
server = client->server;
|
||||
bool sync_req = 0;
|
||||
|
||||
/* attach */
|
||||
status = od_relay_step(&client->relay);
|
||||
status = od_relay_step(&client->relay, false);
|
||||
if (status == OD_ATTACH) {
|
||||
/* Check for replication lag and reject query if too big */
|
||||
od_frontend_status_t catchup_status =
|
||||
|
@ -1689,6 +1736,8 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client)
|
|||
|
||||
/* retry read operation after attach */
|
||||
continue;
|
||||
} else if (status == OD_REQ_SYNC) {
|
||||
sync_req = 1;
|
||||
} else if (status != OD_OK) {
|
||||
break;
|
||||
}
|
||||
|
@ -1696,7 +1745,97 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client)
|
|||
if (server == NULL)
|
||||
continue;
|
||||
|
||||
status = od_frontend_remote_process_server(server, client);
|
||||
status = od_frontend_remote_process_server(server, client,
|
||||
false);
|
||||
if (status != OD_OK) {
|
||||
/* should not return this here */
|
||||
assert(status != OD_REQ_SYNC);
|
||||
break;
|
||||
}
|
||||
|
||||
// are we requested to meet sync point?
|
||||
|
||||
if (sync_req) {
|
||||
od_log(&instance->logger, "sync-point", client, server,
|
||||
"process, %d", od_server_synchronized(server));
|
||||
|
||||
while (1) {
|
||||
if (od_server_synchronized(server)) {
|
||||
break;
|
||||
}
|
||||
// await here
|
||||
|
||||
od_log(&instance->logger, "sync-point", client,
|
||||
server, "process await");
|
||||
status = od_frontend_remote_process_server(
|
||||
server, client, true);
|
||||
|
||||
if (status != OD_OK) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (status != OD_OK) {
|
||||
break;
|
||||
}
|
||||
|
||||
// deploy here
|
||||
|
||||
assert(server->parse_msg != NULL);
|
||||
|
||||
/* fill internals structs in */
|
||||
if (od_frontend_deploy_prepared_stmt(
|
||||
server, &server->relay, "sync-point",
|
||||
server->parse_msg) != OD_OK) {
|
||||
status = OD_ESERVER_WRITE;
|
||||
break;
|
||||
}
|
||||
server->parse_msg = NULL;
|
||||
|
||||
machine_msg_t *msg;
|
||||
msg = kiwi_fe_write_sync(NULL);
|
||||
if (msg == NULL) {
|
||||
status = OD_ESERVER_WRITE;
|
||||
break;
|
||||
}
|
||||
rc = od_write(&server->io, msg);
|
||||
if (rc == -1) {
|
||||
status = OD_ESERVER_WRITE;
|
||||
break;
|
||||
}
|
||||
|
||||
/* enter sync piont mode */
|
||||
server->sync_point = 1;
|
||||
od_server_sync_request(server, 1);
|
||||
|
||||
while (1) {
|
||||
if (od_server_synchronized(server)) {
|
||||
break;
|
||||
}
|
||||
// await here
|
||||
|
||||
od_log(&instance->logger, "sync-point", client,
|
||||
server, "process await");
|
||||
status = od_frontend_remote_process_server(
|
||||
server, client, true);
|
||||
|
||||
if (status != OD_OK) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
server->sync_point = 0;
|
||||
if (status != OD_OK) {
|
||||
break;
|
||||
}
|
||||
|
||||
machine_msg_t *pmsg;
|
||||
pmsg = kiwi_be_write_parse_complete(NULL);
|
||||
if (pmsg == NULL) {
|
||||
return OD_ECLIENT_WRITE;
|
||||
}
|
||||
machine_iov_add(server->relay.iov, pmsg);
|
||||
}
|
||||
if (status != OD_OK) {
|
||||
break;
|
||||
}
|
||||
|
@ -1863,6 +2002,7 @@ static void od_frontend_cleanup(od_client_t *client, char *context,
|
|||
break;
|
||||
case OD_UNDEF:
|
||||
case OD_SKIP:
|
||||
case OD_REQ_SYNC:
|
||||
case OD_ATTACH:
|
||||
/* fallthrough */
|
||||
case OD_DETACH:
|
||||
|
|
|
@ -177,6 +177,8 @@ static inline od_frontend_status_t od_relay_on_packet_msg(od_relay_t *relay,
|
|||
case OD_SKIP:
|
||||
status = OD_OK;
|
||||
/* fallthrough */
|
||||
case OD_REQ_SYNC:
|
||||
/* fallthrough */
|
||||
default:
|
||||
machine_msg_free(msg);
|
||||
break;
|
||||
|
@ -196,9 +198,16 @@ static inline od_frontend_status_t od_relay_on_packet(od_relay_t *relay,
|
|||
/* fallthrough */
|
||||
case OD_DETACH:
|
||||
rc = machine_iov_add_pointer(relay->iov, data, size);
|
||||
|
||||
od_dbg_printf_on_dvl_lvl(1, "relay %p advance msg %c\n", relay,
|
||||
*data);
|
||||
if (rc == -1)
|
||||
return OD_EOOM;
|
||||
break;
|
||||
case OD_REQ_SYNC:
|
||||
/* fallthrough */
|
||||
relay->packet_skip = 1;
|
||||
break;
|
||||
case OD_SKIP:
|
||||
relay->packet_skip = 1;
|
||||
status = OD_OK;
|
||||
|
@ -277,7 +286,10 @@ od_relay_process(od_relay_t *relay, int *progress, char *data, int size)
|
|||
|
||||
if (relay->packet_skip)
|
||||
return OD_OK;
|
||||
|
||||
rc = machine_iov_add_pointer(relay->iov, data, to_parse);
|
||||
|
||||
od_dbg_printf_on_dvl_lvl(1, "relay %p advance msg %c\n", relay, *data);
|
||||
if (rc == -1)
|
||||
return OD_EOOM;
|
||||
|
||||
|
@ -294,6 +306,9 @@ static inline od_frontend_status_t od_relay_pipeline(od_relay_t *relay)
|
|||
rc = od_relay_process(relay, &progress, current, end - current);
|
||||
current += progress;
|
||||
od_readahead_pos_read_advance(&relay->src->readahead, progress);
|
||||
if (rc == OD_REQ_SYNC) {
|
||||
return OD_REQ_SYNC;
|
||||
}
|
||||
if (rc != OD_OK) {
|
||||
if (rc == OD_UNDEF)
|
||||
rc = OD_OK;
|
||||
|
@ -354,11 +369,17 @@ static inline od_frontend_status_t od_relay_write(od_relay_t *relay)
|
|||
return OD_OK;
|
||||
}
|
||||
|
||||
static inline od_frontend_status_t od_relay_step(od_relay_t *relay)
|
||||
static inline od_frontend_status_t od_relay_step(od_relay_t *relay,
|
||||
bool await_read)
|
||||
{
|
||||
/* on read event */
|
||||
od_frontend_status_t retstatus;
|
||||
retstatus = OD_OK;
|
||||
int rc;
|
||||
if (machine_cond_try(relay->src->on_read)) {
|
||||
rc = await_read ?
|
||||
(machine_cond_wait(relay->src->on_read, UINT32_MAX) == 0) :
|
||||
machine_cond_try(relay->src->on_read);
|
||||
if (rc || od_relay_data_pending(relay)) {
|
||||
if (relay->dst == NULL) {
|
||||
/* signal to retry on read logic */
|
||||
machine_cond_signal(relay->src->on_read);
|
||||
|
@ -371,7 +392,9 @@ static inline od_frontend_status_t od_relay_step(od_relay_t *relay)
|
|||
|
||||
rc = od_relay_pipeline(relay);
|
||||
|
||||
if (rc != OD_OK)
|
||||
if (rc == OD_REQ_SYNC) {
|
||||
retstatus = OD_REQ_SYNC;
|
||||
} else if (rc != OD_OK)
|
||||
return rc;
|
||||
|
||||
if (machine_iov_pending(relay->iov)) {
|
||||
|
@ -383,7 +406,7 @@ static inline od_frontend_status_t od_relay_step(od_relay_t *relay)
|
|||
}
|
||||
|
||||
if (relay->dst == NULL)
|
||||
return OD_OK;
|
||||
return retstatus;
|
||||
|
||||
/* on write event */
|
||||
if (machine_cond_try(relay->dst->on_write)) {
|
||||
|
@ -408,7 +431,7 @@ static inline od_frontend_status_t od_relay_step(od_relay_t *relay)
|
|||
}
|
||||
}
|
||||
|
||||
return OD_OK;
|
||||
return retstatus;
|
||||
}
|
||||
|
||||
static inline od_frontend_status_t od_relay_flush(od_relay_t *relay)
|
||||
|
|
|
@ -658,6 +658,10 @@ od_router_status_t od_router_attach(od_router_t *router, od_client_t *client,
|
|||
if (server == NULL)
|
||||
return OD_ROUTER_ERROR;
|
||||
od_id_generate(&server->id, "s");
|
||||
od_dbg_printf_on_dvl_lvl(1, "server %s%.*s has relay %p\n",
|
||||
server->id.id_prefix,
|
||||
(signed)sizeof(server->id.id), server->id.id,
|
||||
&server->relay);
|
||||
server->global = client->global;
|
||||
server->route = route;
|
||||
|
||||
|
@ -672,6 +676,8 @@ attach:
|
|||
server->idle_time = 0;
|
||||
server->key_client = client->key;
|
||||
|
||||
assert(od_server_synchronized(server));
|
||||
|
||||
/*
|
||||
* XXX: this logic breaks some external solutions that use
|
||||
* PostgreSQL logical replication. Need to tests this and fix
|
||||
|
@ -706,6 +712,9 @@ void od_router_detach(od_router_t *router, od_client_t *client)
|
|||
|
||||
/* detach from current machine event loop */
|
||||
od_server_t *server = client->server;
|
||||
|
||||
assert(server != NULL);
|
||||
assert(od_server_synchronized(server));
|
||||
od_io_detach(&server->io);
|
||||
|
||||
od_route_lock(route);
|
||||
|
|
|
@ -35,6 +35,8 @@ struct od_server {
|
|||
uint64_t sync_request;
|
||||
uint64_t sync_reply;
|
||||
|
||||
/* to swallow some internal msgs */
|
||||
machine_msg_t *parse_msg;
|
||||
int idle_time;
|
||||
|
||||
kiwi_key_t key;
|
||||
|
@ -52,6 +54,7 @@ struct od_server {
|
|||
|
||||
/* allocated prepared statements ids */
|
||||
od_hashmap_t *prep_stmts;
|
||||
int sync_point;
|
||||
|
||||
od_global_t *global;
|
||||
int offline;
|
||||
|
@ -78,6 +81,8 @@ static inline void od_server_init(od_server_t *server, int reserve_prep_stmts)
|
|||
server->deploy_sync = 0;
|
||||
server->sync_request = 0;
|
||||
server->sync_reply = 0;
|
||||
server->sync_point = 0;
|
||||
server->parse_msg = NULL;
|
||||
server->init_time_us = machine_time_us();
|
||||
server->error_connect = NULL;
|
||||
server->offline = 0;
|
||||
|
@ -140,8 +145,14 @@ static inline int od_server_in_deploy(od_server_t *server)
|
|||
return server->deploy_sync > 0;
|
||||
}
|
||||
|
||||
static inline int od_server_in_sync_point(od_server_t *server)
|
||||
{
|
||||
return server->sync_point > 0;
|
||||
}
|
||||
|
||||
static inline int od_server_synchronized(od_server_t *server)
|
||||
{
|
||||
assert(server->sync_request >= server->sync_reply);
|
||||
return server->sync_request == server->sync_reply;
|
||||
}
|
||||
|
||||
|
|
|
@ -11,6 +11,7 @@ typedef enum {
|
|||
OD_UNDEF,
|
||||
OD_OK,
|
||||
OD_SKIP,
|
||||
OD_REQ_SYNC,
|
||||
OD_ATTACH,
|
||||
OD_DETACH,
|
||||
OD_WAIT_SYNC,
|
||||
|
@ -37,6 +38,8 @@ static inline char *od_frontend_status_to_str(od_frontend_status_t status)
|
|||
return "OD_OK";
|
||||
case OD_SKIP:
|
||||
return "OD_SKIP";
|
||||
case OD_REQ_SYNC:
|
||||
return "OD_REQ_SYNC";
|
||||
case OD_ATTACH:
|
||||
return "OD_ATTACH";
|
||||
case OD_DETACH:
|
||||
|
|
|
@ -94,6 +94,11 @@ static inline void od_system_server(void *arg)
|
|||
continue;
|
||||
}
|
||||
od_id_generate(&client->id, "c");
|
||||
|
||||
od_dbg_printf_on_dvl_lvl(1, "client %s%.*s has relay %p\n",
|
||||
client->id.id_prefix,
|
||||
(signed)sizeof(client->id.id),
|
||||
client->id.id, &client->relay);
|
||||
rc = od_io_prepare(&client->io, client_io,
|
||||
instance->config.readahead);
|
||||
if (rc == -1) {
|
||||
|
|
Loading…
Reference in New Issue