mirror of https://github.com/celery/kombu.git
Tests auto_delete for memory transport
This commit is contained in:
parent
1d8040be88
commit
570b544fde
|
@ -82,22 +82,28 @@ def _imaybe_declare(entity, channel, **retry_policy):
|
|||
**retry_policy)(entity, channel)
|
||||
|
||||
|
||||
def itermessages(conn, channel, queue, limit=1, timeout=None,
|
||||
Consumer=_Consumer, **kwargs):
|
||||
def drain_consumer(consumer, limit=1, timeout=None, callbacks=None):
|
||||
acc = deque()
|
||||
|
||||
def on_message(body, message):
|
||||
acc.append((body, message))
|
||||
|
||||
with Consumer(channel, [queue], callbacks=[on_message], **kwargs):
|
||||
for _ in eventloop(conn, limit=limit, timeout=timeout,
|
||||
ignore_timeouts=True):
|
||||
consumer.callbacks = [on_message] + (callbacks or [])
|
||||
|
||||
with consumer:
|
||||
for _ in eventloop(consumer.channel.connection.client,
|
||||
limit=limit, timeout=timeout, ignore_timeouts=True):
|
||||
try:
|
||||
yield acc.popleft()
|
||||
except IndexError:
|
||||
pass
|
||||
|
||||
|
||||
def itermessages(conn, channel, queue, limit=1, timeout=None,
|
||||
Consumer=_Consumer, callbacks=None, **kwargs):
|
||||
return drain_consumer(Consumer(channel, queues=[queue], **kwargs),
|
||||
limit=limit, timeout=timeout, callbacks=callbacks)
|
||||
|
||||
def eventloop(conn, limit=None, timeout=None, ignore_timeouts=False):
|
||||
"""Best practice generator wrapper around ``Connection.drain_events``.
|
||||
|
||||
|
|
|
@ -188,7 +188,7 @@ class Exchange(MaybeChannelBound):
|
|||
properties = {} if properties is None else properties
|
||||
dm = delivery_mode or self.delivery_mode
|
||||
properties["delivery_mode"] = \
|
||||
DELIVERY_MODES.get(dm) if (dm != 2 or dm != 1) else dm
|
||||
DELIVERY_MODES[dm] if (dm != 2 and dm != 1) else dm
|
||||
return self.channel.prepare_message(body,
|
||||
properties=properties,
|
||||
priority=priority,
|
||||
|
|
|
@ -45,18 +45,6 @@ class test_maybe_declare(TestCase):
|
|||
maybe_declare(entity, channel)
|
||||
self.assertEqual(entity.declare.call_count, 1)
|
||||
|
||||
def test_uncacheable(self):
|
||||
channel = Mock()
|
||||
entity = Mock()
|
||||
entity.can_cache_declaration = False
|
||||
entity.is_bound = True
|
||||
|
||||
maybe_declare(entity, channel)
|
||||
self.assertEqual(entity.declare.call_count, 1)
|
||||
|
||||
maybe_declare(entity, channel)
|
||||
self.assertEqual(entity.declare.call_count, 2)
|
||||
|
||||
def test_binds_entities(self):
|
||||
channel = Mock()
|
||||
channel.connection.client.declared_entities = set()
|
||||
|
@ -243,7 +231,7 @@ class test_insured(TestCase):
|
|||
class MockConsumer(object):
|
||||
consumers = set()
|
||||
|
||||
def __init__(self, channel, queues, callbacks, **kwargs):
|
||||
def __init__(self, channel, queues=None, callbacks=None, **kwargs):
|
||||
self.channel = channel
|
||||
self.queues = queues
|
||||
self.callbacks = callbacks
|
||||
|
@ -271,6 +259,7 @@ class test_itermessages(TestCase):
|
|||
def test_default(self):
|
||||
conn = self.MockConnection()
|
||||
channel = Mock()
|
||||
channel.connection.client = conn
|
||||
it = common.itermessages(conn, channel, "q", limit=1,
|
||||
Consumer=MockConsumer)
|
||||
|
||||
|
@ -284,6 +273,7 @@ class test_itermessages(TestCase):
|
|||
conn = self.MockConnection()
|
||||
conn.should_raise_timeout = True
|
||||
channel = Mock()
|
||||
channel.connection.client = conn
|
||||
it = common.itermessages(conn, channel, "q", limit=1,
|
||||
Consumer=MockConsumer)
|
||||
|
||||
|
|
|
@ -3,8 +3,10 @@ from __future__ import with_statement
|
|||
|
||||
import socket
|
||||
|
||||
from kombu.common import eventloop, itermessages
|
||||
from kombu.connection import BrokerConnection
|
||||
from kombu.entity import Exchange, Queue
|
||||
from kombu.exceptions import StdChannelError
|
||||
from kombu.messaging import Consumer, Producer
|
||||
|
||||
from .utils import TestCase
|
||||
|
@ -45,6 +47,43 @@ class test_MemoryTransport(TestCase):
|
|||
|
||||
self.assertEqual(len(_received), 10)
|
||||
|
||||
def test_auto_delete(self, name="test_memory_autodelete"):
|
||||
_received = []
|
||||
channel = self.c.channel()
|
||||
queue = Queue(name,
|
||||
exchange=Exchange(name),
|
||||
routing_key=name,
|
||||
auto_delete=True,
|
||||
no_ack=True)
|
||||
queue(channel).declare()
|
||||
self.assertIn(name, channel.auto_delete_queues)
|
||||
|
||||
def callback(body, message):
|
||||
_received.append(body)
|
||||
|
||||
producer = Producer(channel, queue.exchange)
|
||||
for i in range(10):
|
||||
producer.publish({"foo": i}, routing_key=name)
|
||||
|
||||
with Consumer(channel, queue, callbacks=[callback]) as consumer:
|
||||
self.assertEqual(channel.auto_delete_queues[name], 1)
|
||||
|
||||
for _ in eventloop(self.c, limit=10):
|
||||
pass
|
||||
self.assertEqual(channel.auto_delete_queues[name], 0)
|
||||
self.assertTrue(len(_received), 10)
|
||||
|
||||
# all messages consumed, should be deleted now.
|
||||
with self.assertRaises(StdChannelError):
|
||||
channel.queue_declare(name, passive=True)
|
||||
|
||||
queue(channel).declare()
|
||||
producer.publish({"foo": i+1}, routing_key=name)
|
||||
list(itermessages(self.c, channel, queue))
|
||||
with self.assertRaises(StdChannelError):
|
||||
channel.queue_declare(name, passive=True)
|
||||
|
||||
|
||||
def test_produce_consume(self):
|
||||
channel = self.c.channel()
|
||||
producer = Producer(channel, self.e)
|
||||
|
|
Loading…
Reference in New Issue