mirror of https://github.com/celery/kombu.git
backends.virtual.Consumer renamed to FairCycle
This commit is contained in:
parent
ba262e6f70
commit
0e4afa2f29
|
@ -11,7 +11,7 @@ from kombu.backends.base import BaseBackend, BaseMessage
|
|||
from kombu.utils import OrderedDict
|
||||
|
||||
|
||||
class Consume(object):
|
||||
class FairCycle(object):
|
||||
"""Consume from a set of resources, where each resource gets
|
||||
an equal chance to be consumed from."""
|
||||
|
||||
|
@ -174,7 +174,7 @@ class Channel(object):
|
|||
message)
|
||||
|
||||
def _poll(self, queues):
|
||||
return Consume(self._get, queues, QueueEmpty).get()
|
||||
return FairCycle(self._get, queues, QueueEmpty).get()
|
||||
|
||||
def drain_events(self, timeout=None):
|
||||
if self.qos_manager.can_consume():
|
||||
|
@ -330,10 +330,10 @@ class VirtualBaseBackend(BaseBackend):
|
|||
return channel.drain_events(timeout=self.interval)
|
||||
|
||||
def drain_events(self, connection, timeout=None):
|
||||
consumer = Consume(self._drain_channel, self._channels, QueueEmpty)
|
||||
cycle = FairCycle(self._drain_channel, self._channels, QueueEmpty)
|
||||
while True:
|
||||
try:
|
||||
item, channel = consumer.get()
|
||||
item, channel = cycle.get()
|
||||
break
|
||||
except QueueEmpty:
|
||||
sleep(self.interval)
|
||||
|
|
Loading…
Reference in New Issue