diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index bd81a89e..0ca41e29 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -31,6 +31,7 @@ from . import virtual try: import redis + import redis.sentinel except ImportError: # pragma: no cover redis = None # noqa @@ -453,6 +454,8 @@ class Channel(virtual.Channel): 'priority_steps') # <-- do not add comma here! ) + connection_class = redis.Connection + def __init__(self, *args, **kwargs): super_ = super(Channel, self) super_.__init__(*args, **kwargs) @@ -832,7 +835,7 @@ class Channel(virtual.Channel): def _connparams(self, async=False): conninfo = self.connection.client connparams = {'host': conninfo.hostname or '127.0.0.1', - 'port': conninfo.port or DEFAULT_PORT, + 'port': conninfo.port or self.connection.default_port, 'virtual_host': conninfo.virtual_host, 'password': conninfo.password, 'max_connections': self.max_connections, @@ -859,7 +862,7 @@ class Channel(virtual.Channel): channel = self connection_cls = ( connparams.get('connection_class') or - redis.Connection + self.connection_class ) if async: @@ -867,7 +870,9 @@ class Channel(virtual.Channel): def disconnect(self): super(Connection, self).disconnect() channel._on_connection_disconnect(self) - connparams['connection_class'] = Connection + connection_cls = Connection + + connparams['connection_class'] = connection_cls return connparams @@ -919,6 +924,7 @@ class Channel(virtual.Channel): def async_pool(self): if self._async_pool is None: self._async_pool = self._get_pool(async=True) + return self._async_pool @cached_property def client(self): @@ -1023,31 +1029,38 @@ class SentinelChannel(Channel): """ from_transport_options = Channel.from_transport_options + ( - 'service_name', - 'password', + 'master_name', 'min_other_sentinels', - 'sentinel_timeout') + 'sentinel_kwargs') - @cached_property - def _sentinel_managed_pool(self): + connection_class = redis.sentinel.SentinelManagedConnection - connparams = self._connparams() + def _sentinel_managed_pool(self, async=False): + connparams = self._connparams(async) + + additional_params = connparams.copy() + + del additional_params['host'] + del additional_params['port'] sentinel = redis.sentinel.Sentinel( [(connparams['host'], connparams['port'])], - min_other_sentinels=getattr(self, 'min_other_sentinels', None), - password=getattr(self, 'password', connparams['password']), - sentinel_kwargs={"socket_timeout": getattr(self, 'sentinel_timeout', self.socket_timeout)}, + min_other_sentinels=getattr(self, 'min_other_sentinels', 0), + sentinel_kwargs=getattr(self, 'sentinel_kwargs', {}), + **additional_params ) - return sentinel.master_for( - self.service_name, - self.Client, - socket_timeout=self.socket_timeout).connection_pool + master_name = getattr(self, 'master_name', None) - def _get_pool(self): - return self._sentinel_managed_pool + return sentinel.master_for( + master_name, + self.Client, + ).connection_pool + + def _get_pool(self, async=False): + return self._sentinel_managed_pool(False) class SentinelTransport(Transport): + default_port = 26379 Channel = SentinelChannel