From 3a49533bc283eb03dae772ca79a6aa4f1c68f083 Mon Sep 17 00:00:00 2001 From: Marti Raudsepp Date: Wed, 12 Oct 2022 16:41:04 +0300 Subject: [PATCH] Add separate transport option for retry loop timeout (#1599) * Add separate transport option for retry loop timeout This only applies when using `Connect.default_channel`. Before this change, the retry loop timeout was set equal to TCP connect timeout (`connect_timeout`), meaning when first connection attempt timeouted, no retry would be attempted. Now if a new transport option `connect_total_timeout` is provided, this overrides `connect_timeout` for the retry loop (but not for TCP connect). * Add tests * Fix isort * Rename to connect_retry_timeout * Reformat * connect_retry_timeout -> connect_retries_timeout * Fix flake8 --- kombu/connection.py | 5 ++++- t/mocks.py | 13 +++++++++++++ t/unit/test_connection.py | 33 ++++++++++++++++++++++++++++++++- 3 files changed, 49 insertions(+), 2 deletions(-) diff --git a/kombu/connection.py b/kombu/connection.py index 793c4d2c..50dbe10d 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -426,7 +426,7 @@ class Connection: callback (Callable): Optional callback that is called for every internal iteration (1 s). timeout (int): Maximum amount of time in seconds to spend - waiting for connection + attempting to connect, total over all retries. """ if self.connected: return self._connection @@ -867,6 +867,9 @@ class Connection: conn_opts['interval_step'] = transport_opts['interval_step'] if 'interval_max' in transport_opts: conn_opts['interval_max'] = transport_opts['interval_max'] + if 'connect_retries_timeout' in transport_opts: + conn_opts['timeout'] = \ + transport_opts['connect_retries_timeout'] return conn_opts @property diff --git a/t/mocks.py b/t/mocks.py index e4cfa3f7..4c99f010 100644 --- a/t/mocks.py +++ b/t/mocks.py @@ -1,5 +1,6 @@ from __future__ import annotations +import time from itertools import count from typing import TYPE_CHECKING from unittest.mock import Mock @@ -202,3 +203,15 @@ class Transport(base.Transport): def close_connection(self, connection): connection.connected = False + + +class TimeoutingTransport(Transport): + recoverable_connection_errors = (TimeoutError,) + + def __init__(self, connect_timeout=1, **kwargs): + self.connect_timeout = connect_timeout + super().__init__(**kwargs) + + def establish_connection(self): + time.sleep(self.connect_timeout) + raise TimeoutError('timed out') diff --git a/t/unit/test_connection.py b/t/unit/test_connection.py index b1413b7e..740bd6dc 100644 --- a/t/unit/test_connection.py +++ b/t/unit/test_connection.py @@ -11,7 +11,7 @@ from kombu import Connection, Consumer, Producer, parse_url from kombu.connection import Resource from kombu.exceptions import OperationalError from kombu.utils.functional import lazy -from t.mocks import Transport +from t.mocks import TimeoutingTransport, Transport class test_connection_utils: @@ -698,6 +698,37 @@ class test_Connection: with pytest.raises(OperationalError): conn.default_channel + def test_connection_failover_without_total_timeout(self): + with Connection( + ['server1', 'server2'], + transport=TimeoutingTransport, + connect_timeout=1, + transport_options={'interval_start': 0, 'interval_step': 0}, + ) as conn: + conn._establish_connection = Mock( + side_effect=conn._establish_connection + ) + with pytest.raises(OperationalError): + conn.default_channel + # Never retried, because `retry_over_time` `timeout` is equal + # to `connect_timeout` + conn._establish_connection.assert_called_once() + + def test_connection_failover_with_total_timeout(self): + with Connection( + ['server1', 'server2'], + transport=TimeoutingTransport, + connect_timeout=1, + transport_options={'connect_retries_timeout': 2, + 'interval_start': 0, 'interval_step': 0}, + ) as conn: + conn._establish_connection = Mock( + side_effect=conn._establish_connection + ) + with pytest.raises(OperationalError): + conn.default_channel + assert conn._establish_connection.call_count == 2 + class test_Connection_with_transport_options: