diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 479dcb03..5c878e80 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -13,7 +13,6 @@ from __future__ import with_statement from bisect import bisect from contextlib import contextmanager -from itertools import cycle, islice from time import time from Queue import Empty @@ -321,7 +320,7 @@ class Channel(virtual.Channel): super_ = super(Channel, self) super_.__init__(*args, **kwargs) - self._queue_cycle = cycle([]) + self._queue_cycle = [] self.Client = self._get_client() self.ResponseError = self._get_response_error() self.active_fanout_queues = set() @@ -450,6 +449,7 @@ class Channel(virtual.Channel): if dest__item: dest, item = dest__item dest = dest.rsplit(self.sep, 1)[0] + self._rotate_cycle(dest) return loads(item), dest else: raise Empty() @@ -654,13 +654,25 @@ class Channel(virtual.Channel): each queue is equally likely to be consumed from, so that a very busy queue will not block others. + This works by using Redis's `BRPOP` command and + by rotating the most recently used queue to the + and of the list. See Kombu github issue #166 for + more discussion of this method. + """ - self._queue_cycle = cycle(self.active_queues) + self._queue_cycle = list(self.active_queues) def _consume_cycle(self): """Get a fresh list of queues from the queue cycle.""" active = len(self.active_queues) - return list(islice(self._queue_cycle, 0, active + 1))[:active] + return self._queue_cycle[0:active] + + def _rotate_cycle(self, used): + """ + Move most recently used queue to end of list + """ + index = self._queue_cycle.index(used) + self._queue_cycle.append(self._queue_cycle.pop(index)) def _get_response_error(self): from redis import exceptions