mirror of https://github.com/celery/kombu.git
Add separate transport option for retry loop timeout (#1599)
* Add separate transport option for retry loop timeout This only applies when using `Connect.default_channel`. Before this change, the retry loop timeout was set equal to TCP connect timeout (`connect_timeout`), meaning when first connection attempt timeouted, no retry would be attempted. Now if a new transport option `connect_total_timeout` is provided, this overrides `connect_timeout` for the retry loop (but not for TCP connect). * Add tests * Fix isort * Rename to connect_retry_timeout * Reformat * connect_retry_timeout -> connect_retries_timeout * Fix flake8
This commit is contained in:
parent
de560081d3
commit
3a49533bc2
|
@ -426,7 +426,7 @@ class Connection:
|
|||
callback (Callable): Optional callback that is called for every
|
||||
internal iteration (1 s).
|
||||
timeout (int): Maximum amount of time in seconds to spend
|
||||
waiting for connection
|
||||
attempting to connect, total over all retries.
|
||||
"""
|
||||
if self.connected:
|
||||
return self._connection
|
||||
|
@ -867,6 +867,9 @@ class Connection:
|
|||
conn_opts['interval_step'] = transport_opts['interval_step']
|
||||
if 'interval_max' in transport_opts:
|
||||
conn_opts['interval_max'] = transport_opts['interval_max']
|
||||
if 'connect_retries_timeout' in transport_opts:
|
||||
conn_opts['timeout'] = \
|
||||
transport_opts['connect_retries_timeout']
|
||||
return conn_opts
|
||||
|
||||
@property
|
||||
|
|
13
t/mocks.py
13
t/mocks.py
|
@ -1,5 +1,6 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from itertools import count
|
||||
from typing import TYPE_CHECKING
|
||||
from unittest.mock import Mock
|
||||
|
@ -202,3 +203,15 @@ class Transport(base.Transport):
|
|||
|
||||
def close_connection(self, connection):
|
||||
connection.connected = False
|
||||
|
||||
|
||||
class TimeoutingTransport(Transport):
|
||||
recoverable_connection_errors = (TimeoutError,)
|
||||
|
||||
def __init__(self, connect_timeout=1, **kwargs):
|
||||
self.connect_timeout = connect_timeout
|
||||
super().__init__(**kwargs)
|
||||
|
||||
def establish_connection(self):
|
||||
time.sleep(self.connect_timeout)
|
||||
raise TimeoutError('timed out')
|
||||
|
|
|
@ -11,7 +11,7 @@ from kombu import Connection, Consumer, Producer, parse_url
|
|||
from kombu.connection import Resource
|
||||
from kombu.exceptions import OperationalError
|
||||
from kombu.utils.functional import lazy
|
||||
from t.mocks import Transport
|
||||
from t.mocks import TimeoutingTransport, Transport
|
||||
|
||||
|
||||
class test_connection_utils:
|
||||
|
@ -698,6 +698,37 @@ class test_Connection:
|
|||
with pytest.raises(OperationalError):
|
||||
conn.default_channel
|
||||
|
||||
def test_connection_failover_without_total_timeout(self):
|
||||
with Connection(
|
||||
['server1', 'server2'],
|
||||
transport=TimeoutingTransport,
|
||||
connect_timeout=1,
|
||||
transport_options={'interval_start': 0, 'interval_step': 0},
|
||||
) as conn:
|
||||
conn._establish_connection = Mock(
|
||||
side_effect=conn._establish_connection
|
||||
)
|
||||
with pytest.raises(OperationalError):
|
||||
conn.default_channel
|
||||
# Never retried, because `retry_over_time` `timeout` is equal
|
||||
# to `connect_timeout`
|
||||
conn._establish_connection.assert_called_once()
|
||||
|
||||
def test_connection_failover_with_total_timeout(self):
|
||||
with Connection(
|
||||
['server1', 'server2'],
|
||||
transport=TimeoutingTransport,
|
||||
connect_timeout=1,
|
||||
transport_options={'connect_retries_timeout': 2,
|
||||
'interval_start': 0, 'interval_step': 0},
|
||||
) as conn:
|
||||
conn._establish_connection = Mock(
|
||||
side_effect=conn._establish_connection
|
||||
)
|
||||
with pytest.raises(OperationalError):
|
||||
conn.default_channel
|
||||
assert conn._establish_connection.call_count == 2
|
||||
|
||||
|
||||
class test_Connection_with_transport_options:
|
||||
|
||||
|
|
Loading…
Reference in New Issue