mirror of https://github.com/celery/kombu.git
Fixes funtests
This commit is contained in:
parent
efa792b95c
commit
27f8345cb3
|
@ -15,6 +15,10 @@ class test_SQS(transport.TransportCase):
|
|||
# even in simple cases.
|
||||
|
||||
def before_connect(self):
|
||||
try:
|
||||
import boto # noqa
|
||||
except ImportError:
|
||||
raise SkipTest('boto not installed')
|
||||
if 'AWS_ACCESS_KEY_ID' not in os.environ:
|
||||
raise SkipTest('Missing envvar AWS_ACCESS_KEY_ID')
|
||||
if 'AWS_SECRET_ACCESS_KEY' not in os.environ:
|
||||
|
|
|
@ -1,6 +1,14 @@
|
|||
from nose import SkipTest
|
||||
|
||||
from funtests import transport
|
||||
|
||||
|
||||
class test_amqplib(transport.TransportCase):
|
||||
transport = 'amqplib'
|
||||
prefix = 'amqplib'
|
||||
|
||||
def before_connect(self):
|
||||
try:
|
||||
import amqplib # noqa
|
||||
except ImportError:
|
||||
raise SkipTest('amqplib not installed')
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
from funtests import transport
|
||||
|
||||
from nose import SkipTest
|
||||
|
||||
|
||||
class test_beanstalk(transport.TransportCase):
|
||||
transport = 'beanstalk'
|
||||
|
@ -7,5 +9,11 @@ class test_beanstalk(transport.TransportCase):
|
|||
event_loop_max = 10
|
||||
message_size_limit = 47662
|
||||
|
||||
def before_connect(self):
|
||||
try:
|
||||
import beanstalkc # noqa
|
||||
except ImportError:
|
||||
raise SkipTest('beanstalkc not installed')
|
||||
|
||||
def after_connect(self, connection):
|
||||
connection.channel().client
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
from nose import SkipTest
|
||||
|
||||
from funtests import transport
|
||||
|
||||
|
||||
|
@ -6,5 +8,11 @@ class test_couchdb(transport.TransportCase):
|
|||
prefix = 'couchdb'
|
||||
event_loop_max = 100
|
||||
|
||||
def before_connect(self):
|
||||
try:
|
||||
import couchdb # noqa
|
||||
except ImportError:
|
||||
raise SkipTest('couchdb not installed')
|
||||
|
||||
def after_connect(self, connection):
|
||||
connection.channel().client
|
||||
|
|
|
@ -15,17 +15,22 @@ class test_django(transport.TransportCase):
|
|||
@redirect_stdouts
|
||||
def setup_django(stdout, stderr):
|
||||
try:
|
||||
import djkombu # noqa
|
||||
import django # noqa
|
||||
except ImportError:
|
||||
raise SkipTest('django-kombu not installed')
|
||||
raise SkipTest('django not installed')
|
||||
from django.conf import settings
|
||||
if not settings.configured:
|
||||
settings.configure(DATABASE_ENGINE='sqlite3',
|
||||
DATABASE_NAME=':memory:',
|
||||
DATABASES={'default': {
|
||||
'ENGINE': 'django.db.backends.sqlite3',
|
||||
'NAME': ':memory:'}},
|
||||
INSTALLED_APPS=('djkombu', ))
|
||||
settings.configure(
|
||||
DATABASE_ENGINE='sqlite3',
|
||||
DATABASE_NAME=':memory:',
|
||||
DATABASES={
|
||||
'default': {
|
||||
'ENGINE': 'django.db.backends.sqlite3',
|
||||
'NAME': ':memory:',
|
||||
},
|
||||
},
|
||||
INSTALLED_APPS=('kombu.transports.django', ),
|
||||
)
|
||||
from django.core.management import call_command
|
||||
call_command('syncdb')
|
||||
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
from nose import SkipTest
|
||||
|
||||
from kombu import Consumer, Producer, Exchange, Queue
|
||||
from kombu.utils import nested
|
||||
|
||||
|
@ -9,12 +11,20 @@ class test_mongodb(transport.TransportCase):
|
|||
prefix = 'mongodb'
|
||||
event_loop_max = 100
|
||||
|
||||
def before_connect(self):
|
||||
try:
|
||||
import pymongo # noqa
|
||||
except ImportError:
|
||||
raise SkipTest('pymongo not installed')
|
||||
|
||||
def after_connect(self, connection):
|
||||
connection.channel().client # evaluate connection.
|
||||
|
||||
self.c = self.connection # shortcut
|
||||
|
||||
def test_fanout(self, name='test_mongodb_fanout'):
|
||||
if not self.verify_alive():
|
||||
return
|
||||
c = self.connection
|
||||
self.e = Exchange(name, type='fanout')
|
||||
self.q = Queue(name, exchange=self.e, routing_key=name)
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
from nose import SkipTest
|
||||
|
||||
from funtests import transport
|
||||
|
||||
|
||||
|
@ -5,6 +7,12 @@ class test_redis(transport.TransportCase):
|
|||
transport = 'redis'
|
||||
prefix = 'redis'
|
||||
|
||||
def before_connect(self):
|
||||
try:
|
||||
import redis # noqa
|
||||
except ImportError:
|
||||
raise SkipTest('redis not installed')
|
||||
|
||||
def after_connect(self, connection):
|
||||
client = connection.channel().client
|
||||
client.info()
|
||||
|
|
|
@ -7,10 +7,10 @@ class test_sqla(transport.TransportCase):
|
|||
transport = 'sqlalchemy'
|
||||
prefix = 'sqlalchemy'
|
||||
event_loop_max = 10
|
||||
connection_options = {'hostname': 'sqlite://'}
|
||||
connection_options = {'hostname': 'sqla+sqlite://'}
|
||||
|
||||
def before_connect(self):
|
||||
try:
|
||||
import sqlakombu # noqa
|
||||
import sqlalchemy # noqa
|
||||
except ImportError:
|
||||
raise SkipTest('kombu-sqlalchemy not installed')
|
||||
raise SkipTest('sqlalchemy not installed')
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
from nose import SkipTest
|
||||
|
||||
from funtests import transport
|
||||
|
||||
|
||||
|
@ -6,5 +8,11 @@ class test_zookeeper(transport.TransportCase):
|
|||
prefix = 'zookeeper'
|
||||
event_loop_max = 100
|
||||
|
||||
def before_connect(self):
|
||||
try:
|
||||
import kazoo # noqa
|
||||
except ImportError:
|
||||
raise SkipTest('kazoo not installed')
|
||||
|
||||
def after_connect(self, connection):
|
||||
connection.channel().client
|
||||
|
|
|
@ -23,6 +23,10 @@ def say(msg):
|
|||
sys.stderr.write(unicode(msg) + '\n')
|
||||
|
||||
|
||||
def _nobuf(x):
|
||||
return [str(i) if isinstance(i, buffer) else i for i in x]
|
||||
|
||||
|
||||
def consumeN(conn, consumer, n=1, timeout=30):
|
||||
messages = []
|
||||
|
||||
|
@ -212,15 +216,15 @@ class TransportCase(unittest.TestCase):
|
|||
[q.declare() for q in (b1, b2, b3)]
|
||||
self.purge([b1.name, b2.name, b3.name])
|
||||
|
||||
producer.publish('b1', routing_key='b1')
|
||||
producer.publish('b2', routing_key='b2')
|
||||
producer.publish('b3', routing_key='b3')
|
||||
producer.publish(u'b1', routing_key='b1')
|
||||
producer.publish(u'b2', routing_key='b2')
|
||||
producer.publish(u'b3', routing_key='b3')
|
||||
chan1.close()
|
||||
|
||||
chan2 = self.connection.channel()
|
||||
consumer = chan2.Consumer([b1, b2, b3])
|
||||
messages = consumeN(self.connection, consumer, 3)
|
||||
self.assertItemsEqual(messages, ['b1', 'b2', 'b3'])
|
||||
self.assertItemsEqual(_nobuf(messages), ['b1', 'b2', 'b3'])
|
||||
chan2.close()
|
||||
self.purge([self.P('b1'), self.P('b2'), self.P('b3')])
|
||||
|
||||
|
|
Loading…
Reference in New Issue