diff --git a/kombu/exceptions.py b/kombu/exceptions.py index 43c7fd85..bac4ac86 100644 --- a/kombu/exceptions.py +++ b/kombu/exceptions.py @@ -42,3 +42,7 @@ class ChannelLimitExceeded(LimitExceeded): class StdChannelError(Exception): pass + + +class VersionMismatch(Exception): + pass diff --git a/kombu/transport/pypika.py b/kombu/transport/pypika.py index efb09e7d..4cf84a7e 100644 --- a/kombu/transport/pypika.py +++ b/kombu/transport/pypika.py @@ -10,18 +10,19 @@ Pika transport. """ import socket +from kombu.exceptions import VersionMismatch +from kombu.transport import base + from pika import channel # must be here to raise importerror for below. try: from pika import asyncore_adapter except ImportError: - raise ImportError("Kombu only works with pika version 0.5.2") + raise VersionMismatch("Kombu only works with pika version 0.5.2") from pika import blocking_adapter from pika import connection from pika import exceptions from pika.spec import Basic, BasicProperties -from kombu.transport import base - DEFAULT_PORT = 5672 diff --git a/kombu/transport/pyredis.py b/kombu/transport/pyredis.py index 692fb280..36b9d073 100644 --- a/kombu/transport/pyredis.py +++ b/kombu/transport/pyredis.py @@ -13,6 +13,7 @@ from Queue import Empty from anyjson import serialize, deserialize +from kombu.exceptions import VersionMismatch from kombu.transport import virtual from kombu.utils import eventio from kombu.utils import cached_property @@ -307,11 +308,19 @@ class Channel(virtual.Channel): password=conninfo.password) def _get_client(self): - from redis import Redis, ConnectionError + import redis + + version = getattr(redis, "__version__", (0, 0, 0)) + if version: + version = tuple(version.split(".")) + if version < (2, 4, 4): + raise VersionMismatch( + "Redis transport requires redis-py versions 2.4.4 or later. " + "You have %r" % (".".join(version), )) # KombuRedis maintains a connection attribute on it's instance and # uses that when executing commands - class KombuRedis(Redis): + class KombuRedis(redis.Redis): def __init__(self, *args, **kwargs): super(KombuRedis, self).__init__(*args, **kwargs) self.connection = self.connection_pool.get_connection('_') @@ -322,7 +331,7 @@ class Channel(virtual.Channel): try: conn.send_command(*args) return self.parse_response(conn, command_name, **options) - except ConnectionError: + except redis.ConnectionError: conn.disconnect() conn.send_command(*args) return self.parse_response(conn, command_name, **options)