diff --git a/kombu/tests/test_functional/__init__.py b/kombu/tests/test_functional/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/kombu/tests/test_functional/test_amqplib.py b/kombu/tests/test_functional/test_amqplib.py index 94b9bfc9..f933d382 100644 --- a/kombu/tests/test_functional/test_amqplib.py +++ b/kombu/tests/test_functional/test_amqplib.py @@ -16,12 +16,14 @@ def consumeN(conn, consumer, n=1): message.ack() prev, consumer.callbacks = consumer.callbacks, [callback] + consumer.consume() while True: conn.drain_events(timeout=1) if len(messages) >= n: break + consumer.cancel() consumer.callback = prev return messages @@ -47,26 +49,24 @@ class test_amqplib(unittest.TestCase): if not self.connected: raise SkipTest("Broker not running.") chan1 = self.connection.channel() + consumer = Consumer(chan1, self.queue) + consumer.queues[0].purge() producer = Producer(chan1, self.exchange) - producer.publish({"foo": "bar"}, routing_key="tamqplib") - chan1.close() - - chan2 = self.connection.channel() - consumer = Consumer(chan2, self.queue) message = consumeN(self.connection, consumer) self.assertDictEqual(message[0], {"foo": "bar"}) - chan2.close() - self.purge(["tamqplib"]) + chan1.close() + self.purge([self.queue.name]) def test_produce__consume_multiple(self): if not self.connected: raise SkipTest("Broker not running.") chan1 = self.connection.channel() producer = Producer(chan1, self.exchange) - b1 = Queue("pyamqplib.b1", self.exchange, "b1") - b2 = Queue("pyamqplib.b2", self.exchange, "b2") - b3 = Queue("pyamqplib.b3", self.exchange, "b3") + b1 = Queue("pyamqplib.b1", self.exchange, "b1")(chan1) + b2 = Queue("pyamqplib.b2", self.exchange, "b2")(chan1) + b3 = Queue("pyamqplib.b3", self.exchange, "b3")(chan1) + [q.purge() for q in (b1, b2, b3)] producer.publish("b1", routing_key="b1") producer.publish("b2", routing_key="b2")