diff --git a/kombu/entity.py b/kombu/entity.py index 01c06744..df93101e 100644 --- a/kombu/entity.py +++ b/kombu/entity.py @@ -1,21 +1,47 @@ from copy import copy +TRANSIENT_DELIVERY_MODE = 1 +PERSISTENT_DELIVERY_MODE = 2 +DELIVERY_MODES = {"transient": TRANSIENT_DELIVERY_MODE, + "persistent": PERSISTENT_DELIVERY_MODE} + +class NotBoundError(Exception): + """Trying to call channel dependent method on unbound entity.""" + + +class Object(object): + attrs = () + + def __init__(self, *args, **kwargs): + any = lambda v: v + for name, type_ in self.attrs: + value = kwargs.get(name) + if value is not None: + setattr(self, name, (type_ or any)(value)) + + def __copy__(self): + return self.__class__(**dict((name, getattr(self, name)) + for name, _ in self.attrs)) + def assert_is_bound(fun): def only_if_bound(self, *args, **kwargs): if self.is_bound: - return fun(*args, **kwargs) - raise NotBoundError("Can't call %s on unbound %s" % ( - fun.__name__, self.__class__.__name__)) + return fun(self, *args, **kwargs) + raise NotBoundError( + "Can't call %s on %s not bound to a channel" % ( + fun.__name__, + self.__class__.__name__)) only_if_bound.__name__ = fun.__name__ return only_if_bound -class MaybeChannelBound(object): +class MaybeChannelBound(Object): """Mixin for classes that can be bound to an AMQP channel.""" channel = None + _is_bound = False def bind(self, channel): """Create copy of the instance that is bound to a channel.""" @@ -23,9 +49,10 @@ class MaybeChannelBound(object): def maybe_bind(self, channel): """Bind instance to channel if not already bound.""" - if not self.is_bound: + if not self.is_bound and channel: self.channel = channel self.when_bound() + self._is_bound = True return self def when_bound(self): @@ -35,7 +62,7 @@ class MaybeChannelBound(object): @property def is_bound(self): """Returns ``True`` if the entity is bound.""" - return self.channel is not None + return self._is_bound and self.channel is not None def __repr__(self, item=""): if self.is_bound: @@ -45,35 +72,29 @@ class MaybeChannelBound(object): class Exchange(MaybeChannelBound): - TRANSIENT_DELIVERY_MODE = 1 - PERSISTENT_DELIVERY_MODE = 2 - DELIVERY_MODES = { - "transient": TRANSIENT_DELIVERY_MODE, - "persistent": PERSISTENT_DELIVERY_MODE, - } + TRANSIENT_DELIVERY_MODE = TRANSIENT_DELIVERY_MODE + PERSISTENT_DELIVERY_MODE = PERSISTENT_DELIVERY_MODE name = "" type = "direct" routing_key = "" - delivery_mode = PERSISTENT_DELIVERY_MODE durable = True auto_delete = False - _init_opts = ("durable", "auto_delete", - "delivery_mode", "auto_declare") + delivery_mode = PERSISTENT_DELIVERY_MODE - def __init__(self, name="", type="", routing_key=None, channel=None, - **kwargs): + attrs = (("name", None), + ("type", None), + ("routing_key", None), + ("channel", None), + ("durable", bool), + ("auto_delete", bool), + ("delivery_mode", lambda m: DELIVERY_MODES.get(m) or m)) + + def __init__(self, name="", type="", routing_key="", **kwargs): + super(Exchange, self).__init__(**kwargs) self.name = name or self.name self.type = type or self.type self.routing_key = routing_key or self.routing_key - self.maybe_bind(channel) - - for opt_name in self._init_opts: - opt_value = kwargs.get(opt_name) - if opt_value is not None: - setattr(self, opt_name, opt_value) - - self.delivery_mode = self.DELIVERY_MODES.get(self.delivery_mode, - self.delivery_mode) + self.maybe_bind(self.channel) @assert_is_bound def declare(self): @@ -82,15 +103,16 @@ class Exchange(MaybeChannelBound): Creates the exchange on the broker. """ - self.channel.exchange_declare(exchange=self.name, - type=self.type, - durable=self.durable, - auto_delete=self.auto_delete) + return self.channel.exchange_declare(exchange=self.name, + type=self.type, + durable=self.durable, + auto_delete=self.auto_delete) @assert_is_bound def create_message(self, message_data, delivery_mode=None, priority=None, content_type=None, content_encoding=None, properties=None): + properties = properties or {} properties["delivery_mode"] = delivery_mode or self.delivery_mode return self.channel.prepare_message(message_data, properties=properties, @@ -99,31 +121,27 @@ class Exchange(MaybeChannelBound): content_encoding=content_encoding) @assert_is_bound - def publish(self, message, routing_key=None, - mandatory=False, immediate=False): + def publish(self, message, routing_key=None, mandatory=False, + immediate=False, headers=None): if routing_key is None: routing_key = self.routing_key - self.channel.basic_publish(message, - exchange=self.name, - routing_key=routing_key, - mandatory=mandatory, - immediate=immediate, - headers=headers) + return self.channel.basic_publish(message, + exchange=self.name, + routing_key=routing_key, + mandatory=mandatory, + immediate=immediate, + headers=headers) - def __copy__(self): - return self.__class__(name=self.name, - type=self.type, - routing_key=self.routing_key, - channel=self.channel, - **dict((name, getattr(self, name)) - for name in self._init_opts)) + @assert_is_bound + def delete(self, if_unused=False): + return self.channel.exchange_delete(self.name, if_unused=if_unused) def __repr__(self): - super(Exchange, self).__repr__("Exchange %s(%s)" % (self.name, - self.type)) + return super(Exchange, self).__repr__("Exchange %s(%s)" % (self.name, + self.type)) -class Binding(object): +class Binding(MaybeChannelBound): name = "" exchange = None routing_key = "" @@ -131,27 +149,24 @@ class Binding(object): durable = True exclusive = False auto_delete = False - warn_if_exists = False - _init_opts = ("durable", "exclusive", "auto_delete", - "warn_if_exists") - def __init__(self, name=None, exchange=None, routing_key=None, - channel=None, **kwargs): - # Binding. + attrs = (("name", None), + ("exchange", None), + ("routing_key", None), + ("channel", None), + ("durable", bool), + ("exclusive", bool), + ("auto_delete", bool)) + + def __init__(self, name="", exchange=None, routing_key="", **kwargs): + super(Binding, self).__init__(**kwargs) self.name = name or self.name self.exchange = exchange or self.exchange self.routing_key = routing_key or self.routing_key - self.maybe_bind(channel) - - # Options - for opt_name in self._init_opts: - opt_value = kwargs.get(opt_name) - if opt_value is not None: - setattr(self, opt_name, opt_value) - # exclusive implies auto-delete. if self.exclusive: self.auto_delete = True + self.maybe_bind(self.channel) def when_bound(self): self.exchange = self.exchange.bind(self.channel) @@ -160,16 +175,16 @@ class Binding(object): def declare(self): """Declares the queue, the exchange and binds the queue to the exchange.""" - if self.exchange: - self.exchange.declare() - if self.name: - self.channel.queue_declare(queue=self.name, - durable=self.durable, - exclusive=self.exclusive, - auto_delete=self.auto_delete) - self.channel.queue_bind(queue=self.name, - exchange=self.exchange.name, - routing_key=self.routing_key) + chan = self.channel + return (self.exchange and self.exchange.declare(), + self.name and chan.queue_declare(queue=self.name, + durable=self.durable, + exclusive=self.exclusive, + auto_delete=self.auto_delete), + self.name and chan.queue_bind(queue=self.name, + exchange=self.exchange.name, + routing_key=self.routing_key)) + @assert_is_bound def get(self, no_ack=None): @@ -183,26 +198,18 @@ class Binding(object): @assert_is_bound def consume(self, consumer_tag, callback, no_ack=None, nowait=True): - return self.channel.consume(queue=self.name, - no_ack=no_ack, - consumer_tag=consumer_tag, - callback=callback, - nowait=nowait) + return self.channel.basic_consume(queue=self.name, + no_ack=no_ack, + consumer_tag=consumer_tag, + callback=callback, + nowait=nowait) @assert_is_bound def cancel(self, consumer_tag): - self.channel.basic_cancel(consumer_tag) - - def __copy__(self): - return self.__class__(name=self.name, - exchange=self.exchange, - routing_key=self.routing_key, - channel=self.channel, - **dict((name, getattr(self, name) - for name in self._init_opts))) + return self.channel.basic_cancel(consumer_tag) def __repr__(self): - super(Binding, self).__repr__( - "Binding %s -> %s -> %s" % (self.name, - self.exchange, - self.routing_key)) + return super(Binding, self).__repr__( + "Binding %s -> %s -> %s" % (self.name, + self.exchange, + self.routing_key)) diff --git a/kombu/tests/__init__.py b/kombu/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/kombu/tests/test_entities.py b/kombu/tests/test_entities.py new file mode 100644 index 00000000..9b09bd1d --- /dev/null +++ b/kombu/tests/test_entities.py @@ -0,0 +1,159 @@ +import unittest2 as unittest + +from kombu.entity import Exchange, Binding, NotBoundError + + +class Channel(object): + + def __init__(self): + self.called = [] + + def _called(self, name): + self.called.append(name) + + def __contains__(self, key): + return key in self.called + + def exchange_declare(self, *args, **kwargs): + self._called("exchange_declare") + + def prepare_message(self, *args, **kwargs): + self._called("prepare_message") + + def basic_publish(self, *args, **kwargs): + self._called("basic_publish") + + def exchange_delete(self, *args, **kwargs): + self._called("exchange_delete") + + def queue_declare(self, *args, **kwargs): + self._called("queue_declare") + + def queue_bind(self, *args, **kwargs): + self._called("queue_bind") + + def basic_get(self, *args, **kwargs): + self._called("basic_get") + + def queue_purge(self, *args, **kwargs): + self._called("queue_purge") + + def basic_consume(self, *args, **kwargs): + self._called("basic_consume") + + def basic_cancel(self, *args, **kwargs): + self._called("basic_cancel") + + +class test_Exchange(unittest.TestCase): + + def test_bound(self): + exchange = Exchange("foo", "direct") + self.assertFalse(exchange.is_bound) + self.assertIn("