mirror of https://github.com/celery/kombu.git
[sentinel] Support for default port and async connetions
This commit is contained in:
parent
96df86c707
commit
a46dba8f2e
|
@ -31,6 +31,7 @@ from . import virtual
|
|||
|
||||
try:
|
||||
import redis
|
||||
import redis.sentinel
|
||||
except ImportError: # pragma: no cover
|
||||
redis = None # noqa
|
||||
|
||||
|
@ -453,6 +454,8 @@ class Channel(virtual.Channel):
|
|||
'priority_steps') # <-- do not add comma here!
|
||||
)
|
||||
|
||||
connection_class = redis.Connection
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super_ = super(Channel, self)
|
||||
super_.__init__(*args, **kwargs)
|
||||
|
@ -832,7 +835,7 @@ class Channel(virtual.Channel):
|
|||
def _connparams(self, async=False):
|
||||
conninfo = self.connection.client
|
||||
connparams = {'host': conninfo.hostname or '127.0.0.1',
|
||||
'port': conninfo.port or DEFAULT_PORT,
|
||||
'port': conninfo.port or self.connection.default_port,
|
||||
'virtual_host': conninfo.virtual_host,
|
||||
'password': conninfo.password,
|
||||
'max_connections': self.max_connections,
|
||||
|
@ -859,7 +862,7 @@ class Channel(virtual.Channel):
|
|||
channel = self
|
||||
connection_cls = (
|
||||
connparams.get('connection_class') or
|
||||
redis.Connection
|
||||
self.connection_class
|
||||
)
|
||||
|
||||
if async:
|
||||
|
@ -867,7 +870,9 @@ class Channel(virtual.Channel):
|
|||
def disconnect(self):
|
||||
super(Connection, self).disconnect()
|
||||
channel._on_connection_disconnect(self)
|
||||
connparams['connection_class'] = Connection
|
||||
connection_cls = Connection
|
||||
|
||||
connparams['connection_class'] = connection_cls
|
||||
|
||||
return connparams
|
||||
|
||||
|
@ -919,6 +924,7 @@ class Channel(virtual.Channel):
|
|||
def async_pool(self):
|
||||
if self._async_pool is None:
|
||||
self._async_pool = self._get_pool(async=True)
|
||||
return self._async_pool
|
||||
|
||||
@cached_property
|
||||
def client(self):
|
||||
|
@ -1023,31 +1029,38 @@ class SentinelChannel(Channel):
|
|||
"""
|
||||
|
||||
from_transport_options = Channel.from_transport_options + (
|
||||
'service_name',
|
||||
'password',
|
||||
'master_name',
|
||||
'min_other_sentinels',
|
||||
'sentinel_timeout')
|
||||
'sentinel_kwargs')
|
||||
|
||||
@cached_property
|
||||
def _sentinel_managed_pool(self):
|
||||
connection_class = redis.sentinel.SentinelManagedConnection
|
||||
|
||||
connparams = self._connparams()
|
||||
def _sentinel_managed_pool(self, async=False):
|
||||
connparams = self._connparams(async)
|
||||
|
||||
additional_params = connparams.copy()
|
||||
|
||||
del additional_params['host']
|
||||
del additional_params['port']
|
||||
|
||||
sentinel = redis.sentinel.Sentinel(
|
||||
[(connparams['host'], connparams['port'])],
|
||||
min_other_sentinels=getattr(self, 'min_other_sentinels', None),
|
||||
password=getattr(self, 'password', connparams['password']),
|
||||
sentinel_kwargs={"socket_timeout": getattr(self, 'sentinel_timeout', self.socket_timeout)},
|
||||
min_other_sentinels=getattr(self, 'min_other_sentinels', 0),
|
||||
sentinel_kwargs=getattr(self, 'sentinel_kwargs', {}),
|
||||
**additional_params
|
||||
)
|
||||
|
||||
return sentinel.master_for(
|
||||
self.service_name,
|
||||
self.Client,
|
||||
socket_timeout=self.socket_timeout).connection_pool
|
||||
master_name = getattr(self, 'master_name', None)
|
||||
|
||||
def _get_pool(self):
|
||||
return self._sentinel_managed_pool
|
||||
return sentinel.master_for(
|
||||
master_name,
|
||||
self.Client,
|
||||
).connection_pool
|
||||
|
||||
def _get_pool(self, async=False):
|
||||
return self._sentinel_managed_pool(False)
|
||||
|
||||
|
||||
class SentinelTransport(Transport):
|
||||
default_port = 26379
|
||||
Channel = SentinelChannel
|
||||
|
|
Loading…
Reference in New Issue