diff --git a/sources/atomic.h b/sources/atomic.h index ca2c721f..506d1226 100644 --- a/sources/atomic.h +++ b/sources/atomic.h @@ -75,4 +75,16 @@ static inline uint64_t od_atomic_u64_sub(od_atomic_u64_t *atomic, return __sync_sub_and_fetch(atomic, value); } +static inline uint32_t od_atomic_u32_cas(od_atomic_u32_t *atomic, + uint32_t compValue, uint32_t exchValue) +{ + return __sync_val_compare_and_swap(atomic, compValue, exchValue); +} + +static inline uint64_t od_atomic_u64_cas(od_atomic_u64_t *atomic, + uint64_t compValue, uint64_t exchValue) +{ + return __sync_val_compare_and_swap(atomic, compValue, exchValue); +} + #endif /* ODYSSEY_ATOMIC_H */ diff --git a/sources/system.c b/sources/system.c index 035e5e2e..a52e30d3 100644 --- a/sources/system.c +++ b/sources/system.c @@ -500,7 +500,7 @@ static inline void od_system(void *arg) /* start worker threads */ od_worker_pool_t *worker_pool = system->global->worker_pool; rc = od_worker_pool_start(worker_pool, system->global, - instance->config.workers); + (uint32_t)instance->config.workers); if (rc == -1) return; diff --git a/sources/worker_pool.h b/sources/worker_pool.h index c88f243a..651a3940 100644 --- a/sources/worker_pool.h +++ b/sources/worker_pool.h @@ -13,8 +13,8 @@ typedef struct od_worker_pool od_worker_pool_t; struct od_worker_pool { od_worker_t *pool; - int round_robin; - int count; + od_atomic_u32_t round_robin; + uint32_t count; }; static inline void od_worker_pool_init(od_worker_pool_t *pool) @@ -24,28 +24,29 @@ static inline void od_worker_pool_init(od_worker_pool_t *pool) pool->pool = NULL; } -static inline int od_worker_pool_start(od_worker_pool_t *pool, - od_global_t *global, int count) +static inline od_retcode_t od_worker_pool_start(od_worker_pool_t *pool, + od_global_t *global, + uint32_t count) { pool->pool = malloc(sizeof(od_worker_t) * count); if (pool->pool == NULL) return -1; pool->count = count; - int i; + uint32_t i; for (i = 0; i < count; i++) { od_worker_t *worker = &pool->pool[i]; od_worker_init(worker, global, i); int rc; rc = od_worker_start(worker); if (rc == -1) - return -1; + return NOT_OK_RESPONSE; } return 0; } static inline void od_worker_pool_stop(od_worker_pool_t *pool) { - for (int i = 0; i < pool->count; i++) { + for (uint32_t i = 0; i < pool->count; i++) { od_worker_t *worker = &pool->pool[i]; machine_stop(worker->machine); } @@ -63,7 +64,7 @@ od_worker_pool_wait_gracefully_shutdown(od_worker_pool_t *pool) // waiting // // No new TLS handshakes should be initiated, so, just wait a bit. // machine_sleep(1); - for (int i = 0; i < pool->count; i++) { + for (uint32_t i = 0; i < pool->count; i++) { od_worker_t *worker = &pool->pool[i]; int rc = machine_wait(worker->machine); if (rc != MM_OK_RETCODE) @@ -74,12 +75,17 @@ od_worker_pool_wait_gracefully_shutdown(od_worker_pool_t *pool) static inline void od_worker_pool_feed(od_worker_pool_t *pool, machine_msg_t *msg) { - int next = pool->round_robin; - if (pool->round_robin >= pool->count) { - pool->round_robin = 0; - next = 0; + uint32_t next; + uint32_t oldValue; + + while (1) { + oldValue = od_atomic_u32_of(&pool->round_robin); + next = oldValue + 1 == pool->count ? 0 : oldValue; + + if (od_atomic_u32_cas(&pool->round_robin, oldValue, next) == + oldValue) + break; } - pool->round_robin++; od_worker_t *worker; worker = &pool->pool[next];