tests for URI_PASSTHROUGH & clone

This commit is contained in:
Ask Solem 2012-06-24 16:00:12 +01:00
parent 95cdc0bb6f
commit 9d4c516b42
1 changed files with 20 additions and 9 deletions

View File

@ -4,25 +4,36 @@ from __future__ import with_statement
from mock import patch
from nose import SkipTest
from kombu.connection import BrokerConnection
from kombu.connection import Connection
from kombu.tests.utils import TestCase
class test_sqlalchemy(TestCase):
def test_url_parser(self):
def setUp(self):
try:
import sqlalchemy # noqa
except ImportError:
raise SkipTest('sqlalchemy not installed')
with patch('kombu.transport.sqlalchemy.Channel._open'):
url = 'sqlalchemy+sqlite://celerydb.sqlite'
BrokerConnection(url).connect()
url = 'sqla+sqlite://celerydb.sqlite'
BrokerConnection(url).connect()
def test_url_parser(self):
with patch('kombu.transport.sqlalchemy.Channel._open'):
url = 'sqlalchemy+sqlite:///celerydb.sqlite'
Connection(url).connect()
url = 'sqla+sqlite:///celerydb.sqlite'
Connection(url).connect()
# Should prevent regression fixed by f187ccd
url = 'sqlb+sqlite://celerydb.sqlite'
url = 'sqlb+sqlite:///celerydb.sqlite'
with self.assertRaises(KeyError):
BrokerConnection(url).connect()
Connection(url).connect()
def test_clone(self):
hostname = 'sqlite:///celerydb.sqlite'
x = Connection('+'.join(['sqla', hostname]))
self.assertEqual(x.uri_prefix, 'sqla')
self.assertEqual(x.hostname, hostname)
clone = x.clone()
self.assertEqual(clone.hostname, hostname)
self.assertEqual(clone.uri_prefix, 'sqla')