diff --git a/kombu/entity.py b/kombu/entity.py index ee3af0f3..a9b20a6d 100644 --- a/kombu/entity.py +++ b/kombu/entity.py @@ -21,6 +21,8 @@ DELIVERY_MODES = {'transient': TRANSIENT_DELIVERY_MODE, __all__ = ['Exchange', 'Queue', 'binding', 'maybe_delivery_mode'] +INTERNAL_EXCHANGE_PREFIX = ('amq.',) + def _reprstr(s): s = repr(s) @@ -50,6 +52,7 @@ class Exchange(MaybeChannelBound): :keyword auto_delete: See :attr:`auto_delete`. :keyword delivery_mode: See :attr:`delivery_mode`. :keyword arguments: See :attr:`arguments`. + :keyword no_declare: See :attr:`no_declare` .. attribute:: name @@ -136,6 +139,10 @@ class Exchange(MaybeChannelBound): Additional arguments to specify when the exchange is declared. + .. attribute:: no_declare + + Never declare this exchange (:meth:`declare` does nothing). + """ TRANSIENT_DELIVERY_MODE = TRANSIENT_DELIVERY_MODE PERSISTENT_DELIVERY_MODE = PERSISTENT_DELIVERY_MODE @@ -146,6 +153,7 @@ class Exchange(MaybeChannelBound): auto_delete = False passive = False delivery_mode = None + no_declare = False attrs = ( ('name', None), @@ -155,6 +163,7 @@ class Exchange(MaybeChannelBound): ('passive', bool), ('auto_delete', bool), ('delivery_mode', lambda m: DELIVERY_MODES.get(m) or m), + ('no_declare', bool), ) def __init__(self, name='', type='', channel=None, **kwargs): @@ -166,6 +175,11 @@ class Exchange(MaybeChannelBound): def __hash__(self): return hash('E|%s' % (self.name,)) + def _can_declare(self): + return not self.no_declare and ( + self.name and not self.name.startswith( + INTERNAL_EXCHANGE_PREFIX)) + def declare(self, nowait=False, passive=None): """Declare the exchange. @@ -175,8 +189,8 @@ class Exchange(MaybeChannelBound): response will not be waited for. Default is :const:`False`. """ - passive = self.passive if passive is None else passive - if self.name: + if self._can_declare(): + passive = self.passive if passive is None else passive return self.channel.exchange_declare( exchange=self.name, type=self.type, durable=self.durable, auto_delete=self.auto_delete, arguments=self.arguments, @@ -373,6 +387,7 @@ class Queue(MaybeChannelBound): :keyword queue_arguments: See :attr:`queue_arguments`. :keyword binding_arguments: See :attr:`binding_arguments`. :keyword consumer_arguments: See :attr:`consumer_arguments`. + :keyword no_declare: See :attr:`no_declare` :keyword on_declared: See :attr:`on_declared` .. attribute:: name @@ -472,6 +487,11 @@ class Queue(MaybeChannelBound): This must be a function with a signature that accepts at least 3 positional arguments: ``(name, messages, consumers)``. + .. attribute:: no_declare + + Never declare this queue, nor related entities (:meth:`declare` does + nothing). + """ ContentDisallowed = ContentDisallowed @@ -497,6 +517,7 @@ class Queue(MaybeChannelBound): ('no_ack', None), ('alias', None), ('bindings', list), + ('no_declare', bool), ) def __init__(self, name='', exchange=None, routing_key='', @@ -536,19 +557,26 @@ class Queue(MaybeChannelBound): def declare(self, nowait=False): """Declares the queue, the exchange and binds the queue to the exchange.""" - # - declare main binding. + if not self.no_declare: + # - declare main binding. + self._create_exchange(nowait=nowait) + self._create_queue(nowait=nowait) + self._create_bindings(nowait=nowait) + return self.name + + def _create_exchange(self, nowait=False): if self.exchange: self.exchange.declare(nowait) - self.queue_declare(nowait, passive=False) + def _create_queue(self, nowait=False): + self.queue_declare(nowait, passive=False) if self.exchange and self.exchange.name: self.queue_bind(nowait) - # - declare extra/multi-bindings. + def _create_bindings(self, nowait=False): for B in self.bindings: B.declare(self.channel) B.bind(self, nowait=nowait) - return self.name def queue_declare(self, nowait=False, passive=False): """Declare queue on the server. diff --git a/kombu/tests/test_entities.py b/kombu/tests/test_entities.py index c55e3281..ba241b4a 100644 --- a/kombu/tests/test_entities.py +++ b/kombu/tests/test_entities.py @@ -190,6 +190,24 @@ class test_Exchange(Case): foo(chan).unbind_from('bar') self.assertIn('exchange_unbind', chan) + def test_declare__no_declare(self): + chan = get_conn().channel() + foo = Exchange('foo', 'topic', no_declare=True) + foo(chan).declare() + self.assertNotIn('exchange_declare', chan) + + def test_declare__internal_exchange(self): + chan = get_conn().channel() + foo = Exchange('amq.rabbitmq.trace', 'topic') + foo(chan).declare() + self.assertNotIn('exchange_declare', chan) + + def test_declare(self): + chan = get_conn().channel() + foo = Exchange('foo', 'topic', no_declare=False) + foo(chan).declare() + self.assertIn('exchange_declare', chan) + class test_Queue(Case): @@ -257,6 +275,16 @@ class test_Queue(Case): q.declare() q.queue_declare.assert_called_with(False, passive=False) + def test_declare__no_declare(self): + q = Queue('a', no_declare=True) + q.queue_declare = Mock() + q.queue_bind = Mock() + q.exchange = None + + q.declare() + self.assertFalse(q.queue_declare.called) + self.assertFalse(q.queue_bind.called) + def test_bind_to_when_name(self): chan = Mock() q = Queue('a')