mirror of https://github.com/celery/kombu.git
Fix infinity loop in create_loop (#923)
This commit is contained in:
parent
62087a67cf
commit
b3dc920883
|
@ -3,7 +3,6 @@
|
||||||
from __future__ import absolute_import, unicode_literals
|
from __future__ import absolute_import, unicode_literals
|
||||||
|
|
||||||
import errno
|
import errno
|
||||||
import itertools
|
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
from time import sleep
|
from time import sleep
|
||||||
from types import GeneratorType as generator # noqa
|
from types import GeneratorType as generator # noqa
|
||||||
|
@ -279,24 +278,15 @@ class Hub(object):
|
||||||
consolidate_callback = self.consolidate_callback
|
consolidate_callback = self.consolidate_callback
|
||||||
on_tick = self.on_tick
|
on_tick = self.on_tick
|
||||||
propagate = self.propagate_errors
|
propagate = self.propagate_errors
|
||||||
todo = self._ready
|
|
||||||
|
|
||||||
while 1:
|
while 1:
|
||||||
|
todo = self._ready
|
||||||
|
self._ready = set()
|
||||||
|
|
||||||
for tick_callback in on_tick:
|
for tick_callback in on_tick:
|
||||||
tick_callback()
|
tick_callback()
|
||||||
|
|
||||||
# To avoid infinite loop where one of the callables adds items
|
for item in todo:
|
||||||
# to self._ready (via call_soon or otherwise), we take pop only
|
|
||||||
# N items from the ready set.
|
|
||||||
# N represents the current number of items on the set.
|
|
||||||
# That way if a todo adds another one to the ready set,
|
|
||||||
# we will break early and allow execution of readers and writers.
|
|
||||||
current_todos = len(todo)
|
|
||||||
for _ in itertools.repeat(None, current_todos):
|
|
||||||
if not todo:
|
|
||||||
break
|
|
||||||
|
|
||||||
item = todo.pop()
|
|
||||||
if item:
|
if item:
|
||||||
item()
|
item()
|
||||||
|
|
||||||
|
|
|
@ -508,30 +508,27 @@ class test_Hub:
|
||||||
assert list(hub.scheduler), [1, 2 == 3]
|
assert list(hub.scheduler), [1, 2 == 3]
|
||||||
|
|
||||||
def test_loop__tick_callbacks(self):
|
def test_loop__tick_callbacks(self):
|
||||||
self.hub._ready = Mock(name='_ready')
|
|
||||||
self.hub._ready.__len__ = Mock(name="_ready.__len__")
|
|
||||||
self.hub._ready.__len__.side_effect = RuntimeError()
|
|
||||||
ticks = [Mock(name='cb1'), Mock(name='cb2')]
|
ticks = [Mock(name='cb1'), Mock(name='cb2')]
|
||||||
self.hub.on_tick = list(ticks)
|
self.hub.on_tick = list(ticks)
|
||||||
|
|
||||||
with pytest.raises(RuntimeError):
|
next(self.hub.loop)
|
||||||
next(self.hub.loop)
|
|
||||||
|
|
||||||
ticks[0].assert_called_once_with()
|
ticks[0].assert_called_once_with()
|
||||||
ticks[1].assert_called_once_with()
|
ticks[1].assert_called_once_with()
|
||||||
|
|
||||||
def test_loop__todo(self):
|
def test_loop__todo(self):
|
||||||
self.hub.fire_timers = Mock(name='fire_timers')
|
deferred = Mock(name='cb_deferred')
|
||||||
self.hub.fire_timers.side_effect = RuntimeError()
|
|
||||||
self.hub.timer = Mock(name='timer')
|
|
||||||
|
|
||||||
callbacks = [Mock(name='cb1'), Mock(name='cb2')]
|
def defer():
|
||||||
|
self.hub.call_soon(deferred)
|
||||||
|
|
||||||
|
callbacks = [Mock(name='cb1', wraps=defer), Mock(name='cb2')]
|
||||||
for cb in callbacks:
|
for cb in callbacks:
|
||||||
self.hub.call_soon(cb)
|
self.hub.call_soon(cb)
|
||||||
self.hub._ready.add(None)
|
self.hub._ready.add(None)
|
||||||
|
|
||||||
with pytest.raises(RuntimeError):
|
next(self.hub.loop)
|
||||||
next(self.hub.loop)
|
|
||||||
|
|
||||||
callbacks[0].assert_called_once_with()
|
callbacks[0].assert_called_once_with()
|
||||||
callbacks[1].assert_called_once_with()
|
callbacks[1].assert_called_once_with()
|
||||||
|
deferred.assert_not_called()
|
||||||
|
|
Loading…
Reference in New Issue