From 5a3a8ce68191f5208be7a2efce80bab951de70c3 Mon Sep 17 00:00:00 2001 From: Neil Goossen Date: Mon, 23 Oct 2023 10:33:58 -0700 Subject: [PATCH] Responding to PR comments, switching to setting the expiry of the settings instead of checking to respect the pipelining --- rq/worker.py | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 36d2470f..784a2c07 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -1076,44 +1076,50 @@ class BaseWorker: def increment_failed_job_count(self, pipeline: Optional['Pipeline'] = None): """Used to keep the worker stats up to date in Redis. - Increments the failed job count. Only if the worker still exists. - For long running tasks the worker might have been removed already, - and the work horse is still running. + Increments the failed job count. + + Setting to expire in 1 minute more than the worker TTL to ensure that + this gets cleaned up in the event that this is called after the worker has + registered its death. Args: pipeline (Optional[Pipeline], optional): A Redis Pipeline. Defaults to None. """ connection = pipeline if pipeline is not None else self.connection - if connection.exists(self.key): - connection.hincrby(self.key, 'failed_job_count', 1) + connection.hincrby(self.key, 'failed_job_count', 1) + connection.expire(self.key, self.worker_ttl + 60) + def increment_successful_job_count(self, pipeline: Optional['Pipeline'] = None): """Used to keep the worker stats up to date in Redis. - Increments the successful job count. Only if the worker still exists. - For long running tasks the worker might have been removed already, - and the work horse is still running. + Increments the successful job count. + + Setting to expire in 1 minute more than the worker TTL to ensure that + this gets cleaned up in the event that this is called after the worker has + registered its death. Args: pipeline (Optional[Pipeline], optional): A Redis Pipeline. Defaults to None. """ connection = pipeline if pipeline is not None else self.connection - if connection.exists(self.key): - connection.hincrby(self.key, 'successful_job_count', 1) + connection.hincrby(self.key, 'successful_job_count', 1) + connection.expire(self.key, self.worker_ttl + 60) def increment_total_working_time(self, job_execution_time: timedelta, pipeline: 'Pipeline'): """Used to keep the worker stats up to date in Redis. Increments the time the worker has been workig for (in seconds). - Only if the worker still exists. - For long running tasks the worker might have been removed already, - and the work horse is still running. + Setting to expire in 1 minute more than the worker TTL to ensure that + this gets cleaned up in the event that this is called after the worker has + registered its death. Args: job_execution_time (timedelta): A timedelta object. pipeline (Optional[Pipeline], optional): A Redis Pipeline. Defaults to None. """ - if pipeline.exists(self.key): - pipeline.hincrbyfloat(self.key, 'total_working_time', job_execution_time.total_seconds()) + pipeline.hincrbyfloat(self.key, 'total_working_time', + job_execution_time.total_seconds()) + pipeline.expire(self.key, self.worker_ttl + 60) def handle_exception(self, job: 'Job', *exc_info): """Walks the exception handler stack to delegate exception handling.