mirror of https://github.com/celery/kombu.git
fixed queue lookup in virtual transports with multiple overlapping bindings
This commit is contained in:
parent
6c6d898636
commit
91f25df052
|
@ -114,6 +114,45 @@ class test_Topic(ExchangeCase):
|
|||
self.assertListEqual(self.e.channel._put.call_args_list, expected)
|
||||
|
||||
|
||||
class test_TopicMultibind(ExchangeCase):
|
||||
# Testing message delivery in case of multiple overlapping
|
||||
# bindings for the same queue. As AMQP states, in case of
|
||||
# overlapping bindings, a message must be delivered once to
|
||||
# each matching queue.
|
||||
type = exchange.TopicExchange
|
||||
table = [
|
||||
('stock', None, 'rFoo'),
|
||||
('stock.#', None, 'rFoo'),
|
||||
('stock.us.*', None, 'rFoo'),
|
||||
('#', None, 'rFoo'),
|
||||
]
|
||||
|
||||
def setup(self):
|
||||
super(test_TopicMultibind, self).setup()
|
||||
self.table = [(rkey, self.e.key_to_pattern(rkey), queue)
|
||||
for rkey, _, queue in self.table]
|
||||
|
||||
def test_lookup(self):
|
||||
self.assertListEqual(
|
||||
self.e.lookup(self.table, 'eFoo', 'stock.us.nasdaq', None),
|
||||
['rFoo'],
|
||||
)
|
||||
self.assertTrue(self.e._compiled)
|
||||
self.assertListEqual(
|
||||
self.e.lookup(self.table, 'eFoo', 'stock.europe.OSE', None),
|
||||
['rFoo'],
|
||||
)
|
||||
self.assertListEqual(
|
||||
self.e.lookup(self.table, 'eFoo', 'stockxeuropexOSE', None),
|
||||
['rFoo'],
|
||||
)
|
||||
self.assertListEqual(
|
||||
self.e.lookup(self.table, 'eFoo',
|
||||
'candy.schleckpulver.snap_crackle', None),
|
||||
['rFoo'],
|
||||
)
|
||||
|
||||
|
||||
class test_ExchangeType(ExchangeCase):
|
||||
type = exchange.ExchangeType
|
||||
|
||||
|
|
|
@ -11,6 +11,7 @@ from __future__ import absolute_import, unicode_literals
|
|||
from kombu.utils import escape_regex
|
||||
|
||||
import re
|
||||
import collections
|
||||
|
||||
|
||||
class ExchangeType(object):
|
||||
|
@ -51,8 +52,14 @@ class DirectExchange(ExchangeType):
|
|||
type = 'direct'
|
||||
|
||||
def lookup(self, table, exchange, routing_key, default):
|
||||
return [queue for rkey, _, queue in table
|
||||
if rkey == routing_key]
|
||||
# Using an OrderedDict to purge queue duplicates while
|
||||
# keeping the same order in which they appear in the table
|
||||
return list(
|
||||
collections.OrderedDict.fromkeys(
|
||||
queue for rkey, _, queue in table
|
||||
if rkey == routing_key
|
||||
)
|
||||
)
|
||||
|
||||
def deliver(self, message, exchange, routing_key, **kwargs):
|
||||
_lookup = self.channel._lookup
|
||||
|
@ -75,8 +82,14 @@ class TopicExchange(ExchangeType):
|
|||
_compiled = {}
|
||||
|
||||
def lookup(self, table, exchange, routing_key, default):
|
||||
return [queue for rkey, pattern, queue in table
|
||||
if self._match(pattern, routing_key)]
|
||||
# Using an OrderedDict to purge queue duplicates while
|
||||
# keeping the same order in which they appear in the table
|
||||
return list(
|
||||
collections.OrderedDict.fromkeys(
|
||||
queue for rkey, pattern, queue in table
|
||||
if self._match(pattern, routing_key)
|
||||
)
|
||||
)
|
||||
|
||||
def deliver(self, message, exchange, routing_key, **kwargs):
|
||||
_lookup = self.channel._lookup
|
||||
|
@ -120,7 +133,13 @@ class FanoutExchange(ExchangeType):
|
|||
type = 'fanout'
|
||||
|
||||
def lookup(self, table, exchange, routing_key, default):
|
||||
return [queue for _, _, queue in table]
|
||||
# Using an OrderedDict to purge queue duplicates while
|
||||
# keeping the same order in which they appear in the table
|
||||
return list(
|
||||
collections.OrderedDict.fromkeys(
|
||||
queue for _, _, queue in table
|
||||
)
|
||||
)
|
||||
|
||||
def deliver(self, message, exchange, routing_key, **kwargs):
|
||||
if self.channel.supports_fanout:
|
||||
|
|
Loading…
Reference in New Issue