mirror of https://github.com/rq/rq.git
Patch the connection instances.
This patches the connection object (which is either a StrictRedis instance or a Redis instance), to have alternative class methods that behave exactly like their StrictRedis counterparts, no matter whether which type the object is. Only the ambiguous methods are patched. The exhaustive list: - _zadd (fixes argument order) - _lrem (fixes argument order) - _setex (fixes argument order) - _pipeline (always returns a StrictPipeline) - _ttl (fixes return value) - _pttl (fixes return value) This makes it possible to call the methods reliably without polluting the RQ code any further.
This commit is contained in:
parent
67880343f1
commit
54254f2271
|
@ -4,6 +4,10 @@
|
|||
- `ended_at` is now recorded for normally finished jobs, too. (Previously only
|
||||
for failed jobs.)
|
||||
|
||||
- Adds support for both `Redis` and `StrictRedis` connection types
|
||||
|
||||
- Makes `StrictRedis` the default connection type if none is explicitly provided
|
||||
|
||||
|
||||
### 0.3.4
|
||||
(January 23rd, 2013)
|
||||
|
@ -93,11 +97,11 @@
|
|||
invocations:
|
||||
|
||||
```python
|
||||
from redis import Redis
|
||||
from redis import StrictRedis
|
||||
from rq.decorators import job
|
||||
|
||||
# Connect to Redis
|
||||
redis = Redis()
|
||||
redis = StrictRedis()
|
||||
|
||||
@job('high', timeout=10, connection=redis)
|
||||
def some_work(x, y):
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
from redis import Redis, StrictRedis
|
||||
from functools import partial
|
||||
|
||||
|
||||
def fix_return_type(func):
|
||||
# deliberately no functools.wraps() call here, since the function being
|
||||
# wrapped is a partial, which has no module
|
||||
def _inner(*args, **kwargs):
|
||||
value = func(*args, **kwargs)
|
||||
if value is None:
|
||||
value = -1
|
||||
return value
|
||||
return _inner
|
||||
|
||||
|
||||
def patch_connection(connection):
|
||||
if not isinstance(connection, StrictRedis):
|
||||
raise ValueError('A StrictRedis or Redis connection is required.')
|
||||
|
||||
# Don't patch already patches objects
|
||||
PATCHED_METHODS = ['_setex', '_lrem', '_zadd', '_pipeline', '_ttl']
|
||||
if all([hasattr(connection, attr) for attr in PATCHED_METHODS]):
|
||||
return connection
|
||||
|
||||
if isinstance(connection, Redis):
|
||||
connection._setex = partial(StrictRedis.setex, connection)
|
||||
connection._lrem = partial(StrictRedis.lrem, connection)
|
||||
connection._zadd = partial(StrictRedis.zadd, connection)
|
||||
connection._pipeline = partial(StrictRedis.pipeline, connection)
|
||||
connection._ttl = fix_return_type(partial(StrictRedis.ttl, connection))
|
||||
if hasattr(connection, 'pttl'):
|
||||
connection._pttl = fix_return_type(partial(StrictRedis.pttl, connection))
|
||||
elif isinstance(connection, StrictRedis):
|
||||
connection._setex = connection.setex
|
||||
connection._lrem = connection.lrem
|
||||
connection._zadd = connection.zadd
|
||||
connection._pipeline = connection.pipeline
|
||||
connection._ttl = connection.ttl
|
||||
if hasattr(connection, 'pttl'):
|
||||
connection._pttl = connection.pttl
|
||||
else:
|
||||
raise ValueError('Unanticipated connection type: {}. Please report this.'.format(type(connection)))
|
||||
|
||||
return connection
|
|
@ -1,6 +1,7 @@
|
|||
from contextlib import contextmanager
|
||||
from redis import Redis
|
||||
from redis import StrictRedis
|
||||
from .local import LocalStack, release_local
|
||||
from .compat.connections import patch_connection
|
||||
|
||||
|
||||
class NoRedisConnectionException(Exception):
|
||||
|
@ -10,7 +11,7 @@ class NoRedisConnectionException(Exception):
|
|||
@contextmanager
|
||||
def Connection(connection=None):
|
||||
if connection is None:
|
||||
connection = Redis()
|
||||
connection = StrictRedis()
|
||||
push_connection(connection)
|
||||
try:
|
||||
yield
|
||||
|
@ -23,7 +24,7 @@ def Connection(connection=None):
|
|||
|
||||
def push_connection(redis):
|
||||
"""Pushes the given connection on the stack."""
|
||||
_connection_stack.push(redis)
|
||||
_connection_stack.push(patch_connection(redis))
|
||||
|
||||
|
||||
def pop_connection():
|
||||
|
@ -40,7 +41,7 @@ def use_connection(redis=None):
|
|||
release_local(_connection_stack)
|
||||
|
||||
if redis is None:
|
||||
redis = Redis()
|
||||
redis = StrictRedis()
|
||||
push_connection(redis)
|
||||
|
||||
|
||||
|
@ -56,7 +57,7 @@ def resolve_connection(connection=None):
|
|||
Raises an exception if it cannot resolve a connection now.
|
||||
"""
|
||||
if connection is not None:
|
||||
return connection
|
||||
return patch_connection(connection)
|
||||
|
||||
connection = get_current_connection()
|
||||
if connection is None:
|
||||
|
|
|
@ -302,11 +302,11 @@ class FailedQueue(Queue):
|
|||
job = Job.fetch(job_id, connection=self.connection)
|
||||
except NoSuchJobError:
|
||||
# Silently ignore/remove this job and return (i.e. do nothing)
|
||||
self.connection.lrem(self.key, job_id)
|
||||
self.connection._lrem(self.key, 0, job_id)
|
||||
return
|
||||
|
||||
# Delete it from the failed queue (raise an error if that failed)
|
||||
if self.connection.lrem(self.key, job.id) == 0:
|
||||
if self.connection._lrem(self.key, 0, job.id) == 0:
|
||||
raise InvalidJobOperationError('Cannot requeue non-failed jobs.')
|
||||
|
||||
job.exc_info = None
|
||||
|
|
|
@ -48,8 +48,8 @@ def setup_default_arguments(args, settings):
|
|||
|
||||
def setup_redis(args):
|
||||
if args.url is not None:
|
||||
redis_conn = redis.from_url(args.url, db=args.db)
|
||||
redis_conn = redis.StrictRedis.from_url(args.url, db=args.db)
|
||||
else:
|
||||
redis_conn = redis.Redis(host=args.host, port=args.port, db=args.db,
|
||||
redis_conn = redis.StrictRedis(host=args.host, port=args.port, db=args.db,
|
||||
password=args.password)
|
||||
use_connection(redis_conn)
|
||||
|
|
|
@ -5,7 +5,7 @@ if is_python_version((2, 7), (3, 2)):
|
|||
else:
|
||||
import unittest2 as unittest # noqa
|
||||
|
||||
from redis import Redis
|
||||
from redis import StrictRedis
|
||||
from rq import push_connection, pop_connection
|
||||
|
||||
|
||||
|
@ -14,7 +14,7 @@ def find_empty_redis_database():
|
|||
will use/connect it when no keys are in there.
|
||||
"""
|
||||
for dbnum in range(4, 17):
|
||||
testconn = Redis(db=dbnum)
|
||||
testconn = StrictRedis(db=dbnum)
|
||||
empty = len(testconn.keys('*')) == 0
|
||||
if empty:
|
||||
return testconn
|
||||
|
|
|
@ -185,13 +185,13 @@ class TestWorker(RQTestCase):
|
|||
job = q.enqueue(say_hello, args=('Frank',), result_ttl=10)
|
||||
w = Worker([q])
|
||||
w.work(burst=True)
|
||||
self.assertNotEqual(self.testconn.ttl(job.key), 0)
|
||||
self.assertNotEqual(self.testconn._ttl(job.key), 0)
|
||||
|
||||
# Job with -1 result_ttl don't expire
|
||||
job = q.enqueue(say_hello, args=('Frank',), result_ttl=-1)
|
||||
w = Worker([q])
|
||||
w.work(burst=True)
|
||||
self.assertEqual(self.testconn.ttl(job.key), None)
|
||||
self.assertEqual(self.testconn._ttl(job.key), -1)
|
||||
|
||||
# Job with result_ttl = 0 gets deleted immediately
|
||||
job = q.enqueue(say_hello, args=('Frank',), result_ttl=0)
|
||||
|
|
Loading…
Reference in New Issue