From d4159ee804790f53099d7a9b969bce313377306b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fred=20S=C3=B6derberg?= Date: Sun, 18 Jun 2023 05:25:12 +0200 Subject: [PATCH] Do not run dependent jobs when parent or job is canceled (#1947) --- rq/job.py | 2 +- rq/queue.py | 1 + tests/test_worker.py | 86 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 88 insertions(+), 1 deletion(-) diff --git a/rq/job.py b/rq/job.py index 0acfa401..f2707b56 100644 --- a/rq/job.py +++ b/rq/job.py @@ -1586,7 +1586,7 @@ class Job: # If parent job is not finished, we should only continue # if this job allows parent job to fail dependencies_ids.discard(parent_job.id) - if parent_job._status == JobStatus.CANCELED: + if parent_job.get_status() == JobStatus.CANCELED: return False elif parent_job._status == JobStatus.FAILED and not self.allow_dependency_failures: return False diff --git a/rq/queue.py b/rq/queue.py index fc3bd47d..ab7d7d4c 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1206,6 +1206,7 @@ class Queue: pipeline=pipe, exclude_job_id=exclude_job_id, ) + and dependent_job.get_status(refresh=False) != JobStatus.CANCELED ] pipe.multi() diff --git a/tests/test_worker.py b/tests/test_worker.py index 8f0c064c..5784e343 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -4,6 +4,7 @@ import shutil import signal import subprocess import sys +import threading import time import zlib from datetime import datetime, timedelta @@ -594,6 +595,91 @@ class TestWorker(RQTestCase): # Should not have created evidence of execution self.assertEqual(os.path.exists(SENTINEL_FILE), False) + def test_cancel_running_parent_job(self): + """Cancel a running parent job and verify that + dependent jobs are not started.""" + + def cancel_parent_job(job): + while job.is_queued: + time.sleep(1) + + job.cancel() + return + + q = Queue( + "low", + ) + parent_job = q.enqueue(long_running_job, 5) + + job = q.enqueue(say_hello, depends_on=parent_job) + job2 = q.enqueue(say_hello, depends_on=job) + status_thread = threading.Thread(target=cancel_parent_job, args=(parent_job,)) + status_thread.start() + + w = Worker([q]) + w.work( + burst=True, + ) + status_thread.join() + + self.assertNotEqual(parent_job.result, None) + self.assertEqual(job.get_status(), JobStatus.DEFERRED) + self.assertEqual(job.result, None) + self.assertEqual(job2.get_status(), JobStatus.DEFERRED) + self.assertEqual(job2.result, None) + self.assertEqual(q.count, 0) + + def test_cancel_dependent_job(self): + """Cancel job and verify that when the parent job is finished, + the dependent job is not started.""" + + q = Queue( + "low", + ) + parent_job = q.enqueue(long_running_job, 5, job_id="parent_job") + job = q.enqueue(say_hello, depends_on=parent_job, job_id="job1") + job2 = q.enqueue(say_hello, depends_on=job, job_id="job2") + job.cancel() + + w = Worker([q]) + w.work( + burst=True, + ) + self.assertTrue(job.is_canceled) + self.assertNotEqual(parent_job.result, None) + self.assertEqual(job.get_status(), JobStatus.CANCELED) + self.assertEqual(job.result, None) + self.assertEqual(job2.result, None) + self.assertEqual(job2.get_status(), JobStatus.DEFERRED) + self.assertEqual(q.count, 0) + + def test_cancel_job_enqueue_dependent(self): + """Cancel a job in a chain and enqueue the dependent jobs.""" + + q = Queue( + "low", + ) + parent_job = q.enqueue(long_running_job, 5, job_id="parent_job") + job = q.enqueue(say_hello, depends_on=parent_job, job_id="job1") + job2 = q.enqueue(say_hello, depends_on=job, job_id="job2") + job3 = q.enqueue(say_hello, depends_on=job2, job_id="job3") + + job.cancel(enqueue_dependents=True) + + w = Worker([q]) + w.work( + burst=True, + ) + self.assertTrue(job.is_canceled) + self.assertNotEqual(parent_job.result, None) + self.assertEqual(job.get_status(), JobStatus.CANCELED) + self.assertEqual(job.result, None) + self.assertNotEqual(job2.result, None) + self.assertEqual(job2.get_status(), JobStatus.FINISHED) + self.assertEqual(job3.get_status(), JobStatus.FINISHED) + + self.assertEqual(q.count, 0) + @slow def test_max_idle_time(self): q = Queue()