From 7be119068f31db165f8f3fea48d34a00ab587c02 Mon Sep 17 00:00:00 2001 From: th3hamm0r Date: Fri, 28 Jan 2022 05:08:45 +0100 Subject: [PATCH] Fixed job not getting enqueued due to parallel modification (#1615) The handling of WatchErrors in setup_dependencies() did not reset the local status of the job, so if, due to parallel processing, all dependencies get finished while enqueuing the job (causing WatchError to be raised), the job returned to enqueue_call() remained in DEFERRED state, which resulted in no execution at all. --- rq/queue.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/rq/queue.py b/rq/queue.py index 79f0abfb..18ce35c3 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -343,6 +343,7 @@ class Queue: # something else has modified either the set of dependencies or the # status of one of them. In this case, we simply retry. if len(job._dependency_ids) > 0: + orig_status = job.get_status(refresh=False) pipe = pipeline if pipeline is not None else self.connection.pipeline() while True: try: @@ -360,6 +361,8 @@ class Queue: for dependency in dependencies: if dependency.get_status(refresh=False) != JobStatus.FINISHED: + # NOTE: If the following code changes local variables, those values probably have + # to be set back to their original values in the handling of WatchError below! job.set_status(JobStatus.DEFERRED, pipeline=pipe) job.register_dependency(pipeline=pipe) job.save(pipeline=pipe) @@ -370,6 +373,11 @@ class Queue: break except WatchError: if pipeline is None: + # The call to job.set_status(JobStatus.DEFERRED, pipeline=pipe) above has changed the + # internal "_status". We have to reset it to its original value (probably QUEUED), so + # if during the next run no unfinished dependencies exist anymore, the job gets + # enqueued correctly by enqueue_call(). + job._status = orig_status continue else: # if pipeline comes from caller, re-raise to them