mirror of https://github.com/celery/kombu.git
redis-py 2.4.0 refactored some internals that kombu previously relied on. we subclass the Redis client to restore the "connection" attribute, and force execute_command() to use our known connection attribute instead of going through the standard connection_pool.
This commit is contained in:
parent
6bcedcbe53
commit
7c362f6d48
|
@ -204,20 +204,15 @@ class Channel(virtual.Channel):
|
|||
if not queues:
|
||||
return
|
||||
keys = list(queues) + [timeout or 0]
|
||||
name, cmd = self._encode_command("BRPOP", *keys)
|
||||
self.client.connection.send(cmd, self)
|
||||
self.client.connection.send_command("BRPOP", *keys)
|
||||
self._in_poll = True
|
||||
|
||||
def _encode_command(self, *args):
|
||||
encode = self.client.encode
|
||||
cmds = "".join('$%s\r\n%s\r\n' % (len(enc_value), enc_value)
|
||||
for enc_value in imap(encode, args))
|
||||
return args[0], '*%s\r\n%s' % (len(args), cmds)
|
||||
|
||||
def _brpop_read(self, **options):
|
||||
try:
|
||||
try:
|
||||
dest__item = self.client.parse_response("BRPOP", **options)
|
||||
dest__item = self.client.parse_response(self.client.connection,
|
||||
"BRPOP",
|
||||
**options)
|
||||
except self.connection.connection_errors:
|
||||
raise Empty()
|
||||
if dest__item:
|
||||
|
@ -313,8 +308,27 @@ class Channel(virtual.Channel):
|
|||
password=conninfo.password)
|
||||
|
||||
def _get_client(self):
|
||||
from redis import Redis
|
||||
return Redis
|
||||
from redis import Redis, ConnectionError
|
||||
|
||||
# KombuRedis maintains a connection attribute on it's instance and
|
||||
# uses that when executing commands
|
||||
class KombuRedis(Redis):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(KombuRedis, self).__init__(*args, **kwargs)
|
||||
self.connection = self.connection_pool.get_connection('_')
|
||||
|
||||
def execute_command(self, *args, **options):
|
||||
conn = self.connection
|
||||
command_name = args[0]
|
||||
try:
|
||||
conn.send_command(*args)
|
||||
return self.parse_response(conn, command_name, **options)
|
||||
except ConnectionError:
|
||||
conn.disconnect()
|
||||
conn.send_command(*args)
|
||||
return self.parse_response(conn, command_name, **options)
|
||||
|
||||
return KombuRedis
|
||||
|
||||
def _get_response_error(self):
|
||||
from redis import exceptions
|
||||
|
@ -336,7 +350,8 @@ class Channel(virtual.Channel):
|
|||
|
||||
@cached_property
|
||||
def subclient(self):
|
||||
return self._create_client()
|
||||
client = self._create_client()
|
||||
return client.pubsub()
|
||||
|
||||
@subclient.deleter # noqa
|
||||
def subclient(self, client):
|
||||
|
|
Loading…
Reference in New Issue