diff --git a/kombu/messaging.py b/kombu/messaging.py index bac4e320..021abc13 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -68,9 +68,8 @@ class Producer(object): if auto_declare is not None: self.auto_declare = auto_declare - if self.exchange: - self.exchange = self.exchange(self.channel) - self.auto_declare and self.declare() + self.exchange = self.exchange(self.channel) + self.auto_declare and self.declare() if self.on_return: self.channel.events["basic_return"].append(self.on_return) @@ -269,7 +268,12 @@ class Consumer(object): def cancel_by_queue(self, queue): """Cancel consumer by queue name.""" - self.channel.basic_cancel(self._active_tags[queue]) + try: + tag = self._active_tags.pop(queue) + except KeyError: + pass + else: + self.channel.basic_cancel(tag) def purge(self): """Purge messages from all queues. diff --git a/kombu/tests/mocks.py b/kombu/tests/mocks.py index 7cbf857e..d34ca473 100644 --- a/kombu/tests/mocks.py +++ b/kombu/tests/mocks.py @@ -25,6 +25,7 @@ class Channel(object): self.called = [] self.deliveries = count(1).next self.to_deliver = [] + self.events = {"basic_return": []} def _called(self, name): self.called.append(name) @@ -73,7 +74,6 @@ class Channel(object): pass def queue_purge(self, *args, **kwargs): - print("PURGE!") self._called("queue_purge") def basic_consume(self, *args, **kwargs): diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py index a90dc51b..020927ad 100644 --- a/kombu/tests/test_messaging.py +++ b/kombu/tests/test_messaging.py @@ -109,6 +109,23 @@ class test_Producer(unittest.TestCase): p = Producer(chan) self.assertFalse(p.exchange.name) + def test_revive(self): + chan = self.connection.channel() + p = Producer(chan) + chan2 = self.connection.channel() + p.revive(chan2) + self.assertIs(p.channel, chan2) + self.assertIs(p.exchange.channel, chan2) + + def test_on_return(self): + chan = self.connection.channel() + + def on_return(exception, exchange, routing_key, message): + pass + + p = Producer(chan, on_return=on_return) + self.assertTrue(on_return in chan.events["basic_return"]) + class test_Consumer(unittest.TestCase): @@ -138,6 +155,7 @@ class test_Consumer(unittest.TestCase): queue = Queue("qname", self.exchange, "rkey") consumer = Consumer(channel, queue, auto_declare=True) consumer.consume() + consumer.consume() # twice is a noop self.assertIsNot(consumer.queues[0], queue) self.assertTrue(consumer.queues[0].is_bound) self.assertTrue(consumer.queues[0].exchange.is_bound) @@ -151,6 +169,10 @@ class test_Consumer(unittest.TestCase): self.assertEqual(channel.called.count("basic_consume"), 1) self.assertTrue(consumer._active_tags) + consumer.cancel_by_queue(queue.name) + consumer.cancel_by_queue(queue.name) + self.assertFalse(consumer._active_tags) + def test_manual_declare(self): channel = self.connection.channel() queue = Queue("qname", self.exchange, "rkey") @@ -337,14 +359,13 @@ class test_Consumer(unittest.TestCase): def test_on_decode_error_callback(self): channel = self.connection.channel() b1 = Queue("qname1", self.exchange, "rkey") - consumer = Consumer(channel, [b1]) - consumer.channel.throw_decode_error = True thrown = [] def on_decode_error(msg, exc): thrown.append((msg.body, exc)) - consumer.on_decode_error = on_decode_error + consumer = Consumer(channel, [b1], on_decode_error=on_decode_error) + consumer.channel.throw_decode_error = True consumer._receive_callback({"foo": "bar"}) self.assertTrue(thrown) @@ -358,3 +379,18 @@ class test_Consumer(unittest.TestCase): consumer = Consumer(channel, [b1]) consumer.recover() self.assertIn("basic_recover", channel) + + def test_revive(self): + channel = self.connection.channel() + b1 = Queue("qname1", self.exchange, "rkey") + consumer = Consumer(channel, [b1]) + channel2 = self.connection.channel() + consumer.revive(channel2) + self.assertIs(consumer.channel, channel2) + self.assertIs(consumer.queues[0].channel, channel2) + self.assertIs(consumer.queues[0].exchange.channel, channel2) + + def test__repr__(self): + channel = self.connection.channel() + b1 = Queue("qname1", self.exchange, "rkey") + self.assertTrue(repr(Consumer(channel, [b1]))) diff --git a/kombu/tests/test_virtual_scheduling.py b/kombu/tests/test_virtual_scheduling.py index 20abbbde..bdeee651 100644 --- a/kombu/tests/test_virtual_scheduling.py +++ b/kombu/tests/test_virtual_scheduling.py @@ -48,3 +48,6 @@ class test_FairCycle(unittest.TestCase): ("a", "a"), ("b", "b")]) cycle2 = FairCycle(echo, ["c", "c"], MyEmpty) self.assertRaises(MyEmpty, consume, cycle2.get, 3) + + def test__repr__(self): + self.assertTrue(repr(FairCycle(lambda x: x, [1, 2, 3], MyEmpty)))