diff --git a/rq/registry.py b/rq/registry.py index 66ef90ad..101b4f24 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -240,6 +240,7 @@ class StartedJobRegistry(BaseRegistry): if job_ids: failed_job_registry = FailedJobRegistry(self.name, self.connection, serializer=self.serializer) + queue = self.get_queue() with self.connection.pipeline() as pipeline: for job_id in job_ids: @@ -268,7 +269,6 @@ class StartedJobRegistry(BaseRegistry): retry = job.retries_left and job.retries_left > 0 if retry: - queue = self.get_queue() job.retry(queue, pipeline) else: @@ -282,6 +282,7 @@ class StartedJobRegistry(BaseRegistry): job.save(pipeline=pipeline, include_meta=False) job.cleanup(ttl=-1, pipeline=pipeline) failed_job_registry.add(job, job.failure_ttl) + queue.enqueue_dependents(job) pipeline.zremrangebyscore(self.key, 0, score) pipeline.execute() diff --git a/tests/test_registry.py b/tests/test_registry.py index 5e05c6b8..7f47b70a 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -4,7 +4,7 @@ from unittest.mock import ANY from rq.defaults import DEFAULT_FAILURE_TTL from rq.exceptions import AbandonedJobError, InvalidJobOperation -from rq.job import Job, JobStatus, requeue_job +from rq.job import Dependency, Job, JobStatus, requeue_job from rq.queue import Queue from rq.registry import ( CanceledJobRegistry, @@ -298,6 +298,47 @@ class TestRegistry(RQTestCase): registry = StartedJobRegistry('foo', connection=self.connection, serializer=JSONSerializer) self.assertEqual(registry.get_queue(), Queue('foo', connection=self.connection, serializer=JSONSerializer)) + def test_enqueue_dependents_when_parent_job_is_abandoned(self): + """Enqueuing parent job's dependencies after moving it to FailedJobRegistry due to AbandonedJobError.""" + queue = Queue(connection=self.testconn) + worker = Worker([queue]) + failed_job_registry = FailedJobRegistry(connection=self.testconn) + finished_job_registry = FinishedJobRegistry(connection=self.testconn) + deferred_job_registry = DeferredJobRegistry(connection=self.connection) + + parent_job = queue.enqueue(say_hello) + job_to_be_executed = queue.enqueue_call(say_hello, depends_on=Dependency(jobs=parent_job, allow_failure=True)) + job_not_to_be_executed = queue.enqueue_call( + say_hello, depends_on=Dependency(jobs=parent_job, allow_failure=False) + ) + self.assertIn(job_to_be_executed, deferred_job_registry) + self.assertIn(job_not_to_be_executed, deferred_job_registry) + + self.testconn.zadd(self.registry.key, {parent_job.id: 2}) + queue.remove(parent_job.id) + + with mock.patch.object(Job, 'execute_failure_callback') as mocked: + self.registry.cleanup() + mocked.assert_called_once_with(queue.death_penalty_class, AbandonedJobError, ANY, ANY) + + # check that parent job was moved to FailedJobRegistry and has correct status + self.assertIn(parent_job, failed_job_registry) + self.assertNotIn(parent_job, self.registry) + self.assertTrue(parent_job.is_failed) + + # check that only job_to_be_executed has been queued and executed + self.assertEqual(len(queue.get_job_ids()), 1) + self.assertTrue(job_to_be_executed.is_queued) + self.assertFalse(job_not_to_be_executed.is_queued) + + worker.work(burst=True) + self.assertTrue(job_to_be_executed.is_finished) + self.assertNotIn(job_to_be_executed, deferred_job_registry) + self.assertIn(job_to_be_executed, finished_job_registry) + + self.assertFalse(job_not_to_be_executed.is_finished) + self.assertNotIn(job_not_to_be_executed, finished_job_registry) + class TestFinishedJobRegistry(RQTestCase): def setUp(self):