Do not run dependent jobs when parent or job is canceled (#1947)

This commit is contained in:
Fred Söderberg 2023-06-18 05:25:12 +02:00 committed by GitHub
parent a26f6244e3
commit d4159ee804
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 88 additions and 1 deletions

View File

@ -1586,7 +1586,7 @@ class Job:
# If parent job is not finished, we should only continue
# if this job allows parent job to fail
dependencies_ids.discard(parent_job.id)
if parent_job._status == JobStatus.CANCELED:
if parent_job.get_status() == JobStatus.CANCELED:
return False
elif parent_job._status == JobStatus.FAILED and not self.allow_dependency_failures:
return False

View File

@ -1206,6 +1206,7 @@ class Queue:
pipeline=pipe,
exclude_job_id=exclude_job_id,
)
and dependent_job.get_status(refresh=False) != JobStatus.CANCELED
]
pipe.multi()

View File

@ -4,6 +4,7 @@ import shutil
import signal
import subprocess
import sys
import threading
import time
import zlib
from datetime import datetime, timedelta
@ -594,6 +595,91 @@ class TestWorker(RQTestCase):
# Should not have created evidence of execution
self.assertEqual(os.path.exists(SENTINEL_FILE), False)
def test_cancel_running_parent_job(self):
"""Cancel a running parent job and verify that
dependent jobs are not started."""
def cancel_parent_job(job):
while job.is_queued:
time.sleep(1)
job.cancel()
return
q = Queue(
"low",
)
parent_job = q.enqueue(long_running_job, 5)
job = q.enqueue(say_hello, depends_on=parent_job)
job2 = q.enqueue(say_hello, depends_on=job)
status_thread = threading.Thread(target=cancel_parent_job, args=(parent_job,))
status_thread.start()
w = Worker([q])
w.work(
burst=True,
)
status_thread.join()
self.assertNotEqual(parent_job.result, None)
self.assertEqual(job.get_status(), JobStatus.DEFERRED)
self.assertEqual(job.result, None)
self.assertEqual(job2.get_status(), JobStatus.DEFERRED)
self.assertEqual(job2.result, None)
self.assertEqual(q.count, 0)
def test_cancel_dependent_job(self):
"""Cancel job and verify that when the parent job is finished,
the dependent job is not started."""
q = Queue(
"low",
)
parent_job = q.enqueue(long_running_job, 5, job_id="parent_job")
job = q.enqueue(say_hello, depends_on=parent_job, job_id="job1")
job2 = q.enqueue(say_hello, depends_on=job, job_id="job2")
job.cancel()
w = Worker([q])
w.work(
burst=True,
)
self.assertTrue(job.is_canceled)
self.assertNotEqual(parent_job.result, None)
self.assertEqual(job.get_status(), JobStatus.CANCELED)
self.assertEqual(job.result, None)
self.assertEqual(job2.result, None)
self.assertEqual(job2.get_status(), JobStatus.DEFERRED)
self.assertEqual(q.count, 0)
def test_cancel_job_enqueue_dependent(self):
"""Cancel a job in a chain and enqueue the dependent jobs."""
q = Queue(
"low",
)
parent_job = q.enqueue(long_running_job, 5, job_id="parent_job")
job = q.enqueue(say_hello, depends_on=parent_job, job_id="job1")
job2 = q.enqueue(say_hello, depends_on=job, job_id="job2")
job3 = q.enqueue(say_hello, depends_on=job2, job_id="job3")
job.cancel(enqueue_dependents=True)
w = Worker([q])
w.work(
burst=True,
)
self.assertTrue(job.is_canceled)
self.assertNotEqual(parent_job.result, None)
self.assertEqual(job.get_status(), JobStatus.CANCELED)
self.assertEqual(job.result, None)
self.assertNotEqual(job2.result, None)
self.assertEqual(job2.get_status(), JobStatus.FINISHED)
self.assertEqual(job3.get_status(), JobStatus.FINISHED)
self.assertEqual(q.count, 0)
@slow
def test_max_idle_time(self):
q = Queue()