Check dependencies when enqueue via Queue.enqueue_job() (#1837)

* test: check dependencies when enqueue via Queue.enqueue_job()

Signed-off-by: Simó Albert i Beltran <sim6@bona.gent>

* fix: check dependencies when enqueue via Queue.enqueue_job()

Signed-off-by: Simó Albert i Beltran <sim6@bona.gent>
Co-authored-by: Selwin Ong <selwin.ong@gmail.com>

---------

Signed-off-by: Simó Albert i Beltran <sim6@bona.gent>
Co-authored-by: Selwin Ong <selwin.ong@gmail.com>
This commit is contained in:
Simó Albert i Beltran 2023-03-09 00:15:57 +01:00 committed by GitHub
parent 5a1c0a09f4
commit 64e202ea19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 45 additions and 13 deletions

View File

@ -1377,7 +1377,7 @@ class Job:
self.set_status(JobStatus.SCHEDULED) self.set_status(JobStatus.SCHEDULED)
queue.schedule_job(self, scheduled_datetime, pipeline=pipeline) queue.schedule_job(self, scheduled_datetime, pipeline=pipeline)
else: else:
queue.enqueue_job(self, pipeline=pipeline) queue._enqueue_job(self, pipeline=pipeline)
def register_dependency(self, pipeline: Optional['Pipeline'] = None): def register_dependency(self, pipeline: Optional['Pipeline'] = None):
"""Jobs may have dependencies. Jobs are enqueued only if the jobs they """Jobs may have dependencies. Jobs are enqueued only if the jobs they

View File

@ -660,12 +660,7 @@ class Queue:
on_success=on_success, on_success=on_success,
on_failure=on_failure, on_failure=on_failure,
) )
return self.enqueue_job(job, pipeline=pipeline, at_front=at_front)
job = self.setup_dependencies(job, pipeline=pipeline)
# If we do not depend on an unfinished job, enqueue the job.
if job.get_status(refresh=False) != JobStatus.DEFERRED:
return self.enqueue_job(job, pipeline=pipeline, at_front=at_front)
return job
@staticmethod @staticmethod
def prepare_data( def prepare_data(
@ -736,7 +731,7 @@ class Queue:
""" """
pipe = pipeline if pipeline is not None else self.connection.pipeline() pipe = pipeline if pipeline is not None else self.connection.pipeline()
jobs = [ jobs = [
self.enqueue_job( self._enqueue_job(
self.create_job( self.create_job(
job_data.func, job_data.func,
args=job_data.args, args=job_data.args,
@ -977,7 +972,26 @@ class Queue:
return self.enqueue_at(datetime.now(timezone.utc) + time_delta, func, *args, **kwargs) return self.enqueue_at(datetime.now(timezone.utc) + time_delta, func, *args, **kwargs)
def enqueue_job(self, job: 'Job', pipeline: Optional['Pipeline'] = None, at_front: bool = False) -> Job: def enqueue_job(self, job: 'Job', pipeline: Optional['Pipeline'] = None, at_front: bool = False) -> Job:
"""Enqueues a job for delayed execution. """Enqueues a job for delayed execution checking dependencies.
Args:
job (Job): The job to enqueue
pipeline (Optional[Pipeline], optional): The Redis pipeline to use. Defaults to None.
at_front (bool, optional): Whether should enqueue at the front of the queue. Defaults to False.
Returns:
Job: The enqued job
"""
job.origin = self.name
job = self.setup_dependencies(job, pipeline=pipeline)
# If we do not depend on an unfinished job, enqueue the job.
if job.get_status(refresh=False) != JobStatus.DEFERRED:
return self._enqueue_job(job, pipeline=pipeline, at_front=at_front)
return job
def _enqueue_job(self, job: 'Job', pipeline: Optional['Pipeline'] = None, at_front: bool = False) -> Job:
"""Enqueues a job for delayed execution without checking dependencies.
If Queue is instantiated with is_async=False, job is executed immediately. If Queue is instantiated with is_async=False, job is executed immediately.
@ -1103,10 +1117,10 @@ class Queue:
registry.remove(dependent, pipeline=pipe) registry.remove(dependent, pipeline=pipe)
if dependent.origin == self.name: if dependent.origin == self.name:
self.enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front) self._enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front)
else: else:
queue = self.__class__(name=dependent.origin, connection=self.connection) queue = self.__class__(name=dependent.origin, connection=self.connection)
queue.enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front) queue._enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front)
# Only delete dependents_key if all dependents have been enqueued # Only delete dependents_key if all dependents have been enqueued
if len(jobs_to_enqueue) == len(dependent_job_ids): if len(jobs_to_enqueue) == len(dependent_job_ids):

View File

@ -194,7 +194,7 @@ class BaseRegistry:
job.ended_at = None job.ended_at = None
job._exc_info = '' job._exc_info = ''
job.save() job.save()
job = queue.enqueue_job(job, pipeline=pipeline, at_front=at_front) job = queue._enqueue_job(job, pipeline=pipeline, at_front=at_front)
pipeline.execute() pipeline.execute()
return job return job

View File

@ -165,7 +165,7 @@ class RQScheduler:
jobs = Job.fetch_many(job_ids, connection=self.connection, serializer=self.serializer) jobs = Job.fetch_many(job_ids, connection=self.connection, serializer=self.serializer)
for job in jobs: for job in jobs:
if job is not None: if job is not None:
queue.enqueue_job(job, pipeline=pipeline, at_front=bool(job.enqueue_at_front)) queue._enqueue_job(job, pipeline=pipeline, at_front=bool(job.enqueue_at_front))
registry.remove(job, pipeline=pipeline) registry.remove(job, pipeline=pipeline)
pipeline.execute() pipeline.execute()
self._status = self.Status.STARTED self._status = self.Status.STARTED

View File

@ -130,6 +130,24 @@ class TestDependencies(RQTestCase):
self.assertEqual(job.get_status(), JobStatus.FINISHED) self.assertEqual(job.get_status(), JobStatus.FINISHED)
def test_enqueue_job_dependency(self):
"""Enqueue via Queue.enqueue_job() with depencency"""
q = Queue(connection=self.testconn)
w = SimpleWorker([q], connection=q.connection)
# enqueue dependent job when parent successfully finishes
parent_job = Job.create(say_hello)
parent_job.save()
job = Job.create(say_hello, depends_on=parent_job)
q.enqueue_job(job)
w.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.DEFERRED)
q.enqueue_job(parent_job)
w.work(burst=True)
self.assertEqual(parent_job.get_status(), JobStatus.FINISHED)
self.assertEqual(job.get_status(), JobStatus.FINISHED)
def test_dependencies_are_met_if_parent_is_canceled(self): def test_dependencies_are_met_if_parent_is_canceled(self):
"""When parent job is canceled, it should be treated as failed""" """When parent job is canceled, it should be treated as failed"""
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn)