Revert "Issue #1087 redis fix (#1089)" (#1106)

This reverts commit 2f6f5f6a5d.
This commit is contained in:
Asif Saif Uddin 2019-11-02 23:43:08 +06:00 committed by GitHub
parent 19528addd7
commit 597d675ca1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 301 additions and 675 deletions

View File

@ -90,7 +90,6 @@ Marcin Lulek (ergo) <info@webreactor.eu>
Marcin Puhacz <marcin.puhacz@gmail.com>
Mark Lavin <mlavin@caktusgroup.com>
markow <markow@red-sky.pl>
Matt Davis <matteius@gmail.com>
Matt Wise <wise@wiredgeek.net>
Maxime Rouyrre <rouyrre+git@gmail.com>
mdk <luc.mdk@gmail.com>

View File

@ -3,12 +3,12 @@ from __future__ import absolute_import, unicode_literals
import numbers
import socket
import warnings
from bisect import bisect
from collections import namedtuple
from contextlib import contextmanager
from time import time
from vine import promise
from kombu.exceptions import InconsistencyError, VersionMismatch
@ -25,7 +25,6 @@ from kombu.utils.uuid import uuid
from kombu.utils.compat import _detect_environment
from . import virtual
from .virtual.base import UndeliverableWarning, UNDELIVERABLE_FMT
try:
import redis
@ -147,8 +146,12 @@ class QoS(virtual.QoS):
def append(self, message, delivery_tag):
delivery = message.delivery_info
EX, RK = delivery['exchange'], delivery['routing_key']
# Redis-py changed the format of zadd args in v3.0.0 to be like this
zadd_args = [{delivery_tag: time()}]
# TODO: Remove this once we soley on Redis-py 3.0.0+
if redis.VERSION[0] >= 3:
# Redis-py changed the format of zadd args in v3.0.0
zadd_args = [{delivery_tag: time()}]
else:
zadd_args = [time(), delivery_tag]
with self.pipe_or_acquire() as pipe:
pipe.zadd(self.unacked_index_key, *zadd_args) \
@ -717,8 +720,7 @@ class Channel(virtual.Channel):
queues = self._queue_cycle.consume(len(self.active_queues))
if not queues:
return
_q_for_pri = self._queue_for_priority
keys = [_q_for_pri(queue, pri) for pri in self.priority_steps
keys = [self._q_for_pri(queue, pri) for pri in self.priority_steps
for queue in queues] + [timeout or 0]
self._in_poll = self.client.connection
self.client.connection.send_command('BRPOP', *keys)
@ -754,8 +756,7 @@ class Channel(virtual.Channel):
def _get(self, queue):
with self.conn_or_acquire() as client:
for pri in self.priority_steps:
queue_name = self._queue_for_priority(queue, pri)
item = client.rpop(queue_name)
item = client.rpop(self._q_for_pri(queue, pri))
if item:
return loads(bytes_to_str(item))
raise Empty()
@ -764,21 +765,14 @@ class Channel(virtual.Channel):
with self.conn_or_acquire() as client:
with client.pipeline() as pipe:
for pri in self.priority_steps:
queue_name = self._queue_for_priority(queue, pri)
pipe = pipe.llen(queue_name)
pipe = pipe.llen(self._q_for_pri(queue, pri))
sizes = pipe.execute()
size = sum(size for size in sizes
return sum(size for size in sizes
if isinstance(size, numbers.Integral))
return size
def _queue_for_priority(self, queue, pri):
def _q_for_pri(self, queue, pri):
pri = self.priority(pri)
if pri:
queue_args = (queue, self.sep, pri)
else:
queue_args = (queue, '', '')
priority_queue_name = '%s%s%s' % queue_args
return priority_queue_name
return '%s%s%s' % ((queue, self.sep, pri) if pri else (queue, '', ''))
def priority(self, n):
steps = self.priority_steps
@ -789,7 +783,7 @@ class Channel(virtual.Channel):
pri = self._get_message_priority(message, reverse=False)
with self.conn_or_acquire() as client:
client.lpush(self._queue_for_priority(queue, pri), dumps(message))
client.lpush(self._q_for_pri(queue, pri), dumps(message))
def _put_fanout(self, exchange, message, routing_key, **kwargs):
"""Deliver fanout message."""
@ -824,14 +818,14 @@ class Channel(virtual.Channel):
queue or '']))
with client.pipeline() as pipe:
for pri in self.priority_steps:
pipe = pipe.delete(self._queue_for_priority(queue, pri))
pipe = pipe.delete(self._q_for_pri(queue, pri))
pipe.execute()
def _has_queue(self, queue, **kwargs):
with self.conn_or_acquire() as client:
with client.pipeline() as pipe:
for pri in self.priority_steps:
pipe = pipe.exists(self._queue_for_priority(queue, pri))
pipe = pipe.exists(self._q_for_pri(queue, pri))
return any(pipe.execute())
def get_table(self, exchange):
@ -842,55 +836,30 @@ class Channel(virtual.Channel):
raise InconsistencyError(NO_ROUTE_ERROR.format(exchange, key))
return [tuple(bytes_to_str(val).split(self.sep)) for val in values]
def _lookup(self, exchange, routing_key, default=None):
"""Find all queues matching `routing_key` for the given `exchange`.
def _lookup_direct(self, exchange, routing_key):
if not exchange:
return [routing_key]
Returns:
str: queue name -- must return the string `default`
if no queues matched.
"""
key = self.keyprefix_queue % exchange
pattern = ''
result = []
if default is None:
default = self.deadletter_queue
if not exchange: # anon exchange
return [routing_key or default]
queue = routing_key
redis_key = self.keyprefix_queue % exchange
try:
queue_bind = self.sep.join([
routing_key or '',
pattern,
queue or '',
])
with self.conn_or_acquire() as client:
if not client.scard(redis_key):
pass # Do not check if its a member because set is empty
elif client.sismember(redis_key, queue_bind):
result = [queue]
except KeyError:
pass
queue_bind = self.sep.join([
routing_key or '',
pattern,
queue or '',
])
with self.conn_or_acquire() as client:
if client.sismember(key, queue_bind):
return [queue]
if not result:
if default is not None:
warnings.warn(UndeliverableWarning(UNDELIVERABLE_FMT.format(
exchange=exchange, routing_key=routing_key)),
)
self._new_queue(default)
result = [default]
else:
raise InconsistencyError(NO_ROUTE_ERROR.format(
exchange, redis_key))
return result
return []
def _purge(self, queue):
with self.conn_or_acquire() as client:
with client.pipeline() as pipe:
for pri in self.priority_steps:
priority_queue = self._queue_for_priority(queue, pri)
pipe = pipe.llen(priority_queue).delete(priority_queue)
priq = self._q_for_pri(queue, pri)
pipe = pipe.llen(priq).delete(priq)
sizes = pipe.execute()
return sum(sizes[::2])

