From a0d177f573bb42e8bfab5ba7af9d4ce6a2fa0969 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Thu, 4 Oct 2012 16:20:41 +0100 Subject: [PATCH] [redis] Use common pool and disconnect pool on connection lost --- kombu/transport/redis.py | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 89ae648f..d5029786 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -306,6 +306,7 @@ class Channel(virtual.Channel): unacked_restore_limit = None visibility_timeout = 3600 # 1 hour priority_steps = PRIORITY_STEPS + _pool = None from_transport_options = (virtual.Channel.from_transport_options + ('unacked_key', @@ -329,7 +330,12 @@ class Channel(virtual.Channel): self.handlers = {'BRPOP': self._brpop_read, 'LISTEN': self._receive} # Evaluate connection. - self.client.info() + try: + self.client.info() + except Exception: + if self._pool: + self._pool.disconnect() + raise self.connection.cycle.add(self) # add to channel poller. # copy errors, in case channel closed but threads still @@ -540,6 +546,8 @@ class Channel(virtual.Channel): return sum(sizes[::2]) def close(self): + if self._pool: + self._pool.disconnect() if not self.closed: # remove from channel poller. self.connection.cycle.discard(self) @@ -574,15 +582,17 @@ class Channel(virtual.Channel): return self.Client(host=conninfo.hostname or '127.0.0.1', port=conninfo.port or DEFAULT_PORT, db=database, - password=conninfo.password) + password=conninfo.password, + connection_pool=self.pool) + + def _get_pool(self): + return redis.ConnectionPool() def _get_client(self): - version = getattr(redis, '__version__', (0, 0, 0)) - version = tuple(map(int, version.split('.'))) - if version < (2, 4, 4): + if redis.VERSION < (2, 4, 4): raise VersionMismatch( 'Redis transport requires redis-py versions 2.4.4 or later. ' - 'You have %r' % ('.'.join(map(str_t, version)), )) + 'You have %r' % (redis.__version__, )) # KombuRedis maintains a connection attribute on it's instance and # uses that when executing commands @@ -618,6 +628,12 @@ class Channel(virtual.Channel): return self._create_client() return self.client + @property + def pool(self): + if self._pool is None: + self._pool = self._get_pool() + return self._pool + @cached_property def client(self): """Client used to publish messages, BRPOP etc."""