diff --git a/kombu/tests/test_functional/test_beanstalk.py b/kombu/tests/test_functional/test_beanstalk.py index a1c86535..d726430a 100644 --- a/kombu/tests/test_functional/test_beanstalk.py +++ b/kombu/tests/test_functional/test_beanstalk.py @@ -10,6 +10,3 @@ class test_beanstalk(transport.TransportCase): def after_connect(self, connection): connection.channel().client - - def test_basic_get(self): - raise SkipTest("beanstalk does not support synchronous access.") diff --git a/kombu/tests/test_functional/test_mongodb.py b/kombu/tests/test_functional/test_mongodb.py index 976a1bad..0acdf49d 100644 --- a/kombu/tests/test_functional/test_mongodb.py +++ b/kombu/tests/test_functional/test_mongodb.py @@ -1,5 +1,3 @@ -from nose import SkipTest - from kombu.tests.test_functional import transport @@ -10,6 +8,3 @@ class test_mongodb(transport.TransportCase): def after_connect(self, connection): connection.channel().client - - #def test_basic_get(self): - # raise SkipTest("beanstalk does not support synchronous access.") diff --git a/kombu/tests/test_functional/test_redis.py b/kombu/tests/test_functional/test_redis.py index 60e4c91d..94d0211d 100644 --- a/kombu/tests/test_functional/test_redis.py +++ b/kombu/tests/test_functional/test_redis.py @@ -6,4 +6,5 @@ class test_redis(transport.TransportCase): prefix = "redis" def after_connect(self, connection): - connection.channel().client + client = connection.channel().client + client.info() diff --git a/kombu/tests/test_functional/transport.py b/kombu/tests/test_functional/transport.py index 272accaa..14b647b4 100644 --- a/kombu/tests/test_functional/transport.py +++ b/kombu/tests/test_functional/transport.py @@ -32,33 +32,47 @@ class TransportCase(unittest.TestCase): transport = None prefix = None event_loop_max = 100 + connection_options = {} - def purge(self, names): - chan = self.connection.channel() - map(chan.queue_purge, names) + connected = False + skip_test_reason = None - def do_connect(self): - self.connection = BrokerConnection(transport=self.transport) - try: - self.connection.connect() - self.after_connect(self.connection) - except self.connection.connection_errors: - self.connected = False - else: - self.connected = True + def before_connect(self): + pass def after_connect(self, connection): pass def setUp(self): if self.transport: - self.do_connect() + try: + self.before_connect() + except SkipTest, exc: + self.skip_test_reason = str(exc) + else: + self.do_connect() self.exchange = Exchange(self.prefix, "direct") self.queue = Queue(self.prefix, self.exchange, self.prefix) + def purge(self, names): + chan = self.connection.channel() + map(chan.queue_purge, names) + + def do_connect(self): + self.connection = BrokerConnection(transport=self.transport, + **self.connection_options) + try: + self.connection.connect() + self.after_connect(self.connection) + except self.connection.connection_errors: + self.skip_test_reason = "%s transport can't connect" % ( + self.transport, ) + else: + self.connected = True + def verify_alive(self): if not self.connected: - raise SkipTest("%s not running." % self.transport) + raise SkipTest(self.skip_test_reason) def test_produce__consume(self): if not self.transport: @@ -118,13 +132,13 @@ class TransportCase(unittest.TestCase): self.verify_alive() chan1 = self.connection.channel() producer = Producer(chan1, self.exchange) - producer.publish({"basic.get": "this"}, routing_key="basic_get") - chan1.close() - chan2 = self.connection.channel() queue = Queue(self.P("basic_get"), self.exchange, "basic_get") queue = queue(chan2) queue.declare() + producer.publish({"basic.get": "this"}, routing_key="basic_get") + chan1.close() + for i in range(self.event_loop_max): m = queue.get() if m: