mirror of https://github.com/celery/kombu.git
100% coverage for kombu.messaging, 97% total
This commit is contained in:
parent
cbc1234b11
commit
2d1d52431e
|
@ -68,9 +68,8 @@ class Producer(object):
|
|||
if auto_declare is not None:
|
||||
self.auto_declare = auto_declare
|
||||
|
||||
if self.exchange:
|
||||
self.exchange = self.exchange(self.channel)
|
||||
self.auto_declare and self.declare()
|
||||
self.exchange = self.exchange(self.channel)
|
||||
self.auto_declare and self.declare()
|
||||
|
||||
if self.on_return:
|
||||
self.channel.events["basic_return"].append(self.on_return)
|
||||
|
@ -269,7 +268,12 @@ class Consumer(object):
|
|||
|
||||
def cancel_by_queue(self, queue):
|
||||
"""Cancel consumer by queue name."""
|
||||
self.channel.basic_cancel(self._active_tags[queue])
|
||||
try:
|
||||
tag = self._active_tags.pop(queue)
|
||||
except KeyError:
|
||||
pass
|
||||
else:
|
||||
self.channel.basic_cancel(tag)
|
||||
|
||||
def purge(self):
|
||||
"""Purge messages from all queues.
|
||||
|
|
|
@ -25,6 +25,7 @@ class Channel(object):
|
|||
self.called = []
|
||||
self.deliveries = count(1).next
|
||||
self.to_deliver = []
|
||||
self.events = {"basic_return": []}
|
||||
|
||||
def _called(self, name):
|
||||
self.called.append(name)
|
||||
|
@ -73,7 +74,6 @@ class Channel(object):
|
|||
pass
|
||||
|
||||
def queue_purge(self, *args, **kwargs):
|
||||
print("PURGE!")
|
||||
self._called("queue_purge")
|
||||
|
||||
def basic_consume(self, *args, **kwargs):
|
||||
|
|
|
@ -109,6 +109,23 @@ class test_Producer(unittest.TestCase):
|
|||
p = Producer(chan)
|
||||
self.assertFalse(p.exchange.name)
|
||||
|
||||
def test_revive(self):
|
||||
chan = self.connection.channel()
|
||||
p = Producer(chan)
|
||||
chan2 = self.connection.channel()
|
||||
p.revive(chan2)
|
||||
self.assertIs(p.channel, chan2)
|
||||
self.assertIs(p.exchange.channel, chan2)
|
||||
|
||||
def test_on_return(self):
|
||||
chan = self.connection.channel()
|
||||
|
||||
def on_return(exception, exchange, routing_key, message):
|
||||
pass
|
||||
|
||||
p = Producer(chan, on_return=on_return)
|
||||
self.assertTrue(on_return in chan.events["basic_return"])
|
||||
|
||||
|
||||
class test_Consumer(unittest.TestCase):
|
||||
|
||||
|
@ -138,6 +155,7 @@ class test_Consumer(unittest.TestCase):
|
|||
queue = Queue("qname", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, queue, auto_declare=True)
|
||||
consumer.consume()
|
||||
consumer.consume() # twice is a noop
|
||||
self.assertIsNot(consumer.queues[0], queue)
|
||||
self.assertTrue(consumer.queues[0].is_bound)
|
||||
self.assertTrue(consumer.queues[0].exchange.is_bound)
|
||||
|
@ -151,6 +169,10 @@ class test_Consumer(unittest.TestCase):
|
|||
self.assertEqual(channel.called.count("basic_consume"), 1)
|
||||
self.assertTrue(consumer._active_tags)
|
||||
|
||||
consumer.cancel_by_queue(queue.name)
|
||||
consumer.cancel_by_queue(queue.name)
|
||||
self.assertFalse(consumer._active_tags)
|
||||
|
||||
def test_manual_declare(self):
|
||||
channel = self.connection.channel()
|
||||
queue = Queue("qname", self.exchange, "rkey")
|
||||
|
@ -337,14 +359,13 @@ class test_Consumer(unittest.TestCase):
|
|||
def test_on_decode_error_callback(self):
|
||||
channel = self.connection.channel()
|
||||
b1 = Queue("qname1", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, [b1])
|
||||
consumer.channel.throw_decode_error = True
|
||||
thrown = []
|
||||
|
||||
def on_decode_error(msg, exc):
|
||||
thrown.append((msg.body, exc))
|
||||
|
||||
consumer.on_decode_error = on_decode_error
|
||||
consumer = Consumer(channel, [b1], on_decode_error=on_decode_error)
|
||||
consumer.channel.throw_decode_error = True
|
||||
consumer._receive_callback({"foo": "bar"})
|
||||
|
||||
self.assertTrue(thrown)
|
||||
|
@ -358,3 +379,18 @@ class test_Consumer(unittest.TestCase):
|
|||
consumer = Consumer(channel, [b1])
|
||||
consumer.recover()
|
||||
self.assertIn("basic_recover", channel)
|
||||
|
||||
def test_revive(self):
|
||||
channel = self.connection.channel()
|
||||
b1 = Queue("qname1", self.exchange, "rkey")
|
||||
consumer = Consumer(channel, [b1])
|
||||
channel2 = self.connection.channel()
|
||||
consumer.revive(channel2)
|
||||
self.assertIs(consumer.channel, channel2)
|
||||
self.assertIs(consumer.queues[0].channel, channel2)
|
||||
self.assertIs(consumer.queues[0].exchange.channel, channel2)
|
||||
|
||||
def test__repr__(self):
|
||||
channel = self.connection.channel()
|
||||
b1 = Queue("qname1", self.exchange, "rkey")
|
||||
self.assertTrue(repr(Consumer(channel, [b1])))
|
||||
|
|
|
@ -48,3 +48,6 @@ class test_FairCycle(unittest.TestCase):
|
|||
("a", "a"), ("b", "b")])
|
||||
cycle2 = FairCycle(echo, ["c", "c"], MyEmpty)
|
||||
self.assertRaises(MyEmpty, consume, cycle2.get, 3)
|
||||
|
||||
def test__repr__(self):
|
||||
self.assertTrue(repr(FairCycle(lambda x: x, [1, 2, 3], MyEmpty)))
|
||||
|
|
Loading…
Reference in New Issue