From c2e6d953383bfe678c3ad13e4359e27f5feaba41 Mon Sep 17 00:00:00 2001 From: lowercase00 <21188280+lowercase00@users.noreply.github.com> Date: Thu, 2 Feb 2023 00:53:19 -0300 Subject: [PATCH] Enhanced Redis Connection Reliability (#1753) * Enhanced Redis Connection Reliability The Redis connection may fail for several reasons. As the connection can be (1) explicitly passed to the worker or (2) implicity set, this will improve the Connection configuration by setting a timeout to the socket, and adding an ExponentialBackoff Retry logic. * Simpler Connection logic * Add simple retry logic to Redis Connection Error * Make retry exponential, add keepalive & socket_connect_timeout * Handles configuration on Redis' connection pool * Simplifies timeout exception logic * Fix burst bug, add test * Add docs related to `socket_timeout`, improve compatibility with older RedisPy versions * Fixes * New timeout private method * Fix timeout --- docs/docs/connections.md | 20 ++++++++++++++++++++ rq/worker.py | 36 +++++++++++++++++++++++++++++++----- tests/test_worker.py | 7 +++++++ 3 files changed, 58 insertions(+), 5 deletions(-) diff --git a/docs/docs/connections.md b/docs/docs/connections.md index 7e87b3a3..8e563d9e 100644 --- a/docs/docs/connections.md +++ b/docs/docs/connections.md @@ -142,3 +142,23 @@ SENTINEL: {'INSTANCES':[('remote.host1.org', 26379), ('remote.host2.org', 26379) 'DB': 2, 'MASTER_NAME': 'master'} ``` + + +### Timeout + +To avoid potential issues with hanging Redis commands, specifically the blocking `BLPOP` command, +RQ automatically sets a `socket_timeout` value that is 10 seconds higher than the `default_worker_ttl`. + +If you prefer to manually set the `socket_timeout` value, +make sure that the value being set is higher than the `default_worker_ttl` (which is 420 by default). + +```python +from redis import Redis +from rq import Queue + +conn = Redis('localhost', 6379, socket_timeout=500) +q = Queue(connection=conn) +``` + +Setting a `socket_timeout` with a lower value than the `default_worker_ttl` will cause a `TimeoutError` +since it will interrupt the worker while it gets new jobs from the queue. diff --git a/rq/worker.py b/rq/worker.py index 49c53fde..35355c79 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -197,10 +197,9 @@ class Worker: job_monitoring_interval=DEFAULT_JOB_MONITORING_INTERVAL, disable_default_exception_handler: bool = False, prepare_for_work: bool = True, serializer=None): # noqa - if connection is None: - connection = get_current_connection() - self.connection = connection + connection = self._set_connection(connection, default_worker_ttl) + self.connection = connection self.redis_server_version = None self.job_class = backend_class(self, 'job_class', override=job_class) @@ -281,6 +280,24 @@ class Worker: elif exception_handlers is not None: self.push_exc_handler(exception_handlers) + def _set_connection(self, connection: Optional['Redis'], default_worker_ttl: int) -> 'Redis': + """Configures the Redis connection to have a socket timeout. + This should timouet the connection in case any specific command hangs at any given time (eg. BLPOP). + If the connection provided already has a `socket_timeout` defined, skips. + + Args: + connection (Optional[Redis]): The Redis Connection. + default_worker_ttl (int): The Default Worker TTL + """ + if connection is None: + connection = get_current_connection() + current_socket_timeout = connection.connection_pool.connection_kwargs.get("socket_timeout") + if current_socket_timeout is None: + timeout = self._get_timeout(default_worker_ttl) + 10 + timeout_config = {"socket_timeout": timeout} + connection.connection_pool.connection_kwargs.update(timeout_config) + return connection + def get_redis_server_version(self): """Return Redis server version of connection""" if not self.redis_server_version: @@ -328,6 +345,12 @@ class Worker: """Returns whether or not this is the worker or the work horse.""" return self._is_horse + def _get_timeout(self, worker_ttl: Optional[int] = None) -> int: + timeout = DEFAULT_WORKER_TTL + if worker_ttl: + timeout = worker_ttl + return max(1, timeout - 15) + def procline(self, message): """Changes the current procname for the process. @@ -626,7 +649,6 @@ class Worker: self.scheduler.start() self._install_signal_handlers() - try: while True: try: @@ -639,7 +661,7 @@ class Worker: self.log.info('Worker %s: stopping on request', self.key) break - timeout = None if burst else max(1, self.default_worker_ttl - 15) + timeout = None if burst else self._get_timeout() result = self.dequeue_job_and_maintain_ttl(timeout) if result is None: if burst: @@ -660,6 +682,10 @@ class Worker: ) break + except redis.exceptions.TimeoutError: + self.log.error(f"Worker {self.key}: Redis connection timeout, quitting...") + break + except StopRequested: break diff --git a/tests/test_worker.py b/tests/test_worker.py index 1977c93e..e001a95b 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1049,6 +1049,13 @@ class TestWorker(RQTestCase): w.dequeue_job_and_maintain_ttl(10) self.assertNotIn("Frank", mock_logger_info.call_args[0][2]) + def test_worker_configures_socket_timeout(self): + """Ensures that the worker correctly updates Redis client connection to have a socket_timeout""" + q = Queue() + _ = Worker([q]) + connection_kwargs = q.connection.connection_pool.connection_kwargs + self.assertEqual(connection_kwargs["socket_timeout"], 415) + def test_worker_version(self): q = Queue() w = Worker([q])