mirror of https://github.com/celery/kombu.git
Now requires redis-py version 2.4.4 or later
This commit is contained in:
parent
c3c1f5c125
commit
f582991aa1
|
@ -42,3 +42,7 @@ class ChannelLimitExceeded(LimitExceeded):
|
|||
|
||||
class StdChannelError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class VersionMismatch(Exception):
|
||||
pass
|
||||
|
|
|
@ -10,18 +10,19 @@ Pika transport.
|
|||
"""
|
||||
import socket
|
||||
|
||||
from kombu.exceptions import VersionMismatch
|
||||
from kombu.transport import base
|
||||
|
||||
from pika import channel # must be here to raise importerror for below.
|
||||
try:
|
||||
from pika import asyncore_adapter
|
||||
except ImportError:
|
||||
raise ImportError("Kombu only works with pika version 0.5.2")
|
||||
raise VersionMismatch("Kombu only works with pika version 0.5.2")
|
||||
from pika import blocking_adapter
|
||||
from pika import connection
|
||||
from pika import exceptions
|
||||
from pika.spec import Basic, BasicProperties
|
||||
|
||||
from kombu.transport import base
|
||||
|
||||
DEFAULT_PORT = 5672
|
||||
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@ from Queue import Empty
|
|||
|
||||
from anyjson import serialize, deserialize
|
||||
|
||||
from kombu.exceptions import VersionMismatch
|
||||
from kombu.transport import virtual
|
||||
from kombu.utils import eventio
|
||||
from kombu.utils import cached_property
|
||||
|
@ -307,11 +308,19 @@ class Channel(virtual.Channel):
|
|||
password=conninfo.password)
|
||||
|
||||
def _get_client(self):
|
||||
from redis import Redis, ConnectionError
|
||||
import redis
|
||||
|
||||
version = getattr(redis, "__version__", (0, 0, 0))
|
||||
if version:
|
||||
version = tuple(version.split("."))
|
||||
if version < (2, 4, 4):
|
||||
raise VersionMismatch(
|
||||
"Redis transport requires redis-py versions 2.4.4 or later. "
|
||||
"You have %r" % (".".join(version), ))
|
||||
|
||||
# KombuRedis maintains a connection attribute on it's instance and
|
||||
# uses that when executing commands
|
||||
class KombuRedis(Redis):
|
||||
class KombuRedis(redis.Redis):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(KombuRedis, self).__init__(*args, **kwargs)
|
||||
self.connection = self.connection_pool.get_connection('_')
|
||||
|
@ -322,7 +331,7 @@ class Channel(virtual.Channel):
|
|||
try:
|
||||
conn.send_command(*args)
|
||||
return self.parse_response(conn, command_name, **options)
|
||||
except ConnectionError:
|
||||
except redis.ConnectionError:
|
||||
conn.disconnect()
|
||||
conn.send_command(*args)
|
||||
return self.parse_response(conn, command_name, **options)
|
||||
|
|
Loading…
Reference in New Issue