mirror of https://github.com/rq/rq.git
261 lines
8.7 KiB
Python
261 lines
8.7 KiB
Python
from unittest import mock
|
|
|
|
from rq.decorators import job
|
|
from rq.job import Job, Retry
|
|
from rq.queue import Queue
|
|
from rq.worker import DEFAULT_RESULT_TTL
|
|
from tests import RQTestCase
|
|
|
|
|
|
class TestDecorator(RQTestCase):
|
|
def setUp(self):
|
|
super().setUp()
|
|
|
|
@job(queue='default', connection=self.connection)
|
|
def decorated_job(x, y):
|
|
return x + y
|
|
|
|
self.decorated_job = decorated_job
|
|
|
|
def test_decorator_preserves_functionality(self):
|
|
"""Ensure that a decorated function's functionality is still preserved."""
|
|
self.assertEqual(self.decorated_job(1, 2), 3)
|
|
|
|
def test_decorator_adds_delay_attr(self):
|
|
"""Ensure that decorator adds a delay attribute to function that returns
|
|
a Job instance when called.
|
|
"""
|
|
self.assertTrue(hasattr(self.decorated_job, 'delay'))
|
|
job = self.decorated_job.enqueue(1, 2)
|
|
self.assertTrue(isinstance(job, Job))
|
|
|
|
def test_decorator_accepts_queue_name_as_argument(self):
|
|
"""Ensure that passing in queue name to the decorator puts the job in
|
|
the right queue.
|
|
"""
|
|
|
|
@job(queue='queue_name', connection=self.connection)
|
|
def hello():
|
|
return 'Hi'
|
|
|
|
result = hello.enqueue()
|
|
self.assertEqual(result.origin, 'queue_name')
|
|
|
|
def test_decorator_accepts_result_ttl_as_argument(self):
|
|
"""Ensure that passing in result_ttl to the decorator sets the
|
|
result_ttl on the job
|
|
"""
|
|
# Ensure default
|
|
result = self.decorated_job.enqueue(1, 2)
|
|
self.assertEqual(result.result_ttl, DEFAULT_RESULT_TTL)
|
|
|
|
@job('default', result_ttl=10, connection=self.connection)
|
|
def hello():
|
|
return 'Why hello'
|
|
|
|
result = hello.enqueue()
|
|
self.assertEqual(result.result_ttl, 10)
|
|
|
|
def test_decorator_accepts_ttl_as_argument(self):
|
|
"""Ensure that passing in ttl to the decorator sets the ttl on the job"""
|
|
# Ensure default
|
|
result = self.decorated_job.enqueue(1, 2)
|
|
self.assertEqual(result.ttl, None)
|
|
|
|
@job('default', ttl=30, connection=self.connection)
|
|
def hello():
|
|
return 'Hello'
|
|
|
|
result = hello.enqueue()
|
|
self.assertEqual(result.ttl, 30)
|
|
|
|
def test_decorator_accepts_meta_as_argument(self):
|
|
"""Ensure that passing in meta to the decorator sets the meta on the job"""
|
|
# Ensure default
|
|
result = self.decorated_job.enqueue(1, 2)
|
|
self.assertEqual(result.meta, {})
|
|
|
|
test_meta = {
|
|
'metaKey1': 1,
|
|
'metaKey2': 2,
|
|
}
|
|
|
|
@job('default', connection=self.connection, meta=test_meta)
|
|
def hello():
|
|
return 'Hello'
|
|
|
|
result = hello.enqueue()
|
|
self.assertEqual(result.meta, test_meta)
|
|
|
|
def test_decorator_accepts_result_depends_on_as_argument(self):
|
|
"""Ensure that passing in depends_on to the decorator sets the
|
|
correct dependency on the job
|
|
"""
|
|
# Ensure default
|
|
result = self.decorated_job.enqueue(1, 2)
|
|
self.assertEqual(result.dependency, None)
|
|
self.assertEqual(result._dependency_id, None)
|
|
|
|
@job(queue='queue_name', connection=self.connection)
|
|
def foo():
|
|
return 'Firstly'
|
|
|
|
foo_job = foo.enqueue()
|
|
|
|
@job(queue='queue_name', connection=self.connection, depends_on=foo_job)
|
|
def bar():
|
|
return 'Secondly'
|
|
|
|
bar_job = bar.enqueue()
|
|
|
|
self.assertEqual(foo_job._dependency_ids, [])
|
|
self.assertIsNone(foo_job._dependency_id)
|
|
|
|
self.assertEqual(foo_job.dependency, None)
|
|
self.assertEqual(bar_job.dependency, foo_job)
|
|
self.assertEqual(bar_job.dependency.id, foo_job.id)
|
|
|
|
def test_decorator_delay_accepts_depends_on_as_argument(self):
|
|
"""Ensure that passing in depends_on to the delay method of
|
|
a decorated function overrides the depends_on set in the
|
|
constructor.
|
|
"""
|
|
# Ensure default
|
|
result = self.decorated_job.enqueue(1, 2)
|
|
self.assertEqual(result.dependency, None)
|
|
self.assertEqual(result._dependency_id, None)
|
|
|
|
@job(queue='queue_name', connection=self.connection)
|
|
def foo():
|
|
return 'Firstly'
|
|
|
|
@job(queue='queue_name', connection=self.connection)
|
|
def bar():
|
|
return 'Firstly'
|
|
|
|
foo_job = foo.enqueue()
|
|
bar_job = bar.enqueue()
|
|
|
|
@job(queue='queue_name', connection=self.connection, depends_on=foo_job)
|
|
def baz():
|
|
return 'Secondly'
|
|
|
|
baz_job = bar.enqueue(depends_on=bar_job)
|
|
|
|
self.assertIsNone(foo_job._dependency_id)
|
|
self.assertIsNone(bar_job._dependency_id)
|
|
|
|
self.assertEqual(foo_job._dependency_ids, [])
|
|
self.assertEqual(bar_job._dependency_ids, [])
|
|
self.assertEqual(baz_job._dependency_id, bar_job.id)
|
|
self.assertEqual(baz_job.dependency, bar_job)
|
|
self.assertEqual(baz_job.dependency.id, bar_job.id)
|
|
|
|
def test_decorator_accepts_on_failure_function_as_argument(self):
|
|
"""Ensure that passing in on_failure function to the decorator sets the
|
|
correct on_failure function on the job.
|
|
"""
|
|
|
|
# Only functions and builtins are supported as callback
|
|
@job('default', connection=self.connection, on_failure=Job.fetch)
|
|
def foo():
|
|
return 'Foo'
|
|
|
|
with self.assertRaises(ValueError):
|
|
result = foo.enqueue()
|
|
|
|
@job('default', connection=self.connection, on_failure=print)
|
|
def hello():
|
|
return 'Hello'
|
|
|
|
result = hello.enqueue()
|
|
result_job = Job.fetch(id=result.id, connection=self.connection)
|
|
self.assertEqual(result_job.failure_callback, print)
|
|
|
|
def test_decorator_accepts_on_success_function_as_argument(self):
|
|
"""Ensure that passing in on_failure function to the decorator sets the
|
|
correct on_success function on the job.
|
|
"""
|
|
|
|
# Only functions and builtins are supported as callback
|
|
@job('default', connection=self.connection, on_failure=Job.fetch)
|
|
def foo():
|
|
return 'Foo'
|
|
|
|
with self.assertRaises(ValueError):
|
|
result = foo.enqueue()
|
|
|
|
@job('default', connection=self.connection, on_success=print)
|
|
def hello():
|
|
return 'Hello'
|
|
|
|
result = hello.enqueue()
|
|
result_job = Job.fetch(id=result.id, connection=self.connection)
|
|
self.assertEqual(result_job.success_callback, print)
|
|
|
|
def test_decorator_custom_queue_class(self):
|
|
"""Ensure that a custom queue class can be passed to the job decorator"""
|
|
|
|
class CustomQueue(Queue):
|
|
pass
|
|
|
|
CustomQueue.enqueue_call = mock.MagicMock(spec=lambda *args, **kwargs: None, name='enqueue_call')
|
|
|
|
custom_decorator = job(queue='default', connection=self.connection, queue_class=CustomQueue)
|
|
self.assertIs(custom_decorator.queue_class, CustomQueue)
|
|
|
|
@custom_decorator
|
|
def custom_queue_class_job(x, y):
|
|
return x + y
|
|
|
|
custom_queue_class_job.enqueue(1, 2)
|
|
self.assertEqual(CustomQueue.enqueue_call.call_count, 1)
|
|
|
|
def test_decorate_custom_queue(self):
|
|
"""Ensure that a custom queue instance can be passed to the job decorator"""
|
|
|
|
class CustomQueue(Queue):
|
|
pass
|
|
|
|
CustomQueue.enqueue_call = mock.MagicMock(spec=lambda *args, **kwargs: None, name='enqueue_call')
|
|
queue = CustomQueue(connection=self.connection)
|
|
|
|
@job(queue=queue, connection=self.connection)
|
|
def custom_queue_job(x, y):
|
|
return x + y
|
|
|
|
custom_queue_job.enqueue(1, 2)
|
|
self.assertEqual(queue.enqueue_call.call_count, 1)
|
|
|
|
def test_decorator_custom_failure_ttl(self):
|
|
"""Ensure that passing in failure_ttl to the decorator sets the
|
|
failure_ttl on the job
|
|
"""
|
|
# Ensure default
|
|
result = self.decorated_job.enqueue(1, 2)
|
|
self.assertEqual(result.failure_ttl, None)
|
|
|
|
@job('default', connection=self.connection, failure_ttl=10)
|
|
def hello():
|
|
return 'Why hello'
|
|
|
|
result = hello.enqueue()
|
|
self.assertEqual(result.failure_ttl, 10)
|
|
|
|
def test_decorator_custom_retry(self):
|
|
"""Ensure that passing in retry to the decorator sets the
|
|
retry on the job
|
|
"""
|
|
# Ensure default
|
|
result = self.decorated_job.enqueue(1, 2)
|
|
self.assertEqual(result.retries_left, None)
|
|
self.assertEqual(result.retry_intervals, None)
|
|
|
|
@job('default', connection=self.connection, retry=Retry(3, [2]))
|
|
def hello():
|
|
return 'Why hello'
|
|
|
|
result = hello.enqueue()
|
|
self.assertEqual(result.retries_left, 3)
|
|
self.assertEqual(result.retry_intervals, [2])
|