From 5590aab4581f9f2f4d7c34a3d09c8eb708aee3c5 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Tue, 22 Jun 2021 10:30:46 +0700 Subject: [PATCH] Success and failure callbacks (#1480) * Support enqueueing with on_success_callback * Got success callback to execute properly * Added on_failure callback * Document success and failure callbacks * More Flake8 fixes * Mention that options can also be passed in via environment variables * Increase coverage on test_callbacks.py --- docs/docs/index.md | 87 +++++++++++++++++++----- docs/docs/workers.md | 3 +- rq/defaults.py | 1 + rq/job.py | 52 +++++++++++++-- rq/queue.py | 31 +++++---- rq/worker.py | 37 +++++++++-- tests/__init__.py | 12 ++-- tests/fixtures.py | 26 +++++++- tests/test_callbacks.py | 142 ++++++++++++++++++++++++++++++++++++++++ tests/test_job.py | 6 +- 10 files changed, 347 insertions(+), 50 deletions(-) create mode 100644 tests/test_callbacks.py diff --git a/docs/docs/index.md b/docs/docs/index.md index 86b1d7fa..7cfc0a71 100644 --- a/docs/docs/index.md +++ b/docs/docs/index.md @@ -69,12 +69,14 @@ results are kept. Expired jobs will be automatically deleted. Defaults to 500 se * `ttl` specifies the maximum queued time (in seconds) of the job before it's discarded. This argument defaults to `None` (infinite TTL). * `failure_ttl` specifies how long failed jobs are kept (defaults to 1 year) -* `depends_on` specifies another job (or job id) that must complete before this +* `depends_on` specifies another job (or list of jobs) that must complete before this job will be queued. * `job_id` allows you to manually specify this job's `job_id` * `at_front` will place the job at the *front* of the queue, instead of the back * `description` to add additional description to enqueued jobs. +* `on_success` allows you to run a function after a job completes successfully +* `on_failure` allows you to run a function after a job fails * `args` and `kwargs`: use these to explicitly pass arguments and keyword to the underlying job function. This is useful if your function happens to have conflicting argument names with RQ, for example `description` or `ttl`. @@ -132,6 +134,73 @@ with q.connection.pipeline() as pipe: `Queue.prepare_data` accepts all arguments that `Queue.parse_args` does **EXCEPT** for `depends_on`, which is not supported at this time, so dependencies will be up to you to setup. +## Job dependencies + +RQ allows you to chain the execution of multiple jobs. +To execute a job that depends on another job, use the `depends_on` argument: + +```python +q = Queue('low', connection=my_redis_conn) +report_job = q.enqueue(generate_report) +q.enqueue(send_report, depends_on=report_job) +``` + +Specifying multiple dependencies are also supported: + +```python +queue = Queue('low', connection=redis) +foo_job = queue.enqueue(foo) +bar_job = queue.enqueue(bar) +baz_job = queue.enqueue(baz, depends_on=[foo_job, bar_job]) +``` + +The ability to handle job dependencies allows you to split a big job into +several smaller ones. A job that is dependent on another is enqueued only when +its dependency finishes *successfully*. + + +## Job Callbacks +_New in version 1.9.0._ + +If you want to execute a function whenever a job completes or fails, RQ provides +`on_success` and `on_failure` callbacks. + +```python +queue.enqueue(say_hello, on_success=report_success, on_failure=report_failure) +``` + +### Success Callback + +Success callbacks must be a function that accepts `job`, `connection` and `result` arguments. +Your function should also accept `*args` and `**kwargs` so your application doesn't break +when additional parameters are added. + +```python +def report_success(job, connection, result, *args, **kwargs): + pass +``` + +Success callbacks are executed after job execution is complete, before dependents are enqueued. +If an exception happens when your callback is executed, job status will be set to `FAILED` +and dependents won't be enqueued. + +Callbacks are limited to 60 seconds of execution time. If you want to execute a long running job, +consider using RQ's job dependency feature instead. + + +### Failure Callbacks + +Failure callbacks are functions that accept `job`, `connection`, `type`, `value` and `traceback` +arguments. `type`, `value` and `traceback` values returned by [sys.exc_info()](https://docs.python.org/3/library/sys.html#sys.exc_info), which is the exception raised when executing your job. + +```python +def report_failure(job, connection, type, value, traceback): + pass +``` + +Failure callbacks are limited to 60 seconds of execution time. + + ## Working with Queues Besides enqueuing jobs, Queues have a few useful methods: @@ -224,22 +293,6 @@ as `ALWAYS_EAGER`. Note, however, that you still need a working connection to a redis instance for storing states related to job execution and completion. -## Job dependencies - -New in RQ 0.4.0 is the ability to chain the execution of multiple jobs. -To execute a job that depends on another job, use the `depends_on` argument: - -```python -q = Queue('low', connection=my_redis_conn) -report_job = q.enqueue(generate_report) -q.enqueue(send_report, depends_on=report_job) -``` - -The ability to handle job dependencies allows you to split a big job into -several smaller ones. A job that is dependent on another is enqueued only when -its dependency finishes *successfully*. - - ## The worker To learn about workers, see the [workers][w] documentation. diff --git a/docs/docs/workers.md b/docs/docs/workers.md index 600ffee6..cec58617 100644 --- a/docs/docs/workers.md +++ b/docs/docs/workers.md @@ -290,14 +290,13 @@ SENTRY_DSN = 'sync+http://public:secret@example.com/1' The example above shows all the options that are currently supported. -_Note: The_ `QUEUES` _and_ `REDIS_PASSWORD` _settings are new since 0.3.3._ - To specify which module to read settings from, use the `-c` option: ```console $ rq worker -c settings ``` +Alternatively, you can also pass in these options via environment variables. ## Custom Worker Classes diff --git a/rq/defaults.py b/rq/defaults.py index 227a1b6a..93f603e8 100644 --- a/rq/defaults.py +++ b/rq/defaults.py @@ -9,3 +9,4 @@ DEFAULT_RESULT_TTL = 500 DEFAULT_FAILURE_TTL = 31536000 # 1 year in seconds DEFAULT_LOGGING_DATE_FORMAT = '%H:%M:%S' DEFAULT_LOGGING_FORMAT = '%(asctime)s %(message)s' +CALLBACK_TIMEOUT = 60 diff --git a/rq/job.py b/rq/job.py index fc74578c..ac9fe97d 100644 --- a/rq/job.py +++ b/rq/job.py @@ -83,7 +83,7 @@ class Job: def create(cls, func, args=None, kwargs=None, connection=None, result_ttl=None, ttl=None, status=None, description=None, depends_on=None, timeout=None, id=None, origin=None, meta=None, - failure_ttl=None, serializer=None): + failure_ttl=None, serializer=None, *, on_success=None, on_failure=None): """Creates a new Job instance for the given function, arguments, and keyword arguments. """ @@ -121,6 +121,16 @@ class Job: job._args = args job._kwargs = kwargs + if on_success: + if not inspect.isfunction(on_success) and not inspect.isbuiltin(on_success): + raise ValueError('on_success callback must be a function') + job._success_callback_name = '{0}.{1}'.format(on_success.__module__, on_success.__qualname__) + + if on_failure: + if not inspect.isfunction(on_failure) and not inspect.isbuiltin(on_failure): + raise ValueError('on_failure callback must be a function') + job._failure_callback_name = '{0}.{1}'.format(on_failure.__module__, on_failure.__qualname__) + # Extra meta data job.description = description or job.get_call_string() job.result_ttl = parse_timeout(result_ttl) @@ -220,6 +230,26 @@ class Job: return import_attribute(self.func_name) + @property + def success_callback(self): + if self._success_callback is UNEVALUATED: + if self._success_callback_name: + self._success_callback = import_attribute(self._success_callback_name) + else: + self._success_callback = None + + return self._success_callback + + @property + def failure_callback(self): + if self._failure_callback is UNEVALUATED: + if self._failure_callback_name: + self._failure_callback = import_attribute(self._failure_callback_name) + else: + self._failure_callback = None + + return self._failure_callback + def _deserialize_data(self): try: self._func_name, self._instance, self._args, self._kwargs = self.serializer.loads(self.data) @@ -346,6 +376,10 @@ class Job: self._instance = UNEVALUATED self._args = UNEVALUATED self._kwargs = UNEVALUATED + self._success_callback_name = None + self._success_callback = UNEVALUATED + self._failure_callback_name = None + self._failure_callback = UNEVALUATED self.description = None self.origin = None self.enqueued_at = None @@ -400,8 +434,8 @@ class Job: raise TypeError('id must be a string, not {0}'.format(type(value))) self._id = value - def heartbeat(self, heartbeat, ttl, pipeline=None): - self.last_heartbeat = heartbeat + def heartbeat(self, timestamp, ttl, pipeline=None): + self.last_heartbeat = timestamp connection = pipeline if pipeline is not None else self.connection connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat)) self.started_job_registry.add(self, ttl, pipeline=pipeline) @@ -508,10 +542,16 @@ class Job: except Exception: self._result = "Unserializable return value" self.timeout = parse_timeout(obj.get('timeout')) if obj.get('timeout') else None - self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa - self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa + self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None + self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None self._status = obj.get('status').decode() if obj.get('status') else None + if obj.get('success_callback_name'): + self._success_callback_name = obj.get('success_callback_name').decode() + + if obj.get('failure_callback_name'): + self._failure_callback_name = obj.get('failure_callback_name').decode() + dep_ids = obj.get('dependency_ids') dep_id = obj.get('dependency_id') # for backwards compatibility self._dependency_ids = (json.loads(dep_ids.decode()) if dep_ids @@ -554,6 +594,8 @@ class Job: obj = { 'created_at': utcformat(self.created_at or utcnow()), 'data': zlib.compress(self.data), + 'success_callback_name': self._success_callback_name if self._success_callback_name else '', + 'failure_callback_name': self._failure_callback_name if self._failure_callback_name else '', 'started_at': utcformat(self.started_at) if self.started_at else '', 'ended_at': utcformat(self.ended_at) if self.ended_at else '', 'last_heartbeat': utcformat(self.last_heartbeat) if self.last_heartbeat else '', diff --git a/rq/queue.py b/rq/queue.py index e5072cbe..9e0cf702 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -292,7 +292,8 @@ class Queue: def create_job(self, func, args=None, kwargs=None, timeout=None, result_ttl=None, ttl=None, failure_ttl=None, description=None, depends_on=None, job_id=None, - meta=None, status=JobStatus.QUEUED, retry=None): + meta=None, status=JobStatus.QUEUED, retry=None, *, + on_success=None, on_failure=None): """Creates a job based on parameters given.""" timeout = parse_timeout(timeout) @@ -313,7 +314,8 @@ class Queue: result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, status=status, description=description, depends_on=depends_on, timeout=timeout, id=job_id, - origin=self.name, meta=meta, serializer=self.serializer + origin=self.name, meta=meta, serializer=self.serializer, on_success=on_success, + on_failure=on_failure ) if retry: @@ -371,9 +373,9 @@ class Queue: return job def enqueue_call(self, func, args=None, kwargs=None, timeout=None, - result_ttl=None, ttl=None, failure_ttl=None, - description=None, depends_on=None, job_id=None, - at_front=False, meta=None, retry=None, pipeline=None): + result_ttl=None, ttl=None, failure_ttl=None, description=None, + depends_on=None, job_id=None, at_front=False, meta=None, + retry=None, on_success=None, on_failure=None, pipeline=None): """Creates a job to represent the delayed function call and enqueues it. nd @@ -386,7 +388,7 @@ nd func, args=args, kwargs=kwargs, result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, description=description, depends_on=depends_on, job_id=job_id, meta=meta, status=JobStatus.QUEUED, timeout=timeout, - retry=retry + retry=retry, on_success=on_success, on_failure=on_failure ) job = self.setup_dependencies( @@ -477,6 +479,8 @@ nd at_front = kwargs.pop('at_front', False) meta = kwargs.pop('meta', None) retry = kwargs.pop('retry', None) + on_success = kwargs.pop('on_success', None) + on_failure = kwargs.pop('on_failure', None) pipeline = kwargs.pop('pipeline', None) if 'args' in kwargs or 'kwargs' in kwargs: @@ -485,30 +489,35 @@ nd kwargs = kwargs.pop('kwargs', None) return (f, timeout, description, result_ttl, ttl, failure_ttl, - depends_on, job_id, at_front, meta, retry, pipeline, args, kwargs) + depends_on, job_id, at_front, meta, retry, on_success, on_failure, + pipeline, args, kwargs) def enqueue(self, f, *args, **kwargs): """Creates a job to represent the delayed function call and enqueues it.""" (f, timeout, description, result_ttl, ttl, failure_ttl, - depends_on, job_id, at_front, meta, retry, pipeline, args, kwargs) = Queue.parse_args(f, *args, **kwargs) + depends_on, job_id, at_front, meta, retry, on_success, + on_failure, pipeline, args, kwargs) = Queue.parse_args(f, *args, **kwargs) return self.enqueue_call( func=f, args=args, kwargs=kwargs, timeout=timeout, result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, description=description, depends_on=depends_on, job_id=job_id, - at_front=at_front, meta=meta, retry=retry, pipeline=pipeline + at_front=at_front, meta=meta, retry=retry, on_success=on_success, on_failure=on_failure, + pipeline=pipeline ) def enqueue_at(self, datetime, f, *args, **kwargs): """Schedules a job to be enqueued at specified time""" (f, timeout, description, result_ttl, ttl, failure_ttl, - depends_on, job_id, at_front, meta, retry, pipeline, args, kwargs) = Queue.parse_args(f, *args, **kwargs) + depends_on, job_id, at_front, meta, retry, on_success, on_failure, + pipeline, args, kwargs) = Queue.parse_args(f, *args, **kwargs) job = self.create_job(f, status=JobStatus.SCHEDULED, args=args, kwargs=kwargs, timeout=timeout, result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, description=description, - depends_on=depends_on, job_id=job_id, meta=meta, retry=retry) + depends_on=depends_on, job_id=job_id, meta=meta, retry=retry, + on_success=on_success, on_failure=on_failure) return self.schedule_job(job, datetime, pipeline=pipeline) diff --git a/rq/worker.py b/rq/worker.py index 1ed913b1..ff63ab15 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -31,7 +31,7 @@ from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command from .compat import as_text, string_types, text_type from .connections import get_current_connection, push_connection, pop_connection -from .defaults import (DEFAULT_RESULT_TTL, +from .defaults import (CALLBACK_TIMEOUT, DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL, DEFAULT_JOB_MONITORING_INTERVAL, DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT) from .exceptions import DeserializationError, DequeueTimeout, ShutDownImminentException @@ -435,7 +435,7 @@ class Worker: stat = None try: pid, stat = os.waitpid(self.horse_pid, 0) - except ChildProcessError as e: + except ChildProcessError: # ChildProcessError: [Errno 10] No child processes pass return pid, stat @@ -873,7 +873,7 @@ class Worker: self.log = logger try: self.perform_job(job, queue) - except: + except: # noqa os._exit(1) # os._exit() is the way to exit from childs after a fork(), in @@ -1002,6 +1002,18 @@ class Worker: except redis.exceptions.WatchError: continue + def execute_success_callback(self, job, result): + """Executes success_callback with timeout""" + job.heartbeat(utcnow(), CALLBACK_TIMEOUT) + with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id): + job.success_callback(job, self.connection, result) + + def execute_failure_callback(self, job): + """Executes failure_callback with timeout""" + job.heartbeat(utcnow(), CALLBACK_TIMEOUT) + with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id): + job.failure_callback(job, self.connection, *sys.exc_info()) + def perform_job(self, job, queue): """Performs the actual work of a job. Will/should only be called inside the work horse's process. @@ -1023,6 +1035,10 @@ class Worker: # Pickle the result in the same try-except block since we need # to use the same exc handling when pickling fails job._result = rv + + if job.success_callback: + self.execute_success_callback(job, rv) + self.handle_job_success(job=job, queue=queue, started_job_registry=started_job_registry) @@ -1030,6 +1046,18 @@ class Worker: job.ended_at = utcnow() exc_info = sys.exc_info() exc_string = ''.join(traceback.format_exception(*exc_info)) + + if job.failure_callback: + try: + self.execute_failure_callback(job) + except: # noqa + self.log.error( + 'Worker %s: error while executing failure callback', + self.key, exc_info=True + ) + exc_info = sys.exc_info() + exc_string = ''.join(traceback.format_exception(*exc_info)) + self.handle_job_failure(job=job, exc_string=exc_string, queue=queue, started_job_registry=started_job_registry) self.handle_exception(job, *exc_info) @@ -1140,7 +1168,8 @@ class SimpleWorker(Worker): return self.perform_job(job, queue) def get_heartbeat_ttl(self, job): - # "-1" means that jobs never timeout. In this case, we should _not_ do -1 + 60 = 59. We should just stick to DEFAULT_WORKER_TTL. + # "-1" means that jobs never timeout. In this case, we should _not_ do -1 + 60 = 59. + # # We should just stick to DEFAULT_WORKER_TTL. if job.timeout == -1: return DEFAULT_WORKER_TTL else: diff --git a/tests/__init__.py b/tests/__init__.py index 76a0f4a4..2615a615 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -7,12 +7,8 @@ import os from redis import Redis from rq import pop_connection, push_connection -from rq.job import cancel_job -try: - import unittest -except ImportError: - import unittest2 as unittest # noqa +import unittest def find_empty_redis_database(ssl=False): @@ -20,11 +16,11 @@ def find_empty_redis_database(ssl=False): will use/connect it when no keys are in there. """ for dbnum in range(4, 17): - connection_kwargs = { 'db': dbnum } + connection_kwargs = {'db': dbnum} if ssl: connection_kwargs['port'] = 9736 connection_kwargs['ssl'] = True - connection_kwargs['ssl_cert_reqs'] = None # disable certificate validation + connection_kwargs['ssl_cert_reqs'] = None # disable certificate validation testconn = Redis(**connection_kwargs) empty = testconn.dbsize() == 0 if empty: @@ -35,9 +31,11 @@ def find_empty_redis_database(ssl=False): def slow(f): return unittest.skipUnless(os.environ.get('RUN_SLOW_TESTS_TOO'), "Slow tests disabled")(f) + def ssl_test(f): return unittest.skipUnless(os.environ.get('RUN_SSL_TESTS'), "SSL tests disabled")(f) + class RQTestCase(unittest.TestCase): """Base class to inherit test cases from for RQ. diff --git a/tests/fixtures.py b/tests/fixtures.py index 0103daf7..842e293f 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -46,12 +46,15 @@ def do_nothing(): """The best job in the world.""" pass + def raise_exc(): raise Exception('raise_exc error') + def raise_exc_mock(): return raise_exc + def div_by_zero(x): """Prepare for a division-by-zero exception.""" return x / 0 @@ -63,6 +66,7 @@ def some_calculation(x, y, z=1): """ return x * y / z + def rpush(key, value, append_worker_name=False, sleep=0): """Push a value into a list in Redis. Useful for detecting the order in which jobs were executed.""" @@ -73,9 +77,11 @@ def rpush(key, value, append_worker_name=False, sleep=0): redis = get_current_connection() redis.rpush(key, value) + def check_dependencies_are_met(): return get_current_job().dependencies_are_met() + def create_file(path): """Creates a file at the given path. Actually, leaves evidence that the job ran.""" @@ -87,18 +93,19 @@ def create_file_after_timeout(path, timeout): time.sleep(timeout) create_file(path) + def create_file_after_timeout_and_setsid(path, timeout): os.setsid() create_file_after_timeout(path, timeout) -def launch_process_within_worker_and_store_pid(path, timeout): +def launch_process_within_worker_and_store_pid(path, timeout): p = subprocess.Popen(['sleep', str(timeout)]) with open(path, 'w') as f: f.write('{}'.format(p.pid)) - p.wait() + def access_self(): assert get_current_connection() is not None assert get_current_job() is not None @@ -264,3 +271,18 @@ def burst_two_workers(queue, timeout=2, tries=5, pause=0.1): # Now can start the second worker. w2.work(burst=True) w1.join(timeout) + + +def save_result(job, connection, result): + """Store job result in a key""" + connection.set('success_callback:%s' % job.id, result, ex=60) + + +def save_exception(job, connection, type, value, traceback): + """Store job exception in a key""" + connection.set('failure_callback:%s' % job.id, str(value), ex=60) + + +def erroneous_callback(job): + """A callback that's not written properly""" + pass diff --git a/tests/test_callbacks.py b/tests/test_callbacks.py new file mode 100644 index 00000000..57362ae5 --- /dev/null +++ b/tests/test_callbacks.py @@ -0,0 +1,142 @@ +from datetime import timedelta + +from tests import RQTestCase +from tests.fixtures import div_by_zero, erroneous_callback, save_exception, save_result, say_hello + +from rq import Queue, Worker +from rq.job import Job, JobStatus, UNEVALUATED +from rq.worker import SimpleWorker + + +class QueueCallbackTestCase(RQTestCase): + + def test_enqueue_with_success_callback(self): + """Test enqueue* methods with on_success""" + queue = Queue(connection=self.testconn) + + # Only functions and builtins are supported as callback + with self.assertRaises(ValueError): + queue.enqueue(say_hello, on_success=Job.fetch) + + job = queue.enqueue(say_hello, on_success=print) + + job = Job.fetch(id=job.id, connection=self.testconn) + self.assertEqual(job.success_callback, print) + + job = queue.enqueue_in(timedelta(seconds=10), say_hello, on_success=print) + + job = Job.fetch(id=job.id, connection=self.testconn) + self.assertEqual(job.success_callback, print) + + def test_enqueue_with_failure_callback(self): + """queue.enqueue* methods with on_failure is persisted correctly""" + queue = Queue(connection=self.testconn) + + # Only functions and builtins are supported as callback + with self.assertRaises(ValueError): + queue.enqueue(say_hello, on_failure=Job.fetch) + + job = queue.enqueue(say_hello, on_failure=print) + + job = Job.fetch(id=job.id, connection=self.testconn) + self.assertEqual(job.failure_callback, print) + + job = queue.enqueue_in(timedelta(seconds=10), say_hello, on_failure=print) + + job = Job.fetch(id=job.id, connection=self.testconn) + self.assertEqual(job.failure_callback, print) + + +class WorkerCallbackTestCase(RQTestCase): + def test_success_callback(self): + """Test success callback is executed only when job is successful""" + queue = Queue(connection=self.testconn) + worker = SimpleWorker([queue]) + + job = queue.enqueue(say_hello, on_success=save_result) + + # Callback is executed when job is successfully executed + worker.work(burst=True) + self.assertEqual(job.get_status(), JobStatus.FINISHED) + self.assertEqual( + self.testconn.get('success_callback:%s' % job.id).decode(), + job.result + ) + + job = queue.enqueue(div_by_zero, on_success=save_result) + worker.work(burst=True) + self.assertEqual(job.get_status(), JobStatus.FAILED) + self.assertFalse(self.testconn.exists('success_callback:%s' % job.id)) + + def test_erroneous_success_callback(self): + """Test exception handling when executing success callback""" + queue = Queue(connection=self.testconn) + worker = Worker([queue]) + + # If success_callback raises an error, job will is considered as failed + job = queue.enqueue(say_hello, on_success=erroneous_callback) + worker.work(burst=True) + self.assertEqual(job.get_status(), JobStatus.FAILED) + + def test_failure_callback(self): + """Test failure callback is executed only when job a fails""" + queue = Queue(connection=self.testconn) + worker = SimpleWorker([queue]) + + job = queue.enqueue(div_by_zero, on_failure=save_exception) + + # Callback is executed when job is successfully executed + worker.work(burst=True) + self.assertEqual(job.get_status(), JobStatus.FAILED) + job.refresh() + print(job.exc_info) + self.assertIn('div_by_zero', + self.testconn.get('failure_callback:%s' % job.id).decode()) + + job = queue.enqueue(div_by_zero, on_success=save_result) + worker.work(burst=True) + self.assertEqual(job.get_status(), JobStatus.FAILED) + self.assertFalse(self.testconn.exists('failure_callback:%s' % job.id)) + + # TODO: add test case for error while executing failure callback + + +class JobCallbackTestCase(RQTestCase): + + def test_job_creation_with_success_callback(self): + """Ensure callbacks are created and persisted properly""" + job = Job.create(say_hello) + self.assertIsNone(job._success_callback_name) + # _success_callback starts with UNEVALUATED + self.assertEqual(job._success_callback, UNEVALUATED) + self.assertEqual(job.success_callback, None) + # _success_callback becomes `None` after `job.success_callback` is called if there's no success callback + self.assertEqual(job._success_callback, None) + + # job.success_callback is assigned properly + job = Job.create(say_hello, on_success=print) + self.assertIsNotNone(job._success_callback_name) + self.assertEqual(job.success_callback, print) + job.save() + + job = Job.fetch(id=job.id, connection=self.testconn) + self.assertEqual(job.success_callback, print) + + def test_job_creation_with_failure_callback(self): + """Ensure failure callbacks are persisted properly""" + job = Job.create(say_hello) + self.assertIsNone(job._failure_callback_name) + # _failure_callback starts with UNEVALUATED + self.assertEqual(job._failure_callback, UNEVALUATED) + self.assertEqual(job.failure_callback, None) + # _failure_callback becomes `None` after `job.failure_callback` is called if there's no failure callback + self.assertEqual(job._failure_callback, None) + + # job.failure_callback is assigned properly + job = Job.create(say_hello, on_failure=print) + self.assertIsNotNone(job._failure_callback_name) + self.assertEqual(job.failure_callback, print) + job.save() + + job = Job.fetch(id=job.id, connection=self.testconn) + self.assertEqual(job.failure_callback, print) diff --git a/tests/test_job.py b/tests/test_job.py index 482434df..7319180f 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -210,8 +210,10 @@ class TestJob(RQTestCase): # ... and no other keys are stored self.assertEqual( - sorted(self.testconn.hkeys(job.key)), - [b'created_at', b'data', b'description', b'ended_at', b'last_heartbeat', b'started_at', b'worker_name']) + set(self.testconn.hkeys(job.key)), + {b'created_at', b'data', b'description', b'ended_at', b'last_heartbeat', b'started_at', + b'worker_name', b'success_callback_name', b'failure_callback_name'} + ) self.assertEqual(job.last_heartbeat, None) self.assertEqual(job.last_heartbeat, None)