diff --git a/funtests/tests/test_mongodb.py b/funtests/tests/test_mongodb.py index 8d6353c2..83932364 100644 --- a/funtests/tests/test_mongodb.py +++ b/funtests/tests/test_mongodb.py @@ -1,3 +1,6 @@ +from kombu import Consumer, Producer, Exchange, Queue +from kombu.utils import nested + from funtests import transport @@ -7,4 +10,60 @@ class test_mongodb(transport.TransportCase): event_loop_max = 100 def after_connect(self, connection): - connection.channel().client + connection.channel().client # evaluate connection. + + self.c = self.connection # shortcut + + def test_fanout(self, name="test_mongodb_fanout"): + c = self.connection + e = Exchange(name, type="fanout") + q = Queue(name, exchange=e, routing_key=name) + q2 = Queue(name + "2", exchange=e, routing_key=name + "2") + + channel = c.default_channel + producer = Producer(channel, e) + consumer1 = Consumer(channel, q) + consumer2 = Consumer(channel, q2) + self.q2(channel).declare() + + for i in xrange(10): + producer.publish({"foo": i}, routing_key=name) + for i in xrange(10): + producer.publish({"foo": i}, routing_key=name + "2") + + _received1 = [] + _received2 = [] + + def callback1(message_data, message): + _received1.append(message) + message.ack() + + def callback2(message_data, message): + _received2.append(message) + message.ack() + + consumer1.register_callback(callback1) + consumer2.register_callback(callback2) + + with nested(consumer1, consumer2): + + while 1: + if len(_received1) + len(_received2) == 20: + break + c.drain_events(timeout=60) + self.assertEqual(len(_received1) + len(_received2), 20) + + # queue.delete + for i in xrange(10): + producer.publish({"foo": i}, routing_key=name) + self.assertTrue(self.q(channel).get()) + self.q(channel).delete() + self.q(channel).declare() + self.assertIsNone(self.q(channel).get()) + + # queue.purge + for i in xrange(10): + producer.publish({"foo": i}, routing_key=name + "2") + self.assertTrue(self.q2(channel).get()) + self.q2(channel).purge() + self.assertIsNone(self.q2(channel).get()) diff --git a/kombu/tests/test_transport_mongodb.py b/kombu/tests/test_transport_mongodb.py deleted file mode 100644 index 57fa9f14..00000000 --- a/kombu/tests/test_transport_mongodb.py +++ /dev/null @@ -1,72 +0,0 @@ -from __future__ import absolute_import -from __future__ import with_statement - -from ..connection import BrokerConnection -from ..entity import Exchange, Queue -from ..messaging import Consumer, Producer - -from .utils import TestCase - - -class test_MongoDBTransport(TestCase): - - def setUp(self): - self.c = BrokerConnection(transport="mongodb") - self.e = Exchange("test_transport_mongodb", type="fanout") - self.q = Queue("test_transport_mongodb", - exchange=self.e, - routing_key="test_transport_mongodb") - self.q2 = Queue("test_transport_memory2", - exchange=self.e, - routing_key="test_transport_mongodb2") - - def test_produce_consume(self): - channel = self.c.channel() - producer = Producer(channel, self.e) - consumer1 = Consumer(channel, self.q) - consumer2 = Consumer(channel, self.q2) - self.q2(channel).declare() - - for i in range(10): - producer.publish({"foo": i}, routing_key="test_transport_mongodb") - for i in range(10): - producer.publish({"foo": i}, routing_key="test_transport_mongodb2") - - _received1 = [] - _received2 = [] - - def callback1(message_data, message): - _received1.append(message) - message.ack() - - def callback2(message_data, message): - _received2.append(message) - message.ack() - - consumer1.register_callback(callback1) - consumer2.register_callback(callback2) - - consumer1.consume() - consumer2.consume() - - while 1: - if len(_received1) + len(_received2) == 20: - break - self.c.drain_events() - - self.assertEqual(len(_received1) + len(_received2), 20) - - # queue.delete - for i in range(10): - producer.publish({"foo": i}, routing_key="test_transport_mongodb") - self.assertTrue(self.q(channel).get()) - self.q(channel).delete() - self.q(channel).declare() - self.assertIsNone(self.q(channel).get()) - - # queue.purge - for i in range(10): - producer.publish({"foo": i}, routing_key="test_transport_mongodb2") - self.assertTrue(self.q2(channel).get()) - self.q2(channel).purge() - self.assertIsNone(self.q2(channel).get())