View File

@ -710,20 +710,36 @@ class Channel(AbstractChannel, base.StdChannel):
return [routing_key or default]
try:
result = self.typeof(exchange).lookup(
R = self.typeof(exchange).lookup(
self.get_table(exchange),
exchange, routing_key, default,
)
except KeyError:
result = []
R = []
if not result and default is not None:
if not R and default is not None:
warnings.warn(UndeliverableWarning(UNDELIVERABLE_FMT.format(
exchange=exchange, routing_key=routing_key)),
)
self._new_queue(default)
result = [default]
return result
R = [default]
return R
def _lookup_direct(self, exchange, routing_key):
"""Find queue matching `routing_key` for given direct `exchange`.
Returns:
str: queue name
"""
if not exchange:
return [routing_key]
return self.exchange_types['direct'].lookup(
table=self.get_table(exchange),
exchange=exchange,
routing_key=routing_key,
default=None,
)
def _restore(self, message):
"""Redeliver message to its original destination."""

View File

@ -65,7 +65,7 @@ class DirectExchange(ExchangeType):
}
def deliver(self, message, exchange, routing_key, **kwargs):
_lookup = self.channel._lookup
_lookup = self.channel._lookup_direct
_put = self.channel._put
for queue in _lookup(exchange, routing_key):
_put(queue, message, **kwargs)

View File

@ -3,4 +3,3 @@ case>=1.5.2
pytest
pytest-sugar
Pyro4
fakeredis==1.0.4

File diff suppressed because it is too large Load Diff