Enqueue dependents when job is abandoned and moved to FailedJobRegistry (#2008)

* Enqueue dependents when job is abandoned

* Add test

* Run CI
This commit is contained in:
Maria Khrustaleva 2024-07-18 05:16:09 +02:00 committed by GitHub
parent c7bc4f61c6
commit 5cfd853546
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 44 additions and 2 deletions

View File

@ -240,6 +240,7 @@ class StartedJobRegistry(BaseRegistry):
if job_ids: if job_ids:
failed_job_registry = FailedJobRegistry(self.name, self.connection, serializer=self.serializer) failed_job_registry = FailedJobRegistry(self.name, self.connection, serializer=self.serializer)
queue = self.get_queue()
with self.connection.pipeline() as pipeline: with self.connection.pipeline() as pipeline:
for job_id in job_ids: for job_id in job_ids:
@ -268,7 +269,6 @@ class StartedJobRegistry(BaseRegistry):
retry = job.retries_left and job.retries_left > 0 retry = job.retries_left and job.retries_left > 0
if retry: if retry:
queue = self.get_queue()
job.retry(queue, pipeline) job.retry(queue, pipeline)
else: else:
@ -282,6 +282,7 @@ class StartedJobRegistry(BaseRegistry):
job.save(pipeline=pipeline, include_meta=False) job.save(pipeline=pipeline, include_meta=False)
job.cleanup(ttl=-1, pipeline=pipeline) job.cleanup(ttl=-1, pipeline=pipeline)
failed_job_registry.add(job, job.failure_ttl) failed_job_registry.add(job, job.failure_ttl)
queue.enqueue_dependents(job)
pipeline.zremrangebyscore(self.key, 0, score) pipeline.zremrangebyscore(self.key, 0, score)
pipeline.execute() pipeline.execute()

View File

@ -4,7 +4,7 @@ from unittest.mock import ANY
from rq.defaults import DEFAULT_FAILURE_TTL from rq.defaults import DEFAULT_FAILURE_TTL
from rq.exceptions import AbandonedJobError, InvalidJobOperation from rq.exceptions import AbandonedJobError, InvalidJobOperation
from rq.job import Job, JobStatus, requeue_job from rq.job import Dependency, Job, JobStatus, requeue_job
from rq.queue import Queue from rq.queue import Queue
from rq.registry import ( from rq.registry import (
CanceledJobRegistry, CanceledJobRegistry,
@ -298,6 +298,47 @@ class TestRegistry(RQTestCase):
registry = StartedJobRegistry('foo', connection=self.connection, serializer=JSONSerializer) registry = StartedJobRegistry('foo', connection=self.connection, serializer=JSONSerializer)
self.assertEqual(registry.get_queue(), Queue('foo', connection=self.connection, serializer=JSONSerializer)) self.assertEqual(registry.get_queue(), Queue('foo', connection=self.connection, serializer=JSONSerializer))
def test_enqueue_dependents_when_parent_job_is_abandoned(self):
"""Enqueuing parent job's dependencies after moving it to FailedJobRegistry due to AbandonedJobError."""
queue = Queue(connection=self.testconn)
worker = Worker([queue])
failed_job_registry = FailedJobRegistry(connection=self.testconn)
finished_job_registry = FinishedJobRegistry(connection=self.testconn)
deferred_job_registry = DeferredJobRegistry(connection=self.connection)
parent_job = queue.enqueue(say_hello)
job_to_be_executed = queue.enqueue_call(say_hello, depends_on=Dependency(jobs=parent_job, allow_failure=True))
job_not_to_be_executed = queue.enqueue_call(
say_hello, depends_on=Dependency(jobs=parent_job, allow_failure=False)
)
self.assertIn(job_to_be_executed, deferred_job_registry)
self.assertIn(job_not_to_be_executed, deferred_job_registry)
self.testconn.zadd(self.registry.key, {parent_job.id: 2})
queue.remove(parent_job.id)
with mock.patch.object(Job, 'execute_failure_callback') as mocked:
self.registry.cleanup()
mocked.assert_called_once_with(queue.death_penalty_class, AbandonedJobError, ANY, ANY)
# check that parent job was moved to FailedJobRegistry and has correct status
self.assertIn(parent_job, failed_job_registry)
self.assertNotIn(parent_job, self.registry)
self.assertTrue(parent_job.is_failed)
# check that only job_to_be_executed has been queued and executed
self.assertEqual(len(queue.get_job_ids()), 1)
self.assertTrue(job_to_be_executed.is_queued)
self.assertFalse(job_not_to_be_executed.is_queued)
worker.work(burst=True)
self.assertTrue(job_to_be_executed.is_finished)
self.assertNotIn(job_to_be_executed, deferred_job_registry)
self.assertIn(job_to_be_executed, finished_job_registry)
self.assertFalse(job_not_to_be_executed.is_finished)
self.assertNotIn(job_not_to_be_executed, finished_job_registry)
class TestFinishedJobRegistry(RQTestCase): class TestFinishedJobRegistry(RQTestCase):
def setUp(self): def setUp(self):