mirror of https://github.com/celery/kombu.git
Protect set of ready tasks by lock to avoid concurrent updates. (#1489)
When there is no locking then there is a possibility that multiple threads manipulate with the same object at the same time. The issue is manifested as: ``` RuntimeError: Set changed size during iteration ``` See: https://github.com/celery/celery/issues/7162
This commit is contained in:
parent
0282e1419f
commit
aab2588c4a
|
@ -1,6 +1,7 @@
|
|||
"""Event loop implementation."""
|
||||
|
||||
import errno
|
||||
import threading
|
||||
from contextlib import contextmanager
|
||||
from queue import Empty
|
||||
from time import sleep
|
||||
|
@ -78,6 +79,7 @@ class Hub:
|
|||
self.on_tick = set()
|
||||
self.on_close = set()
|
||||
self._ready = set()
|
||||
self._ready_lock = threading.Lock()
|
||||
|
||||
self._running = False
|
||||
self._loop = None
|
||||
|
@ -198,7 +200,8 @@ class Hub:
|
|||
def call_soon(self, callback, *args):
|
||||
if not isinstance(callback, Thenable):
|
||||
callback = promise(callback, args)
|
||||
self._ready.add(callback)
|
||||
with self._ready_lock:
|
||||
self._ready.add(callback)
|
||||
return callback
|
||||
|
||||
def call_later(self, delay, callback, *args):
|
||||
|
@ -242,6 +245,12 @@ class Hub:
|
|||
except (AttributeError, KeyError, OSError):
|
||||
pass
|
||||
|
||||
def _pop_ready(self):
|
||||
with self._ready_lock:
|
||||
ready = self._ready
|
||||
self._ready = set()
|
||||
return ready
|
||||
|
||||
def close(self, *args):
|
||||
[self._unregister(fd) for fd in self.readers]
|
||||
self.readers.clear()
|
||||
|
@ -257,8 +266,7 @@ class Hub:
|
|||
# To avoid infinite loop where one of the callables adds items
|
||||
# to self._ready (via call_soon or otherwise).
|
||||
# we create new list with current self._ready
|
||||
todos = list(self._ready)
|
||||
self._ready = set()
|
||||
todos = self._pop_ready()
|
||||
for item in todos:
|
||||
item()
|
||||
|
||||
|
@ -288,8 +296,7 @@ class Hub:
|
|||
propagate = self.propagate_errors
|
||||
|
||||
while 1:
|
||||
todo = self._ready
|
||||
self._ready = set()
|
||||
todo = self._pop_ready()
|
||||
|
||||
for tick_callback in on_tick:
|
||||
tick_callback()
|
||||
|
|
|
@ -187,6 +187,12 @@ class test_Hub:
|
|||
assert promise() in self.hub._ready
|
||||
assert ret is promise()
|
||||
|
||||
def test_call_soon_uses_lock(self):
|
||||
callback = Mock(name='callback')
|
||||
with patch.object(self.hub, '_ready_lock', autospec=True) as lock:
|
||||
self.hub.call_soon(callback)
|
||||
assert lock.__enter__.called_once()
|
||||
|
||||
def test_call_soon__promise_argument(self):
|
||||
callback = promise(Mock(name='callback'), (1, 2, 3))
|
||||
ret = self.hub.call_soon(callback)
|
||||
|
@ -533,3 +539,14 @@ class test_Hub:
|
|||
callbacks[0].assert_called_once_with()
|
||||
callbacks[1].assert_called_once_with()
|
||||
deferred.assert_not_called()
|
||||
|
||||
def test__pop_ready_pops_ready_items(self):
|
||||
self.hub._ready.add(None)
|
||||
ret = self.hub._pop_ready()
|
||||
assert ret == {None}
|
||||
assert self.hub._ready == set()
|
||||
|
||||
def test__pop_ready_uses_lock(self):
|
||||
with patch.object(self.hub, '_ready_lock', autospec=True) as lock:
|
||||
self.hub._pop_ready()
|
||||
assert lock.__enter__.called_once()
|
||||
|
|
Loading…
Reference in New Issue