From 6cfb5a60125ba906604ffc0526a1026b5dc85443 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Tue, 13 Jun 2023 08:34:25 +0700 Subject: [PATCH] Don't call handle_job_failure if job is no longer present (#1946) --- rq/maintenance.py | 3 +- rq/worker.py | 112 +++++++++++++++++++++++----------------------- 2 files changed, 58 insertions(+), 57 deletions(-) diff --git a/rq/maintenance.py b/rq/maintenance.py index b078a237..5c9d7cd2 100644 --- a/rq/maintenance.py +++ b/rq/maintenance.py @@ -21,5 +21,6 @@ def clean_intermediate_queue(worker: 'BaseWorker', queue: Queue) -> None: for job_id in job_ids: if job_id not in queue.started_job_registry: job = queue.fetch_job(job_id) - worker.handle_job_failure(job, queue, exc_string='Job was stuck in the intermediate queue.') + if job: + worker.handle_job_failure(job, queue, exc_string='Job was stuck in intermediate queue.') queue.connection.lrem(queue.intermediate_queue_key, 1, job_id) diff --git a/rq/worker.py b/rq/worker.py index 7b16cc00..d5b921ad 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -456,6 +456,62 @@ class BaseWorker: self.teardown() return bool(completed_jobs) + def handle_job_failure(self, job: 'Job', queue: 'Queue', started_job_registry=None, exc_string=''): + """ + Handles the failure or an executing job by: + 1. Setting the job status to failed + 2. Removing the job from StartedJobRegistry + 3. Setting the workers current job to None + 4. Add the job to FailedJobRegistry + `save_exc_to_job` should only be used for testing purposes + """ + self.log.debug('Handling failed execution of job %s', job.id) + with self.connection.pipeline() as pipeline: + if started_job_registry is None: + started_job_registry = StartedJobRegistry( + job.origin, self.connection, job_class=self.job_class, serializer=self.serializer + ) + + # check whether a job was stopped intentionally and set the job + # status appropriately if it was this job. + job_is_stopped = self._stopped_job_id == job.id + retry = job.retries_left and job.retries_left > 0 and not job_is_stopped + + if job_is_stopped: + job.set_status(JobStatus.STOPPED, pipeline=pipeline) + self._stopped_job_id = None + else: + # Requeue/reschedule if retry is configured, otherwise + if not retry: + job.set_status(JobStatus.FAILED, pipeline=pipeline) + + started_job_registry.remove(job, pipeline=pipeline) + + if not self.disable_default_exception_handler and not retry: + job._handle_failure(exc_string, pipeline=pipeline) + with suppress(redis.exceptions.ConnectionError): + pipeline.execute() + + self.set_current_job_id(None, pipeline=pipeline) + self.increment_failed_job_count(pipeline) + if job.started_at and job.ended_at: + self.increment_total_working_time(job.ended_at - job.started_at, pipeline) + + if retry: + job.retry(queue, pipeline) + enqueue_dependents = False + else: + enqueue_dependents = True + + try: + pipeline.execute() + if enqueue_dependents: + queue.enqueue_dependents(job) + except Exception: + # Ensure that custom exception handlers are called + # even if Redis is down + pass + def _start_scheduler( self, burst: bool = False, @@ -1294,62 +1350,6 @@ class Worker(BaseWorker): msg = 'Processing {0} from {1} since {2}' self.procline(msg.format(job.func_name, job.origin, time.time())) - def handle_job_failure(self, job: 'Job', queue: 'Queue', started_job_registry=None, exc_string=''): - """ - Handles the failure or an executing job by: - 1. Setting the job status to failed - 2. Removing the job from StartedJobRegistry - 3. Setting the workers current job to None - 4. Add the job to FailedJobRegistry - `save_exc_to_job` should only be used for testing purposes - """ - self.log.debug('Handling failed execution of job %s', job.id) - with self.connection.pipeline() as pipeline: - if started_job_registry is None: - started_job_registry = StartedJobRegistry( - job.origin, self.connection, job_class=self.job_class, serializer=self.serializer - ) - - # check whether a job was stopped intentionally and set the job - # status appropriately if it was this job. - job_is_stopped = self._stopped_job_id == job.id - retry = job.retries_left and job.retries_left > 0 and not job_is_stopped - - if job_is_stopped: - job.set_status(JobStatus.STOPPED, pipeline=pipeline) - self._stopped_job_id = None - else: - # Requeue/reschedule if retry is configured, otherwise - if not retry: - job.set_status(JobStatus.FAILED, pipeline=pipeline) - - started_job_registry.remove(job, pipeline=pipeline) - - if not self.disable_default_exception_handler and not retry: - job._handle_failure(exc_string, pipeline=pipeline) - with suppress(redis.exceptions.ConnectionError): - pipeline.execute() - - self.set_current_job_id(None, pipeline=pipeline) - self.increment_failed_job_count(pipeline) - if job.started_at and job.ended_at: - self.increment_total_working_time(job.ended_at - job.started_at, pipeline) - - if retry: - job.retry(queue, pipeline) - enqueue_dependents = False - else: - enqueue_dependents = True - - try: - pipeline.execute() - if enqueue_dependents: - queue.enqueue_dependents(job) - except Exception: - # Ensure that custom exception handlers are called - # even if Redis is down - pass - def handle_job_success(self, job: 'Job', queue: 'Queue', started_job_registry: StartedJobRegistry): """Handles the successful execution of certain job. It will remove the job from the `StartedJobRegistry`, adding it to the `SuccessfulJobRegistry`,