Responding to PR comments, switching to setting the expiry of the settings instead of checking to respect the pipelining

This commit is contained in:
Neil Goossen 2023-10-23 10:33:58 -07:00
parent cdcd6784de
commit 5a3a8ce681
1 changed files with 21 additions and 15 deletions

View File

@ -1076,44 +1076,50 @@ class BaseWorker:
def increment_failed_job_count(self, pipeline: Optional['Pipeline'] = None):
"""Used to keep the worker stats up to date in Redis.
Increments the failed job count. Only if the worker still exists.
For long running tasks the worker might have been removed already,
and the work horse is still running.
Increments the failed job count.
Setting to expire in 1 minute more than the worker TTL to ensure that
this gets cleaned up in the event that this is called after the worker has
registered its death.
Args:
pipeline (Optional[Pipeline], optional): A Redis Pipeline. Defaults to None.
"""
connection = pipeline if pipeline is not None else self.connection
if connection.exists(self.key):
connection.hincrby(self.key, 'failed_job_count', 1)
connection.hincrby(self.key, 'failed_job_count', 1)
connection.expire(self.key, self.worker_ttl + 60)
def increment_successful_job_count(self, pipeline: Optional['Pipeline'] = None):
"""Used to keep the worker stats up to date in Redis.
Increments the successful job count. Only if the worker still exists.
For long running tasks the worker might have been removed already,
and the work horse is still running.
Increments the successful job count.
Setting to expire in 1 minute more than the worker TTL to ensure that
this gets cleaned up in the event that this is called after the worker has
registered its death.
Args:
pipeline (Optional[Pipeline], optional): A Redis Pipeline. Defaults to None.
"""
connection = pipeline if pipeline is not None else self.connection
if connection.exists(self.key):
connection.hincrby(self.key, 'successful_job_count', 1)
connection.hincrby(self.key, 'successful_job_count', 1)
connection.expire(self.key, self.worker_ttl + 60)
def increment_total_working_time(self, job_execution_time: timedelta, pipeline: 'Pipeline'):
"""Used to keep the worker stats up to date in Redis.
Increments the time the worker has been workig for (in seconds).
Only if the worker still exists.
For long running tasks the worker might have been removed already,
and the work horse is still running.
Setting to expire in 1 minute more than the worker TTL to ensure that
this gets cleaned up in the event that this is called after the worker has
registered its death.
Args:
job_execution_time (timedelta): A timedelta object.
pipeline (Optional[Pipeline], optional): A Redis Pipeline. Defaults to None.
"""
if pipeline.exists(self.key):
pipeline.hincrbyfloat(self.key, 'total_working_time', job_execution_time.total_seconds())
pipeline.hincrbyfloat(self.key, 'total_working_time',
job_execution_time.total_seconds())
pipeline.expire(self.key, self.worker_ttl + 60)
def handle_exception(self, job: 'Job', *exc_info):
"""Walks the exception handler stack to delegate exception handling.