From 11c8631921cd9738b94c17937315ec9dba0041b7 Mon Sep 17 00:00:00 2001 From: Adda Satya Ram <34022496+Asrst@users.noreply.github.com> Date: Thu, 14 Jan 2021 13:01:41 +0530 Subject: [PATCH] Add exception to catch redis connection failure to retry after wait time (#1387) * add exception catch for redis connection failure * Add test for connection recovery * add exponential backoff * limit worker max connection wait time to 60 seconds * fix undefined class variable * fix string formatting issue while printing error log * cap max connection wait time:better code style Co-authored-by: corynezin --- rq/worker.py | 28 ++++++++++++++++++++-------- tests/test_worker.py | 14 ++++++++++++++ 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 8aa07bdd..c5762fd9 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -22,7 +22,7 @@ try: except ImportError: from signal import SIGTERM as SIGKILL -from redis import WatchError +import redis.exceptions from . import worker_registration from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command @@ -106,6 +106,10 @@ class Worker(object): log_result_lifespan = True # `log_job_description` is used to toggle logging an entire jobs description. log_job_description = True + # factor to increase connection_wait_time incase of continous connection failures. + exponential_backoff_factor = 2.0 + # Max Wait time (in seconds) after which exponential_backoff_factor wont be applicable. + max_connection_wait_time = 60.0 @classmethod def all(cls, connection=None, job_class=None, queue_class=None, queue=None, serializer=None): @@ -469,7 +473,6 @@ class Worker(object): def check_for_suspension(self, burst): """Check to see if workers have been suspended by `rq suspend`""" - before_state = None notified = False @@ -628,14 +631,15 @@ class Worker(object): self.set_state(WorkerStatus.IDLE) self.procline('Listening on ' + qnames) self.log.debug('*** Listening on %s...', green(qnames)) - + connection_wait_time = 1.0 while True: - self.heartbeat() - - if self.should_run_maintenance_tasks: - self.run_maintenance_tasks() try: + self.heartbeat() + + if self.should_run_maintenance_tasks: + self.run_maintenance_tasks() + result = self.queue_class.dequeue_any(self.queues, timeout, connection=self.connection, job_class=self.job_class, @@ -654,6 +658,14 @@ class Worker(object): break except DequeueTimeout: pass + except redis.exceptions.ConnectionError as conn_err: + self.log.error('Could not connect to Redis instance: %s Retrying in %d seconds...', + conn_err, connection_wait_time) + time.sleep(connection_wait_time) + connection_wait_time *= self.exponential_backoff_factor + connection_wait_time = min(connection_wait_time, self.max_connection_wait_time) + else: + connection_wait_time = 1.0 self.heartbeat() return result @@ -955,7 +967,7 @@ class Worker(object): pipeline.execute() break - except WatchError: + except redis.exceptions.WatchError: continue def perform_job(self, job, queue, heartbeat_ttl=None): diff --git a/tests/test_worker.py b/tests/test_worker.py index f30cc1af..1cf2be17 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -18,6 +18,7 @@ from time import sleep from unittest import skipIf +import redis.exceptions import pytest import mock from mock import Mock @@ -283,6 +284,19 @@ class TestWorker(RQTestCase): self.testconn.hdel(w.key, 'birth') w.refresh() + @slow + def test_heartbeat_survives_lost_connection(self): + with mock.patch.object(Worker, 'heartbeat') as mocked: + # None -> Heartbeat is first called before the job loop + mocked.side_effect = [None, redis.exceptions.ConnectionError()] + q = Queue() + w = Worker([q]) + w.work(burst=True) + # First call is prior to job loop, second raises the error, + # third is successful, after "recovery" + assert mocked.call_count == 3 + + @slow def test_heartbeat_busy(self): """Periodic heartbeats while horse is busy with long jobs"""