diff --git a/rq/dependency.py b/rq/dependency.py new file mode 100644 index 00000000..ca1b9409 --- /dev/null +++ b/rq/dependency.py @@ -0,0 +1,27 @@ +from typing import List + +from redis.client import Pipeline +from redis.exceptions import WatchError + +from .job import Job + + +class Dependency: + @classmethod + def get_jobs_with_met_dependencies(cls, jobs: List['Job'], pipeline: Pipeline): + jobs_with_met_dependencies = [] + jobs_with_unmet_dependencies = [] + for job in jobs: + while True: + try: + pipeline.watch(*[Job.key_for(dependency_id) for dependency_id in job._dependency_ids]) + job.register_dependency(pipeline=pipeline) + if job.dependencies_are_met(pipeline=pipeline): + jobs_with_met_dependencies.append(job) + else: + jobs_with_unmet_dependencies.append(job) + pipeline.execute() + except WatchError: + continue + break + return jobs_with_met_dependencies, jobs_with_unmet_dependencies diff --git a/rq/queue.py b/rq/queue.py index 9b1e1e54..1709bae4 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -20,6 +20,7 @@ if TYPE_CHECKING: from .connections import resolve_connection from .defaults import DEFAULT_RESULT_TTL +from .dependency import Dependency from .exceptions import DequeueTimeout, NoSuchJobError from .job import Job, JobStatus from .logutils import blue, green @@ -42,6 +43,7 @@ class EnqueueData( "ttl", "failure_ttl", "description", + "depends_on", "job_id", "at_front", "meta", @@ -714,6 +716,7 @@ class Queue: ttl: Optional[int] = None, failure_ttl: Optional[int] = None, description: Optional[str] = None, + depends_on: Optional[List] = None, job_id: Optional[str] = None, at_front: bool = False, meta: Optional[Dict] = None, @@ -733,6 +736,7 @@ class Queue: ttl (Optional[int], optional): Time to live. Defaults to None. failure_ttl (Optional[int], optional): Failure time to live. Defaults to None. description (Optional[str], optional): The job description. Defaults to None. + depends_on (Optional[JobDependencyType], optional): The job dependencies. Defaults to None. job_id (Optional[str], optional): The job ID. Defaults to None. at_front (bool, optional): Whether to enqueue the job at the front. Defaults to False. meta (Optional[Dict], optional): Metadata to attach to the job. Defaults to None. @@ -752,6 +756,7 @@ class Queue: ttl, failure_ttl, description, + depends_on, job_id, at_front, meta, @@ -772,33 +777,66 @@ class Queue: List[Job]: A list of enqueued jobs """ pipe = pipeline if pipeline is not None else self.connection.pipeline() - jobs = [ - self._enqueue_job( - self.create_job( - job_data.func, - args=job_data.args, - kwargs=job_data.kwargs, - result_ttl=job_data.result_ttl, - ttl=job_data.ttl, - failure_ttl=job_data.failure_ttl, - description=job_data.description, - depends_on=None, - job_id=job_data.job_id, - meta=job_data.meta, - status=JobStatus.QUEUED, - timeout=job_data.timeout, - retry=job_data.retry, - on_success=job_data.on_success, - on_failure=job_data.on_failure, - ), - pipeline=pipe, - at_front=job_data.at_front, + jobs_without_dependencies = [] + jobs_with_unmet_dependencies = [] + jobs_with_met_dependencies = [] + + def get_job_kwargs(job_data, initial_status): + return { + "func": job_data.func, + "args": job_data.args, + "kwargs": job_data.kwargs, + "result_ttl": job_data.result_ttl, + "ttl": job_data.ttl, + "failure_ttl": job_data.failure_ttl, + "description": job_data.description, + "depends_on": job_data.depends_on, + "job_id": job_data.job_id, + "meta": job_data.meta, + "status": initial_status, + "timeout": job_data.timeout, + "retry": job_data.retry, + "on_success": job_data.on_success, + "on_failure": job_data.on_failure, + } + + # Enqueue jobs without dependencies + job_datas_without_dependencies = [job_data for job_data in job_datas if not job_data.depends_on] + if job_datas_without_dependencies: + jobs_without_dependencies = [ + self._enqueue_job( + self.create_job(**get_job_kwargs(job_data, JobStatus.QUEUED)), + pipeline=pipe, + at_front=job_data.at_front, + ) + for job_data in job_datas_without_dependencies + ] + if pipeline is None: + pipe.execute() + + job_datas_with_dependencies = [job_data for job_data in job_datas if job_data.depends_on] + if job_datas_with_dependencies: + # Save all jobs with dependencies as deferred + jobs_with_dependencies = [ + self.create_job(**get_job_kwargs(job_data, JobStatus.DEFERRED)) + for job_data in job_datas_with_dependencies + ] + for job in jobs_with_dependencies: + job.save(pipeline=pipe) + if pipeline is None: + pipe.execute() + + # Enqueue the jobs whose dependencies have been met + jobs_with_met_dependencies, jobs_with_unmet_dependencies = Dependency.get_jobs_with_met_dependencies( + jobs_with_dependencies, pipeline=pipe ) - for job_data in job_datas - ] - if pipeline is None: - pipe.execute() - return jobs + jobs_with_met_dependencies = [ + self._enqueue_job(job, pipeline=pipe, at_front=job.enqueue_at_front) + for job in jobs_with_met_dependencies + ] + if pipeline is None: + pipe.execute() + return jobs_without_dependencies + jobs_with_unmet_dependencies + jobs_with_met_dependencies def run_job(self, job: 'Job') -> Job: """Run the job diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index 13400f73..b4e2842c 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -103,6 +103,26 @@ class TestDependencies(RQTestCase): self.assertEqual(q.job_ids, ["fake_job_id_2", "fake_job_id_1"]) + def test_multiple_jobs_with_dependencies(self): + """Enqueue dependent jobs only when appropriate""" + q = Queue(connection=self.testconn) + w = SimpleWorker([q], connection=q.connection) + + # Multiple jobs are enqueued with correct status + parent_job = q.enqueue(say_hello) + job_no_deps = Queue.prepare_data(say_hello) + job_with_deps = Queue.prepare_data(say_hello, depends_on=parent_job) + jobs = q.enqueue_many([job_no_deps, job_with_deps]) + self.assertEqual(jobs[0].get_status(), JobStatus.QUEUED) + self.assertEqual(jobs[1].get_status(), JobStatus.DEFERRED) + w.work(burst=True, max_jobs=1) + self.assertEqual(jobs[1].get_status(), JobStatus.QUEUED) + + job_with_met_deps = Queue.prepare_data(say_hello, depends_on=parent_job) + jobs = q.enqueue_many([job_with_met_deps]) + self.assertEqual(jobs[0].get_status(), JobStatus.QUEUED) + q.empty() + def test_dependency_list_in_depends_on(self): """Enqueue with Dependency list in depends_on""" q = Queue(connection=self.testconn)