From f78a5f08c6973496deded69de2edc0e12f9c97ba Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 10 Aug 2019 23:40:36 +0000 Subject: [PATCH] issue #605: ansible: share a sem_t instead of a pthread_mutex_t The previous version quite reliably causes worker deadlocks within 10 minutes running: # 100 times: - import_playbook: integration/async/runner_one_job.yml # 100 times: - import_playbook: integration/module_utils/adjacent_to_playbook.yml via .ci/soak/mitogen.sh with PLAYBOOK= set to the above playbook. Attaching to the worker with gdb reveals it in an instruction immediately following a futex() call, which likely returned EINTR due to attaching gdb. Examining the pthread_mutex_t state reveals it to be completely unlocked. pthread_mutex_t on Linux should have zero trouble living in shmem, so it's not clear how this deadlock is happening. Meanwhile POSIX semaphores are explicitly designed for cross-process use and have a completely different internal implementation, so try those instead. 1 hour of soaking reveals no deadlock. This is about avoiding managing a lockable temporary file on disk to contain our counter, and somehow communicating a reference to it into subprocesses (despite the subprocess module closing inherited fds, etc), somehow deleting it reliably at exit, and somehow avoiding concurrent Ansible runs stepping on the same file. For now ctypes is still less pain. A final possibility would be to abandon a shared counter and instead pick a CPU based on the hash of e.g. the new child's process ID. That would likely balance equally well, and might be worth exploring when making this code work on BSD. --- ansible_mitogen/affinity.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/ansible_mitogen/affinity.py b/ansible_mitogen/affinity.py index 9eb6597a..67e16d8a 100644 --- a/ansible_mitogen/affinity.py +++ b/ansible_mitogen/affinity.py @@ -92,37 +92,37 @@ try: _libc = ctypes.CDLL(None, use_errno=True) _strerror = _libc.strerror _strerror.restype = ctypes.c_char_p - _pthread_mutex_init = _libc.pthread_mutex_init - _pthread_mutex_lock = _libc.pthread_mutex_lock - _pthread_mutex_unlock = _libc.pthread_mutex_unlock + _sem_init = _libc.sem_init + _sem_wait = _libc.sem_wait + _sem_post = _libc.sem_post _sched_setaffinity = _libc.sched_setaffinity except (OSError, AttributeError): _libc = None _strerror = None - _pthread_mutex_init = None - _pthread_mutex_lock = None - _pthread_mutex_unlock = None + _sem_init = None + _sem_wait = None + _sem_post = None _sched_setaffinity = None -class pthread_mutex_t(ctypes.Structure): +class sem_t(ctypes.Structure): """ - Wrap pthread_mutex_t to allow storing a lock in shared memory. + Wrap sem_t to allow storing a lock in shared memory. """ _fields_ = [ - ('data', ctypes.c_uint8 * 512), + ('data', ctypes.c_uint8 * 128), ] def init(self): - if _pthread_mutex_init(self.data, 0): + if _sem_init(self.data, 1, 1): raise Exception(_strerror(ctypes.get_errno())) def acquire(self): - if _pthread_mutex_lock(self.data): + if _sem_wait(self.data): raise Exception(_strerror(ctypes.get_errno())) def release(self): - if _pthread_mutex_unlock(self.data): + if _sem_post(self.data): raise Exception(_strerror(ctypes.get_errno())) @@ -133,7 +133,7 @@ class State(ctypes.Structure): the context of the new child process. """ _fields_ = [ - ('lock', pthread_mutex_t), + ('lock', sem_t), ('counter', ctypes.c_uint8), ]