mirror of https://github.com/celery/kombu.git
Use SIMEMBERS instead of SMEMBERS to check for queue (redis broker) (#1041)
* Add `_lookup_direct` method to virtual channel. (#994) Add possibility to optimize lookup for queue in direct exchange set. * Add `_lookup_direct` method to redis virtual channel. (#994) Use `SISMEMBER` instead of `SMEMBERS` command to check if queue exists in a set. Time complexity is increased from O(N) to O(1) where N is the set cardinality.
This commit is contained in:
parent
d4eab78ab4
commit
73d2219887
|
@ -829,6 +829,24 @@ class Channel(virtual.Channel):
|
|||
raise InconsistencyError(NO_ROUTE_ERROR.format(exchange, key))
|
||||
return [tuple(bytes_to_str(val).split(self.sep)) for val in values]
|
||||
|
||||
def _lookup_direct(self, exchange, routing_key):
|
||||
if not exchange:
|
||||
return [routing_key]
|
||||
|
||||
key = self.keyprefix_queue % exchange
|
||||
pattern = ''
|
||||
queue = routing_key
|
||||
queue_bind = self.sep.join([
|
||||
routing_key or '',
|
||||
pattern,
|
||||
queue or '',
|
||||
])
|
||||
with self.conn_or_acquire() as client:
|
||||
if client.sismember(key, queue_bind):
|
||||
return [queue]
|
||||
|
||||
return []
|
||||
|
||||
def _purge(self, queue):
|
||||
with self.conn_or_acquire() as client:
|
||||
with client.pipeline() as pipe:
|
||||
|
|
|
@ -725,6 +725,22 @@ class Channel(AbstractChannel, base.StdChannel):
|
|||
R = [default]
|
||||
return R
|
||||
|
||||
def _lookup_direct(self, exchange, routing_key):
|
||||
"""Find queue matching `routing_key` for given direct `exchange`.
|
||||
|
||||
Returns:
|
||||
str: queue name
|
||||
"""
|
||||
if not exchange:
|
||||
return [routing_key]
|
||||
|
||||
return self.exchange_types['direct'].lookup(
|
||||
table=self.get_table(exchange),
|
||||
exchange=exchange,
|
||||
routing_key=routing_key,
|
||||
default=None,
|
||||
)
|
||||
|
||||
def _restore(self, message):
|
||||
"""Redeliver message to its original destination."""
|
||||
delivery_info = message.delivery_info
|
||||
|
|
|
@ -65,7 +65,7 @@ class DirectExchange(ExchangeType):
|
|||
}
|
||||
|
||||
def deliver(self, message, exchange, routing_key, **kwargs):
|
||||
_lookup = self.channel._lookup
|
||||
_lookup = self.channel._lookup_direct
|
||||
_put = self.channel._put
|
||||
for queue in _lookup(exchange, routing_key):
|
||||
_put(queue, message, **kwargs)
|
||||
|
|
|
@ -88,6 +88,9 @@ class Client(object):
|
|||
def smembers(self, key):
|
||||
return self.sets.get(key, set())
|
||||
|
||||
def sismember(self, name, value):
|
||||
return value in self.sets.get(name, set())
|
||||
|
||||
def ping(self, *args, **kwargs):
|
||||
return True
|
||||
|
||||
|
|
Loading…
Reference in New Issue