diff --git a/kombu/asynchronous/hub.py b/kombu/asynchronous/hub.py index e8091746..a8c3124f 100644 --- a/kombu/asynchronous/hub.py +++ b/kombu/asynchronous/hub.py @@ -3,7 +3,6 @@ from __future__ import absolute_import, unicode_literals import errno -import itertools from contextlib import contextmanager from time import sleep from types import GeneratorType as generator # noqa @@ -279,24 +278,15 @@ class Hub(object): consolidate_callback = self.consolidate_callback on_tick = self.on_tick propagate = self.propagate_errors - todo = self._ready while 1: + todo = self._ready + self._ready = set() + for tick_callback in on_tick: tick_callback() - # To avoid infinite loop where one of the callables adds items - # to self._ready (via call_soon or otherwise), we take pop only - # N items from the ready set. - # N represents the current number of items on the set. - # That way if a todo adds another one to the ready set, - # we will break early and allow execution of readers and writers. - current_todos = len(todo) - for _ in itertools.repeat(None, current_todos): - if not todo: - break - - item = todo.pop() + for item in todo: if item: item() diff --git a/t/unit/asynchronous/test_hub.py b/t/unit/asynchronous/test_hub.py index 40b3ce5d..6659a163 100644 --- a/t/unit/asynchronous/test_hub.py +++ b/t/unit/asynchronous/test_hub.py @@ -508,30 +508,27 @@ class test_Hub: assert list(hub.scheduler), [1, 2 == 3] def test_loop__tick_callbacks(self): - self.hub._ready = Mock(name='_ready') - self.hub._ready.__len__ = Mock(name="_ready.__len__") - self.hub._ready.__len__.side_effect = RuntimeError() ticks = [Mock(name='cb1'), Mock(name='cb2')] self.hub.on_tick = list(ticks) - with pytest.raises(RuntimeError): - next(self.hub.loop) + next(self.hub.loop) ticks[0].assert_called_once_with() ticks[1].assert_called_once_with() def test_loop__todo(self): - self.hub.fire_timers = Mock(name='fire_timers') - self.hub.fire_timers.side_effect = RuntimeError() - self.hub.timer = Mock(name='timer') + deferred = Mock(name='cb_deferred') - callbacks = [Mock(name='cb1'), Mock(name='cb2')] + def defer(): + self.hub.call_soon(deferred) + + callbacks = [Mock(name='cb1', wraps=defer), Mock(name='cb2')] for cb in callbacks: self.hub.call_soon(cb) self.hub._ready.add(None) - with pytest.raises(RuntimeError): - next(self.hub.loop) + next(self.hub.loop) callbacks[0].assert_called_once_with() callbacks[1].assert_called_once_with() + deferred.assert_not_called()