diff --git a/kombu/common.py b/kombu/common.py index 2bbdb93f..4c0f9bfd 100644 --- a/kombu/common.py +++ b/kombu/common.py @@ -82,22 +82,28 @@ def _imaybe_declare(entity, channel, **retry_policy): **retry_policy)(entity, channel) -def itermessages(conn, channel, queue, limit=1, timeout=None, - Consumer=_Consumer, **kwargs): +def drain_consumer(consumer, limit=1, timeout=None, callbacks=None): acc = deque() def on_message(body, message): acc.append((body, message)) - with Consumer(channel, [queue], callbacks=[on_message], **kwargs): - for _ in eventloop(conn, limit=limit, timeout=timeout, - ignore_timeouts=True): + consumer.callbacks = [on_message] + (callbacks or []) + + with consumer: + for _ in eventloop(consumer.channel.connection.client, + limit=limit, timeout=timeout, ignore_timeouts=True): try: yield acc.popleft() except IndexError: pass +def itermessages(conn, channel, queue, limit=1, timeout=None, + Consumer=_Consumer, callbacks=None, **kwargs): + return drain_consumer(Consumer(channel, queues=[queue], **kwargs), + limit=limit, timeout=timeout, callbacks=callbacks) + def eventloop(conn, limit=None, timeout=None, ignore_timeouts=False): """Best practice generator wrapper around ``Connection.drain_events``. diff --git a/kombu/entity.py b/kombu/entity.py index f1d23e48..0fee91aa 100644 --- a/kombu/entity.py +++ b/kombu/entity.py @@ -188,7 +188,7 @@ class Exchange(MaybeChannelBound): properties = {} if properties is None else properties dm = delivery_mode or self.delivery_mode properties["delivery_mode"] = \ - DELIVERY_MODES.get(dm) if (dm != 2 or dm != 1) else dm + DELIVERY_MODES[dm] if (dm != 2 and dm != 1) else dm return self.channel.prepare_message(body, properties=properties, priority=priority, diff --git a/kombu/tests/test_common.py b/kombu/tests/test_common.py index 0ba44154..452c26a8 100644 --- a/kombu/tests/test_common.py +++ b/kombu/tests/test_common.py @@ -45,18 +45,6 @@ class test_maybe_declare(TestCase): maybe_declare(entity, channel) self.assertEqual(entity.declare.call_count, 1) - def test_uncacheable(self): - channel = Mock() - entity = Mock() - entity.can_cache_declaration = False - entity.is_bound = True - - maybe_declare(entity, channel) - self.assertEqual(entity.declare.call_count, 1) - - maybe_declare(entity, channel) - self.assertEqual(entity.declare.call_count, 2) - def test_binds_entities(self): channel = Mock() channel.connection.client.declared_entities = set() @@ -243,7 +231,7 @@ class test_insured(TestCase): class MockConsumer(object): consumers = set() - def __init__(self, channel, queues, callbacks, **kwargs): + def __init__(self, channel, queues=None, callbacks=None, **kwargs): self.channel = channel self.queues = queues self.callbacks = callbacks @@ -271,6 +259,7 @@ class test_itermessages(TestCase): def test_default(self): conn = self.MockConnection() channel = Mock() + channel.connection.client = conn it = common.itermessages(conn, channel, "q", limit=1, Consumer=MockConsumer) @@ -284,6 +273,7 @@ class test_itermessages(TestCase): conn = self.MockConnection() conn.should_raise_timeout = True channel = Mock() + channel.connection.client = conn it = common.itermessages(conn, channel, "q", limit=1, Consumer=MockConsumer) diff --git a/kombu/tests/test_transport_memory.py b/kombu/tests/test_transport_memory.py index c658327c..989b0e92 100644 --- a/kombu/tests/test_transport_memory.py +++ b/kombu/tests/test_transport_memory.py @@ -3,8 +3,10 @@ from __future__ import with_statement import socket +from kombu.common import eventloop, itermessages from kombu.connection import BrokerConnection from kombu.entity import Exchange, Queue +from kombu.exceptions import StdChannelError from kombu.messaging import Consumer, Producer from .utils import TestCase @@ -45,6 +47,43 @@ class test_MemoryTransport(TestCase): self.assertEqual(len(_received), 10) + def test_auto_delete(self, name="test_memory_autodelete"): + _received = [] + channel = self.c.channel() + queue = Queue(name, + exchange=Exchange(name), + routing_key=name, + auto_delete=True, + no_ack=True) + queue(channel).declare() + self.assertIn(name, channel.auto_delete_queues) + + def callback(body, message): + _received.append(body) + + producer = Producer(channel, queue.exchange) + for i in range(10): + producer.publish({"foo": i}, routing_key=name) + + with Consumer(channel, queue, callbacks=[callback]) as consumer: + self.assertEqual(channel.auto_delete_queues[name], 1) + + for _ in eventloop(self.c, limit=10): + pass + self.assertEqual(channel.auto_delete_queues[name], 0) + self.assertTrue(len(_received), 10) + + # all messages consumed, should be deleted now. + with self.assertRaises(StdChannelError): + channel.queue_declare(name, passive=True) + + queue(channel).declare() + producer.publish({"foo": i+1}, routing_key=name) + list(itermessages(self.c, channel, queue)) + with self.assertRaises(StdChannelError): + channel.queue_declare(name, passive=True) + + def test_produce_consume(self): channel = self.c.channel() producer = Producer(channel, self.e)