rq/tests/test_helpers.py

42 lines
1.4 KiB
Python
Raw Normal View History

from rq.cli.helpers import get_redis_from_config
from tests import RQTestCase
class TestHelpers(RQTestCase):
def test_get_redis_from_config(self):
"""Ensure Redis connection params are properly parsed"""
settings = {
'REDIS_URL': 'redis://localhost:1/1'
}
# Ensure REDIS_URL is read
redis = get_redis_from_config(settings)
connection_kwargs = redis.connection_pool.connection_kwargs
self.assertEqual(connection_kwargs['db'], 1)
self.assertEqual(connection_kwargs['port'], 1)
settings = {
'REDIS_URL': 'redis://localhost:1/1',
'REDIS_HOST': 'foo',
'REDIS_DB': 2,
'REDIS_PORT': 2,
'REDIS_PASSWORD': 'bar'
}
# Ensure REDIS_URL is preferred
redis = get_redis_from_config(settings)
connection_kwargs = redis.connection_pool.connection_kwargs
self.assertEqual(connection_kwargs['db'], 1)
self.assertEqual(connection_kwargs['port'], 1)
# Ensure fall back to regular connection parameters
settings['REDIS_URL'] = None
redis = get_redis_from_config(settings)
connection_kwargs = redis.connection_pool.connection_kwargs
self.assertEqual(connection_kwargs['host'], 'foo')
self.assertEqual(connection_kwargs['db'], 2)
self.assertEqual(connection_kwargs['port'], 2)
self.assertEqual(connection_kwargs['password'], 'bar')