diff --git a/CHANGES.md b/CHANGES.md index 4036dba6..30399a5f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,8 +1,22 @@ ### 0.3.0 (not released) -- Removes the possible ambiguity of passing in a `timeout` argument to - `.enqueue()`. Instead, now use the `.enqueue_call()` method. +- `.enqueue()` does not consume the `timeout` kwarg anymore. Instead, to pass + RQ a timeout value while enqueueing a function, use the explicit invocation + instead: + + q.enqueue(do_something, args=(1, 2), kwargs={'a': 1}, timeout=30) + +- Add a `@job` decorator, which can be used to do Celery-style delayed + invocations: + + from rq.decorators import job + + @job('high', timeout=10) + def some_work(x, y): + return x + y + + some_work.delay(2, 3) ### 0.2.1 diff --git a/rq/connections.py b/rq/connections.py index 7aa7cf79..05de2f18 100644 --- a/rq/connections.py +++ b/rq/connections.py @@ -51,6 +51,20 @@ def get_current_connection(): return _connection_stack.top +def resolve_connection(connection=None): + """Convenience function to resolve the given or the current connection. + Raises an exception if it cannot resolve a connection now. + """ + if connection is not None: + return connection + + connection = get_current_connection() + if connection is None: + raise NoRedisConnectionException( + 'Could not resolve a Redis connection.') + return connection + + _connection_stack = LocalStack() __all__ = ['Connection', diff --git a/rq/decorators.py b/rq/decorators.py new file mode 100644 index 00000000..d5851c3b --- /dev/null +++ b/rq/decorators.py @@ -0,0 +1,34 @@ +from functools import wraps +from .queue import Queue +from .connections import resolve_connection + + +class job(object): + + def __init__(self, queue, connection=None, timeout=None): + """A decorator that adds a ``delay`` method to the decorated function, + which in turn creates a RQ job when called. Accepts a required + ``queue`` argument that can be either a ``Queue`` instance or a string + denoting the queue name. For example: + + @job(queue='default') + def simple_add(x, y): + return x + y + + simple_add.delay(1, 2) # Puts simple_add function into queue + """ + self.queue = queue + self.connection = resolve_connection(connection) + self.timeout = timeout + + def __call__(self, f): + @wraps(f) + def delay(*args, **kwargs): + if isinstance(self.queue, basestring): + queue = Queue(name=self.queue, connection=self.connection) + else: + queue = self.queue + return queue.enqueue_call(f, args=args, kwargs=kwargs, + timeout=self.timeout) + f.delay = delay + return f diff --git a/rq/queue.py b/rq/queue.py index 9253aa10..b02e7990 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,5 +1,5 @@ import times -from .connections import get_current_connection +from .connections import resolve_connection from .job import Job from .exceptions import NoSuchJobError, UnpickleError, InvalidJobOperationError from .compat import total_ordering @@ -23,8 +23,7 @@ class Queue(object): """Returns an iterable of all Queues. """ prefix = cls.redis_queue_namespace_prefix - if connection is None: - connection = get_current_connection() + connection = resolve_connection(connection) def to_queue(queue_key): return cls.from_queue_key(queue_key, connection=connection) @@ -43,9 +42,7 @@ class Queue(object): return cls(name, connection=connection) def __init__(self, name='default', default_timeout=None, connection=None): - if connection is None: - connection = get_current_connection() - self.connection = connection + self.connection = resolve_connection(connection) prefix = self.redis_queue_namespace_prefix self.name = name self._key = '%s%s' % (prefix, name) @@ -107,7 +104,7 @@ class Queue(object): """Pushes a job ID on the corresponding Redis queue.""" self.connection.rpush(self.key, job_id) - def enqueue_call(self, func, args=None, kwargs=None, **options): + def enqueue_call(self, func, args=None, kwargs=None, timeout=None): """Creates a job to represent the delayed function call and enqueues it. @@ -115,7 +112,7 @@ class Queue(object): and kwargs as explicit arguments. Any kwargs passed to this function contain options for RQ itself. """ - timeout = options.get('timeout', self._default_timeout) + timeout = timeout or self._default_timeout job = Job.create(func, args, kwargs, connection=self.connection) return self.enqueue_job(job, timeout=timeout) @@ -138,15 +135,17 @@ class Queue(object): 'Functions from the __main__ module cannot be processed ' 'by workers.') - # Warn about the timeout flag that has been removed - if 'timeout' in kwargs: - import warnings - warnings.warn('The use of the timeout kwarg is not supported ' - 'anymore. If you meant to pass this argument to RQ ' - '(rather than to %r), use the `.enqueue_call()` ' - 'method instead.' % f, DeprecationWarning) + # Detect explicit invocations, i.e. of the form: + # q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, timeout=30) + timeout = None + if 'args' in kwargs or 'kwargs' in kwargs: + assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa + timeout = kwargs.pop('timeout', None) + args = kwargs.pop('args', None) + kwargs = kwargs.pop('kwargs', None) - return self.enqueue_call(func=f, args=args, kwargs=kwargs) + return self.enqueue_call(func=f, args=args, kwargs=kwargs, + timeout=timeout) def enqueue_job(self, job, timeout=None, set_meta_data=True): """Enqueues a job for delayed execution. @@ -185,8 +184,7 @@ class Queue(object): Until Redis receives a specific method for this, we'll have to wrap it this way. """ - if connection is None: - connection = get_current_connection() + connection = resolve_connection(connection) if blocking: queue_key, job_id = connection.blpop(queue_keys) return queue_key, job_id diff --git a/tests/fixtures.py b/tests/fixtures.py index 32003dec..3d45ab60 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -3,6 +3,8 @@ This file contains all jobs that are used in tests. Each of these test fixtures has a slighty different characteristics. """ import time +from rq import Connection +from rq.decorators import job def say_hello(name=None): @@ -46,5 +48,11 @@ class Calculator(object): def __init__(self, denominator): self.denominator = denominator - def calculate(x, y): + def calculate(self, x, y): return x * y / self.denominator + + +with Connection(): + @job(queue='default') + def decorated_job(x, y): + return x + y diff --git a/tests/test_decorator.py b/tests/test_decorator.py new file mode 100644 index 00000000..4f95f9c3 --- /dev/null +++ b/tests/test_decorator.py @@ -0,0 +1,36 @@ +from tests import RQTestCase +from tests.fixtures import decorated_job + +from rq.decorators import job +from rq.job import Job + + +class TestDecorator(RQTestCase): + + def setUp(self): + super(TestDecorator, self).setUp() + + def test_decorator_preserves_functionality(self): + """Ensure that a decorated function's functionality is still preserved. + """ + self.assertEqual(decorated_job(1, 2), 3) + + def test_decorator_adds_delay_attr(self): + """Ensure that decorator adds a delay attribute to function that returns + a Job instance when called. + """ + self.assertTrue(hasattr(decorated_job, 'delay')) + result = decorated_job.delay(1, 2) + self.assertTrue(isinstance(result, Job)) + # Ensure that job returns the right result when performed + self.assertEqual(result.perform(), 3) + + def test_decorator_accepts_queue_name_as_argument(self): + """Ensure that passing in queue name to the decorator puts the job in + the right queue. + """ + @job(queue='queue_name') + def hello(): + return 'Hi' + result = hello.delay() + self.assertEqual(result.origin, 'queue_name') diff --git a/tests/test_worker.py b/tests/test_worker.py index 8b12c6aa..9ea338b8 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -157,8 +157,8 @@ class TestWorker(RQTestCase): w = Worker([q]) # Put it on the queue with a timeout value - res = q.enqueue_call( - func=create_file_after_timeout, + res = q.enqueue( + create_file_after_timeout, args=(sentinel_file, 4), timeout=1)