mirror of https://github.com/celery/kombu.git
no_declare option added to Queue/Exchange (also enabled for internal amq. exchanges). Closes #565
This commit is contained in:
parent
4f4c2c9a04
commit
9598b876ac
|
@ -21,6 +21,8 @@ DELIVERY_MODES = {'transient': TRANSIENT_DELIVERY_MODE,
|
|||
|
||||
__all__ = ['Exchange', 'Queue', 'binding', 'maybe_delivery_mode']
|
||||
|
||||
INTERNAL_EXCHANGE_PREFIX = ('amq.',)
|
||||
|
||||
|
||||
def _reprstr(s):
|
||||
s = repr(s)
|
||||
|
@ -50,6 +52,7 @@ class Exchange(MaybeChannelBound):
|
|||
:keyword auto_delete: See :attr:`auto_delete`.
|
||||
:keyword delivery_mode: See :attr:`delivery_mode`.
|
||||
:keyword arguments: See :attr:`arguments`.
|
||||
:keyword no_declare: See :attr:`no_declare`
|
||||
|
||||
.. attribute:: name
|
||||
|
||||
|
@ -136,6 +139,10 @@ class Exchange(MaybeChannelBound):
|
|||
|
||||
Additional arguments to specify when the exchange is declared.
|
||||
|
||||
.. attribute:: no_declare
|
||||
|
||||
Never declare this exchange (:meth:`declare` does nothing).
|
||||
|
||||
"""
|
||||
TRANSIENT_DELIVERY_MODE = TRANSIENT_DELIVERY_MODE
|
||||
PERSISTENT_DELIVERY_MODE = PERSISTENT_DELIVERY_MODE
|
||||
|
@ -146,6 +153,7 @@ class Exchange(MaybeChannelBound):
|
|||
auto_delete = False
|
||||
passive = False
|
||||
delivery_mode = None
|
||||
no_declare = False
|
||||
|
||||
attrs = (
|
||||
('name', None),
|
||||
|
@ -155,6 +163,7 @@ class Exchange(MaybeChannelBound):
|
|||
('passive', bool),
|
||||
('auto_delete', bool),
|
||||
('delivery_mode', lambda m: DELIVERY_MODES.get(m) or m),
|
||||
('no_declare', bool),
|
||||
)
|
||||
|
||||
def __init__(self, name='', type='', channel=None, **kwargs):
|
||||
|
@ -166,6 +175,11 @@ class Exchange(MaybeChannelBound):
|
|||
def __hash__(self):
|
||||
return hash('E|%s' % (self.name,))
|
||||
|
||||
def _can_declare(self):
|
||||
return not self.no_declare and (
|
||||
self.name and not self.name.startswith(
|
||||
INTERNAL_EXCHANGE_PREFIX))
|
||||
|
||||
def declare(self, nowait=False, passive=None):
|
||||
"""Declare the exchange.
|
||||
|
||||
|
@ -175,8 +189,8 @@ class Exchange(MaybeChannelBound):
|
|||
response will not be waited for. Default is :const:`False`.
|
||||
|
||||
"""
|
||||
passive = self.passive if passive is None else passive
|
||||
if self.name:
|
||||
if self._can_declare():
|
||||
passive = self.passive if passive is None else passive
|
||||
return self.channel.exchange_declare(
|
||||
exchange=self.name, type=self.type, durable=self.durable,
|
||||
auto_delete=self.auto_delete, arguments=self.arguments,
|
||||
|
@ -373,6 +387,7 @@ class Queue(MaybeChannelBound):
|
|||
:keyword queue_arguments: See :attr:`queue_arguments`.
|
||||
:keyword binding_arguments: See :attr:`binding_arguments`.
|
||||
:keyword consumer_arguments: See :attr:`consumer_arguments`.
|
||||
:keyword no_declare: See :attr:`no_declare`
|
||||
:keyword on_declared: See :attr:`on_declared`
|
||||
|
||||
.. attribute:: name
|
||||
|
@ -472,6 +487,11 @@ class Queue(MaybeChannelBound):
|
|||
This must be a function with a signature that accepts at least 3
|
||||
positional arguments: ``(name, messages, consumers)``.
|
||||
|
||||
.. attribute:: no_declare
|
||||
|
||||
Never declare this queue, nor related entities (:meth:`declare` does
|
||||
nothing).
|
||||
|
||||
"""
|
||||
ContentDisallowed = ContentDisallowed
|
||||
|
||||
|
@ -497,6 +517,7 @@ class Queue(MaybeChannelBound):
|
|||
('no_ack', None),
|
||||
('alias', None),
|
||||
('bindings', list),
|
||||
('no_declare', bool),
|
||||
)
|
||||
|
||||
def __init__(self, name='', exchange=None, routing_key='',
|
||||
|
@ -536,19 +557,26 @@ class Queue(MaybeChannelBound):
|
|||
def declare(self, nowait=False):
|
||||
"""Declares the queue, the exchange and binds the queue to
|
||||
the exchange."""
|
||||
# - declare main binding.
|
||||
if not self.no_declare:
|
||||
# - declare main binding.
|
||||
self._create_exchange(nowait=nowait)
|
||||
self._create_queue(nowait=nowait)
|
||||
self._create_bindings(nowait=nowait)
|
||||
return self.name
|
||||
|
||||
def _create_exchange(self, nowait=False):
|
||||
if self.exchange:
|
||||
self.exchange.declare(nowait)
|
||||
self.queue_declare(nowait, passive=False)
|
||||
|
||||
def _create_queue(self, nowait=False):
|
||||
self.queue_declare(nowait, passive=False)
|
||||
if self.exchange and self.exchange.name:
|
||||
self.queue_bind(nowait)
|
||||
|
||||
# - declare extra/multi-bindings.
|
||||
def _create_bindings(self, nowait=False):
|
||||
for B in self.bindings:
|
||||
B.declare(self.channel)
|
||||
B.bind(self, nowait=nowait)
|
||||
return self.name
|
||||
|
||||
def queue_declare(self, nowait=False, passive=False):
|
||||
"""Declare queue on the server.
|
||||
|
|
|
@ -190,6 +190,24 @@ class test_Exchange(Case):
|
|||
foo(chan).unbind_from('bar')
|
||||
self.assertIn('exchange_unbind', chan)
|
||||
|
||||
def test_declare__no_declare(self):
|
||||
chan = get_conn().channel()
|
||||
foo = Exchange('foo', 'topic', no_declare=True)
|
||||
foo(chan).declare()
|
||||
self.assertNotIn('exchange_declare', chan)
|
||||
|
||||
def test_declare__internal_exchange(self):
|
||||
chan = get_conn().channel()
|
||||
foo = Exchange('amq.rabbitmq.trace', 'topic')
|
||||
foo(chan).declare()
|
||||
self.assertNotIn('exchange_declare', chan)
|
||||
|
||||
def test_declare(self):
|
||||
chan = get_conn().channel()
|
||||
foo = Exchange('foo', 'topic', no_declare=False)
|
||||
foo(chan).declare()
|
||||
self.assertIn('exchange_declare', chan)
|
||||
|
||||
|
||||
class test_Queue(Case):
|
||||
|
||||
|
@ -257,6 +275,16 @@ class test_Queue(Case):
|
|||
q.declare()
|
||||
q.queue_declare.assert_called_with(False, passive=False)
|
||||
|
||||
def test_declare__no_declare(self):
|
||||
q = Queue('a', no_declare=True)
|
||||
q.queue_declare = Mock()
|
||||
q.queue_bind = Mock()
|
||||
q.exchange = None
|
||||
|
||||
q.declare()
|
||||
self.assertFalse(q.queue_declare.called)
|
||||
self.assertFalse(q.queue_bind.called)
|
||||
|
||||
def test_bind_to_when_name(self):
|
||||
chan = Mock()
|
||||
q = Queue('a')
|
||||
|
|
Loading…
Reference in New Issue