mirror of https://github.com/celery/kombu.git
kombu.Binding has been renamed to kombu.Queue
This commit is contained in:
parent
3f4248a80c
commit
3c96d0affe
24
README.rst
24
README.rst
|
@ -11,10 +11,10 @@ Carrot will be discontinued in favor of Kombu.
|
|||
Proposed API::
|
||||
|
||||
from kombu.connection BrokerConnection
|
||||
from kombu.messaging import Exchange, Binding, Consumer, Producer
|
||||
from kombu.messaging import Exchange, Queue, Consumer, Producer
|
||||
|
||||
media_exchange = Exchange("media", "direct", durable=True)
|
||||
video_binding = Binding("video", exchange=media_exchange, key="video")
|
||||
video_queue = Queue("video", exchange=media_exchange, key="video")
|
||||
|
||||
# connections/channels
|
||||
connection = BrokerConnection("localhost", "guest", "guest", "/")
|
||||
|
@ -25,7 +25,7 @@ Proposed API::
|
|||
producer.publish({"name": "/tmp/lolcat1.avi", "size": 1301013})
|
||||
|
||||
# consume
|
||||
consumer = Consumer(channel, video_binding)
|
||||
consumer = Consumer(channel, video_queue)
|
||||
consumer.register_callback(process_media)
|
||||
consumer.consume()
|
||||
|
||||
|
@ -34,10 +34,10 @@ Proposed API::
|
|||
|
||||
|
||||
# consumerset:
|
||||
video_binding = Binding("video", exchange=media_exchange, key="video")
|
||||
image_binding = Binding("image", exchange=media_exchange, key="image")
|
||||
video_queue = Queue("video", exchange=media_exchange, key="video")
|
||||
image_queue = Queue("image", exchange=media_exchange, key="image")
|
||||
|
||||
consumer = Consumer(channel, [video_binding, image_binding])
|
||||
consumer = Consumer(channel, [video_queue, image_queue])
|
||||
consumer.consume()
|
||||
|
||||
while True:
|
||||
|
@ -45,7 +45,7 @@ Proposed API::
|
|||
|
||||
|
||||
|
||||
Exchanges/Bindings can be bound to a channel::
|
||||
Exchanges/Queue can be bound to a channel::
|
||||
|
||||
>>> exchange = Exchange("tasks", "direct")
|
||||
|
||||
|
@ -210,13 +210,13 @@ First we open up a Python shell and start a message consumer.
|
|||
This consumer declares a queue named ``"feed"``, receiving messages with
|
||||
the routing key ``"importer"`` from the ``"feed"`` exchange.
|
||||
|
||||
>>> from kombu import Exchange, Binding, Consumer
|
||||
>>> from kombu import Exchange, Queue, Consumer
|
||||
|
||||
>>> feed_exchange = Exchange("feed", type="direct")
|
||||
>>> feed_binding = Binding("feed", feed_exchange, "importer")
|
||||
>>> feed_queue = Queue("feed", feed_exchange, "importer")
|
||||
|
||||
>>> channel = connection.channel()
|
||||
>>> consumer = Consumer(channel, [feed_binding])
|
||||
>>> consumer = Consumer(channel, [feed_queue])
|
||||
|
||||
>>> def import_feed_callback(message_data, message)
|
||||
... feed_url = message_data["import_feed"]
|
||||
|
@ -347,7 +347,7 @@ This method returns a ``Message`` object, from where you can get the
|
|||
message body, de-serialize the body to get the data, acknowledge, reject or
|
||||
re-queue the message.
|
||||
|
||||
>>> consumer = Consumer(channel, bindings)
|
||||
>>> consumer = Consumer(channel, queues)
|
||||
>>> message = consumer.get()
|
||||
>>> if message:
|
||||
... message_data = message.payload
|
||||
|
@ -371,7 +371,7 @@ can define the above producer and consumer like so:
|
|||
... "feed_url": feed_url})
|
||||
|
||||
>>> class FeedConsumer(Consumer):
|
||||
... bindings = bindings
|
||||
... queues = queues
|
||||
...
|
||||
... def receive(self, message_data, message):
|
||||
... action = message_data["action"]
|
||||
|
|
|
@ -9,5 +9,5 @@ __docformat__ = "restructuredtext"
|
|||
import os
|
||||
if not os.environ.get("KOMBU_NO_EVAL", False):
|
||||
from kombu.connection import BrokerConnection
|
||||
from kombu.entity import Exchange, Binding
|
||||
from kombu.entity import Exchange, Queue
|
||||
from kombu.messaging import Consumer, Producer
|
||||
|
|
|
@ -12,7 +12,7 @@ def iterconsume(connection, consumer, no_ack=False, limit=None):
|
|||
yield connection.drain_events()
|
||||
|
||||
|
||||
def entry_to_binding(queue, **options):
|
||||
def entry_to_queue(queue, **options):
|
||||
binding_key = options.get("binding_key") or options.get("routing_key")
|
||||
e_durable = options.get("exchange_durable") or options.get("durable")
|
||||
e_auto_delete = options.get("exchange_auto_delete") or \
|
||||
|
@ -30,14 +30,14 @@ def entry_to_binding(queue, **options):
|
|||
durable=e_durable,
|
||||
auto_delete=e_auto_delete,
|
||||
arguments=e_arguments)
|
||||
return entity.Binding(queue,
|
||||
exchange=exchange,
|
||||
routing_key=binding_key,
|
||||
durable=q_durable,
|
||||
exclusive=options.get("exclusive"),
|
||||
auto_delete=q_auto_delete,
|
||||
queue_arguments=q_arguments,
|
||||
binding_arguments=b_arguments)
|
||||
return entity.Queue(queue,
|
||||
exchange=exchange,
|
||||
routing_key=binding_key,
|
||||
durable=q_durable,
|
||||
exclusive=options.get("exclusive"),
|
||||
auto_delete=q_auto_delete,
|
||||
queue_arguments=q_arguments,
|
||||
binding_arguments=b_arguments)
|
||||
|
||||
|
||||
class Publisher(messaging.Producer):
|
||||
|
@ -120,13 +120,13 @@ class Consumer(messaging.Consumer):
|
|||
routing_key=self.routing_key,
|
||||
auto_delete=self.auto_delete,
|
||||
durable=self.durable)
|
||||
binding = entity.Binding(self.queue,
|
||||
exchange=exchange,
|
||||
routing_key=self.routing_key,
|
||||
durable=self.durable,
|
||||
exclusive=self.exclusive,
|
||||
auto_delete=self.auto_delete)
|
||||
super(Consumer, self).__init__(self.backend, binding, **kwargs)
|
||||
queue = entity.Queue(self.queue,
|
||||
exchange=exchange,
|
||||
routing_key=self.routing_key,
|
||||
durable=self.durable,
|
||||
exclusive=self.exclusive,
|
||||
auto_delete=self.auto_delete)
|
||||
super(Consumer, self).__init__(self.backend, queue, **kwargs)
|
||||
|
||||
def close(self):
|
||||
self.cancel()
|
||||
|
@ -145,7 +145,7 @@ class Consumer(messaging.Consumer):
|
|||
def fetch(self, no_ack=None, enable_callbacks=False):
|
||||
if no_ack is None:
|
||||
no_ack = self.no_ack
|
||||
message = self.bindings[0].get(no_ack)
|
||||
message = self.queues[0].get(no_ack)
|
||||
if message:
|
||||
if enable_callbacks:
|
||||
self.receive(message.payload, message)
|
||||
|
@ -190,10 +190,10 @@ class _CSet(messaging.Consumer):
|
|||
return self.purge()
|
||||
|
||||
def add_consumer_from_dict(self, queue, **options):
|
||||
self.bindings.append(entry_to_binding(queue, **options))
|
||||
self.queues.append(entry_to_queue(queue, **options))
|
||||
|
||||
def add_consumer(self, consumer):
|
||||
self.bindings.extend(consumer.bindings)
|
||||
self.queues.extend(consumer.queues)
|
||||
|
||||
def close(self):
|
||||
self.cancel()
|
||||
|
@ -203,12 +203,12 @@ class _CSet(messaging.Consumer):
|
|||
def ConsumerSet(connection, from_dict=None, consumers=None,
|
||||
callbacks=None, **kwargs):
|
||||
|
||||
bindings = []
|
||||
queues = []
|
||||
if consumers:
|
||||
for consumer in consumers:
|
||||
map(bindings.extend, consumer.bindings)
|
||||
map(queues.extend, consumer.queues)
|
||||
if from_dict:
|
||||
for queue_name, queue_options in from_dict.items():
|
||||
bindings.append(entry_to_binding(queue_name, **queue_options))
|
||||
queues.append(entry_to_queue(queue_name, **queue_options))
|
||||
|
||||
return _CSet(connection, bindings, **kwargs)
|
||||
return _CSet(connection, queues, **kwargs)
|
||||
|
|
|
@ -36,10 +36,10 @@ class Exchange(MaybeChannelBound):
|
|||
* ``topic``
|
||||
|
||||
Wildcard match between the routing key and the routing pattern
|
||||
specified in the binding. The routing key is treated as zero
|
||||
or more words delimited by ``"."`` and supports special
|
||||
wildcard characters. ``"*"`` matches a single word and ``"#"``
|
||||
matches zero or more words.
|
||||
specified in the exchange/queue binding. The routing key is
|
||||
treated as zero or more words delimited by ``"."`` and
|
||||
supports special wildcard characters. ``"*"`` matches a
|
||||
single word and ``"#"`` matches zero or more words.
|
||||
|
||||
* ``fanout``
|
||||
|
||||
|
@ -233,8 +233,8 @@ class Exchange(MaybeChannelBound):
|
|||
self.type))
|
||||
|
||||
|
||||
class Binding(MaybeChannelBound):
|
||||
"""A Queue declaration and its binding.
|
||||
class Queue(MaybeChannelBound):
|
||||
"""A Queue declaration.
|
||||
|
||||
:keyword name: See :attr:`name`.
|
||||
:keyword exchange: See :attr:`exchange`.
|
||||
|
@ -283,7 +283,7 @@ class Binding(MaybeChannelBound):
|
|||
|
||||
.. attribute:: channel
|
||||
|
||||
The channel the Binding is bound to (if bound).
|
||||
The channel the Queue is bound to (if bound).
|
||||
|
||||
.. attribute:: durable
|
||||
|
||||
|
@ -322,19 +322,20 @@ class Binding(MaybeChannelBound):
|
|||
|
||||
**Usage**
|
||||
|
||||
Example creating a binding for our exchange in the :class:`Exchange`
|
||||
Example creating a queue using our exchange in the :class:`Exchange`
|
||||
example::
|
||||
|
||||
>>> science_news = Binding("science_news",
|
||||
... exchange=news_exchange,
|
||||
... routing_key="news.science")
|
||||
>>> science_news = Queue("science_news",
|
||||
... exchange=news_exchange,
|
||||
... routing_key="news.science")
|
||||
|
||||
For now ``science_news`` is just a declaration, you can't perform
|
||||
actions on it. It just describes the name and options for the binding.
|
||||
actions on it. It just describes the name and options for the queue.
|
||||
|
||||
The binding can be bound or unbound. Bound means the binding is
|
||||
The queue can be bound or unbound. Bound means the queue is
|
||||
associated with a channel and operations can be performed on it.
|
||||
To bind the binding you call the binding with the channel as argument::
|
||||
To bind the queue you call the queue instance with the channel as
|
||||
an argument::
|
||||
|
||||
>>> bound_science_news = science_news(channel)
|
||||
|
||||
|
@ -364,7 +365,7 @@ class Binding(MaybeChannelBound):
|
|||
|
||||
def __init__(self, name="", exchange=None, routing_key="", channel=None,
|
||||
**kwargs):
|
||||
super(Binding, self).__init__(**kwargs)
|
||||
super(Queue, self).__init__(**kwargs)
|
||||
self.name = name or self.name
|
||||
self.exchange = exchange or self.exchange
|
||||
self.routing_key = routing_key or self.routing_key
|
||||
|
@ -492,7 +493,9 @@ class Binding(MaybeChannelBound):
|
|||
arguments=self.binding_arguments)
|
||||
|
||||
def __repr__(self):
|
||||
return super(Binding, self).__repr__(
|
||||
"Binding %s -> %s -> %s" % (self.name,
|
||||
self.exchange,
|
||||
return super(Queue, self).__repr__(
|
||||
"Queue %s -> %s -> %s" % (self.name,
|
||||
self.exchange,
|
||||
self.routing_key))
|
||||
|
||||
Binding = Queue
|
||||
|
|
|
@ -2,7 +2,8 @@ from itertools import count
|
|||
|
||||
from kombu import serialization
|
||||
from kombu.compression import compress
|
||||
from kombu.entity import Exchange, Binding
|
||||
from kombu.entity import Exchange, Queue
|
||||
from kombu.entity import Binding # TODO Remove
|
||||
from kombu.utils import maybe_list
|
||||
|
||||
|
||||
|
@ -132,7 +133,7 @@ class Consumer(object):
|
|||
"""Message consumer.
|
||||
|
||||
:param channel: see :attr:`channel`.
|
||||
:param bindings: see :attr:`bindings`.
|
||||
:param queues see :attr:`queues`.
|
||||
:keyword no_ack: see :attr:`no_ack`.
|
||||
:keyword auto_declare: see :attr:`auto_declare`
|
||||
:keyword callbacks: see :attr:`callbacks`.
|
||||
|
@ -142,9 +143,10 @@ class Consumer(object):
|
|||
|
||||
The connection channel to use.
|
||||
|
||||
.. attribute:: bindings
|
||||
.. attribute:: queues
|
||||
|
||||
A single binding, or a list of bindings to consume from.
|
||||
A single :class:`~kombu.entity.Queue`, or a list of queues to
|
||||
consume from.
|
||||
|
||||
.. attribute:: auto_declare
|
||||
|
||||
|
@ -176,10 +178,10 @@ class Consumer(object):
|
|||
_next_tag = count(1).next # global
|
||||
_consuming = False
|
||||
|
||||
def __init__(self, channel, bindings, no_ack=None, auto_declare=None,
|
||||
def __init__(self, channel, queues, no_ack=None, auto_declare=None,
|
||||
callbacks=None, on_decode_error=None):
|
||||
self.channel = channel
|
||||
self.bindings = bindings
|
||||
self.queues = queues
|
||||
if no_ack is not None:
|
||||
self.no_ack = no_ack
|
||||
if auto_declare is not None:
|
||||
|
@ -193,21 +195,21 @@ class Consumer(object):
|
|||
self.callbacks = []
|
||||
self._active_tags = {}
|
||||
|
||||
self.bindings = [binding(self.channel)
|
||||
for binding in maybe_list(self.bindings)]
|
||||
self.queues = [queue(self.channel)
|
||||
for queue in maybe_list(self.queues)]
|
||||
|
||||
if self.auto_declare:
|
||||
self.declare()
|
||||
|
||||
def declare(self):
|
||||
"""Declare queues, bindings and exchanges.
|
||||
"""Declare queues, exchanges and bindings.
|
||||
|
||||
This is done automatically at instantiation if :attr:`auto_declare`
|
||||
is set.
|
||||
|
||||
"""
|
||||
for binding in self.bindings:
|
||||
binding.declare()
|
||||
for queue in self.queues:
|
||||
queue.declare()
|
||||
|
||||
def consume(self, delivery_tag=None):
|
||||
"""Register consumer on server.
|
||||
|
@ -217,9 +219,9 @@ class Consumer(object):
|
|||
|
||||
"""
|
||||
if not self._consuming:
|
||||
H, T = self.bindings[:-1], self.bindings[-1]
|
||||
for binding in H:
|
||||
binding.consume(self._add_tag(binding, delivery_tag),
|
||||
H, T = self.queues[:-1], self.queues[-1]
|
||||
for queue in H:
|
||||
queue.consume(self._add_tag(queue, delivery_tag),
|
||||
self._receive_callback,
|
||||
self.no_ack,
|
||||
nowait=True)
|
||||
|
@ -265,7 +267,7 @@ class Consumer(object):
|
|||
undo operation available.
|
||||
|
||||
"""
|
||||
return sum(binding.purge() for binding in self.bindings)
|
||||
return sum(queue.purge() for queue in self.queues)
|
||||
|
||||
def cancel(self):
|
||||
"""End all active queue consumers.
|
||||
|
@ -274,8 +276,8 @@ class Consumer(object):
|
|||
mean the server will not send any more messages for this consumer.
|
||||
|
||||
"""
|
||||
for binding, tag in self._active_tags.items():
|
||||
binding.cancel(tag)
|
||||
for queue, tag in self._active_tags.items():
|
||||
queue.cancel(tag)
|
||||
self._active_tags.clear()
|
||||
self._consuming = False
|
||||
|
||||
|
@ -336,9 +338,9 @@ class Consumer(object):
|
|||
"""
|
||||
return self.channel.basic_recover(requeue=requeue)
|
||||
|
||||
def _add_tag(self, binding, delivery_tag=None):
|
||||
def _add_tag(self, queue, delivery_tag=None):
|
||||
tag = delivery_tag or str(self._next_tag())
|
||||
self._active_tags[binding] = tag
|
||||
self._active_tags[queue] = tag
|
||||
return tag
|
||||
|
||||
def _receive_callback(self, raw_message):
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import unittest2 as unittest
|
||||
|
||||
from kombu.entity import Exchange, Binding
|
||||
from kombu.entity import Exchange, Queue
|
||||
from kombu.exceptions import NotBoundError
|
||||
|
||||
from kombu.tests.mocks import Channel
|
||||
|
@ -59,22 +59,22 @@ class test_Exchange(unittest.TestCase):
|
|||
self.assertIn("Exchange", repr(b))
|
||||
|
||||
|
||||
class test_Binding(unittest.TestCase):
|
||||
class test_Queue(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.exchange = Exchange("foo", "direct")
|
||||
|
||||
def test_exclusive_implies_auto_delete(self):
|
||||
self.assertTrue(
|
||||
Binding("foo", self.exchange, exclusive=True).auto_delete)
|
||||
Queue("foo", self.exchange, exclusive=True).auto_delete)
|
||||
|
||||
def test_binds_at_instantiation(self):
|
||||
self.assertTrue(
|
||||
Binding("foo", self.exchange, channel=Channel()).is_bound)
|
||||
Queue("foo", self.exchange, channel=Channel()).is_bound)
|
||||
|
||||
def test_also_binds_exchange(self):
|
||||
chan = Channel()
|
||||
b = Binding("foo", self.exchange)
|
||||
b = Queue("foo", self.exchange)
|
||||
self.assertFalse(b.is_bound)
|
||||
self.assertFalse(b.exchange.is_bound)
|
||||
b = b.bind(chan)
|
||||
|
@ -85,7 +85,7 @@ class test_Binding(unittest.TestCase):
|
|||
|
||||
def test_declare(self):
|
||||
chan = Channel()
|
||||
b = Binding("foo", self.exchange, "foo", channel=chan)
|
||||
b = Queue("foo", self.exchange, "foo", channel=chan)
|
||||
self.assertTrue(b.is_bound)
|
||||
b.declare()
|
||||
self.assertIn("exchange_declare", chan)
|
||||
|
@ -93,31 +93,31 @@ class test_Binding(unittest.TestCase):
|
|||
self.assertIn("queue_bind", chan)
|
||||
|
||||
def test_get(self):
|
||||
b = Binding("foo", self.exchange, "foo", channel=Channel())
|
||||
b = Queue("foo", self.exchange, "foo", channel=Channel())
|
||||
b.get()
|
||||
self.assertIn("basic_get", b.channel)
|
||||
|
||||
def test_purge(self):
|
||||
b = Binding("foo", self.exchange, "foo", channel=Channel())
|
||||
b = Queue("foo", self.exchange, "foo", channel=Channel())
|
||||
b.purge()
|
||||
self.assertIn("queue_purge", b.channel)
|
||||
|
||||
def test_consume(self):
|
||||
b = Binding("foo", self.exchange, "foo", channel=Channel())
|
||||
b = Queue("foo", self.exchange, "foo", channel=Channel())
|
||||
b.consume("fifafo", None)
|
||||
self.assertIn("basic_consume", b.channel)
|
||||
|
||||
def test_cancel(self):
|
||||
b = Binding("foo", self.exchange, "foo", channel=Channel())
|
||||
b = Queue("foo", self.exchange, "foo", channel=Channel())
|
||||
b.cancel("fifafo")
|
||||
self.assertIn("basic_cancel", b.channel)
|
||||
|
||||
def test_delete(self):
|
||||
b = Binding("foo", self.exchange, "foo", channel=Channel())
|
||||
b = Queue("foo", self.exchange, "foo", channel=Channel())
|
||||
b.delete()
|
||||
self.assertIn("queue_delete", b.channel)
|
||||
|
||||
def test__repr__(self):
|
||||
b = Binding("foo", self.exchange, "foo")
|
||||
b = Queue("foo", self.exchange, "foo")
|
||||
self.assertIn("foo", repr(b))
|
||||
self.assertIn("Binding", repr(b))
|
||||
self.assertIn("Queue", repr(b))
|
||||
|
|
|
@ -5,7 +5,7 @@ import unittest2 as unittest
|
|||
from nose import SkipTest
|
||||
|
||||
from kombu import BrokerConnection
|
||||
from kombu import Producer, Consumer, Exchange, Binding
|
||||
from kombu import Producer, Consumer, Exchange, Queue
|
||||
|
||||
|
||||
def consumeN(conn, consumer, n=1):
|
||||
|
@ -41,7 +41,7 @@ class test_pika(unittest.TestCase):
|
|||
else:
|
||||
self.connected = True
|
||||
self.exchange = Exchange("tamqplib", "direct")
|
||||
self.binding = Binding("tamqplib", self.exchange, "tamqplib")
|
||||
self.queue = Queue("tamqplib", self.exchange, "tamqplib")
|
||||
|
||||
def test_produce__consume(self):
|
||||
if not self.connected:
|
||||
|
@ -53,7 +53,7 @@ class test_pika(unittest.TestCase):
|
|||
chan1.close()
|
||||
|
||||
chan2 = self.connection.channel()
|
||||
consumer = Consumer(chan2, self.binding)
|
||||
consumer = Consumer(chan2, self.queue)
|
||||
message = consumeN(self.connection, consumer)
|
||||
self.assertDictEqual(message[0], {"foo": "bar"})
|
||||
chan2.close()
|
||||
|
@ -64,9 +64,9 @@ class test_pika(unittest.TestCase):
|
|||
raise SkipTest("Broker not running.")
|
||||
chan1 = self.connection.channel()
|
||||
producer = Producer(chan1, self.exchange)
|
||||
b1 = Binding("pyamqplib.b1", self.exchange, "b1")
|
||||
b2 = Binding("pyamqplib.b2", self.exchange, "b2")
|
||||
b3 = Binding("pyamqplib.b3", self.exchange, "b3")
|
||||
b1 = Queue("pyamqplib.b1", self.exchange, "b1")
|
||||
b2 = Queue("pyamqplib.b2", self.exchange, "b2")
|
||||
b3 = Queue("pyamqplib.b3", self.exchange, "b3")
|
||||
|
||||
producer.publish("b1", routing_key="b1")
|
||||
producer.publish("b2", routing_key="b2")
|
||||
|
@ -84,8 +84,8 @@ class test_pika(unittest.TestCase):
|
|||
if not self.connected:
|
||||
raise SkipTest("Broker not running.")
|
||||
chan = self.connection.channel()
|
||||
self.purge([self.binding.name])
|
||||
consumer = Consumer(chan, self.binding)
|
||||
self.purge([self.queue.name])
|
||||
consumer = Consumer(chan, self.queue)
|
||||
self.assertRaises(socket.timeout, self.connection.drain_events,
|
||||
timeout=0.3)
|
||||
consumer.cancel()
|
||||
|
@ -97,11 +97,11 @@ class test_pika(unittest.TestCase):
|
|||
chan1.close()
|
||||
|
||||
chan2 = self.connection.channel()
|
||||
binding = Binding("amqplib_basic_get", self.exchange, "basic_get")
|
||||
binding = binding(chan2)
|
||||
binding.declare()
|
||||
queue = Queue("amqplib_basic_get", self.exchange, "basic_get")
|
||||
queue = queue(chan2)
|
||||
queue.declare()
|
||||
for i in range(50):
|
||||
m = binding.get()
|
||||
m = queue.get()
|
||||
if m:
|
||||
break
|
||||
time.sleep(0.1)
|
||||
|
|
|
@ -5,7 +5,7 @@ import unittest2 as unittest
|
|||
from nose import SkipTest
|
||||
|
||||
from kombu import BrokerConnection
|
||||
from kombu import Producer, Consumer, Exchange, Binding
|
||||
from kombu import Producer, Consumer, Exchange, Queue
|
||||
|
||||
|
||||
def consumeN(conn, consumer, n=1):
|
||||
|
@ -41,7 +41,7 @@ class test_amqplib(unittest.TestCase):
|
|||
else:
|
||||
self.connected = True
|
||||
self.exchange = Exchange("tamqplib", "direct")
|
||||
self.binding = Binding("tamqplib", self.exchange, "tamqplib")
|
||||
self.queue = Queue("tamqplib", self.exchange, "tamqplib")
|
||||
|
||||
def test_produce__consume(self):
|
||||
if not self.connected:
|
||||
|
@ -53,7 +53,7 @@ class test_amqplib(unittest.TestCase):
|
|||
chan1.close()
|
||||
|
||||
chan2 = self.connection.channel()
|
||||
consumer = Consumer(chan2, self.binding)
|
||||
consumer = Consumer(chan2, self.queue)
|
||||
message = consumeN(self.connection, consumer)
|
||||
self.assertDictEqual(message[0], {"foo": "bar"})
|
||||
chan2.close()
|
||||
|
@ -64,9 +64,9 @@ class test_amqplib(unittest.TestCase):
|
|||
raise SkipTest("Broker not running.")
|
||||
chan1 = self.connection.channel()
|
||||
producer = Producer(chan1, self.exchange)
|
||||
b1 = Binding("pyamqplib.b1", self.exchange, "b1")
|
||||
b2 = Binding("pyamqplib.b2", self.exchange, "b2")
|
||||
b3 = Binding("pyamqplib.b3", self.exchange, "b3")
|
||||
b1 = Queue("pyamqplib.b1", self.exchange, "b1")
|
||||
b2 = Queue("pyamqplib.b2", self.exchange, "b2")
|
||||
b3 = Queue("pyamqplib.b3", self.exchange, "b3")
|
||||
|
||||
producer.publish("b1", routing_key="b1")
|
||||
producer.publish("b2", routing_key="b2")
|
||||
|
@ -84,8 +84,8 @@ class test_amqplib(unittest.TestCase):
|
|||
if not self.connected:
|
||||
raise SkipTest("Broker not running.")
|
||||
chan = self.connection.channel()
|
||||
self.purge([self.binding.name])
|
||||
consumer = Consumer(chan, self.binding)
|
||||
self.purge([self.queue.name])
|
||||
consumer = Consumer(chan, self.queue)
|
||||
self.assertRaises(socket.timeout, self.connection.drain_events,
|
||||
timeout=0.3)
|
||||
consumer.cancel()
|
||||
|
@ -97,11 +97,11 @@ class test_amqplib(unittest.TestCase):
|
|||
chan1.close()
|
||||
|
||||
chan2 = self.connection.channel()
|
||||
binding = Binding("amqplib_basic_get", self.exchange, "basic_get")
|
||||
binding = binding(chan2)
|
||||
binding.declare()
|
||||
queue = Queue("amqplib_basic_get", self.exchange, "basic_get")
|
||||
queue = queue(chan2)
|
||||
queue.declare()
|
||||
for i in range(50):
|
||||
m = binding.get()
|
||||
m = queue.get()
|
||||
if m:
|
||||
break
|
||||
time.sleep(0.1)
|
||||
|
|
|
@ -5,7 +5,7 @@ import simplejson
|
|||
from kombu.connection import BrokerConnection
|
||||
from kombu.exceptions import MessageStateError
|
||||
from kombu.messaging import Consumer, Producer
|
||||
from kombu.entity import Exchange, Binding
|
||||
from kombu.entity import Exchange, Queue
|
||||
|
||||
from kombu.tests.mocks import Backend
|
||||
|
||||
|
@ -120,28 +120,28 @@ class test_Consumer(unittest.TestCase):
|
|||
|
||||
def test_set_no_ack(self):
|
||||
channel = self.connection.channel()
|
||||
binding = Binding("qname", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, binding, auto_declare=True, no_ack=True)
|
||||
queue = Queue("qname", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, queue, auto_declare=True, no_ack=True)
|
||||
self.assertTrue(consumer.no_ack)
|
||||
|
||||
def test_set_callbacks(self):
|
||||
channel = self.connection.channel()
|
||||
binding = Binding("qname", self.exchange, "rkey")
|
||||
queue = Queue("qname", self.exchange, "rkey")
|
||||
callbacks = [lambda x, y: x,
|
||||
lambda x, y: x]
|
||||
consumer = Consumer(channel, binding, auto_declare=True,
|
||||
consumer = Consumer(channel, queue, auto_declare=True,
|
||||
callbacks=callbacks)
|
||||
self.assertEqual(consumer.callbacks, callbacks)
|
||||
|
||||
def test_auto_declare(self):
|
||||
channel = self.connection.channel()
|
||||
binding = Binding("qname", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, binding, auto_declare=True)
|
||||
queue = Queue("qname", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, queue, auto_declare=True)
|
||||
consumer.consume()
|
||||
self.assertIsNot(consumer.bindings[0], binding)
|
||||
self.assertTrue(consumer.bindings[0].is_bound)
|
||||
self.assertTrue(consumer.bindings[0].exchange.is_bound)
|
||||
self.assertIsNot(consumer.bindings[0].exchange, self.exchange)
|
||||
self.assertIsNot(consumer.queues[0], queue)
|
||||
self.assertTrue(consumer.queues[0].is_bound)
|
||||
self.assertTrue(consumer.queues[0].exchange.is_bound)
|
||||
self.assertIsNot(consumer.queues[0].exchange, self.exchange)
|
||||
|
||||
for meth in ("exchange_declare",
|
||||
"queue_declare",
|
||||
|
@ -153,12 +153,12 @@ class test_Consumer(unittest.TestCase):
|
|||
|
||||
def test_manual_declare(self):
|
||||
channel = self.connection.channel()
|
||||
binding = Binding("qname", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, binding, auto_declare=False)
|
||||
self.assertIsNot(consumer.bindings[0], binding)
|
||||
self.assertTrue(consumer.bindings[0].is_bound)
|
||||
self.assertTrue(consumer.bindings[0].exchange.is_bound)
|
||||
self.assertIsNot(consumer.bindings[0].exchange, self.exchange)
|
||||
queue = Queue("qname", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, queue, auto_declare=False)
|
||||
self.assertIsNot(consumer.queues[0], queue)
|
||||
self.assertTrue(consumer.queues[0].is_bound)
|
||||
self.assertTrue(consumer.queues[0].exchange.is_bound)
|
||||
self.assertIsNot(consumer.queues[0].exchange, self.exchange)
|
||||
|
||||
for meth in ("exchange_declare",
|
||||
"queue_declare",
|
||||
|
@ -177,8 +177,8 @@ class test_Consumer(unittest.TestCase):
|
|||
|
||||
def test_consume__cancel(self):
|
||||
channel = self.connection.channel()
|
||||
binding = Binding("qname", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, binding, auto_declare=True)
|
||||
queue = Queue("qname", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, queue, auto_declare=True)
|
||||
consumer.consume()
|
||||
consumer.cancel()
|
||||
self.assertIn("basic_cancel", channel)
|
||||
|
@ -186,8 +186,8 @@ class test_Consumer(unittest.TestCase):
|
|||
|
||||
def test___enter____exit__(self):
|
||||
channel = self.connection.channel()
|
||||
binding = Binding("qname", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, binding, auto_declare=True)
|
||||
queue = Queue("qname", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, queue, auto_declare=True)
|
||||
context = consumer.__enter__()
|
||||
self.assertIs(context, consumer)
|
||||
self.assertTrue(consumer._active_tags)
|
||||
|
@ -197,34 +197,34 @@ class test_Consumer(unittest.TestCase):
|
|||
|
||||
def test_flow(self):
|
||||
channel = self.connection.channel()
|
||||
binding = Binding("qname", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, binding, auto_declare=True)
|
||||
queue = Queue("qname", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, queue, auto_declare=True)
|
||||
consumer.flow(False)
|
||||
self.assertIn("flow", channel)
|
||||
|
||||
def test_qos(self):
|
||||
channel = self.connection.channel()
|
||||
binding = Binding("qname", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, binding, auto_declare=True)
|
||||
queue = Queue("qname", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, queue, auto_declare=True)
|
||||
consumer.qos(30, 10, False)
|
||||
self.assertIn("basic_qos", channel)
|
||||
|
||||
def test_purge(self):
|
||||
channel = self.connection.channel()
|
||||
b1 = Binding("qname1", self.exchange, "rkey")
|
||||
b2 = Binding("qname2", self.exchange, "rkey")
|
||||
b3 = Binding("qname3", self.exchange, "rkey")
|
||||
b4 = Binding("qname4", self.exchange, "rkey")
|
||||
b1 = Queue("qname1", self.exchange, "rkey")
|
||||
b2 = Queue("qname2", self.exchange, "rkey")
|
||||
b3 = Queue("qname3", self.exchange, "rkey")
|
||||
b4 = Queue("qname4", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, [b1, b2, b3, b4], auto_declare=True)
|
||||
consumer.purge()
|
||||
self.assertEqual(channel.called.count("queue_purge"), 4)
|
||||
|
||||
def test_multiple_bindings(self):
|
||||
def test_multiple_queues(self):
|
||||
channel = self.connection.channel()
|
||||
b1 = Binding("qname1", self.exchange, "rkey")
|
||||
b2 = Binding("qname2", self.exchange, "rkey")
|
||||
b3 = Binding("qname3", self.exchange, "rkey")
|
||||
b4 = Binding("qname4", self.exchange, "rkey")
|
||||
b1 = Queue("qname1", self.exchange, "rkey")
|
||||
b2 = Queue("qname2", self.exchange, "rkey")
|
||||
b3 = Queue("qname3", self.exchange, "rkey")
|
||||
b4 = Queue("qname4", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, [b1, b2, b3, b4])
|
||||
consumer.consume()
|
||||
self.assertEqual(channel.called.count("exchange_declare"), 4)
|
||||
|
@ -238,7 +238,7 @@ class test_Consumer(unittest.TestCase):
|
|||
|
||||
def test_receive_callback(self):
|
||||
channel = self.connection.channel()
|
||||
b1 = Binding("qname1", self.exchange, "rkey")
|
||||
b1 = Queue("qname1", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, [b1])
|
||||
|
||||
received = []
|
||||
|
@ -256,7 +256,7 @@ class test_Consumer(unittest.TestCase):
|
|||
|
||||
def test_basic_ack_twice(self):
|
||||
channel = self.connection.channel()
|
||||
b1 = Binding("qname1", self.exchange, "rkey")
|
||||
b1 = Queue("qname1", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, [b1])
|
||||
|
||||
def callback(message_data, message):
|
||||
|
@ -269,7 +269,7 @@ class test_Consumer(unittest.TestCase):
|
|||
|
||||
def test_basic_reject(self):
|
||||
channel = self.connection.channel()
|
||||
b1 = Binding("qname1", self.exchange, "rkey")
|
||||
b1 = Queue("qname1", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, [b1])
|
||||
|
||||
def callback(message_data, message):
|
||||
|
@ -281,7 +281,7 @@ class test_Consumer(unittest.TestCase):
|
|||
|
||||
def test_basic_reject_twice(self):
|
||||
channel = self.connection.channel()
|
||||
b1 = Binding("qname1", self.exchange, "rkey")
|
||||
b1 = Queue("qname1", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, [b1])
|
||||
|
||||
def callback(message_data, message):
|
||||
|
@ -295,7 +295,7 @@ class test_Consumer(unittest.TestCase):
|
|||
|
||||
def test_basic_reject__requeue(self):
|
||||
channel = self.connection.channel()
|
||||
b1 = Binding("qname1", self.exchange, "rkey")
|
||||
b1 = Queue("qname1", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, [b1])
|
||||
|
||||
def callback(message_data, message):
|
||||
|
@ -307,7 +307,7 @@ class test_Consumer(unittest.TestCase):
|
|||
|
||||
def test_basic_reject__requeue_twice(self):
|
||||
channel = self.connection.channel()
|
||||
b1 = Binding("qname1", self.exchange, "rkey")
|
||||
b1 = Queue("qname1", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, [b1])
|
||||
|
||||
def callback(message_data, message):
|
||||
|
@ -321,13 +321,13 @@ class test_Consumer(unittest.TestCase):
|
|||
|
||||
def test_receive_without_callbacks_raises(self):
|
||||
channel = self.connection.channel()
|
||||
b1 = Binding("qname1", self.exchange, "rkey")
|
||||
b1 = Queue("qname1", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, [b1])
|
||||
self.assertRaises(NotImplementedError, consumer.receive, 1, 2)
|
||||
|
||||
def test_decode_error(self):
|
||||
channel = self.connection.channel()
|
||||
b1 = Binding("qname1", self.exchange, "rkey")
|
||||
b1 = Queue("qname1", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, [b1])
|
||||
consumer.channel.throw_decode_error = True
|
||||
|
||||
|
@ -336,7 +336,7 @@ class test_Consumer(unittest.TestCase):
|
|||
|
||||
def test_on_decode_error_callback(self):
|
||||
channel = self.connection.channel()
|
||||
b1 = Binding("qname1", self.exchange, "rkey")
|
||||
b1 = Queue("qname1", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, [b1])
|
||||
consumer.channel.throw_decode_error = True
|
||||
|
||||
|
@ -354,7 +354,7 @@ class test_Consumer(unittest.TestCase):
|
|||
|
||||
def test_recover(self):
|
||||
channel = self.connection.channel()
|
||||
b1 = Binding("qname1", self.exchange, "rkey")
|
||||
b1 = Queue("qname1", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, [b1])
|
||||
consumer.recover()
|
||||
self.assertIn("basic_recover", channel)
|
||||
|
|
Loading…
Reference in New Issue