Don't call handle_job_failure if job is no longer present (#1946)

This commit is contained in:
Selwin Ong 2023-06-13 08:34:25 +07:00 committed by GitHub
parent 10230ff040
commit 6cfb5a6012
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 58 additions and 57 deletions

View File

@ -21,5 +21,6 @@ def clean_intermediate_queue(worker: 'BaseWorker', queue: Queue) -> None:
for job_id in job_ids: for job_id in job_ids:
if job_id not in queue.started_job_registry: if job_id not in queue.started_job_registry:
job = queue.fetch_job(job_id) job = queue.fetch_job(job_id)
worker.handle_job_failure(job, queue, exc_string='Job was stuck in the intermediate queue.') if job:
worker.handle_job_failure(job, queue, exc_string='Job was stuck in intermediate queue.')
queue.connection.lrem(queue.intermediate_queue_key, 1, job_id) queue.connection.lrem(queue.intermediate_queue_key, 1, job_id)

View File

@ -456,6 +456,62 @@ class BaseWorker:
self.teardown() self.teardown()
return bool(completed_jobs) return bool(completed_jobs)
def handle_job_failure(self, job: 'Job', queue: 'Queue', started_job_registry=None, exc_string=''):
"""
Handles the failure or an executing job by:
1. Setting the job status to failed
2. Removing the job from StartedJobRegistry
3. Setting the workers current job to None
4. Add the job to FailedJobRegistry
`save_exc_to_job` should only be used for testing purposes
"""
self.log.debug('Handling failed execution of job %s', job.id)
with self.connection.pipeline() as pipeline:
if started_job_registry is None:
started_job_registry = StartedJobRegistry(
job.origin, self.connection, job_class=self.job_class, serializer=self.serializer
)
# check whether a job was stopped intentionally and set the job
# status appropriately if it was this job.
job_is_stopped = self._stopped_job_id == job.id
retry = job.retries_left and job.retries_left > 0 and not job_is_stopped
if job_is_stopped:
job.set_status(JobStatus.STOPPED, pipeline=pipeline)
self._stopped_job_id = None
else:
# Requeue/reschedule if retry is configured, otherwise
if not retry:
job.set_status(JobStatus.FAILED, pipeline=pipeline)
started_job_registry.remove(job, pipeline=pipeline)
if not self.disable_default_exception_handler and not retry:
job._handle_failure(exc_string, pipeline=pipeline)
with suppress(redis.exceptions.ConnectionError):
pipeline.execute()
self.set_current_job_id(None, pipeline=pipeline)
self.increment_failed_job_count(pipeline)
if job.started_at and job.ended_at:
self.increment_total_working_time(job.ended_at - job.started_at, pipeline)
if retry:
job.retry(queue, pipeline)
enqueue_dependents = False
else:
enqueue_dependents = True
try:
pipeline.execute()
if enqueue_dependents:
queue.enqueue_dependents(job)
except Exception:
# Ensure that custom exception handlers are called
# even if Redis is down
pass
def _start_scheduler( def _start_scheduler(
self, self,
burst: bool = False, burst: bool = False,
@ -1294,62 +1350,6 @@ class Worker(BaseWorker):
msg = 'Processing {0} from {1} since {2}' msg = 'Processing {0} from {1} since {2}'
self.procline(msg.format(job.func_name, job.origin, time.time())) self.procline(msg.format(job.func_name, job.origin, time.time()))
def handle_job_failure(self, job: 'Job', queue: 'Queue', started_job_registry=None, exc_string=''):
"""
Handles the failure or an executing job by:
1. Setting the job status to failed
2. Removing the job from StartedJobRegistry
3. Setting the workers current job to None
4. Add the job to FailedJobRegistry
`save_exc_to_job` should only be used for testing purposes
"""
self.log.debug('Handling failed execution of job %s', job.id)
with self.connection.pipeline() as pipeline:
if started_job_registry is None:
started_job_registry = StartedJobRegistry(
job.origin, self.connection, job_class=self.job_class, serializer=self.serializer
)
# check whether a job was stopped intentionally and set the job
# status appropriately if it was this job.
job_is_stopped = self._stopped_job_id == job.id
retry = job.retries_left and job.retries_left > 0 and not job_is_stopped
if job_is_stopped:
job.set_status(JobStatus.STOPPED, pipeline=pipeline)
self._stopped_job_id = None
else:
# Requeue/reschedule if retry is configured, otherwise
if not retry:
job.set_status(JobStatus.FAILED, pipeline=pipeline)
started_job_registry.remove(job, pipeline=pipeline)
if not self.disable_default_exception_handler and not retry:
job._handle_failure(exc_string, pipeline=pipeline)
with suppress(redis.exceptions.ConnectionError):
pipeline.execute()
self.set_current_job_id(None, pipeline=pipeline)
self.increment_failed_job_count(pipeline)
if job.started_at and job.ended_at:
self.increment_total_working_time(job.ended_at - job.started_at, pipeline)
if retry:
job.retry(queue, pipeline)
enqueue_dependents = False
else:
enqueue_dependents = True
try:
pipeline.execute()
if enqueue_dependents:
queue.enqueue_dependents(job)
except Exception:
# Ensure that custom exception handlers are called
# even if Redis is down
pass
def handle_job_success(self, job: 'Job', queue: 'Queue', started_job_registry: StartedJobRegistry): def handle_job_success(self, job: 'Job', queue: 'Queue', started_job_registry: StartedJobRegistry):
"""Handles the successful execution of certain job. """Handles the successful execution of certain job.
It will remove the job from the `StartedJobRegistry`, adding it to the `SuccessfulJobRegistry`, It will remove the job from the `StartedJobRegistry`, adding it to the `SuccessfulJobRegistry`,