From aab2588c4ab2f8fd67c6d1c6be3243504b028504 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Oliver=20Nem=C4=8Dek?= Date: Thu, 24 Mar 2022 15:49:17 +0100 Subject: [PATCH] Protect set of ready tasks by lock to avoid concurrent updates. (#1489) When there is no locking then there is a possibility that multiple threads manipulate with the same object at the same time. The issue is manifested as: ``` RuntimeError: Set changed size during iteration ``` See: https://github.com/celery/celery/issues/7162 --- kombu/asynchronous/hub.py | 17 ++++++++++++----- t/unit/asynchronous/test_hub.py | 17 +++++++++++++++++ 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/kombu/asynchronous/hub.py b/kombu/asynchronous/hub.py index b1f7e241..2d479db7 100644 --- a/kombu/asynchronous/hub.py +++ b/kombu/asynchronous/hub.py @@ -1,6 +1,7 @@ """Event loop implementation.""" import errno +import threading from contextlib import contextmanager from queue import Empty from time import sleep @@ -78,6 +79,7 @@ class Hub: self.on_tick = set() self.on_close = set() self._ready = set() + self._ready_lock = threading.Lock() self._running = False self._loop = None @@ -198,7 +200,8 @@ class Hub: def call_soon(self, callback, *args): if not isinstance(callback, Thenable): callback = promise(callback, args) - self._ready.add(callback) + with self._ready_lock: + self._ready.add(callback) return callback def call_later(self, delay, callback, *args): @@ -242,6 +245,12 @@ class Hub: except (AttributeError, KeyError, OSError): pass + def _pop_ready(self): + with self._ready_lock: + ready = self._ready + self._ready = set() + return ready + def close(self, *args): [self._unregister(fd) for fd in self.readers] self.readers.clear() @@ -257,8 +266,7 @@ class Hub: # To avoid infinite loop where one of the callables adds items # to self._ready (via call_soon or otherwise). # we create new list with current self._ready - todos = list(self._ready) - self._ready = set() + todos = self._pop_ready() for item in todos: item() @@ -288,8 +296,7 @@ class Hub: propagate = self.propagate_errors while 1: - todo = self._ready - self._ready = set() + todo = self._pop_ready() for tick_callback in on_tick: tick_callback() diff --git a/t/unit/asynchronous/test_hub.py b/t/unit/asynchronous/test_hub.py index eae25357..0cf14194 100644 --- a/t/unit/asynchronous/test_hub.py +++ b/t/unit/asynchronous/test_hub.py @@ -187,6 +187,12 @@ class test_Hub: assert promise() in self.hub._ready assert ret is promise() + def test_call_soon_uses_lock(self): + callback = Mock(name='callback') + with patch.object(self.hub, '_ready_lock', autospec=True) as lock: + self.hub.call_soon(callback) + assert lock.__enter__.called_once() + def test_call_soon__promise_argument(self): callback = promise(Mock(name='callback'), (1, 2, 3)) ret = self.hub.call_soon(callback) @@ -533,3 +539,14 @@ class test_Hub: callbacks[0].assert_called_once_with() callbacks[1].assert_called_once_with() deferred.assert_not_called() + + def test__pop_ready_pops_ready_items(self): + self.hub._ready.add(None) + ret = self.hub._pop_ready() + assert ret == {None} + assert self.hub._ready == set() + + def test__pop_ready_uses_lock(self): + with patch.object(self.hub, '_ready_lock', autospec=True) as lock: + self.hub._pop_ready() + assert lock.__enter__.called_once()