diff --git a/kombu/transport/pyredis.py b/kombu/transport/pyredis.py index 7e0bd42c..d0328535 100644 --- a/kombu/transport/pyredis.py +++ b/kombu/transport/pyredis.py @@ -204,20 +204,15 @@ class Channel(virtual.Channel): if not queues: return keys = list(queues) + [timeout or 0] - name, cmd = self._encode_command("BRPOP", *keys) - self.client.connection.send(cmd, self) + self.client.connection.send_command("BRPOP", *keys) self._in_poll = True - def _encode_command(self, *args): - encode = self.client.encode - cmds = "".join('$%s\r\n%s\r\n' % (len(enc_value), enc_value) - for enc_value in imap(encode, args)) - return args[0], '*%s\r\n%s' % (len(args), cmds) - def _brpop_read(self, **options): try: try: - dest__item = self.client.parse_response("BRPOP", **options) + dest__item = self.client.parse_response(self.client.connection, + "BRPOP", + **options) except self.connection.connection_errors: raise Empty() if dest__item: @@ -313,8 +308,27 @@ class Channel(virtual.Channel): password=conninfo.password) def _get_client(self): - from redis import Redis - return Redis + from redis import Redis, ConnectionError + + # KombuRedis maintains a connection attribute on it's instance and + # uses that when executing commands + class KombuRedis(Redis): + def __init__(self, *args, **kwargs): + super(KombuRedis, self).__init__(*args, **kwargs) + self.connection = self.connection_pool.get_connection('_') + + def execute_command(self, *args, **options): + conn = self.connection + command_name = args[0] + try: + conn.send_command(*args) + return self.parse_response(conn, command_name, **options) + except ConnectionError: + conn.disconnect() + conn.send_command(*args) + return self.parse_response(conn, command_name, **options) + + return KombuRedis def _get_response_error(self): from redis import exceptions @@ -336,7 +350,8 @@ class Channel(virtual.Channel): @cached_property def subclient(self): - return self._create_client() + client = self._create_client() + return client.pubsub() @subclient.deleter # noqa def subclient(self, client):