diff --git a/kombu/tests/transport/virtual/test_exchange.py b/kombu/tests/transport/virtual/test_exchange.py index a4fdd15d..280f07dc 100644 --- a/kombu/tests/transport/virtual/test_exchange.py +++ b/kombu/tests/transport/virtual/test_exchange.py @@ -114,6 +114,45 @@ class test_Topic(ExchangeCase): self.assertListEqual(self.e.channel._put.call_args_list, expected) +class test_TopicMultibind(ExchangeCase): + # Testing message delivery in case of multiple overlapping + # bindings for the same queue. As AMQP states, in case of + # overlapping bindings, a message must be delivered once to + # each matching queue. + type = exchange.TopicExchange + table = [ + ('stock', None, 'rFoo'), + ('stock.#', None, 'rFoo'), + ('stock.us.*', None, 'rFoo'), + ('#', None, 'rFoo'), + ] + + def setup(self): + super(test_TopicMultibind, self).setup() + self.table = [(rkey, self.e.key_to_pattern(rkey), queue) + for rkey, _, queue in self.table] + + def test_lookup(self): + self.assertListEqual( + self.e.lookup(self.table, 'eFoo', 'stock.us.nasdaq', None), + ['rFoo'], + ) + self.assertTrue(self.e._compiled) + self.assertListEqual( + self.e.lookup(self.table, 'eFoo', 'stock.europe.OSE', None), + ['rFoo'], + ) + self.assertListEqual( + self.e.lookup(self.table, 'eFoo', 'stockxeuropexOSE', None), + ['rFoo'], + ) + self.assertListEqual( + self.e.lookup(self.table, 'eFoo', + 'candy.schleckpulver.snap_crackle', None), + ['rFoo'], + ) + + class test_ExchangeType(ExchangeCase): type = exchange.ExchangeType diff --git a/kombu/transport/virtual/exchange.py b/kombu/transport/virtual/exchange.py index dfd185c7..98ad6ed2 100644 --- a/kombu/transport/virtual/exchange.py +++ b/kombu/transport/virtual/exchange.py @@ -11,6 +11,7 @@ from __future__ import absolute_import, unicode_literals from kombu.utils import escape_regex import re +import collections class ExchangeType(object): @@ -51,8 +52,14 @@ class DirectExchange(ExchangeType): type = 'direct' def lookup(self, table, exchange, routing_key, default): - return [queue for rkey, _, queue in table - if rkey == routing_key] + # Using an OrderedDict to purge queue duplicates while + # keeping the same order in which they appear in the table + return list( + collections.OrderedDict.fromkeys( + queue for rkey, _, queue in table + if rkey == routing_key + ) + ) def deliver(self, message, exchange, routing_key, **kwargs): _lookup = self.channel._lookup @@ -75,8 +82,14 @@ class TopicExchange(ExchangeType): _compiled = {} def lookup(self, table, exchange, routing_key, default): - return [queue for rkey, pattern, queue in table - if self._match(pattern, routing_key)] + # Using an OrderedDict to purge queue duplicates while + # keeping the same order in which they appear in the table + return list( + collections.OrderedDict.fromkeys( + queue for rkey, pattern, queue in table + if self._match(pattern, routing_key) + ) + ) def deliver(self, message, exchange, routing_key, **kwargs): _lookup = self.channel._lookup @@ -120,7 +133,13 @@ class FanoutExchange(ExchangeType): type = 'fanout' def lookup(self, table, exchange, routing_key, default): - return [queue for _, _, queue in table] + # Using an OrderedDict to purge queue duplicates while + # keeping the same order in which they appear in the table + return list( + collections.OrderedDict.fromkeys( + queue for _, _, queue in table + ) + ) def deliver(self, message, exchange, routing_key, **kwargs): if self.channel.supports_fanout: