mirror of https://github.com/yandex/odyssey.git
Fix undefined behaviour in od_worker_pool_feed function (#457)
Currently od_worker_pool_feed function uses non-atomic access to pool->round_robin variable. But od_worker_pool_feed function can be called multiple time simultaneously from od_system_server, as odyssey supports multiple listen addresses feauture, and no synchronization is performed between concurrent listen coroutines. This entails data race, which leads undefined behaviour in C/C++
This commit is contained in:
parent
1710d40246
commit
4041401816
|
@ -75,4 +75,16 @@ static inline uint64_t od_atomic_u64_sub(od_atomic_u64_t *atomic,
|
|||
return __sync_sub_and_fetch(atomic, value);
|
||||
}
|
||||
|
||||
static inline uint32_t od_atomic_u32_cas(od_atomic_u32_t *atomic,
|
||||
uint32_t compValue, uint32_t exchValue)
|
||||
{
|
||||
return __sync_val_compare_and_swap(atomic, compValue, exchValue);
|
||||
}
|
||||
|
||||
static inline uint64_t od_atomic_u64_cas(od_atomic_u64_t *atomic,
|
||||
uint64_t compValue, uint64_t exchValue)
|
||||
{
|
||||
return __sync_val_compare_and_swap(atomic, compValue, exchValue);
|
||||
}
|
||||
|
||||
#endif /* ODYSSEY_ATOMIC_H */
|
||||
|
|
|
@ -500,7 +500,7 @@ static inline void od_system(void *arg)
|
|||
/* start worker threads */
|
||||
od_worker_pool_t *worker_pool = system->global->worker_pool;
|
||||
rc = od_worker_pool_start(worker_pool, system->global,
|
||||
instance->config.workers);
|
||||
(uint32_t)instance->config.workers);
|
||||
if (rc == -1)
|
||||
return;
|
||||
|
||||
|
|
|
@ -13,8 +13,8 @@ typedef struct od_worker_pool od_worker_pool_t;
|
|||
|
||||
struct od_worker_pool {
|
||||
od_worker_t *pool;
|
||||
int round_robin;
|
||||
int count;
|
||||
od_atomic_u32_t round_robin;
|
||||
uint32_t count;
|
||||
};
|
||||
|
||||
static inline void od_worker_pool_init(od_worker_pool_t *pool)
|
||||
|
@ -24,28 +24,29 @@ static inline void od_worker_pool_init(od_worker_pool_t *pool)
|
|||
pool->pool = NULL;
|
||||
}
|
||||
|
||||
static inline int od_worker_pool_start(od_worker_pool_t *pool,
|
||||
od_global_t *global, int count)
|
||||
static inline od_retcode_t od_worker_pool_start(od_worker_pool_t *pool,
|
||||
od_global_t *global,
|
||||
uint32_t count)
|
||||
{
|
||||
pool->pool = malloc(sizeof(od_worker_t) * count);
|
||||
if (pool->pool == NULL)
|
||||
return -1;
|
||||
pool->count = count;
|
||||
int i;
|
||||
uint32_t i;
|
||||
for (i = 0; i < count; i++) {
|
||||
od_worker_t *worker = &pool->pool[i];
|
||||
od_worker_init(worker, global, i);
|
||||
int rc;
|
||||
rc = od_worker_start(worker);
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
return NOT_OK_RESPONSE;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline void od_worker_pool_stop(od_worker_pool_t *pool)
|
||||
{
|
||||
for (int i = 0; i < pool->count; i++) {
|
||||
for (uint32_t i = 0; i < pool->count; i++) {
|
||||
od_worker_t *worker = &pool->pool[i];
|
||||
machine_stop(worker->machine);
|
||||
}
|
||||
|
@ -63,7 +64,7 @@ od_worker_pool_wait_gracefully_shutdown(od_worker_pool_t *pool)
|
|||
// waiting
|
||||
// // No new TLS handshakes should be initiated, so, just wait a bit.
|
||||
// machine_sleep(1);
|
||||
for (int i = 0; i < pool->count; i++) {
|
||||
for (uint32_t i = 0; i < pool->count; i++) {
|
||||
od_worker_t *worker = &pool->pool[i];
|
||||
int rc = machine_wait(worker->machine);
|
||||
if (rc != MM_OK_RETCODE)
|
||||
|
@ -74,12 +75,17 @@ od_worker_pool_wait_gracefully_shutdown(od_worker_pool_t *pool)
|
|||
static inline void od_worker_pool_feed(od_worker_pool_t *pool,
|
||||
machine_msg_t *msg)
|
||||
{
|
||||
int next = pool->round_robin;
|
||||
if (pool->round_robin >= pool->count) {
|
||||
pool->round_robin = 0;
|
||||
next = 0;
|
||||
uint32_t next;
|
||||
uint32_t oldValue;
|
||||
|
||||
while (1) {
|
||||
oldValue = od_atomic_u32_of(&pool->round_robin);
|
||||
next = oldValue + 1 == pool->count ? 0 : oldValue;
|
||||
|
||||
if (od_atomic_u32_cas(&pool->round_robin, oldValue, next) ==
|
||||
oldValue)
|
||||
break;
|
||||
}
|
||||
pool->round_robin++;
|
||||
|
||||
od_worker_t *worker;
|
||||
worker = &pool->pool[next];
|
||||
|
|
Loading…
Reference in New Issue