diff --git a/rq/job.py b/rq/job.py index 477f92c5..657eb522 100644 --- a/rq/job.py +++ b/rq/job.py @@ -734,7 +734,7 @@ class Job: # Only WATCH if no pipeline passed, otherwise caller is responsible if pipeline is None: pipe.watch(self.dependents_key) - q.enqueue_dependents(self, pipeline=pipeline) + q.enqueue_dependents(self, pipeline=pipeline, exclude_job_id=self.id) self._remove_from_registries( pipeline=pipe, remove_from_queue=True @@ -982,7 +982,7 @@ class Job: return [Job.key_for(_id.decode()) for _id in dependencies] - def dependencies_are_met(self, parent_job=None, pipeline=None): + def dependencies_are_met(self, parent_job=None, pipeline=None, exclude_job_id=None): """Returns a boolean indicating if all of this job's dependencies are _FINISHED_ If a pipeline is passed, all dependencies are WATCHed. @@ -1001,13 +1001,18 @@ class Job: dependencies_ids = {_id.decode() for _id in connection.smembers(self.dependencies_key)} + if exclude_job_id: + dependencies_ids.discard(exclude_job_id) + if parent_job.id == exclude_job_id: + parent_job = None + if parent_job: - # If parent job is canceled, no need to check for status + # If parent job is canceled, treat dependency as failed # If parent job is not finished, we should only continue # if this job allows parent job to fail dependencies_ids.discard(parent_job.id) if parent_job._status == JobStatus.CANCELED: - pass + return False elif parent_job._status == JobStatus.FAILED and not self.allow_dependency_failures: return False diff --git a/rq/queue.py b/rq/queue.py index f7d22144..9b7dcbd9 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -602,7 +602,7 @@ class Queue: return job - def enqueue_dependents(self, job, pipeline=None): + def enqueue_dependents(self, job, pipeline=None, exclude_job_id=None): """Enqueues all jobs in the given job's dependents set and clears it. When called without a pipeline, this method uses WATCH/MULTI/EXEC. @@ -638,6 +638,7 @@ class Queue: ) if dependent_job and dependent_job.dependencies_are_met( parent_job=job, pipeline=pipe, + exclude_job_id=exclude_job_id, ) ] diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index 6d2f776c..12b956d4 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -1,7 +1,7 @@ from tests import RQTestCase -from tests.fixtures import div_by_zero, say_hello +from tests.fixtures import check_dependencies_are_met, div_by_zero, say_hello -from rq import Queue, SimpleWorker +from rq import Queue, SimpleWorker, Worker from rq.job import Job, JobStatus, Dependency @@ -97,3 +97,48 @@ class TestDependencies(RQTestCase): w.work(burst=True) job = Job.fetch(job.id, connection=self.testconn) self.assertEqual(job.get_status(), JobStatus.FINISHED) + + def test_dependencies_are_met_if_parent_is_canceled(self): + """When parent job is canceled, it should be treated as failed""" + queue = Queue(connection=self.testconn) + job = queue.enqueue(say_hello) + job.set_status(JobStatus.CANCELED) + dependent_job = queue.enqueue(say_hello, depends_on=job) + # dependencies_are_met() should return False, whether or not + # parent_job is provided + self.assertFalse(dependent_job.dependencies_are_met(job)) + self.assertFalse(dependent_job.dependencies_are_met()) + + def test_can_enqueue_job_if_dependency_is_deleted(self): + queue = Queue(connection=self.testconn) + + dependency_job = queue.enqueue(say_hello, result_ttl=0) + + w = Worker([queue]) + w.work(burst=True) + + assert queue.enqueue(say_hello, depends_on=dependency_job) + + def test_dependencies_are_met_if_dependency_is_deleted(self): + queue = Queue(connection=self.testconn) + + dependency_job = queue.enqueue(say_hello, result_ttl=0) + dependent_job = queue.enqueue(say_hello, depends_on=dependency_job) + + w = Worker([queue]) + w.work(burst=True, max_jobs=1) + + assert dependent_job.dependencies_are_met() + assert dependent_job.get_status() == JobStatus.QUEUED + + def test_dependencies_are_met_at_execution_time(self): + queue = Queue(connection=self.testconn) + queue.empty() + queue.enqueue(say_hello, job_id="A") + queue.enqueue(say_hello, job_id="B") + job_c = queue.enqueue(check_dependencies_are_met, job_id="C", depends_on=["A", "B"]) + + job_c.dependencies_are_met() + w = Worker([queue]) + w.work(burst=True) + assert job_c.result \ No newline at end of file diff --git a/tests/test_job.py b/tests/test_job.py index 6106afb9..317e2f9a 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1119,40 +1119,6 @@ class TestJob(RQTestCase): pipeline.touch(Job.key_for(dependent_job.id)) pipeline.execute() - def test_can_enqueue_job_if_dependency_is_deleted(self): - queue = Queue(connection=self.testconn) - - dependency_job = queue.enqueue(fixtures.say_hello, result_ttl=0) - - w = Worker([queue]) - w.work(burst=True) - - assert queue.enqueue(fixtures.say_hello, depends_on=dependency_job) - - def test_dependents_are_met_if_dependency_is_deleted(self): - queue = Queue(connection=self.testconn) - - dependency_job = queue.enqueue(fixtures.say_hello, result_ttl=0) - dependent_job = queue.enqueue(fixtures.say_hello, depends_on=dependency_job) - - w = Worker([queue]) - w.work(burst=True, max_jobs=1) - - assert dependent_job.dependencies_are_met() - assert dependent_job.get_status() == JobStatus.QUEUED - - def test_dependencies_are_met_at_execution_time(self): - queue = Queue(connection=self.testconn) - queue.empty() - queue.enqueue(fixtures.say_hello, job_id="A") - queue.enqueue(fixtures.say_hello, job_id="B") - job_C = queue.enqueue(fixtures.check_dependencies_are_met, job_id="C", depends_on=["A", "B"]) - - job_C.dependencies_are_met() - w = Worker([queue]) - w.work(burst=True) - assert job_C.result - def test_execution_order_with_sole_dependency(self): queue = Queue(connection=self.testconn) key = 'test_job:job_order'