Functional tests for pyamqplib are passing again

This commit is contained in:
Ask Solem 2010-09-16 16:15:10 +02:00
parent 5ea899d035
commit 5d3475736e
2 changed files with 10 additions and 10 deletions

View File

View File

@ -16,12 +16,14 @@ def consumeN(conn, consumer, n=1):
message.ack()
prev, consumer.callbacks = consumer.callbacks, [callback]
consumer.consume()
while True:
conn.drain_events(timeout=1)
if len(messages) >= n:
break
consumer.cancel()
consumer.callback = prev
return messages
@ -47,26 +49,24 @@ class test_amqplib(unittest.TestCase):
if not self.connected:
raise SkipTest("Broker not running.")
chan1 = self.connection.channel()
consumer = Consumer(chan1, self.queue)
consumer.queues[0].purge()
producer = Producer(chan1, self.exchange)
producer.publish({"foo": "bar"}, routing_key="tamqplib")
chan1.close()
chan2 = self.connection.channel()
consumer = Consumer(chan2, self.queue)
message = consumeN(self.connection, consumer)
self.assertDictEqual(message[0], {"foo": "bar"})
chan2.close()
self.purge(["tamqplib"])
chan1.close()
self.purge([self.queue.name])
def test_produce__consume_multiple(self):
if not self.connected:
raise SkipTest("Broker not running.")
chan1 = self.connection.channel()
producer = Producer(chan1, self.exchange)
b1 = Queue("pyamqplib.b1", self.exchange, "b1")
b2 = Queue("pyamqplib.b2", self.exchange, "b2")
b3 = Queue("pyamqplib.b3", self.exchange, "b3")
b1 = Queue("pyamqplib.b1", self.exchange, "b1")(chan1)
b2 = Queue("pyamqplib.b2", self.exchange, "b2")(chan1)
b3 = Queue("pyamqplib.b3", self.exchange, "b3")(chan1)
[q.purge() for q in (b1, b2, b3)]
producer.publish("b1", routing_key="b1")
producer.publish("b2", routing_key="b2")