diff --git a/docs/docs/index.md b/docs/docs/index.md index 07488464..fb2a68be 100644 --- a/docs/docs/index.md +++ b/docs/docs/index.md @@ -167,6 +167,7 @@ The `Dependency(jobs=...)` parameter accepts: - a string representing a single job id - a Job object - an iteratable of job id strings and/or Job objects +- `enqueue_at_front` boolean parameter to put dependents at the front when they are enqueued Example: @@ -177,9 +178,17 @@ from rq import Queue queue = Queue(connection=Redis()) job_1 = queue.enqueue(div_by_zero) -dependency = Dependency(jobs=[job_1], allow_failure=True) # allow_failure defaults to False +dependency = Dependency( + jobs=[job_1], + allow_failure=True, # allow_failure defaults to False + enqueue_at_front=True # enqueue_at_front defaults to False +) job_2 = queue.enqueue(say_hello, depends_on=dependency) -# job_2 will execute even though its dependency (job_1) fails + +""" + job_2 will execute even though its dependency (job_1) fails, + and it will be enqueued at the front of the queue. +""" ``` @@ -269,10 +278,10 @@ There are two options: #### Arguments: -| | plain text | json | [literal-eval](https://docs.python.org/3/library/ast.html#ast.literal_eval) | -|-|-|-|-| -| keyword | `[key]=[value]` | `[key]:=[value]` | `[key]%=[value]` | -| no keyword | `[value]` | `:[value]` | `%[value]` | +| | plain text | json | [literal-eval](https://docs.python.org/3/library/ast.html#ast.literal_eval) | +| ---------- | --------------- | ---------------- | --------------------------------------------------------------------------- | +| keyword | `[key]=[value]` | `[key]:=[value]` | `[key]%=[value]` | +| no keyword | `[value]` | `:[value]` | `%[value]` | Where `[key]` is the keyword and `[value]` is the value which is parsed with the corresponding parsing method. diff --git a/rq/job.py b/rq/job.py index 657eb522..0436cf4f 100644 --- a/rq/job.py +++ b/rq/job.py @@ -39,7 +39,7 @@ class JobStatus(str, Enum): class Dependency: - def __init__(self, jobs, allow_failure: bool = False): + def __init__(self, jobs, allow_failure: bool = False, enqueue_at_front: bool = False): jobs = ensure_list(jobs) if not all( isinstance(job, Job) or isinstance(job, str) @@ -52,6 +52,7 @@ class Dependency: self.dependencies = jobs self.allow_failure = allow_failure + self.enqueue_at_front = enqueue_at_front # Sentinel value to mark that some of our lazily evaluated properties have not @@ -151,6 +152,7 @@ class Job: # dependency could be job instance or id, or iterable thereof if depends_on is not None: if isinstance(depends_on, Dependency): + job.enqueue_at_front = depends_on.enqueue_at_front job.allow_dependency_failures = depends_on.allow_failure depends_on_list = depends_on.dependencies else: @@ -429,6 +431,7 @@ class Job: self.redis_server_version = None self.last_heartbeat = None self.allow_dependency_failures = None + self.enqueue_at_front = None def __repr__(self): # noqa # pragma: no cover return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__, @@ -586,6 +589,7 @@ class Job: self._dependency_ids = (json.loads(dep_ids.decode()) if dep_ids else [dep_id.decode()] if dep_id else []) self.allow_dependency_failures = bool(int(obj.get('allow_dependency_failures'))) if obj.get('allow_dependency_failures') else None + self.enqueue_at_front = bool(int(obj['enqueue_at_front'])) if 'enqueue_at_front' in obj else None self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None self.meta = self.serializer.loads(obj.get('meta')) if obj.get('meta') else {} @@ -669,6 +673,9 @@ class Job: # convert boolean to integer to avoid redis.exception.DataError obj["allow_dependency_failures"] = int(self.allow_dependency_failures) + if self.enqueue_at_front is not None: + obj["enqueue_at_front"] = int(self.enqueue_at_front) + return obj def save(self, pipeline=None, include_meta=True): diff --git a/rq/queue.py b/rq/queue.py index 9b7dcbd9..97b74fb9 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -325,9 +325,6 @@ class Queue: job.retries_left = retry.max job.retry_intervals = retry.intervals - if isinstance(depends_on, Dependency): - job.allow_dependency_failures = depends_on.allow_failure - return job def setup_dependencies( @@ -648,16 +645,19 @@ class Queue: break for dependent in jobs_to_enqueue: + enqueue_at_front = dependent.enqueue_at_front or False + registry = DeferredJobRegistry(dependent.origin, self.connection, job_class=self.job_class, serializer=self.serializer) registry.remove(dependent, pipeline=pipe) + if dependent.origin == self.name: - self.enqueue_job(dependent, pipeline=pipe) + self.enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front) else: queue = self.__class__(name=dependent.origin, connection=self.connection) - queue.enqueue_job(dependent, pipeline=pipe) + queue.enqueue_job(dependent, pipeline=pipe, at_front=enqueue_at_front) # Only delete dependents_key if all dependents have been enqueued if len(jobs_to_enqueue) == len(dependent_job_ids): diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index 12b956d4..d379ed9f 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -98,6 +98,25 @@ class TestDependencies(RQTestCase): job = Job.fetch(job.id, connection=self.testconn) self.assertEqual(job.get_status(), JobStatus.FINISHED) + # Test dependant is enqueued at front + q.empty() + parent_job = q.enqueue(say_hello) + q.enqueue( + say_hello, + job_id='fake_job_id_1', + depends_on=Dependency(jobs=[parent_job]) + ) + q.enqueue( + say_hello, + job_id='fake_job_id_2', + depends_on=Dependency(jobs=[parent_job],enqueue_at_front=True) + ) + #q.enqueue(say_hello) # This is a filler job that will act as a separator for jobs, one will be enqueued at front while the other one at the end of the queue + w.work(burst=True, max_jobs=1) + + self.assertEqual(q.job_ids, ["fake_job_id_2", "fake_job_id_1"]) + + def test_dependencies_are_met_if_parent_is_canceled(self): """When parent job is canceled, it should be treated as failed""" queue = Queue(connection=self.testconn)