mirror of https://github.com/celery/kombu.git
Added unittets for kombu.entities (Exchange/Binding)
This commit is contained in:
parent
db24c9d1f7
commit
9a4074b568
187
kombu/entity.py
187
kombu/entity.py
|
@ -1,21 +1,47 @@
|
|||
from copy import copy
|
||||
|
||||
TRANSIENT_DELIVERY_MODE = 1
|
||||
PERSISTENT_DELIVERY_MODE = 2
|
||||
DELIVERY_MODES = {"transient": TRANSIENT_DELIVERY_MODE,
|
||||
"persistent": PERSISTENT_DELIVERY_MODE}
|
||||
|
||||
class NotBoundError(Exception):
|
||||
"""Trying to call channel dependent method on unbound entity."""
|
||||
|
||||
|
||||
class Object(object):
|
||||
attrs = ()
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
any = lambda v: v
|
||||
for name, type_ in self.attrs:
|
||||
value = kwargs.get(name)
|
||||
if value is not None:
|
||||
setattr(self, name, (type_ or any)(value))
|
||||
|
||||
def __copy__(self):
|
||||
return self.__class__(**dict((name, getattr(self, name))
|
||||
for name, _ in self.attrs))
|
||||
|
||||
|
||||
def assert_is_bound(fun):
|
||||
|
||||
def only_if_bound(self, *args, **kwargs):
|
||||
if self.is_bound:
|
||||
return fun(*args, **kwargs)
|
||||
raise NotBoundError("Can't call %s on unbound %s" % (
|
||||
fun.__name__, self.__class__.__name__))
|
||||
return fun(self, *args, **kwargs)
|
||||
raise NotBoundError(
|
||||
"Can't call %s on %s not bound to a channel" % (
|
||||
fun.__name__,
|
||||
self.__class__.__name__))
|
||||
only_if_bound.__name__ = fun.__name__
|
||||
|
||||
return only_if_bound
|
||||
|
||||
|
||||
class MaybeChannelBound(object):
|
||||
class MaybeChannelBound(Object):
|
||||
"""Mixin for classes that can be bound to an AMQP channel."""
|
||||
channel = None
|
||||
_is_bound = False
|
||||
|
||||
def bind(self, channel):
|
||||
"""Create copy of the instance that is bound to a channel."""
|
||||
|
@ -23,9 +49,10 @@ class MaybeChannelBound(object):
|
|||
|
||||
def maybe_bind(self, channel):
|
||||
"""Bind instance to channel if not already bound."""
|
||||
if not self.is_bound:
|
||||
if not self.is_bound and channel:
|
||||
self.channel = channel
|
||||
self.when_bound()
|
||||
self._is_bound = True
|
||||
return self
|
||||
|
||||
def when_bound(self):
|
||||
|
@ -35,7 +62,7 @@ class MaybeChannelBound(object):
|
|||
@property
|
||||
def is_bound(self):
|
||||
"""Returns ``True`` if the entity is bound."""
|
||||
return self.channel is not None
|
||||
return self._is_bound and self.channel is not None
|
||||
|
||||
def __repr__(self, item=""):
|
||||
if self.is_bound:
|
||||
|
@ -45,35 +72,29 @@ class MaybeChannelBound(object):
|
|||
|
||||
|
||||
class Exchange(MaybeChannelBound):
|
||||
TRANSIENT_DELIVERY_MODE = 1
|
||||
PERSISTENT_DELIVERY_MODE = 2
|
||||
DELIVERY_MODES = {
|
||||
"transient": TRANSIENT_DELIVERY_MODE,
|
||||
"persistent": PERSISTENT_DELIVERY_MODE,
|
||||
}
|
||||
TRANSIENT_DELIVERY_MODE = TRANSIENT_DELIVERY_MODE
|
||||
PERSISTENT_DELIVERY_MODE = PERSISTENT_DELIVERY_MODE
|
||||
name = ""
|
||||
type = "direct"
|
||||
routing_key = ""
|
||||
delivery_mode = PERSISTENT_DELIVERY_MODE
|
||||
durable = True
|
||||
auto_delete = False
|
||||
_init_opts = ("durable", "auto_delete",
|
||||
"delivery_mode", "auto_declare")
|
||||
delivery_mode = PERSISTENT_DELIVERY_MODE
|
||||
|
||||
def __init__(self, name="", type="", routing_key=None, channel=None,
|
||||
**kwargs):
|
||||
attrs = (("name", None),
|
||||
("type", None),
|
||||
("routing_key", None),
|
||||
("channel", None),
|
||||
("durable", bool),
|
||||
("auto_delete", bool),
|
||||
("delivery_mode", lambda m: DELIVERY_MODES.get(m) or m))
|
||||
|
||||
def __init__(self, name="", type="", routing_key="", **kwargs):
|
||||
super(Exchange, self).__init__(**kwargs)
|
||||
self.name = name or self.name
|
||||
self.type = type or self.type
|
||||
self.routing_key = routing_key or self.routing_key
|
||||
self.maybe_bind(channel)
|
||||
|
||||
for opt_name in self._init_opts:
|
||||
opt_value = kwargs.get(opt_name)
|
||||
if opt_value is not None:
|
||||
setattr(self, opt_name, opt_value)
|
||||
|
||||
self.delivery_mode = self.DELIVERY_MODES.get(self.delivery_mode,
|
||||
self.delivery_mode)
|
||||
self.maybe_bind(self.channel)
|
||||
|
||||
@assert_is_bound
|
||||
def declare(self):
|
||||
|
@ -82,15 +103,16 @@ class Exchange(MaybeChannelBound):
|
|||
Creates the exchange on the broker.
|
||||
|
||||
"""
|
||||
self.channel.exchange_declare(exchange=self.name,
|
||||
type=self.type,
|
||||
durable=self.durable,
|
||||
auto_delete=self.auto_delete)
|
||||
return self.channel.exchange_declare(exchange=self.name,
|
||||
type=self.type,
|
||||
durable=self.durable,
|
||||
auto_delete=self.auto_delete)
|
||||
|
||||
@assert_is_bound
|
||||
def create_message(self, message_data, delivery_mode=None,
|
||||
priority=None, content_type=None, content_encoding=None,
|
||||
properties=None):
|
||||
properties = properties or {}
|
||||
properties["delivery_mode"] = delivery_mode or self.delivery_mode
|
||||
return self.channel.prepare_message(message_data,
|
||||
properties=properties,
|
||||
|
@ -99,31 +121,27 @@ class Exchange(MaybeChannelBound):
|
|||
content_encoding=content_encoding)
|
||||
|
||||
@assert_is_bound
|
||||
def publish(self, message, routing_key=None,
|
||||
mandatory=False, immediate=False):
|
||||
def publish(self, message, routing_key=None, mandatory=False,
|
||||
immediate=False, headers=None):
|
||||
if routing_key is None:
|
||||
routing_key = self.routing_key
|
||||
self.channel.basic_publish(message,
|
||||
exchange=self.name,
|
||||
routing_key=routing_key,
|
||||
mandatory=mandatory,
|
||||
immediate=immediate,
|
||||
headers=headers)
|
||||
return self.channel.basic_publish(message,
|
||||
exchange=self.name,
|
||||
routing_key=routing_key,
|
||||
mandatory=mandatory,
|
||||
immediate=immediate,
|
||||
headers=headers)
|
||||
|
||||
def __copy__(self):
|
||||
return self.__class__(name=self.name,
|
||||
type=self.type,
|
||||
routing_key=self.routing_key,
|
||||
channel=self.channel,
|
||||
**dict((name, getattr(self, name))
|
||||
for name in self._init_opts))
|
||||
@assert_is_bound
|
||||
def delete(self, if_unused=False):
|
||||
return self.channel.exchange_delete(self.name, if_unused=if_unused)
|
||||
|
||||
def __repr__(self):
|
||||
super(Exchange, self).__repr__("Exchange %s(%s)" % (self.name,
|
||||
self.type))
|
||||
return super(Exchange, self).__repr__("Exchange %s(%s)" % (self.name,
|
||||
self.type))
|
||||
|
||||
|
||||
class Binding(object):
|
||||
class Binding(MaybeChannelBound):
|
||||
name = ""
|
||||
exchange = None
|
||||
routing_key = ""
|
||||
|
@ -131,27 +149,24 @@ class Binding(object):
|
|||
durable = True
|
||||
exclusive = False
|
||||
auto_delete = False
|
||||
warn_if_exists = False
|
||||
_init_opts = ("durable", "exclusive", "auto_delete",
|
||||
"warn_if_exists")
|
||||
|
||||
def __init__(self, name=None, exchange=None, routing_key=None,
|
||||
channel=None, **kwargs):
|
||||
# Binding.
|
||||
attrs = (("name", None),
|
||||
("exchange", None),
|
||||
("routing_key", None),
|
||||
("channel", None),
|
||||
("durable", bool),
|
||||
("exclusive", bool),
|
||||
("auto_delete", bool))
|
||||
|
||||
def __init__(self, name="", exchange=None, routing_key="", **kwargs):
|
||||
super(Binding, self).__init__(**kwargs)
|
||||
self.name = name or self.name
|
||||
self.exchange = exchange or self.exchange
|
||||
self.routing_key = routing_key or self.routing_key
|
||||
self.maybe_bind(channel)
|
||||
|
||||
# Options
|
||||
for opt_name in self._init_opts:
|
||||
opt_value = kwargs.get(opt_name)
|
||||
if opt_value is not None:
|
||||
setattr(self, opt_name, opt_value)
|
||||
|
||||
# exclusive implies auto-delete.
|
||||
if self.exclusive:
|
||||
self.auto_delete = True
|
||||
self.maybe_bind(self.channel)
|
||||
|
||||
def when_bound(self):
|
||||
self.exchange = self.exchange.bind(self.channel)
|
||||
|
@ -160,16 +175,16 @@ class Binding(object):
|
|||
def declare(self):
|
||||
"""Declares the queue, the exchange and binds the queue to
|
||||
the exchange."""
|
||||
if self.exchange:
|
||||
self.exchange.declare()
|
||||
if self.name:
|
||||
self.channel.queue_declare(queue=self.name,
|
||||
durable=self.durable,
|
||||
exclusive=self.exclusive,
|
||||
auto_delete=self.auto_delete)
|
||||
self.channel.queue_bind(queue=self.name,
|
||||
exchange=self.exchange.name,
|
||||
routing_key=self.routing_key)
|
||||
chan = self.channel
|
||||
return (self.exchange and self.exchange.declare(),
|
||||
self.name and chan.queue_declare(queue=self.name,
|
||||
durable=self.durable,
|
||||
exclusive=self.exclusive,
|
||||
auto_delete=self.auto_delete),
|
||||
self.name and chan.queue_bind(queue=self.name,
|
||||
exchange=self.exchange.name,
|
||||
routing_key=self.routing_key))
|
||||
|
||||
|
||||
@assert_is_bound
|
||||
def get(self, no_ack=None):
|
||||
|
@ -183,26 +198,18 @@ class Binding(object):
|
|||
|
||||
@assert_is_bound
|
||||
def consume(self, consumer_tag, callback, no_ack=None, nowait=True):
|
||||
return self.channel.consume(queue=self.name,
|
||||
no_ack=no_ack,
|
||||
consumer_tag=consumer_tag,
|
||||
callback=callback,
|
||||
nowait=nowait)
|
||||
return self.channel.basic_consume(queue=self.name,
|
||||
no_ack=no_ack,
|
||||
consumer_tag=consumer_tag,
|
||||
callback=callback,
|
||||
nowait=nowait)
|
||||
|
||||
@assert_is_bound
|
||||
def cancel(self, consumer_tag):
|
||||
self.channel.basic_cancel(consumer_tag)
|
||||
|
||||
def __copy__(self):
|
||||
return self.__class__(name=self.name,
|
||||
exchange=self.exchange,
|
||||
routing_key=self.routing_key,
|
||||
channel=self.channel,
|
||||
**dict((name, getattr(self, name)
|
||||
for name in self._init_opts)))
|
||||
return self.channel.basic_cancel(consumer_tag)
|
||||
|
||||
def __repr__(self):
|
||||
super(Binding, self).__repr__(
|
||||
"Binding %s -> %s -> %s" % (self.name,
|
||||
self.exchange,
|
||||
self.routing_key))
|
||||
return super(Binding, self).__repr__(
|
||||
"Binding %s -> %s -> %s" % (self.name,
|
||||
self.exchange,
|
||||
self.routing_key))
|
||||
|
|
|
@ -0,0 +1,159 @@
|
|||
import unittest2 as unittest
|
||||
|
||||
from kombu.entity import Exchange, Binding, NotBoundError
|
||||
|
||||
|
||||
class Channel(object):
|
||||
|
||||
def __init__(self):
|
||||
self.called = []
|
||||
|
||||
def _called(self, name):
|
||||
self.called.append(name)
|
||||
|
||||
def __contains__(self, key):
|
||||
return key in self.called
|
||||
|
||||
def exchange_declare(self, *args, **kwargs):
|
||||
self._called("exchange_declare")
|
||||
|
||||
def prepare_message(self, *args, **kwargs):
|
||||
self._called("prepare_message")
|
||||
|
||||
def basic_publish(self, *args, **kwargs):
|
||||
self._called("basic_publish")
|
||||
|
||||
def exchange_delete(self, *args, **kwargs):
|
||||
self._called("exchange_delete")
|
||||
|
||||
def queue_declare(self, *args, **kwargs):
|
||||
self._called("queue_declare")
|
||||
|
||||
def queue_bind(self, *args, **kwargs):
|
||||
self._called("queue_bind")
|
||||
|
||||
def basic_get(self, *args, **kwargs):
|
||||
self._called("basic_get")
|
||||
|
||||
def queue_purge(self, *args, **kwargs):
|
||||
self._called("queue_purge")
|
||||
|
||||
def basic_consume(self, *args, **kwargs):
|
||||
self._called("basic_consume")
|
||||
|
||||
def basic_cancel(self, *args, **kwargs):
|
||||
self._called("basic_cancel")
|
||||
|
||||
|
||||
class test_Exchange(unittest.TestCase):
|
||||
|
||||
def test_bound(self):
|
||||
exchange = Exchange("foo", "direct")
|
||||
self.assertFalse(exchange.is_bound)
|
||||
self.assertIn("<unbound", repr(exchange))
|
||||
|
||||
chan = Channel()
|
||||
bound = exchange.bind(chan)
|
||||
self.assertTrue(bound.is_bound)
|
||||
self.assertIs(bound.channel, chan)
|
||||
self.assertIn("<bound", repr(bound))
|
||||
|
||||
def test_assert_is_bound(self):
|
||||
exchange = Exchange("foo", "direct")
|
||||
self.assertRaises(NotBoundError, exchange.declare)
|
||||
|
||||
chan = Channel()
|
||||
exchange.bind(chan).declare()
|
||||
self.assertIn("exchange_declare", chan)
|
||||
|
||||
def test_set_transient_delivery_mode(self):
|
||||
exc = Exchange("foo", "direct", delivery_mode="transient")
|
||||
self.assertEqual(exc.delivery_mode, Exchange.TRANSIENT_DELIVERY_MODE)
|
||||
|
||||
def test_set_persistent_delivery_mode(self):
|
||||
exc = Exchange("foo", "direct", delivery_mode="persistent")
|
||||
self.assertEqual(exc.delivery_mode, Exchange.PERSISTENT_DELIVERY_MODE)
|
||||
|
||||
def test_bind_at_instantiation(self):
|
||||
self.assertTrue(Exchange("foo", channel=Channel()).is_bound)
|
||||
|
||||
def test_create_message(self):
|
||||
chan = Channel()
|
||||
Exchange("foo", channel=chan).create_message({"foo": "bar"})
|
||||
self.assertIn("prepare_message", chan)
|
||||
|
||||
def test_publish(self):
|
||||
chan = Channel()
|
||||
Exchange("foo", channel=chan).publish("the quick brown fox")
|
||||
self.assertIn("basic_publish", chan)
|
||||
|
||||
def test_delete(self):
|
||||
chan = Channel()
|
||||
Exchange("foo", channel=chan).delete()
|
||||
self.assertIn("exchange_delete", chan)
|
||||
|
||||
def test__repr__(self):
|
||||
b = Exchange("foo", "topic")
|
||||
self.assertIn("foo(topic)", repr(b))
|
||||
self.assertIn("Exchange", repr(b))
|
||||
|
||||
|
||||
|
||||
class test_Binding(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.exchange = Exchange("foo", "direct")
|
||||
|
||||
def test_exclusive_implies_auto_delete(self):
|
||||
self.assertTrue(
|
||||
Binding("foo", self.exchange, exclusive=True).auto_delete)
|
||||
|
||||
def test_binds_at_instantiation(self):
|
||||
self.assertTrue(
|
||||
Binding("foo", self.exchange, channel=Channel()).is_bound)
|
||||
|
||||
def test_also_binds_exchange(self):
|
||||
chan = Channel()
|
||||
b = Binding("foo", self.exchange)
|
||||
self.assertFalse(b.is_bound)
|
||||
self.assertFalse(b.exchange.is_bound)
|
||||
b = b.bind(chan)
|
||||
self.assertTrue(b.is_bound)
|
||||
self.assertTrue(b.exchange.is_bound)
|
||||
self.assertIs(b.channel, b.exchange.channel)
|
||||
self.assertIsNot(b.exchange, self.exchange)
|
||||
|
||||
def test_declare(self):
|
||||
chan = Channel()
|
||||
b = Binding("foo", self.exchange, "foo", channel=chan)
|
||||
self.assertTrue(b.is_bound)
|
||||
b.declare()
|
||||
self.assertIn("exchange_declare", chan)
|
||||
self.assertIn("queue_declare", chan)
|
||||
self.assertIn("queue_bind", chan)
|
||||
|
||||
def test_get(self):
|
||||
b = Binding("foo", self.exchange, "foo", channel=Channel())
|
||||
b.get()
|
||||
self.assertIn("basic_get", b.channel)
|
||||
|
||||
def test_purge(self):
|
||||
b = Binding("foo", self.exchange, "foo", channel=Channel())
|
||||
b.purge()
|
||||
self.assertIn("queue_purge", b.channel)
|
||||
|
||||
def test_consume(self):
|
||||
b = Binding("foo", self.exchange, "foo", channel=Channel())
|
||||
b.consume("fifafo", None)
|
||||
self.assertIn("basic_consume", b.channel)
|
||||
|
||||
def test_cancel(self):
|
||||
b = Binding("foo", self.exchange, "foo", channel=Channel())
|
||||
b.cancel("fifafo")
|
||||
self.assertIn("basic_cancel", b.channel)
|
||||
|
||||
def test__repr__(self):
|
||||
b = Binding("foo", self.exchange, "foo")
|
||||
self.assertIn("foo", repr(b))
|
||||
self.assertIn("Binding", repr(b))
|
||||
|
Loading…
Reference in New Issue