mirror of https://github.com/celery/kombu.git
Make Queue Cycling in Redis More Fair
The previous method of cycling queues with the Redis transport was not fair if there were a lot of empty queues. Queues waiting behind lots of empty queues and at least one very full queue will be forced to wait until the queue in front of it is rotated, and then will only get the chance to be consumed once. This changes the behavior to rotate the most recently used queue to the back of the list, elmininating the problem. Closes #166
This commit is contained in:
parent
734317ce68
commit
10319aa7e1
|
@ -13,7 +13,6 @@ from __future__ import with_statement
|
|||
|
||||
from bisect import bisect
|
||||
from contextlib import contextmanager
|
||||
from itertools import cycle, islice
|
||||
from time import time
|
||||
from Queue import Empty
|
||||
|
||||
|
@ -321,7 +320,7 @@ class Channel(virtual.Channel):
|
|||
super_ = super(Channel, self)
|
||||
super_.__init__(*args, **kwargs)
|
||||
|
||||
self._queue_cycle = cycle([])
|
||||
self._queue_cycle = []
|
||||
self.Client = self._get_client()
|
||||
self.ResponseError = self._get_response_error()
|
||||
self.active_fanout_queues = set()
|
||||
|
@ -450,6 +449,7 @@ class Channel(virtual.Channel):
|
|||
if dest__item:
|
||||
dest, item = dest__item
|
||||
dest = dest.rsplit(self.sep, 1)[0]
|
||||
self._rotate_cycle(dest)
|
||||
return loads(item), dest
|
||||
else:
|
||||
raise Empty()
|
||||
|
@ -654,13 +654,25 @@ class Channel(virtual.Channel):
|
|||
each queue is equally likely to be consumed from,
|
||||
so that a very busy queue will not block others.
|
||||
|
||||
This works by using Redis's `BRPOP` command and
|
||||
by rotating the most recently used queue to the
|
||||
and of the list. See Kombu github issue #166 for
|
||||
more discussion of this method.
|
||||
|
||||
"""
|
||||
self._queue_cycle = cycle(self.active_queues)
|
||||
self._queue_cycle = list(self.active_queues)
|
||||
|
||||
def _consume_cycle(self):
|
||||
"""Get a fresh list of queues from the queue cycle."""
|
||||
active = len(self.active_queues)
|
||||
return list(islice(self._queue_cycle, 0, active + 1))[:active]
|
||||
return self._queue_cycle[0:active]
|
||||
|
||||
def _rotate_cycle(self, used):
|
||||
"""
|
||||
Move most recently used queue to end of list
|
||||
"""
|
||||
index = self._queue_cycle.index(used)
|
||||
self._queue_cycle.append(self._queue_cycle.pop(index))
|
||||
|
||||
def _get_response_error(self):
|
||||
from redis import exceptions
|
||||
|
|
Loading…
Reference in New Issue