call callbacks and prepare job when running sync (#1599)

* call signals

* fix lines

* add tests and fix faliure status bug

* bump version
This commit is contained in:
Eric Feldman 2021-12-07 12:20:36 +02:00 committed by GitHub
parent 0147b30f2b
commit 93f34c796f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 48 additions and 1 deletions

View File

@ -3,6 +3,7 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals)
import uuid
import sys
import warnings
from collections import namedtuple
@ -572,7 +573,23 @@ nd
pipe.execute()
if not self._is_async:
job = self.run_sync(job)
return job
def run_sync(self, job):
with self.connection.pipeline() as pipeline:
job.prepare_for_execution('sync', pipeline)
try:
job = self.run_job(job)
except: # noqa
job.set_status(JobStatus.FAILED)
if job.failure_callback:
job.failure_callback(job, self.connection, *sys.exc_info())
else:
if job.success_callback:
job.success_callback(job, self.connection, job.result)
return job

View File

@ -2,4 +2,4 @@
from __future__ import (absolute_import, division, print_function,
unicode_literals)
VERSION = '1.10.1'
VERSION = '1.10.2'

View File

@ -47,6 +47,36 @@ class QueueCallbackTestCase(RQTestCase):
self.assertEqual(job.failure_callback, print)
class SyncJobCallback(RQTestCase):
def test_success_callback(self):
"""Test success callback is executed only when job is successful"""
queue = Queue(is_async=False)
job = queue.enqueue(say_hello, on_success=save_result)
self.assertEqual(job.get_status(), JobStatus.FINISHED)
self.assertEqual(
self.testconn.get('success_callback:%s' % job.id).decode(),
job.result
)
job = queue.enqueue(div_by_zero, on_success=save_result)
self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertFalse(self.testconn.exists('success_callback:%s' % job.id))
def test_failure_callback(self):
"""queue.enqueue* methods with on_failure is persisted correctly"""
queue = Queue(is_async=False)
job = queue.enqueue(div_by_zero, on_failure=save_exception)
self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertIn('div_by_zero',
self.testconn.get('failure_callback:%s' % job.id).decode())
job = queue.enqueue(div_by_zero, on_success=save_result)
self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertFalse(self.testconn.exists('failure_callback:%s' % job.id))
class WorkerCallbackTestCase(RQTestCase):
def test_success_callback(self):
"""Test success callback is executed only when job is successful"""