mirror of https://github.com/celery/kombu.git
[redis] Use common pool and disconnect pool on connection lost
This commit is contained in:
parent
4104282edc
commit
a0d177f573
|
@ -306,6 +306,7 @@ class Channel(virtual.Channel):
|
|||
unacked_restore_limit = None
|
||||
visibility_timeout = 3600 # 1 hour
|
||||
priority_steps = PRIORITY_STEPS
|
||||
_pool = None
|
||||
|
||||
from_transport_options = (virtual.Channel.from_transport_options
|
||||
+ ('unacked_key',
|
||||
|
@ -329,7 +330,12 @@ class Channel(virtual.Channel):
|
|||
self.handlers = {'BRPOP': self._brpop_read, 'LISTEN': self._receive}
|
||||
|
||||
# Evaluate connection.
|
||||
self.client.info()
|
||||
try:
|
||||
self.client.info()
|
||||
except Exception:
|
||||
if self._pool:
|
||||
self._pool.disconnect()
|
||||
raise
|
||||
|
||||
self.connection.cycle.add(self) # add to channel poller.
|
||||
# copy errors, in case channel closed but threads still
|
||||
|
@ -540,6 +546,8 @@ class Channel(virtual.Channel):
|
|||
return sum(sizes[::2])
|
||||
|
||||
def close(self):
|
||||
if self._pool:
|
||||
self._pool.disconnect()
|
||||
if not self.closed:
|
||||
# remove from channel poller.
|
||||
self.connection.cycle.discard(self)
|
||||
|
@ -574,15 +582,17 @@ class Channel(virtual.Channel):
|
|||
return self.Client(host=conninfo.hostname or '127.0.0.1',
|
||||
port=conninfo.port or DEFAULT_PORT,
|
||||
db=database,
|
||||
password=conninfo.password)
|
||||
password=conninfo.password,
|
||||
connection_pool=self.pool)
|
||||
|
||||
def _get_pool(self):
|
||||
return redis.ConnectionPool()
|
||||
|
||||
def _get_client(self):
|
||||
version = getattr(redis, '__version__', (0, 0, 0))
|
||||
version = tuple(map(int, version.split('.')))
|
||||
if version < (2, 4, 4):
|
||||
if redis.VERSION < (2, 4, 4):
|
||||
raise VersionMismatch(
|
||||
'Redis transport requires redis-py versions 2.4.4 or later. '
|
||||
'You have %r' % ('.'.join(map(str_t, version)), ))
|
||||
'You have %r' % (redis.__version__, ))
|
||||
|
||||
# KombuRedis maintains a connection attribute on it's instance and
|
||||
# uses that when executing commands
|
||||
|
@ -618,6 +628,12 @@ class Channel(virtual.Channel):
|
|||
return self._create_client()
|
||||
return self.client
|
||||
|
||||
@property
|
||||
def pool(self):
|
||||
if self._pool is None:
|
||||
self._pool = self._get_pool()
|
||||
return self._pool
|
||||
|
||||
@cached_property
|
||||
def client(self):
|
||||
"""Client used to publish messages, BRPOP etc."""
|
||||
|
|
Loading…
Reference in New Issue