diff --git a/CHANGES.md b/CHANGES.md index 4bf554ad..104d5034 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,9 +1,19 @@ -### 0.3.5 +### 0.3.6 (not yet released) +- ... + + +### 0.3.5 +(February 6th, 2013) + - `ended_at` is now recorded for normally finished jobs, too. (Previously only for failed jobs.) +- Adds support for both `Redis` and `StrictRedis` connection types + +- Makes `StrictRedis` the default connection type if none is explicitly provided + ### 0.3.4 (January 23rd, 2013) @@ -93,11 +103,11 @@ invocations: ```python - from redis import Redis + from redis import StrictRedis from rq.decorators import job # Connect to Redis - redis = Redis() + redis = StrictRedis() @job('high', timeout=10, connection=redis) def some_work(x, y): diff --git a/rq/compat/connections.py b/rq/compat/connections.py new file mode 100644 index 00000000..0374b5bf --- /dev/null +++ b/rq/compat/connections.py @@ -0,0 +1,44 @@ +from redis import Redis, StrictRedis +from functools import partial + + +def fix_return_type(func): + # deliberately no functools.wraps() call here, since the function being + # wrapped is a partial, which has no module + def _inner(*args, **kwargs): + value = func(*args, **kwargs) + if value is None: + value = -1 + return value + return _inner + + +def patch_connection(connection): + if not isinstance(connection, StrictRedis): + raise ValueError('A StrictRedis or Redis connection is required.') + + # Don't patch already patches objects + PATCHED_METHODS = ['_setex', '_lrem', '_zadd', '_pipeline', '_ttl'] + if all([hasattr(connection, attr) for attr in PATCHED_METHODS]): + return connection + + if isinstance(connection, Redis): + connection._setex = partial(StrictRedis.setex, connection) + connection._lrem = partial(StrictRedis.lrem, connection) + connection._zadd = partial(StrictRedis.zadd, connection) + connection._pipeline = partial(StrictRedis.pipeline, connection) + connection._ttl = fix_return_type(partial(StrictRedis.ttl, connection)) + if hasattr(connection, 'pttl'): + connection._pttl = fix_return_type(partial(StrictRedis.pttl, connection)) + elif isinstance(connection, StrictRedis): + connection._setex = connection.setex + connection._lrem = connection.lrem + connection._zadd = connection.zadd + connection._pipeline = connection.pipeline + connection._ttl = connection.ttl + if hasattr(connection, 'pttl'): + connection._pttl = connection.pttl + else: + raise ValueError('Unanticipated connection type: {}. Please report this.'.format(type(connection))) + + return connection diff --git a/rq/connections.py b/rq/connections.py index 05de2f18..f4a72e47 100644 --- a/rq/connections.py +++ b/rq/connections.py @@ -1,6 +1,7 @@ from contextlib import contextmanager -from redis import Redis +from redis import StrictRedis from .local import LocalStack, release_local +from .compat.connections import patch_connection class NoRedisConnectionException(Exception): @@ -10,7 +11,7 @@ class NoRedisConnectionException(Exception): @contextmanager def Connection(connection=None): if connection is None: - connection = Redis() + connection = StrictRedis() push_connection(connection) try: yield @@ -23,7 +24,7 @@ def Connection(connection=None): def push_connection(redis): """Pushes the given connection on the stack.""" - _connection_stack.push(redis) + _connection_stack.push(patch_connection(redis)) def pop_connection(): @@ -40,7 +41,7 @@ def use_connection(redis=None): release_local(_connection_stack) if redis is None: - redis = Redis() + redis = StrictRedis() push_connection(redis) @@ -56,7 +57,7 @@ def resolve_connection(connection=None): Raises an exception if it cannot resolve a connection now. """ if connection is not None: - return connection + return patch_connection(connection) connection = get_current_connection() if connection is None: diff --git a/rq/exceptions.py b/rq/exceptions.py index 135ff3d5..982a5801 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -11,8 +11,8 @@ class NoQueueError(Exception): class UnpickleError(Exception): - def __init__(self, message, raw_data): - super(UnpickleError, self).__init__(message) + def __init__(self, message, raw_data, inner_exception=None): + super(UnpickleError, self).__init__(message, inner_exception) self.raw_data = raw_data class DequeueTimeout(Exception): diff --git a/rq/job.py b/rq/job.py index e94f5128..4d625bfd 100644 --- a/rq/job.py +++ b/rq/job.py @@ -1,7 +1,6 @@ import importlib import inspect import times -from collections import namedtuple from uuid import uuid4 from cPickle import loads, dumps, UnpicklingError from .local import LocalStack @@ -27,8 +26,8 @@ def unpickle(pickled_string): """ try: obj = loads(pickled_string) - except (StandardError, UnpicklingError): - raise UnpickleError('Could not unpickle.', pickled_string) + except (StandardError, UnpicklingError) as e: + raise UnpickleError('Could not unpickle.', pickled_string, e) return obj @@ -289,7 +288,7 @@ class Job(object): key = self.key obj = {} - obj['created_at'] = times.format(self.created_at, 'UTC') + obj['created_at'] = times.format(self.created_at or times.now(), 'UTC') if self.func_name is not None: obj['data'] = dumps(self.job_tuple) diff --git a/rq/queue.py b/rq/queue.py index 0a2791cb..87ff6719 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -232,6 +232,7 @@ class Queue(object): except UnpickleError as e: # Attach queue information on the exception for improved error # reporting + e.job_id = job_id e.queue = self raise e return job @@ -314,13 +315,14 @@ class FailedQueue(Queue): job = Job.fetch(job_id, connection=self.connection) except NoSuchJobError: # Silently ignore/remove this job and return (i.e. do nothing) - self.connection.lrem(self.key, job_id) + self.connection._lrem(self.key, 0, job_id) return # Delete it from the failed queue (raise an error if that failed) - if self.connection.lrem(self.key, job.id) == 0: + if self.connection._lrem(self.key, 0, job.id) == 0: raise InvalidJobOperationError('Cannot requeue non-failed jobs.') + job.status = Status.QUEUED job.exc_info = None q = Queue(job.origin, connection=self.connection) q.enqueue_job(job, timeout=job.timeout) diff --git a/rq/scripts/__init__.py b/rq/scripts/__init__.py index 94441ab9..119f4f6d 100644 --- a/rq/scripts/__init__.py +++ b/rq/scripts/__init__.py @@ -48,8 +48,8 @@ def setup_default_arguments(args, settings): def setup_redis(args): if args.url is not None: - redis_conn = redis.from_url(args.url, db=args.db) + redis_conn = redis.StrictRedis.from_url(args.url, db=args.db) else: - redis_conn = redis.Redis(host=args.host, port=args.port, db=args.db, + redis_conn = redis.StrictRedis(host=args.host, port=args.port, db=args.db, password=args.password) use_connection(redis_conn) diff --git a/rq/utils.py b/rq/utils.py index 14241a6d..5fa4940c 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -9,7 +9,7 @@ import os import sys import logging -from compat import is_python_version +from .compat import is_python_version def gettermsize(): diff --git a/rq/version.py b/rq/version.py index 8e18e5ba..d4ab04aa 100644 --- a/rq/version.py +++ b/rq/version.py @@ -1 +1 @@ -VERSION = '0.3.5-dev' +VERSION = '0.3.6-dev' diff --git a/tests/__init__.py b/tests/__init__.py index cba1035e..688c11f6 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -5,7 +5,7 @@ if is_python_version((2, 7), (3, 2)): else: import unittest2 as unittest # noqa -from redis import Redis +from redis import StrictRedis from rq import push_connection, pop_connection @@ -14,7 +14,7 @@ def find_empty_redis_database(): will use/connect it when no keys are in there. """ for dbnum in range(4, 17): - testconn = Redis(db=dbnum) + testconn = StrictRedis(db=dbnum) empty = len(testconn.keys('*')) == 0 if empty: return testconn diff --git a/tests/test_queue.py b/tests/test_queue.py index ae70ac72..b2bffba1 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -259,6 +259,16 @@ class TestFailedQueue(RQTestCase): job = Job.fetch(job.id) self.assertEquals(job.timeout, 200) + def test_requeue_sets_status_to_queued(self): + """Requeueing a job should set its status back to QUEUED.""" + job = Job.create(func=div_by_zero, args=(1, 2, 3)) + job.save() + get_failed_queue().quarantine(job, Exception('Some fake error')) + get_failed_queue().requeue(job.id) + + job = Job.fetch(job.id) + self.assertEqual(job.status, Status.QUEUED) + def test_enqueue_preserves_result_ttl(self): """Enqueueing persists result_ttl.""" q = Queue() diff --git a/tests/test_worker.py b/tests/test_worker.py index 80a6ede3..d9e2fe52 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -194,13 +194,13 @@ class TestWorker(RQTestCase): job = q.enqueue(say_hello, args=('Frank',), result_ttl=10) w = Worker([q]) w.work(burst=True) - self.assertNotEqual(self.testconn.ttl(job.key), 0) + self.assertNotEqual(self.testconn._ttl(job.key), 0) # Job with -1 result_ttl don't expire job = q.enqueue(say_hello, args=('Frank',), result_ttl=-1) w = Worker([q]) w.work(burst=True) - self.assertEqual(self.testconn.ttl(job.key), None) + self.assertEqual(self.testconn._ttl(job.key), -1) # Job with result_ttl = 0 gets deleted immediately job = q.enqueue(say_hello, args=('Frank',), result_ttl=0)