From 54254f227112f6cd32cc47ee1a2494ef9923a10e Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 6 Feb 2013 22:36:23 +0100 Subject: [PATCH] Patch the connection instances. This patches the connection object (which is either a StrictRedis instance or a Redis instance), to have alternative class methods that behave exactly like their StrictRedis counterparts, no matter whether which type the object is. Only the ambiguous methods are patched. The exhaustive list: - _zadd (fixes argument order) - _lrem (fixes argument order) - _setex (fixes argument order) - _pipeline (always returns a StrictPipeline) - _ttl (fixes return value) - _pttl (fixes return value) This makes it possible to call the methods reliably without polluting the RQ code any further. --- CHANGES.md | 8 ++++++-- rq/compat/connections.py | 44 ++++++++++++++++++++++++++++++++++++++++ rq/connections.py | 11 +++++----- rq/queue.py | 4 ++-- rq/scripts/__init__.py | 4 ++-- tests/__init__.py | 4 ++-- tests/test_worker.py | 4 ++-- 7 files changed, 64 insertions(+), 15 deletions(-) create mode 100644 rq/compat/connections.py diff --git a/CHANGES.md b/CHANGES.md index 4bf554ad..7cee984e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,6 +4,10 @@ - `ended_at` is now recorded for normally finished jobs, too. (Previously only for failed jobs.) +- Adds support for both `Redis` and `StrictRedis` connection types + +- Makes `StrictRedis` the default connection type if none is explicitly provided + ### 0.3.4 (January 23rd, 2013) @@ -93,11 +97,11 @@ invocations: ```python - from redis import Redis + from redis import StrictRedis from rq.decorators import job # Connect to Redis - redis = Redis() + redis = StrictRedis() @job('high', timeout=10, connection=redis) def some_work(x, y): diff --git a/rq/compat/connections.py b/rq/compat/connections.py new file mode 100644 index 00000000..0374b5bf --- /dev/null +++ b/rq/compat/connections.py @@ -0,0 +1,44 @@ +from redis import Redis, StrictRedis +from functools import partial + + +def fix_return_type(func): + # deliberately no functools.wraps() call here, since the function being + # wrapped is a partial, which has no module + def _inner(*args, **kwargs): + value = func(*args, **kwargs) + if value is None: + value = -1 + return value + return _inner + + +def patch_connection(connection): + if not isinstance(connection, StrictRedis): + raise ValueError('A StrictRedis or Redis connection is required.') + + # Don't patch already patches objects + PATCHED_METHODS = ['_setex', '_lrem', '_zadd', '_pipeline', '_ttl'] + if all([hasattr(connection, attr) for attr in PATCHED_METHODS]): + return connection + + if isinstance(connection, Redis): + connection._setex = partial(StrictRedis.setex, connection) + connection._lrem = partial(StrictRedis.lrem, connection) + connection._zadd = partial(StrictRedis.zadd, connection) + connection._pipeline = partial(StrictRedis.pipeline, connection) + connection._ttl = fix_return_type(partial(StrictRedis.ttl, connection)) + if hasattr(connection, 'pttl'): + connection._pttl = fix_return_type(partial(StrictRedis.pttl, connection)) + elif isinstance(connection, StrictRedis): + connection._setex = connection.setex + connection._lrem = connection.lrem + connection._zadd = connection.zadd + connection._pipeline = connection.pipeline + connection._ttl = connection.ttl + if hasattr(connection, 'pttl'): + connection._pttl = connection.pttl + else: + raise ValueError('Unanticipated connection type: {}. Please report this.'.format(type(connection))) + + return connection diff --git a/rq/connections.py b/rq/connections.py index 05de2f18..f4a72e47 100644 --- a/rq/connections.py +++ b/rq/connections.py @@ -1,6 +1,7 @@ from contextlib import contextmanager -from redis import Redis +from redis import StrictRedis from .local import LocalStack, release_local +from .compat.connections import patch_connection class NoRedisConnectionException(Exception): @@ -10,7 +11,7 @@ class NoRedisConnectionException(Exception): @contextmanager def Connection(connection=None): if connection is None: - connection = Redis() + connection = StrictRedis() push_connection(connection) try: yield @@ -23,7 +24,7 @@ def Connection(connection=None): def push_connection(redis): """Pushes the given connection on the stack.""" - _connection_stack.push(redis) + _connection_stack.push(patch_connection(redis)) def pop_connection(): @@ -40,7 +41,7 @@ def use_connection(redis=None): release_local(_connection_stack) if redis is None: - redis = Redis() + redis = StrictRedis() push_connection(redis) @@ -56,7 +57,7 @@ def resolve_connection(connection=None): Raises an exception if it cannot resolve a connection now. """ if connection is not None: - return connection + return patch_connection(connection) connection = get_current_connection() if connection is None: diff --git a/rq/queue.py b/rq/queue.py index b962b097..0e4c11a2 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -302,11 +302,11 @@ class FailedQueue(Queue): job = Job.fetch(job_id, connection=self.connection) except NoSuchJobError: # Silently ignore/remove this job and return (i.e. do nothing) - self.connection.lrem(self.key, job_id) + self.connection._lrem(self.key, 0, job_id) return # Delete it from the failed queue (raise an error if that failed) - if self.connection.lrem(self.key, job.id) == 0: + if self.connection._lrem(self.key, 0, job.id) == 0: raise InvalidJobOperationError('Cannot requeue non-failed jobs.') job.exc_info = None diff --git a/rq/scripts/__init__.py b/rq/scripts/__init__.py index 94441ab9..119f4f6d 100644 --- a/rq/scripts/__init__.py +++ b/rq/scripts/__init__.py @@ -48,8 +48,8 @@ def setup_default_arguments(args, settings): def setup_redis(args): if args.url is not None: - redis_conn = redis.from_url(args.url, db=args.db) + redis_conn = redis.StrictRedis.from_url(args.url, db=args.db) else: - redis_conn = redis.Redis(host=args.host, port=args.port, db=args.db, + redis_conn = redis.StrictRedis(host=args.host, port=args.port, db=args.db, password=args.password) use_connection(redis_conn) diff --git a/tests/__init__.py b/tests/__init__.py index cba1035e..688c11f6 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -5,7 +5,7 @@ if is_python_version((2, 7), (3, 2)): else: import unittest2 as unittest # noqa -from redis import Redis +from redis import StrictRedis from rq import push_connection, pop_connection @@ -14,7 +14,7 @@ def find_empty_redis_database(): will use/connect it when no keys are in there. """ for dbnum in range(4, 17): - testconn = Redis(db=dbnum) + testconn = StrictRedis(db=dbnum) empty = len(testconn.keys('*')) == 0 if empty: return testconn diff --git a/tests/test_worker.py b/tests/test_worker.py index e3e706e4..b662b09b 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -185,13 +185,13 @@ class TestWorker(RQTestCase): job = q.enqueue(say_hello, args=('Frank',), result_ttl=10) w = Worker([q]) w.work(burst=True) - self.assertNotEqual(self.testconn.ttl(job.key), 0) + self.assertNotEqual(self.testconn._ttl(job.key), 0) # Job with -1 result_ttl don't expire job = q.enqueue(say_hello, args=('Frank',), result_ttl=-1) w = Worker([q]) w.work(burst=True) - self.assertEqual(self.testconn.ttl(job.key), None) + self.assertEqual(self.testconn._ttl(job.key), -1) # Job with result_ttl = 0 gets deleted immediately job = q.enqueue(say_hello, args=('Frank',), result_ttl=0)