From 7c362f6d4860785f3147797069de6efbda8e41a0 Mon Sep 17 00:00:00 2001 From: andy Date: Wed, 1 Jun 2011 14:51:33 -0700 Subject: [PATCH] redis-py 2.4.0 refactored some internals that kombu previously relied on. we subclass the Redis client to restore the "connection" attribute, and force execute_command() to use our known connection attribute instead of going through the standard connection_pool. --- kombu/transport/pyredis.py | 39 ++++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/kombu/transport/pyredis.py b/kombu/transport/pyredis.py index 7e0bd42c..d0328535 100644 --- a/kombu/transport/pyredis.py +++ b/kombu/transport/pyredis.py @@ -204,20 +204,15 @@ class Channel(virtual.Channel): if not queues: return keys = list(queues) + [timeout or 0] - name, cmd = self._encode_command("BRPOP", *keys) - self.client.connection.send(cmd, self) + self.client.connection.send_command("BRPOP", *keys) self._in_poll = True - def _encode_command(self, *args): - encode = self.client.encode - cmds = "".join('$%s\r\n%s\r\n' % (len(enc_value), enc_value) - for enc_value in imap(encode, args)) - return args[0], '*%s\r\n%s' % (len(args), cmds) - def _brpop_read(self, **options): try: try: - dest__item = self.client.parse_response("BRPOP", **options) + dest__item = self.client.parse_response(self.client.connection, + "BRPOP", + **options) except self.connection.connection_errors: raise Empty() if dest__item: @@ -313,8 +308,27 @@ class Channel(virtual.Channel): password=conninfo.password) def _get_client(self): - from redis import Redis - return Redis + from redis import Redis, ConnectionError + + # KombuRedis maintains a connection attribute on it's instance and + # uses that when executing commands + class KombuRedis(Redis): + def __init__(self, *args, **kwargs): + super(KombuRedis, self).__init__(*args, **kwargs) + self.connection = self.connection_pool.get_connection('_') + + def execute_command(self, *args, **options): + conn = self.connection + command_name = args[0] + try: + conn.send_command(*args) + return self.parse_response(conn, command_name, **options) + except ConnectionError: + conn.disconnect() + conn.send_command(*args) + return self.parse_response(conn, command_name, **options) + + return KombuRedis def _get_response_error(self): from redis import exceptions @@ -336,7 +350,8 @@ class Channel(virtual.Channel): @cached_property def subclient(self): - return self._create_client() + client = self._create_client() + return client.pubsub() @subclient.deleter # noqa def subclient(self, client):