diff --git a/kombu/backends/emulation.py b/kombu/backends/emulation.py index 90595ab5..b19c9ebd 100644 --- a/kombu/backends/emulation.py +++ b/kombu/backends/emulation.py @@ -173,20 +173,20 @@ class Channel(object): delivery_info["routing_key"]), message) - def _poll(self, resource): - while True: - if self.qos_manager.can_consume(): - try: - return resource.get() - except QueueEmpty: - pass + def _poll(self, queues): + return Consume(self._get, queues, QueueEmpty).get() def drain_events(self, timeout=None): if self.qos_manager.can_consume(): - queues = [_consumers[tag] for tag in self._consumers] - return Consume(self._get, queues, QueueEmpty).get() + if hasattr(self, "_get_many"): + return self._get_many(self._active_queues, timeout=timeout) + return self._poll(self._active_queues) raise QueueEmpty() + @property + def _active_queues(self): + return [_consumers[tag] for tag in self._consumers] + def exchange_declare(self, exchange, type="direct", durable=False, auto_delete=False, arguments=None): if exchange not in _exchanges: @@ -327,7 +327,7 @@ class EmulationBase(BaseBackend): channel.close() def _drain_channel(self, channel): - return channel.drain_events() + return channel.drain_events(timeout=self.interval) def drain_events(self, timeout=None): consumer = Consume(self._drain_channel, self._channels, QueueEmpty) diff --git a/kombu/backends/pyredis.py b/kombu/backends/pyredis.py index e75928b4..2a335f12 100644 --- a/kombu/backends/pyredis.py +++ b/kombu/backends/pyredis.py @@ -12,7 +12,6 @@ DEFAULT_DB = 0 class RedisChannel(emulation.Channel): queues = {} - do_restore = False _client = None def _new_queue(self, queue, **kwargs): @@ -27,11 +26,11 @@ class RedisChannel(emulation.Channel): def _size(self, queue): return self.client.llen(queue) - def _get_many(self, queue, timeout=None): + def _get_many(self, queues, timeout=None): dest__item = self.client.brpop(queues, timeout=timeout) if dest__item: dest, item = dest__item - return deserialize(dest), item + return deserialize(item), dest raise Empty() def _put(self, queue, message):