From ff5b6563108d26fa12967bef249c054f9fe5b69b Mon Sep 17 00:00:00 2001 From: reshke Date: Tue, 5 Mar 2024 13:29:11 +0500 Subject: [PATCH] Sync relay extended proto messages in tx pool. (#586) * Fix relay in tx pool + prep stmt try 2 * fmt * fix --- docker/prep_stmts/pstmts.conf | 2 +- sources/backend.c | 10 +- sources/client.h | 2 +- sources/frontend.c | 330 ++++++++++++++++++++++++---------- sources/relay.h | 33 +++- sources/router.c | 9 + sources/server.h | 11 ++ sources/status.h | 3 + sources/system.c | 5 + 9 files changed, 299 insertions(+), 106 deletions(-) diff --git a/docker/prep_stmts/pstmts.conf b/docker/prep_stmts/pstmts.conf index c38f404c..95752440 100644 --- a/docker/prep_stmts/pstmts.conf +++ b/docker/prep_stmts/pstmts.conf @@ -60,7 +60,7 @@ database "db1" { pool_timeout 0 pool_ttl 60 pool_discard no - pool_smart_discard yes + pool_smart_discard no pool_cancel yes pool_rollback yes pool_reserve_prepared_statement yes diff --git a/sources/backend.c b/sources/backend.c index 4f570eb1..e4555bbc 100644 --- a/sources/backend.c +++ b/sources/backend.c @@ -100,6 +100,7 @@ int od_backend_ready(od_server_t *server, char *data, uint32_t size) } /* update server sync reply state */ + od_server_sync_reply(server); return 0; } @@ -171,6 +172,7 @@ static inline int od_backend_startup(od_server_t *server, /* update request count and sync state */ od_server_sync_request(server, 1); + assert(server->client); while (1) { msg = od_read(&server->io, UINT32_MAX); @@ -702,7 +704,7 @@ int od_backend_ready_wait(od_server_t *server, char *context, int count, int query_rc; query_rc = 0; - for (;;) { + for (; !od_server_synchronized(server);) { machine_msg_t *msg; msg = od_read(&server->io, time_ms); if (msg == NULL) { @@ -741,13 +743,12 @@ int od_backend_ready_wait(od_server_t *server, char *context, int count, machine_msg_size(msg)); machine_msg_free(msg); ready++; - if (ready == count) { - return query_rc; - } } else { machine_msg_free(msg); } } + + return query_rc; /* never reached */ } @@ -777,6 +778,7 @@ od_retcode_t od_backend_query_send(od_server_t *server, char *context, /* update server sync state */ od_server_sync_request(server, 1); + assert(server->client); return OK_RESPONSE; } diff --git a/sources/client.h b/sources/client.h index 7735f68e..1252d3b7 100644 --- a/sources/client.h +++ b/sources/client.h @@ -54,7 +54,7 @@ struct od_client { void *route; char peer[OD_CLIENT_MAX_PEERLEN]; - // desc preparet statements ids + /* desc preparet statements ids */ od_hashmap_t *prep_stmt_ids; /* passwd from config rule */ diff --git a/sources/frontend.c b/sources/frontend.c index c6783224..7bba5a60 100644 --- a/sources/frontend.c +++ b/sources/frontend.c @@ -229,6 +229,8 @@ od_frontend_attach(od_client_t *client, char *context, server->id.id_prefix, (int)sizeof(server->id.id_prefix), server->id.id); + assert(od_server_synchronized(server)); + /* connect to server, if necessary */ if (server->io.io) { return OD_OK; @@ -279,7 +281,7 @@ od_frontend_attach_and_deploy(od_client_t *client, char *context) return OD_ESERVER_WRITE; /* set number of replies to discard */ - client->server->deploy_sync = rc; + server->deploy_sync = rc; od_server_sync_request(server, server->deploy_sync); return OD_OK; @@ -726,6 +728,9 @@ static od_frontend_status_t od_frontend_remote_server(od_relay_t *relay, od_server_t *server = client->server; od_route_t *route = client->route; od_instance_t *instance = client->global->instance; + od_frontend_status_t retstatus; + + retstatus = OD_OK; kiwi_be_type_t type = *data; if (instance->config.log_debug) @@ -764,6 +769,11 @@ static od_frontend_status_t od_frontend_remote_server(od_relay_t *relay, is_ready_for_query = 1; od_backend_ready(server, data, size); + /* exactly one RFQ! */ + if (od_server_in_sync_point(server)) { + retstatus = OD_SKIP; + } + if (is_deploy) server->deploy_sync--; @@ -786,14 +796,14 @@ static od_frontend_status_t od_frontend_remote_server(od_relay_t *relay, case KIWI_BE_PARSE_COMPLETE: if (route->rule->pool->reserve_prepared_statement) { // skip msg - is_deploy = 1; + retstatus = OD_SKIP; } default: break; } /* discard replies during configuration deploy */ - if (is_deploy) + if (is_deploy || retstatus == OD_SKIP) return OD_SKIP; if (route->id.physical_rep || route->id.logical_rep) { @@ -803,7 +813,8 @@ static od_frontend_status_t od_frontend_remote_server(od_relay_t *relay, return OD_DETACH; } } else { - if (is_ready_for_query && od_server_synchronized(server)) { + if (is_ready_for_query && od_server_synchronized(server) && + server->parse_msg == NULL) { switch (route->rule->pool->pool) { case OD_RULE_POOL_STATEMENT: return OD_DETACH; @@ -822,7 +833,7 @@ static od_frontend_status_t od_frontend_remote_server(od_relay_t *relay, } } - return OD_OK; + return retstatus; } static inline od_retcode_t od_frontend_log_query(od_instance_t *instance, @@ -973,6 +984,79 @@ static inline machine_msg_t *od_frontend_rewrite_msg(char *data, int size, return msg; } +static od_frontend_status_t od_frontend_deploy_prepared_stmt( + od_server_t *server, od_relay_t *relay, char *ctx, + machine_msg_t *msg /* to adcance or to write? */ +) +{ + od_route_t *route = server->route; + od_instance_t *instance = server->global->instance; + od_client_t *client = server->client; + char *data = machine_msg_data(msg); + int size = machine_msg_size(msg); + + od_hash_t body_hash = od_murmur_hash(data, size); + + od_hashmap_elt_t desc; + desc.data = data; + desc.len = size; + + od_debug(&instance->logger, ctx, client, server, + "statement: %.*s, hash: %08x", desc.len, desc.data, body_hash); + + char opname[OD_HASH_LEN]; + od_snprintf(opname, OD_HASH_LEN, "%08x", body_hash); + + int refcnt = 0; + od_hashmap_elt_t value; + value.data = &refcnt; + value.len = sizeof(int); + od_hashmap_elt_t *value_ptr = &value; + + // send parse msg if needed + if (od_hashmap_insert(server->prep_stmts, body_hash, &desc, + &value_ptr) == 0) { + od_debug(&instance->logger, ctx, client, server, + "deploy %.*s operator %s to server", desc.len, + desc.data, opname); + // rewrite msg + // allocate prepered statement under name equal to body hash + + od_stat_parse(&route->stats); + + machine_msg_t *pmsg; + pmsg = kiwi_fe_write_parse_description( + NULL, opname, OD_HASH_LEN, desc.data, desc.len); + if (pmsg == NULL) { + return OD_ESERVER_WRITE; + } + + if (instance->config.log_query || route->rule->log_query) { + od_frontend_log_parse(instance, client, "rewrite parse", + machine_msg_data(pmsg), + machine_msg_size(pmsg)); + } + + od_stat_parse(&route->stats); + // msg deallocated here + od_dbg_printf_on_dvl_lvl(1, "relay %p write msg %c\n", relay, + *(char *)machine_msg_data(pmsg)); + + od_write(&server->io, pmsg); + // advance? + // machine_iov_add(relay->iov, pmsg); + + return OD_OK; + } else { + int *refcnt; + refcnt = value_ptr->data; + *refcnt = 1 + *refcnt; + + od_stat_parse_reuse(&route->stats); + return OD_OK; + } +} + static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, char *data, int size) { @@ -990,14 +1074,13 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, configuration */ od_server_t *server = client->server; assert(server != NULL); + assert(server->parse_msg == NULL); if (instance->config.log_debug) od_debug(&instance->logger, "remote client", client, server, "%s", kiwi_fe_type_to_string(type)); od_frontend_status_t retstatus = OD_OK; - machine_msg_t *msg; - msg = NULL; bool forwarded = 0; switch (type) { case KIWI_FE_COPY_DONE: @@ -1031,16 +1114,11 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, return OD_ECLIENT_READ; } if (type == KIWI_FE_DESCRIBE_PORTAL) { - break; // skip this, we obly need to rewrite statement + break; // skip this, we only need to rewrite statement } assert(client->prep_stmt_ids); retstatus = OD_SKIP; - int opname_start_offset = - kiwi_be_describe_opname_offset(data, size); - if (opname_start_offset < 0) { - return OD_ECLIENT_READ; - } od_hashmap_elt_t key; key.len = operator_name_len; @@ -1088,10 +1166,11 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, // rewrite msg // allocate prepered statement under name equal to body hash - msg = kiwi_fe_write_parse_description( + machine_msg_t *pmsg; + pmsg = kiwi_fe_write_parse_description( NULL, opname, OD_HASH_LEN, desc->data, desc->len); - if (msg == NULL) { + if (pmsg == NULL) { return OD_ESERVER_WRITE; } @@ -1100,20 +1179,17 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, od_frontend_log_parse( instance, client, "rewrite parse", - machine_msg_data(msg), - machine_msg_size(msg)); + machine_msg_data(pmsg), + machine_msg_size(pmsg)); } od_stat_parse(&route->stats); // msg deallocated here - rc = od_write(&server->io, msg); - if (rc == -1) { - od_error(&instance->logger, "describe", - NULL, server, - "write error: %s", - od_io_error(&server->io)); - return OD_ESERVER_WRITE; - } + + machine_iov_add(relay->iov, pmsg); + od_dbg_printf_on_dvl_lvl( + 1, "client relay %p advance msg %c\n", + relay, *(char *)machine_msg_data(pmsg)); } else { int *refcnt; @@ -1123,6 +1199,7 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, od_stat_parse_reuse(&route->stats); } + machine_msg_t *msg; msg = kiwi_fe_write_describe(NULL, 'S', opname, OD_HASH_LEN); @@ -1138,14 +1215,11 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, } // msg if deallocated automaictly - rc = od_write(&server->io, msg); + machine_iov_add(relay->iov, msg); + od_dbg_printf_on_dvl_lvl( + 1, "client relay %p advance msg %c\n", relay, + *(char *)machine_msg_data(msg)); forwarded = 1; - if (rc == -1) { - od_error(&instance->logger, "describe", NULL, - server, "write error: %s", - od_io_error(&server->io)); - return OD_ESERVER_WRITE; - } } break; case KIWI_FE_PARSE: @@ -1155,7 +1229,7 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, if (route->rule->pool->reserve_prepared_statement) { /* skip client parse msg */ - retstatus = OD_SKIP; + retstatus = OD_REQ_SYNC; kiwi_prepared_statement_t desc; int rc; rc = kiwi_be_read_parse_dest(data, size, &desc); @@ -1180,17 +1254,13 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, od_hashmap_elt_t *value_ptr = &value; - int opname_start_offset = - kiwi_be_parse_opname_offset(data, size); - if (opname_start_offset < 0) { - return OD_ECLIENT_READ; + server->parse_msg = + machine_msg_create(desc.description_len); + if (server->parse_msg == NULL) { + return OD_ESERVER_WRITE; } - - od_hash_t body_hash = - od_murmur_hash(data + opname_start_offset + - desc.operator_name_len, - size - opname_start_offset - - desc.operator_name_len); + memcpy(machine_msg_data(server->parse_msg), + desc.description, desc.description_len); assert(client->prep_stmt_ids); if (od_hashmap_insert(client->prep_stmt_ids, keyhash, @@ -1205,21 +1275,6 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, return OD_ESERVER_WRITE; } } - - machine_msg_t *pmsg; - pmsg = kiwi_be_write_parse_complete(NULL); - if (pmsg == NULL) { - return OD_ESERVER_WRITE; - } - rc = od_write(&client->io, pmsg); - forwarded = 1; - - if (rc == -1) { - od_error(&instance->logger, "parse", client, - NULL, "write error: %s", - od_io_error(&client->io)); - return OD_ESERVER_WRITE; - } } break; case KIWI_FE_BIND: @@ -1240,12 +1295,6 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, return OD_ECLIENT_READ; } - int opname_start_offset = - kiwi_be_bind_opname_offset(data, size); - if (opname_start_offset < 0) { - return OD_ECLIENT_READ; - } - od_hashmap_elt_t key; key.len = operator_name_len; key.data = operator_name; @@ -1291,12 +1340,12 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, desc->len, desc->data, keyhash); // rewrite msg // allocate prepered statement under name equal to body hash - - msg = kiwi_fe_write_parse_description( + machine_msg_t *pmsg; + pmsg = kiwi_fe_write_parse_description( NULL, opname, OD_HASH_LEN, desc->data, desc->len); - if (msg == NULL) { + if (pmsg == NULL) { return OD_ESERVER_WRITE; } @@ -1305,25 +1354,30 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, od_frontend_log_parse( instance, client, "rewrite parse", - machine_msg_data(msg), - machine_msg_size(msg)); + machine_msg_data(pmsg), + machine_msg_size(pmsg)); } od_stat_parse(&route->stats); - rc = od_write(&server->io, msg); - if (rc == -1) { - od_error(&instance->logger, - "rewrite parse", NULL, server, - "write error: %s", - od_io_error(&server->io)); - return OD_ESERVER_WRITE; - } + machine_iov_add(relay->iov, pmsg); + + od_dbg_printf_on_dvl_lvl( + 1, "client relay %p advance msg %c\n", + relay, *(char *)machine_msg_data(pmsg)); + } else { int *refcnt = value_ptr->data; *refcnt = 1 + *refcnt; od_stat_parse_reuse(&route->stats); } + int opname_start_offset = + kiwi_be_bind_opname_offset(data, size); + if (opname_start_offset < 0) { + return OD_ECLIENT_READ; + } + + machine_msg_t *msg; msg = od_frontend_rewrite_msg(data, size, opname_start_offset, operator_name_len, @@ -1341,15 +1395,12 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, machine_msg_size(msg)); } - rc = od_write(&server->io, msg); - forwarded = 1; + machine_iov_add(relay->iov, msg); - if (rc == -1) { - od_error(&instance->logger, "rewrite bind", - NULL, server, "write error: %s", - od_io_error(&server->io)); - return OD_ESERVER_WRITE; - } + od_dbg_printf_on_dvl_lvl( + 1, "client relay %p advance msg %c\n", relay, + *(char *)machine_msg_data(msg)); + forwarded = 1; } break; case KIWI_FE_EXECUTE: @@ -1418,12 +1469,6 @@ static od_frontend_status_t od_frontend_remote_client(od_relay_t *relay, } /* If the retstatus is not SKIP */ - if (route->rule->pool->reserve_prepared_statement && forwarded != 1 && - msg != NULL) { - msg = kiwi_fe_copy_msg(msg, data, size); - od_write(&server->io, msg); - retstatus = OD_SKIP; - } /* update server stats */ od_stat_query_start(&server->stats_state); return retstatus; @@ -1500,9 +1545,10 @@ static inline od_frontend_status_t od_frontend_poll_catchup(od_client_t *client, } static inline od_frontend_status_t -od_frontend_remote_process_server(od_server_t *server, od_client_t *client) +od_frontend_remote_process_server(od_server_t *server, od_client_t *client, + bool await_read) { - od_frontend_status_t status = od_relay_step(&server->relay); + od_frontend_status_t status = od_relay_step(&server->relay, await_read); int rc; od_instance_t *instance = client->global->instance; @@ -1658,9 +1704,10 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client) break; server = client->server; + bool sync_req = 0; /* attach */ - status = od_relay_step(&client->relay); + status = od_relay_step(&client->relay, false); if (status == OD_ATTACH) { /* Check for replication lag and reject query if too big */ od_frontend_status_t catchup_status = @@ -1689,6 +1736,8 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client) /* retry read operation after attach */ continue; + } else if (status == OD_REQ_SYNC) { + sync_req = 1; } else if (status != OD_OK) { break; } @@ -1696,7 +1745,97 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client) if (server == NULL) continue; - status = od_frontend_remote_process_server(server, client); + status = od_frontend_remote_process_server(server, client, + false); + if (status != OD_OK) { + /* should not return this here */ + assert(status != OD_REQ_SYNC); + break; + } + + // are we requested to meet sync point? + + if (sync_req) { + od_log(&instance->logger, "sync-point", client, server, + "process, %d", od_server_synchronized(server)); + + while (1) { + if (od_server_synchronized(server)) { + break; + } + // await here + + od_log(&instance->logger, "sync-point", client, + server, "process await"); + status = od_frontend_remote_process_server( + server, client, true); + + if (status != OD_OK) { + break; + } + } + + if (status != OD_OK) { + break; + } + + // deploy here + + assert(server->parse_msg != NULL); + + /* fill internals structs in */ + if (od_frontend_deploy_prepared_stmt( + server, &server->relay, "sync-point", + server->parse_msg) != OD_OK) { + status = OD_ESERVER_WRITE; + break; + } + server->parse_msg = NULL; + + machine_msg_t *msg; + msg = kiwi_fe_write_sync(NULL); + if (msg == NULL) { + status = OD_ESERVER_WRITE; + break; + } + rc = od_write(&server->io, msg); + if (rc == -1) { + status = OD_ESERVER_WRITE; + break; + } + + /* enter sync piont mode */ + server->sync_point = 1; + od_server_sync_request(server, 1); + + while (1) { + if (od_server_synchronized(server)) { + break; + } + // await here + + od_log(&instance->logger, "sync-point", client, + server, "process await"); + status = od_frontend_remote_process_server( + server, client, true); + + if (status != OD_OK) { + break; + } + } + + server->sync_point = 0; + if (status != OD_OK) { + break; + } + + machine_msg_t *pmsg; + pmsg = kiwi_be_write_parse_complete(NULL); + if (pmsg == NULL) { + return OD_ECLIENT_WRITE; + } + machine_iov_add(server->relay.iov, pmsg); + } if (status != OD_OK) { break; } @@ -1863,6 +2002,7 @@ static void od_frontend_cleanup(od_client_t *client, char *context, break; case OD_UNDEF: case OD_SKIP: + case OD_REQ_SYNC: case OD_ATTACH: /* fallthrough */ case OD_DETACH: diff --git a/sources/relay.h b/sources/relay.h index 61c66b1c..7bee8243 100644 --- a/sources/relay.h +++ b/sources/relay.h @@ -177,6 +177,8 @@ static inline od_frontend_status_t od_relay_on_packet_msg(od_relay_t *relay, case OD_SKIP: status = OD_OK; /* fallthrough */ + case OD_REQ_SYNC: + /* fallthrough */ default: machine_msg_free(msg); break; @@ -196,9 +198,16 @@ static inline od_frontend_status_t od_relay_on_packet(od_relay_t *relay, /* fallthrough */ case OD_DETACH: rc = machine_iov_add_pointer(relay->iov, data, size); + + od_dbg_printf_on_dvl_lvl(1, "relay %p advance msg %c\n", relay, + *data); if (rc == -1) return OD_EOOM; break; + case OD_REQ_SYNC: + /* fallthrough */ + relay->packet_skip = 1; + break; case OD_SKIP: relay->packet_skip = 1; status = OD_OK; @@ -277,7 +286,10 @@ od_relay_process(od_relay_t *relay, int *progress, char *data, int size) if (relay->packet_skip) return OD_OK; + rc = machine_iov_add_pointer(relay->iov, data, to_parse); + + od_dbg_printf_on_dvl_lvl(1, "relay %p advance msg %c\n", relay, *data); if (rc == -1) return OD_EOOM; @@ -294,6 +306,9 @@ static inline od_frontend_status_t od_relay_pipeline(od_relay_t *relay) rc = od_relay_process(relay, &progress, current, end - current); current += progress; od_readahead_pos_read_advance(&relay->src->readahead, progress); + if (rc == OD_REQ_SYNC) { + return OD_REQ_SYNC; + } if (rc != OD_OK) { if (rc == OD_UNDEF) rc = OD_OK; @@ -354,11 +369,17 @@ static inline od_frontend_status_t od_relay_write(od_relay_t *relay) return OD_OK; } -static inline od_frontend_status_t od_relay_step(od_relay_t *relay) +static inline od_frontend_status_t od_relay_step(od_relay_t *relay, + bool await_read) { /* on read event */ + od_frontend_status_t retstatus; + retstatus = OD_OK; int rc; - if (machine_cond_try(relay->src->on_read)) { + rc = await_read ? + (machine_cond_wait(relay->src->on_read, UINT32_MAX) == 0) : + machine_cond_try(relay->src->on_read); + if (rc || od_relay_data_pending(relay)) { if (relay->dst == NULL) { /* signal to retry on read logic */ machine_cond_signal(relay->src->on_read); @@ -371,7 +392,9 @@ static inline od_frontend_status_t od_relay_step(od_relay_t *relay) rc = od_relay_pipeline(relay); - if (rc != OD_OK) + if (rc == OD_REQ_SYNC) { + retstatus = OD_REQ_SYNC; + } else if (rc != OD_OK) return rc; if (machine_iov_pending(relay->iov)) { @@ -383,7 +406,7 @@ static inline od_frontend_status_t od_relay_step(od_relay_t *relay) } if (relay->dst == NULL) - return OD_OK; + return retstatus; /* on write event */ if (machine_cond_try(relay->dst->on_write)) { @@ -408,7 +431,7 @@ static inline od_frontend_status_t od_relay_step(od_relay_t *relay) } } - return OD_OK; + return retstatus; } static inline od_frontend_status_t od_relay_flush(od_relay_t *relay) diff --git a/sources/router.c b/sources/router.c index 2f32dd7e..3b1a0b6e 100644 --- a/sources/router.c +++ b/sources/router.c @@ -658,6 +658,10 @@ od_router_status_t od_router_attach(od_router_t *router, od_client_t *client, if (server == NULL) return OD_ROUTER_ERROR; od_id_generate(&server->id, "s"); + od_dbg_printf_on_dvl_lvl(1, "server %s%.*s has relay %p\n", + server->id.id_prefix, + (signed)sizeof(server->id.id), server->id.id, + &server->relay); server->global = client->global; server->route = route; @@ -672,6 +676,8 @@ attach: server->idle_time = 0; server->key_client = client->key; + assert(od_server_synchronized(server)); + /* * XXX: this logic breaks some external solutions that use * PostgreSQL logical replication. Need to tests this and fix @@ -706,6 +712,9 @@ void od_router_detach(od_router_t *router, od_client_t *client) /* detach from current machine event loop */ od_server_t *server = client->server; + + assert(server != NULL); + assert(od_server_synchronized(server)); od_io_detach(&server->io); od_route_lock(route); diff --git a/sources/server.h b/sources/server.h index c5bc5123..86cfa053 100644 --- a/sources/server.h +++ b/sources/server.h @@ -35,6 +35,8 @@ struct od_server { uint64_t sync_request; uint64_t sync_reply; + /* to swallow some internal msgs */ + machine_msg_t *parse_msg; int idle_time; kiwi_key_t key; @@ -52,6 +54,7 @@ struct od_server { /* allocated prepared statements ids */ od_hashmap_t *prep_stmts; + int sync_point; od_global_t *global; int offline; @@ -78,6 +81,8 @@ static inline void od_server_init(od_server_t *server, int reserve_prep_stmts) server->deploy_sync = 0; server->sync_request = 0; server->sync_reply = 0; + server->sync_point = 0; + server->parse_msg = NULL; server->init_time_us = machine_time_us(); server->error_connect = NULL; server->offline = 0; @@ -140,8 +145,14 @@ static inline int od_server_in_deploy(od_server_t *server) return server->deploy_sync > 0; } +static inline int od_server_in_sync_point(od_server_t *server) +{ + return server->sync_point > 0; +} + static inline int od_server_synchronized(od_server_t *server) { + assert(server->sync_request >= server->sync_reply); return server->sync_request == server->sync_reply; } diff --git a/sources/status.h b/sources/status.h index ac4f29a6..4e404aaa 100644 --- a/sources/status.h +++ b/sources/status.h @@ -11,6 +11,7 @@ typedef enum { OD_UNDEF, OD_OK, OD_SKIP, + OD_REQ_SYNC, OD_ATTACH, OD_DETACH, OD_WAIT_SYNC, @@ -37,6 +38,8 @@ static inline char *od_frontend_status_to_str(od_frontend_status_t status) return "OD_OK"; case OD_SKIP: return "OD_SKIP"; + case OD_REQ_SYNC: + return "OD_REQ_SYNC"; case OD_ATTACH: return "OD_ATTACH"; case OD_DETACH: diff --git a/sources/system.c b/sources/system.c index 8c5cedec..8e81e883 100644 --- a/sources/system.c +++ b/sources/system.c @@ -94,6 +94,11 @@ static inline void od_system_server(void *arg) continue; } od_id_generate(&client->id, "c"); + + od_dbg_printf_on_dvl_lvl(1, "client %s%.*s has relay %p\n", + client->id.id_prefix, + (signed)sizeof(client->id.id), + client->id.id, &client->relay); rc = od_io_prepare(&client->io, client_io, instance->config.readahead); if (rc == -1) {