mirror of https://github.com/celery/kombu.git
[sentinel] Simple unittests for RedisSentinel transport support
This commit is contained in:
parent
e15a41382f
commit
57322c22ab
|
@ -14,7 +14,7 @@ from kombu.utils import eventio # patch poll
|
||||||
from kombu.utils.json import dumps
|
from kombu.utils.json import dumps
|
||||||
|
|
||||||
from kombu.tests.case import (
|
from kombu.tests.case import (
|
||||||
Case, ContextMock, Mock, call, module_exists, skip_if_not_module, patch,
|
Case, ContextMock, Mock, call, module_exists, skip_if_not_module, patch, ANY,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -1280,3 +1280,41 @@ class test_Mutex(Case):
|
||||||
with redis.Mutex(client, 'foo1', 100):
|
with redis.Mutex(client, 'foo1', 100):
|
||||||
held = True
|
held = True
|
||||||
self.assertTrue(held)
|
self.assertTrue(held)
|
||||||
|
|
||||||
|
|
||||||
|
class test_RedisSentinel(Case):
|
||||||
|
def test_method_called(self):
|
||||||
|
from kombu.transport.redis import SentinelChannel
|
||||||
|
|
||||||
|
with patch.object(SentinelChannel, '_sentinel_managed_pool') as patched:
|
||||||
|
connection = Connection('sentinel://localhost:65534/', transport_options={
|
||||||
|
'master_name': 'not_important'
|
||||||
|
})
|
||||||
|
|
||||||
|
connection.channel()
|
||||||
|
self.assertTrue(patched.called)
|
||||||
|
|
||||||
|
def test_getting_master_from_sentinel(self):
|
||||||
|
from redis.sentinel import Sentinel
|
||||||
|
|
||||||
|
with patch.object(Sentinel, '__new__') as patched:
|
||||||
|
connection = Connection('sentinel://localhost:65534/', transport_options={
|
||||||
|
'master_name': 'not_important'
|
||||||
|
})
|
||||||
|
|
||||||
|
connection.channel()
|
||||||
|
self.assertTrue(patched)
|
||||||
|
|
||||||
|
sentinel_obj = patched.return_value
|
||||||
|
self.assertTrue(sentinel_obj.master_for.called, 'master_for was not called')
|
||||||
|
sentinel_obj.master_for.assert_called_with('not_important', ANY)
|
||||||
|
self.assertTrue(sentinel_obj.master_for().connection_pool.get_connection.called, 'get_connection on redis connection pool was not called')
|
||||||
|
|
||||||
|
def test_can_create_connection(self):
|
||||||
|
from redis.exceptions import ConnectionError
|
||||||
|
|
||||||
|
with self.assertRaises(ConnectionError):
|
||||||
|
connection = Connection('sentinel://localhost:65534/', transport_options={
|
||||||
|
'master_name': 'not_important'
|
||||||
|
})
|
||||||
|
connection.channel()
|
||||||
|
|
Loading…
Reference in New Issue