diff --git a/README.rst b/README.rst index fc306bea..869588eb 100644 --- a/README.rst +++ b/README.rst @@ -11,10 +11,10 @@ Carrot will be discontinued in favor of Kombu. Proposed API:: from kombu.connection BrokerConnection - from kombu.messaging import Exchange, Binding, Consumer, Producer + from kombu.messaging import Exchange, Queue, Consumer, Producer media_exchange = Exchange("media", "direct", durable=True) - video_binding = Binding("video", exchange=media_exchange, key="video") + video_queue = Queue("video", exchange=media_exchange, key="video") # connections/channels connection = BrokerConnection("localhost", "guest", "guest", "/") @@ -25,7 +25,7 @@ Proposed API:: producer.publish({"name": "/tmp/lolcat1.avi", "size": 1301013}) # consume - consumer = Consumer(channel, video_binding) + consumer = Consumer(channel, video_queue) consumer.register_callback(process_media) consumer.consume() @@ -34,10 +34,10 @@ Proposed API:: # consumerset: - video_binding = Binding("video", exchange=media_exchange, key="video") - image_binding = Binding("image", exchange=media_exchange, key="image") + video_queue = Queue("video", exchange=media_exchange, key="video") + image_queue = Queue("image", exchange=media_exchange, key="image") - consumer = Consumer(channel, [video_binding, image_binding]) + consumer = Consumer(channel, [video_queue, image_queue]) consumer.consume() while True: @@ -45,7 +45,7 @@ Proposed API:: -Exchanges/Bindings can be bound to a channel:: +Exchanges/Queue can be bound to a channel:: >>> exchange = Exchange("tasks", "direct") @@ -210,13 +210,13 @@ First we open up a Python shell and start a message consumer. This consumer declares a queue named ``"feed"``, receiving messages with the routing key ``"importer"`` from the ``"feed"`` exchange. - >>> from kombu import Exchange, Binding, Consumer + >>> from kombu import Exchange, Queue, Consumer >>> feed_exchange = Exchange("feed", type="direct") - >>> feed_binding = Binding("feed", feed_exchange, "importer") + >>> feed_queue = Queue("feed", feed_exchange, "importer") >>> channel = connection.channel() - >>> consumer = Consumer(channel, [feed_binding]) + >>> consumer = Consumer(channel, [feed_queue]) >>> def import_feed_callback(message_data, message) ... feed_url = message_data["import_feed"] @@ -347,7 +347,7 @@ This method returns a ``Message`` object, from where you can get the message body, de-serialize the body to get the data, acknowledge, reject or re-queue the message. - >>> consumer = Consumer(channel, bindings) + >>> consumer = Consumer(channel, queues) >>> message = consumer.get() >>> if message: ... message_data = message.payload @@ -371,7 +371,7 @@ can define the above producer and consumer like so: ... "feed_url": feed_url}) >>> class FeedConsumer(Consumer): - ... bindings = bindings + ... queues = queues ... ... def receive(self, message_data, message): ... action = message_data["action"] diff --git a/kombu/__init__.py b/kombu/__init__.py index a3df6f5d..e8ffde46 100644 --- a/kombu/__init__.py +++ b/kombu/__init__.py @@ -9,5 +9,5 @@ __docformat__ = "restructuredtext" import os if not os.environ.get("KOMBU_NO_EVAL", False): from kombu.connection import BrokerConnection - from kombu.entity import Exchange, Binding + from kombu.entity import Exchange, Queue from kombu.messaging import Consumer, Producer diff --git a/kombu/compat.py b/kombu/compat.py index 224f2e33..50406348 100644 --- a/kombu/compat.py +++ b/kombu/compat.py @@ -12,7 +12,7 @@ def iterconsume(connection, consumer, no_ack=False, limit=None): yield connection.drain_events() -def entry_to_binding(queue, **options): +def entry_to_queue(queue, **options): binding_key = options.get("binding_key") or options.get("routing_key") e_durable = options.get("exchange_durable") or options.get("durable") e_auto_delete = options.get("exchange_auto_delete") or \ @@ -30,14 +30,14 @@ def entry_to_binding(queue, **options): durable=e_durable, auto_delete=e_auto_delete, arguments=e_arguments) - return entity.Binding(queue, - exchange=exchange, - routing_key=binding_key, - durable=q_durable, - exclusive=options.get("exclusive"), - auto_delete=q_auto_delete, - queue_arguments=q_arguments, - binding_arguments=b_arguments) + return entity.Queue(queue, + exchange=exchange, + routing_key=binding_key, + durable=q_durable, + exclusive=options.get("exclusive"), + auto_delete=q_auto_delete, + queue_arguments=q_arguments, + binding_arguments=b_arguments) class Publisher(messaging.Producer): @@ -120,13 +120,13 @@ class Consumer(messaging.Consumer): routing_key=self.routing_key, auto_delete=self.auto_delete, durable=self.durable) - binding = entity.Binding(self.queue, - exchange=exchange, - routing_key=self.routing_key, - durable=self.durable, - exclusive=self.exclusive, - auto_delete=self.auto_delete) - super(Consumer, self).__init__(self.backend, binding, **kwargs) + queue = entity.Queue(self.queue, + exchange=exchange, + routing_key=self.routing_key, + durable=self.durable, + exclusive=self.exclusive, + auto_delete=self.auto_delete) + super(Consumer, self).__init__(self.backend, queue, **kwargs) def close(self): self.cancel() @@ -145,7 +145,7 @@ class Consumer(messaging.Consumer): def fetch(self, no_ack=None, enable_callbacks=False): if no_ack is None: no_ack = self.no_ack - message = self.bindings[0].get(no_ack) + message = self.queues[0].get(no_ack) if message: if enable_callbacks: self.receive(message.payload, message) @@ -190,10 +190,10 @@ class _CSet(messaging.Consumer): return self.purge() def add_consumer_from_dict(self, queue, **options): - self.bindings.append(entry_to_binding(queue, **options)) + self.queues.append(entry_to_queue(queue, **options)) def add_consumer(self, consumer): - self.bindings.extend(consumer.bindings) + self.queues.extend(consumer.queues) def close(self): self.cancel() @@ -203,12 +203,12 @@ class _CSet(messaging.Consumer): def ConsumerSet(connection, from_dict=None, consumers=None, callbacks=None, **kwargs): - bindings = [] + queues = [] if consumers: for consumer in consumers: - map(bindings.extend, consumer.bindings) + map(queues.extend, consumer.queues) if from_dict: for queue_name, queue_options in from_dict.items(): - bindings.append(entry_to_binding(queue_name, **queue_options)) + queues.append(entry_to_queue(queue_name, **queue_options)) - return _CSet(connection, bindings, **kwargs) + return _CSet(connection, queues, **kwargs) diff --git a/kombu/entity.py b/kombu/entity.py index cad25677..e533be28 100644 --- a/kombu/entity.py +++ b/kombu/entity.py @@ -36,10 +36,10 @@ class Exchange(MaybeChannelBound): * ``topic`` Wildcard match between the routing key and the routing pattern - specified in the binding. The routing key is treated as zero - or more words delimited by ``"."`` and supports special - wildcard characters. ``"*"`` matches a single word and ``"#"`` - matches zero or more words. + specified in the exchange/queue binding. The routing key is + treated as zero or more words delimited by ``"."`` and + supports special wildcard characters. ``"*"`` matches a + single word and ``"#"`` matches zero or more words. * ``fanout`` @@ -233,8 +233,8 @@ class Exchange(MaybeChannelBound): self.type)) -class Binding(MaybeChannelBound): - """A Queue declaration and its binding. +class Queue(MaybeChannelBound): + """A Queue declaration. :keyword name: See :attr:`name`. :keyword exchange: See :attr:`exchange`. @@ -283,7 +283,7 @@ class Binding(MaybeChannelBound): .. attribute:: channel - The channel the Binding is bound to (if bound). + The channel the Queue is bound to (if bound). .. attribute:: durable @@ -322,19 +322,20 @@ class Binding(MaybeChannelBound): **Usage** - Example creating a binding for our exchange in the :class:`Exchange` + Example creating a queue using our exchange in the :class:`Exchange` example:: - >>> science_news = Binding("science_news", - ... exchange=news_exchange, - ... routing_key="news.science") + >>> science_news = Queue("science_news", + ... exchange=news_exchange, + ... routing_key="news.science") For now ``science_news`` is just a declaration, you can't perform - actions on it. It just describes the name and options for the binding. + actions on it. It just describes the name and options for the queue. - The binding can be bound or unbound. Bound means the binding is + The queue can be bound or unbound. Bound means the queue is associated with a channel and operations can be performed on it. - To bind the binding you call the binding with the channel as argument:: + To bind the queue you call the queue instance with the channel as + an argument:: >>> bound_science_news = science_news(channel) @@ -364,7 +365,7 @@ class Binding(MaybeChannelBound): def __init__(self, name="", exchange=None, routing_key="", channel=None, **kwargs): - super(Binding, self).__init__(**kwargs) + super(Queue, self).__init__(**kwargs) self.name = name or self.name self.exchange = exchange or self.exchange self.routing_key = routing_key or self.routing_key @@ -492,7 +493,9 @@ class Binding(MaybeChannelBound): arguments=self.binding_arguments) def __repr__(self): - return super(Binding, self).__repr__( - "Binding %s -> %s -> %s" % (self.name, - self.exchange, + return super(Queue, self).__repr__( + "Queue %s -> %s -> %s" % (self.name, + self.exchange, self.routing_key)) + +Binding = Queue diff --git a/kombu/messaging.py b/kombu/messaging.py index f83b5d8e..43b9bde9 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -2,7 +2,8 @@ from itertools import count from kombu import serialization from kombu.compression import compress -from kombu.entity import Exchange, Binding +from kombu.entity import Exchange, Queue +from kombu.entity import Binding # TODO Remove from kombu.utils import maybe_list @@ -132,7 +133,7 @@ class Consumer(object): """Message consumer. :param channel: see :attr:`channel`. - :param bindings: see :attr:`bindings`. + :param queues see :attr:`queues`. :keyword no_ack: see :attr:`no_ack`. :keyword auto_declare: see :attr:`auto_declare` :keyword callbacks: see :attr:`callbacks`. @@ -142,9 +143,10 @@ class Consumer(object): The connection channel to use. - .. attribute:: bindings + .. attribute:: queues - A single binding, or a list of bindings to consume from. + A single :class:`~kombu.entity.Queue`, or a list of queues to + consume from. .. attribute:: auto_declare @@ -176,10 +178,10 @@ class Consumer(object): _next_tag = count(1).next # global _consuming = False - def __init__(self, channel, bindings, no_ack=None, auto_declare=None, + def __init__(self, channel, queues, no_ack=None, auto_declare=None, callbacks=None, on_decode_error=None): self.channel = channel - self.bindings = bindings + self.queues = queues if no_ack is not None: self.no_ack = no_ack if auto_declare is not None: @@ -193,21 +195,21 @@ class Consumer(object): self.callbacks = [] self._active_tags = {} - self.bindings = [binding(self.channel) - for binding in maybe_list(self.bindings)] + self.queues = [queue(self.channel) + for queue in maybe_list(self.queues)] if self.auto_declare: self.declare() def declare(self): - """Declare queues, bindings and exchanges. + """Declare queues, exchanges and bindings. This is done automatically at instantiation if :attr:`auto_declare` is set. """ - for binding in self.bindings: - binding.declare() + for queue in self.queues: + queue.declare() def consume(self, delivery_tag=None): """Register consumer on server. @@ -217,9 +219,9 @@ class Consumer(object): """ if not self._consuming: - H, T = self.bindings[:-1], self.bindings[-1] - for binding in H: - binding.consume(self._add_tag(binding, delivery_tag), + H, T = self.queues[:-1], self.queues[-1] + for queue in H: + queue.consume(self._add_tag(queue, delivery_tag), self._receive_callback, self.no_ack, nowait=True) @@ -265,7 +267,7 @@ class Consumer(object): undo operation available. """ - return sum(binding.purge() for binding in self.bindings) + return sum(queue.purge() for queue in self.queues) def cancel(self): """End all active queue consumers. @@ -274,8 +276,8 @@ class Consumer(object): mean the server will not send any more messages for this consumer. """ - for binding, tag in self._active_tags.items(): - binding.cancel(tag) + for queue, tag in self._active_tags.items(): + queue.cancel(tag) self._active_tags.clear() self._consuming = False @@ -336,9 +338,9 @@ class Consumer(object): """ return self.channel.basic_recover(requeue=requeue) - def _add_tag(self, binding, delivery_tag=None): + def _add_tag(self, queue, delivery_tag=None): tag = delivery_tag or str(self._next_tag()) - self._active_tags[binding] = tag + self._active_tags[queue] = tag return tag def _receive_callback(self, raw_message): diff --git a/kombu/tests/test_entities.py b/kombu/tests/test_entities.py index 12302ec4..473c1d4c 100644 --- a/kombu/tests/test_entities.py +++ b/kombu/tests/test_entities.py @@ -1,6 +1,6 @@ import unittest2 as unittest -from kombu.entity import Exchange, Binding +from kombu.entity import Exchange, Queue from kombu.exceptions import NotBoundError from kombu.tests.mocks import Channel @@ -59,22 +59,22 @@ class test_Exchange(unittest.TestCase): self.assertIn("Exchange", repr(b)) -class test_Binding(unittest.TestCase): +class test_Queue(unittest.TestCase): def setUp(self): self.exchange = Exchange("foo", "direct") def test_exclusive_implies_auto_delete(self): self.assertTrue( - Binding("foo", self.exchange, exclusive=True).auto_delete) + Queue("foo", self.exchange, exclusive=True).auto_delete) def test_binds_at_instantiation(self): self.assertTrue( - Binding("foo", self.exchange, channel=Channel()).is_bound) + Queue("foo", self.exchange, channel=Channel()).is_bound) def test_also_binds_exchange(self): chan = Channel() - b = Binding("foo", self.exchange) + b = Queue("foo", self.exchange) self.assertFalse(b.is_bound) self.assertFalse(b.exchange.is_bound) b = b.bind(chan) @@ -85,7 +85,7 @@ class test_Binding(unittest.TestCase): def test_declare(self): chan = Channel() - b = Binding("foo", self.exchange, "foo", channel=chan) + b = Queue("foo", self.exchange, "foo", channel=chan) self.assertTrue(b.is_bound) b.declare() self.assertIn("exchange_declare", chan) @@ -93,31 +93,31 @@ class test_Binding(unittest.TestCase): self.assertIn("queue_bind", chan) def test_get(self): - b = Binding("foo", self.exchange, "foo", channel=Channel()) + b = Queue("foo", self.exchange, "foo", channel=Channel()) b.get() self.assertIn("basic_get", b.channel) def test_purge(self): - b = Binding("foo", self.exchange, "foo", channel=Channel()) + b = Queue("foo", self.exchange, "foo", channel=Channel()) b.purge() self.assertIn("queue_purge", b.channel) def test_consume(self): - b = Binding("foo", self.exchange, "foo", channel=Channel()) + b = Queue("foo", self.exchange, "foo", channel=Channel()) b.consume("fifafo", None) self.assertIn("basic_consume", b.channel) def test_cancel(self): - b = Binding("foo", self.exchange, "foo", channel=Channel()) + b = Queue("foo", self.exchange, "foo", channel=Channel()) b.cancel("fifafo") self.assertIn("basic_cancel", b.channel) def test_delete(self): - b = Binding("foo", self.exchange, "foo", channel=Channel()) + b = Queue("foo", self.exchange, "foo", channel=Channel()) b.delete() self.assertIn("queue_delete", b.channel) def test__repr__(self): - b = Binding("foo", self.exchange, "foo") + b = Queue("foo", self.exchange, "foo") self.assertIn("foo", repr(b)) - self.assertIn("Binding", repr(b)) + self.assertIn("Queue", repr(b)) diff --git a/kombu/tests/test_functional/disabled_pika.py b/kombu/tests/test_functional/disabled_pika.py index 484345e2..97df73d3 100644 --- a/kombu/tests/test_functional/disabled_pika.py +++ b/kombu/tests/test_functional/disabled_pika.py @@ -5,7 +5,7 @@ import unittest2 as unittest from nose import SkipTest from kombu import BrokerConnection -from kombu import Producer, Consumer, Exchange, Binding +from kombu import Producer, Consumer, Exchange, Queue def consumeN(conn, consumer, n=1): @@ -41,7 +41,7 @@ class test_pika(unittest.TestCase): else: self.connected = True self.exchange = Exchange("tamqplib", "direct") - self.binding = Binding("tamqplib", self.exchange, "tamqplib") + self.queue = Queue("tamqplib", self.exchange, "tamqplib") def test_produce__consume(self): if not self.connected: @@ -53,7 +53,7 @@ class test_pika(unittest.TestCase): chan1.close() chan2 = self.connection.channel() - consumer = Consumer(chan2, self.binding) + consumer = Consumer(chan2, self.queue) message = consumeN(self.connection, consumer) self.assertDictEqual(message[0], {"foo": "bar"}) chan2.close() @@ -64,9 +64,9 @@ class test_pika(unittest.TestCase): raise SkipTest("Broker not running.") chan1 = self.connection.channel() producer = Producer(chan1, self.exchange) - b1 = Binding("pyamqplib.b1", self.exchange, "b1") - b2 = Binding("pyamqplib.b2", self.exchange, "b2") - b3 = Binding("pyamqplib.b3", self.exchange, "b3") + b1 = Queue("pyamqplib.b1", self.exchange, "b1") + b2 = Queue("pyamqplib.b2", self.exchange, "b2") + b3 = Queue("pyamqplib.b3", self.exchange, "b3") producer.publish("b1", routing_key="b1") producer.publish("b2", routing_key="b2") @@ -84,8 +84,8 @@ class test_pika(unittest.TestCase): if not self.connected: raise SkipTest("Broker not running.") chan = self.connection.channel() - self.purge([self.binding.name]) - consumer = Consumer(chan, self.binding) + self.purge([self.queue.name]) + consumer = Consumer(chan, self.queue) self.assertRaises(socket.timeout, self.connection.drain_events, timeout=0.3) consumer.cancel() @@ -97,11 +97,11 @@ class test_pika(unittest.TestCase): chan1.close() chan2 = self.connection.channel() - binding = Binding("amqplib_basic_get", self.exchange, "basic_get") - binding = binding(chan2) - binding.declare() + queue = Queue("amqplib_basic_get", self.exchange, "basic_get") + queue = queue(chan2) + queue.declare() for i in range(50): - m = binding.get() + m = queue.get() if m: break time.sleep(0.1) diff --git a/kombu/tests/test_functional/test_amqplib.py b/kombu/tests/test_functional/test_amqplib.py index 2df7bac1..e15a6d92 100644 --- a/kombu/tests/test_functional/test_amqplib.py +++ b/kombu/tests/test_functional/test_amqplib.py @@ -5,7 +5,7 @@ import unittest2 as unittest from nose import SkipTest from kombu import BrokerConnection -from kombu import Producer, Consumer, Exchange, Binding +from kombu import Producer, Consumer, Exchange, Queue def consumeN(conn, consumer, n=1): @@ -41,7 +41,7 @@ class test_amqplib(unittest.TestCase): else: self.connected = True self.exchange = Exchange("tamqplib", "direct") - self.binding = Binding("tamqplib", self.exchange, "tamqplib") + self.queue = Queue("tamqplib", self.exchange, "tamqplib") def test_produce__consume(self): if not self.connected: @@ -53,7 +53,7 @@ class test_amqplib(unittest.TestCase): chan1.close() chan2 = self.connection.channel() - consumer = Consumer(chan2, self.binding) + consumer = Consumer(chan2, self.queue) message = consumeN(self.connection, consumer) self.assertDictEqual(message[0], {"foo": "bar"}) chan2.close() @@ -64,9 +64,9 @@ class test_amqplib(unittest.TestCase): raise SkipTest("Broker not running.") chan1 = self.connection.channel() producer = Producer(chan1, self.exchange) - b1 = Binding("pyamqplib.b1", self.exchange, "b1") - b2 = Binding("pyamqplib.b2", self.exchange, "b2") - b3 = Binding("pyamqplib.b3", self.exchange, "b3") + b1 = Queue("pyamqplib.b1", self.exchange, "b1") + b2 = Queue("pyamqplib.b2", self.exchange, "b2") + b3 = Queue("pyamqplib.b3", self.exchange, "b3") producer.publish("b1", routing_key="b1") producer.publish("b2", routing_key="b2") @@ -84,8 +84,8 @@ class test_amqplib(unittest.TestCase): if not self.connected: raise SkipTest("Broker not running.") chan = self.connection.channel() - self.purge([self.binding.name]) - consumer = Consumer(chan, self.binding) + self.purge([self.queue.name]) + consumer = Consumer(chan, self.queue) self.assertRaises(socket.timeout, self.connection.drain_events, timeout=0.3) consumer.cancel() @@ -97,11 +97,11 @@ class test_amqplib(unittest.TestCase): chan1.close() chan2 = self.connection.channel() - binding = Binding("amqplib_basic_get", self.exchange, "basic_get") - binding = binding(chan2) - binding.declare() + queue = Queue("amqplib_basic_get", self.exchange, "basic_get") + queue = queue(chan2) + queue.declare() for i in range(50): - m = binding.get() + m = queue.get() if m: break time.sleep(0.1) diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py index b9a76cc8..6cfb33da 100644 --- a/kombu/tests/test_messaging.py +++ b/kombu/tests/test_messaging.py @@ -5,7 +5,7 @@ import simplejson from kombu.connection import BrokerConnection from kombu.exceptions import MessageStateError from kombu.messaging import Consumer, Producer -from kombu.entity import Exchange, Binding +from kombu.entity import Exchange, Queue from kombu.tests.mocks import Backend @@ -120,28 +120,28 @@ class test_Consumer(unittest.TestCase): def test_set_no_ack(self): channel = self.connection.channel() - binding = Binding("qname", self.exchange, "rkey") - consumer = Consumer(channel, binding, auto_declare=True, no_ack=True) + queue = Queue("qname", self.exchange, "rkey") + consumer = Consumer(channel, queue, auto_declare=True, no_ack=True) self.assertTrue(consumer.no_ack) def test_set_callbacks(self): channel = self.connection.channel() - binding = Binding("qname", self.exchange, "rkey") + queue = Queue("qname", self.exchange, "rkey") callbacks = [lambda x, y: x, lambda x, y: x] - consumer = Consumer(channel, binding, auto_declare=True, + consumer = Consumer(channel, queue, auto_declare=True, callbacks=callbacks) self.assertEqual(consumer.callbacks, callbacks) def test_auto_declare(self): channel = self.connection.channel() - binding = Binding("qname", self.exchange, "rkey") - consumer = Consumer(channel, binding, auto_declare=True) + queue = Queue("qname", self.exchange, "rkey") + consumer = Consumer(channel, queue, auto_declare=True) consumer.consume() - self.assertIsNot(consumer.bindings[0], binding) - self.assertTrue(consumer.bindings[0].is_bound) - self.assertTrue(consumer.bindings[0].exchange.is_bound) - self.assertIsNot(consumer.bindings[0].exchange, self.exchange) + self.assertIsNot(consumer.queues[0], queue) + self.assertTrue(consumer.queues[0].is_bound) + self.assertTrue(consumer.queues[0].exchange.is_bound) + self.assertIsNot(consumer.queues[0].exchange, self.exchange) for meth in ("exchange_declare", "queue_declare", @@ -153,12 +153,12 @@ class test_Consumer(unittest.TestCase): def test_manual_declare(self): channel = self.connection.channel() - binding = Binding("qname", self.exchange, "rkey") - consumer = Consumer(channel, binding, auto_declare=False) - self.assertIsNot(consumer.bindings[0], binding) - self.assertTrue(consumer.bindings[0].is_bound) - self.assertTrue(consumer.bindings[0].exchange.is_bound) - self.assertIsNot(consumer.bindings[0].exchange, self.exchange) + queue = Queue("qname", self.exchange, "rkey") + consumer = Consumer(channel, queue, auto_declare=False) + self.assertIsNot(consumer.queues[0], queue) + self.assertTrue(consumer.queues[0].is_bound) + self.assertTrue(consumer.queues[0].exchange.is_bound) + self.assertIsNot(consumer.queues[0].exchange, self.exchange) for meth in ("exchange_declare", "queue_declare", @@ -177,8 +177,8 @@ class test_Consumer(unittest.TestCase): def test_consume__cancel(self): channel = self.connection.channel() - binding = Binding("qname", self.exchange, "rkey") - consumer = Consumer(channel, binding, auto_declare=True) + queue = Queue("qname", self.exchange, "rkey") + consumer = Consumer(channel, queue, auto_declare=True) consumer.consume() consumer.cancel() self.assertIn("basic_cancel", channel) @@ -186,8 +186,8 @@ class test_Consumer(unittest.TestCase): def test___enter____exit__(self): channel = self.connection.channel() - binding = Binding("qname", self.exchange, "rkey") - consumer = Consumer(channel, binding, auto_declare=True) + queue = Queue("qname", self.exchange, "rkey") + consumer = Consumer(channel, queue, auto_declare=True) context = consumer.__enter__() self.assertIs(context, consumer) self.assertTrue(consumer._active_tags) @@ -197,34 +197,34 @@ class test_Consumer(unittest.TestCase): def test_flow(self): channel = self.connection.channel() - binding = Binding("qname", self.exchange, "rkey") - consumer = Consumer(channel, binding, auto_declare=True) + queue = Queue("qname", self.exchange, "rkey") + consumer = Consumer(channel, queue, auto_declare=True) consumer.flow(False) self.assertIn("flow", channel) def test_qos(self): channel = self.connection.channel() - binding = Binding("qname", self.exchange, "rkey") - consumer = Consumer(channel, binding, auto_declare=True) + queue = Queue("qname", self.exchange, "rkey") + consumer = Consumer(channel, queue, auto_declare=True) consumer.qos(30, 10, False) self.assertIn("basic_qos", channel) def test_purge(self): channel = self.connection.channel() - b1 = Binding("qname1", self.exchange, "rkey") - b2 = Binding("qname2", self.exchange, "rkey") - b3 = Binding("qname3", self.exchange, "rkey") - b4 = Binding("qname4", self.exchange, "rkey") + b1 = Queue("qname1", self.exchange, "rkey") + b2 = Queue("qname2", self.exchange, "rkey") + b3 = Queue("qname3", self.exchange, "rkey") + b4 = Queue("qname4", self.exchange, "rkey") consumer = Consumer(channel, [b1, b2, b3, b4], auto_declare=True) consumer.purge() self.assertEqual(channel.called.count("queue_purge"), 4) - def test_multiple_bindings(self): + def test_multiple_queues(self): channel = self.connection.channel() - b1 = Binding("qname1", self.exchange, "rkey") - b2 = Binding("qname2", self.exchange, "rkey") - b3 = Binding("qname3", self.exchange, "rkey") - b4 = Binding("qname4", self.exchange, "rkey") + b1 = Queue("qname1", self.exchange, "rkey") + b2 = Queue("qname2", self.exchange, "rkey") + b3 = Queue("qname3", self.exchange, "rkey") + b4 = Queue("qname4", self.exchange, "rkey") consumer = Consumer(channel, [b1, b2, b3, b4]) consumer.consume() self.assertEqual(channel.called.count("exchange_declare"), 4) @@ -238,7 +238,7 @@ class test_Consumer(unittest.TestCase): def test_receive_callback(self): channel = self.connection.channel() - b1 = Binding("qname1", self.exchange, "rkey") + b1 = Queue("qname1", self.exchange, "rkey") consumer = Consumer(channel, [b1]) received = [] @@ -256,7 +256,7 @@ class test_Consumer(unittest.TestCase): def test_basic_ack_twice(self): channel = self.connection.channel() - b1 = Binding("qname1", self.exchange, "rkey") + b1 = Queue("qname1", self.exchange, "rkey") consumer = Consumer(channel, [b1]) def callback(message_data, message): @@ -269,7 +269,7 @@ class test_Consumer(unittest.TestCase): def test_basic_reject(self): channel = self.connection.channel() - b1 = Binding("qname1", self.exchange, "rkey") + b1 = Queue("qname1", self.exchange, "rkey") consumer = Consumer(channel, [b1]) def callback(message_data, message): @@ -281,7 +281,7 @@ class test_Consumer(unittest.TestCase): def test_basic_reject_twice(self): channel = self.connection.channel() - b1 = Binding("qname1", self.exchange, "rkey") + b1 = Queue("qname1", self.exchange, "rkey") consumer = Consumer(channel, [b1]) def callback(message_data, message): @@ -295,7 +295,7 @@ class test_Consumer(unittest.TestCase): def test_basic_reject__requeue(self): channel = self.connection.channel() - b1 = Binding("qname1", self.exchange, "rkey") + b1 = Queue("qname1", self.exchange, "rkey") consumer = Consumer(channel, [b1]) def callback(message_data, message): @@ -307,7 +307,7 @@ class test_Consumer(unittest.TestCase): def test_basic_reject__requeue_twice(self): channel = self.connection.channel() - b1 = Binding("qname1", self.exchange, "rkey") + b1 = Queue("qname1", self.exchange, "rkey") consumer = Consumer(channel, [b1]) def callback(message_data, message): @@ -321,13 +321,13 @@ class test_Consumer(unittest.TestCase): def test_receive_without_callbacks_raises(self): channel = self.connection.channel() - b1 = Binding("qname1", self.exchange, "rkey") + b1 = Queue("qname1", self.exchange, "rkey") consumer = Consumer(channel, [b1]) self.assertRaises(NotImplementedError, consumer.receive, 1, 2) def test_decode_error(self): channel = self.connection.channel() - b1 = Binding("qname1", self.exchange, "rkey") + b1 = Queue("qname1", self.exchange, "rkey") consumer = Consumer(channel, [b1]) consumer.channel.throw_decode_error = True @@ -336,7 +336,7 @@ class test_Consumer(unittest.TestCase): def test_on_decode_error_callback(self): channel = self.connection.channel() - b1 = Binding("qname1", self.exchange, "rkey") + b1 = Queue("qname1", self.exchange, "rkey") consumer = Consumer(channel, [b1]) consumer.channel.throw_decode_error = True @@ -354,7 +354,7 @@ class test_Consumer(unittest.TestCase): def test_recover(self): channel = self.connection.channel() - b1 = Binding("qname1", self.exchange, "rkey") + b1 = Queue("qname1", self.exchange, "rkey") consumer = Consumer(channel, [b1]) consumer.recover() self.assertIn("basic_recover", channel)