mirror of https://github.com/celery/kombu.git
Redis backend working with _get_many
This commit is contained in:
parent
128ea635e8
commit
e74742473c
|
@ -173,20 +173,20 @@ class Channel(object):
|
||||||
delivery_info["routing_key"]),
|
delivery_info["routing_key"]),
|
||||||
message)
|
message)
|
||||||
|
|
||||||
def _poll(self, resource):
|
def _poll(self, queues):
|
||||||
while True:
|
return Consume(self._get, queues, QueueEmpty).get()
|
||||||
if self.qos_manager.can_consume():
|
|
||||||
try:
|
|
||||||
return resource.get()
|
|
||||||
except QueueEmpty:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def drain_events(self, timeout=None):
|
def drain_events(self, timeout=None):
|
||||||
if self.qos_manager.can_consume():
|
if self.qos_manager.can_consume():
|
||||||
queues = [_consumers[tag] for tag in self._consumers]
|
if hasattr(self, "_get_many"):
|
||||||
return Consume(self._get, queues, QueueEmpty).get()
|
return self._get_many(self._active_queues, timeout=timeout)
|
||||||
|
return self._poll(self._active_queues)
|
||||||
raise QueueEmpty()
|
raise QueueEmpty()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def _active_queues(self):
|
||||||
|
return [_consumers[tag] for tag in self._consumers]
|
||||||
|
|
||||||
def exchange_declare(self, exchange, type="direct", durable=False,
|
def exchange_declare(self, exchange, type="direct", durable=False,
|
||||||
auto_delete=False, arguments=None):
|
auto_delete=False, arguments=None):
|
||||||
if exchange not in _exchanges:
|
if exchange not in _exchanges:
|
||||||
|
@ -327,7 +327,7 @@ class EmulationBase(BaseBackend):
|
||||||
channel.close()
|
channel.close()
|
||||||
|
|
||||||
def _drain_channel(self, channel):
|
def _drain_channel(self, channel):
|
||||||
return channel.drain_events()
|
return channel.drain_events(timeout=self.interval)
|
||||||
|
|
||||||
def drain_events(self, timeout=None):
|
def drain_events(self, timeout=None):
|
||||||
consumer = Consume(self._drain_channel, self._channels, QueueEmpty)
|
consumer = Consume(self._drain_channel, self._channels, QueueEmpty)
|
||||||
|
|
|
@ -12,7 +12,6 @@ DEFAULT_DB = 0
|
||||||
|
|
||||||
class RedisChannel(emulation.Channel):
|
class RedisChannel(emulation.Channel):
|
||||||
queues = {}
|
queues = {}
|
||||||
do_restore = False
|
|
||||||
_client = None
|
_client = None
|
||||||
|
|
||||||
def _new_queue(self, queue, **kwargs):
|
def _new_queue(self, queue, **kwargs):
|
||||||
|
@ -27,11 +26,11 @@ class RedisChannel(emulation.Channel):
|
||||||
def _size(self, queue):
|
def _size(self, queue):
|
||||||
return self.client.llen(queue)
|
return self.client.llen(queue)
|
||||||
|
|
||||||
def _get_many(self, queue, timeout=None):
|
def _get_many(self, queues, timeout=None):
|
||||||
dest__item = self.client.brpop(queues, timeout=timeout)
|
dest__item = self.client.brpop(queues, timeout=timeout)
|
||||||
if dest__item:
|
if dest__item:
|
||||||
dest, item = dest__item
|
dest, item = dest__item
|
||||||
return deserialize(dest), item
|
return deserialize(item), dest
|
||||||
raise Empty()
|
raise Empty()
|
||||||
|
|
||||||
def _put(self, queue, message):
|
def _put(self, queue, message):
|
||||||
|
|
Loading…
Reference in New Issue