From 75027490c71b83342f92b5293461c679233dc25e Mon Sep 17 00:00:00 2001 From: Matus Valo Date: Wed, 10 Jun 2020 11:14:47 +0200 Subject: [PATCH] Revert incompatible changes in #1193 and additional improvements (#1211) * Revert incompatible changes introduced in #1193 * Improved integration tests covering connection * Fix unittests on python2 + flake8 fixes --- kombu/connection.py | 24 ++++++++++++------- t/integration/common.py | 43 ++++++++++++++++++++++++++++++++-- t/integration/test_py_amqp.py | 5 ++++ t/integration/test_redis.py | 9 ++++++- t/unit/test_connection.py | 20 +++++++++------- t/unit/transport/test_redis.py | 16 ++++++------- 6 files changed, 88 insertions(+), 29 deletions(-) diff --git a/kombu/connection.py b/kombu/connection.py index 5a5af519..d8883586 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -279,8 +279,9 @@ class Connection(object): def connect(self): """Establish connection to server immediately.""" - conn_opts = self._extract_failover_opts() - return self._ensure_connection(**conn_opts) + return self._ensure_connection( + max_retries=1, reraise_as_library_errors=False + ) def channel(self): """Create and return a new channel.""" @@ -420,6 +421,9 @@ class Connection(object): timeout (int): Maximum amount of time in seconds to spend waiting for connection """ + if self.connected: + return self._connection + def on_error(exc, intervals, retries, interval=0): round = self.completes_cycle(retries) if round: @@ -434,13 +438,12 @@ class Connection(object): if not reraise_as_library_errors: ctx = self._dummy_context with ctx(): - self._connection = self._connection or retry_over_time( + return retry_over_time( self._connection_factory, self.recoverable_connection_errors, (), {}, on_error, max_retries, interval_start, interval_step, interval_max, callback, timeout=timeout ) - return self._connection @contextmanager def _reraise_as_library_errors( @@ -860,16 +863,17 @@ class Connection(object): """ if not self._closed: if not self.connected: - conn_opts = self._extract_failover_opts() - self._ensure_connection(**conn_opts) + return self._ensure_connection( + max_retries=1, reraise_as_library_errors=False + ) return self._connection def _connection_factory(self): self.declared_entities.clear() self._default_channel = None - connection = self._establish_connection() + self._connection = self._establish_connection() self._closed = False - return connection + return self._connection @property def default_channel(self): @@ -884,7 +888,9 @@ class Connection(object): require a channel. """ # make sure we're still connected, and if not refresh. - self.connect() + conn_opts = self._extract_failover_opts() + self._ensure_connection(**conn_opts) + if self._default_channel is None: self._default_channel = self.channel() return self._default_channel diff --git a/t/integration/common.py b/t/integration/common.py index 4b3a70fd..19dca984 100644 --- a/t/integration/common.py +++ b/t/integration/common.py @@ -11,13 +11,52 @@ import kombu class BasicFunctionality(object): def test_connect(self, connection): - connection.connect() + assert connection.connect() + assert connection.connection connection.close() + assert connection.connection is None + assert connection.connect() + assert connection.connection + connection.close() + + def test_failed_connect(self, invalid_connection): + # method raises transport exception + with pytest.raises(Exception): + invalid_connection.connect() + + def test_failed_connection(self, invalid_connection): + # method raises transport exception + with pytest.raises(Exception): + invalid_connection.connection + + def test_failed_channel(self, invalid_connection): + # method raises transport exception + with pytest.raises(Exception): + invalid_connection.channel() + + def test_failed_default_channel(self, invalid_connection): + invalid_connection.transport_options = {'max_retries': 1} + # method raises transport exception + with pytest.raises(Exception): + invalid_connection.default_channel def test_default_channel_autoconnect(self, connection): connection.connect() connection.close() - connection.default_channel + assert connection.connection is None + assert connection.default_channel + assert connection.connection + connection.close() + + def test_channel(self, connection): + chan = connection.channel() + assert chan + assert connection.connection + + def test_default_channel(self, connection): + chan = connection.default_channel + assert chan + assert connection.connection def test_publish_consume(self, connection): test_queue = kombu.Queue('test', routing_key='test') diff --git a/t/integration/test_py_amqp.py b/t/integration/test_py_amqp.py index 1a32cce4..93327c8a 100644 --- a/t/integration/test_py_amqp.py +++ b/t/integration/test_py_amqp.py @@ -21,6 +21,11 @@ def get_failover_connection(hostname, port, vhost): ) +@pytest.fixture() +def invalid_connection(): + return kombu.Connection('pyamqp://localhost:12345') + + @pytest.fixture() def connection(request): return get_connection( diff --git a/t/integration/test_redis.py b/t/integration/test_redis.py index d417eb45..e4666d12 100644 --- a/t/integration/test_redis.py +++ b/t/integration/test_redis.py @@ -6,7 +6,9 @@ import pytest import kombu from time import sleep -from .common import BasicFunctionality, BaseExchangeTypes, BasePriority +from .common import ( + BasicFunctionality, BaseExchangeTypes, BasePriority +) def get_connection( @@ -26,6 +28,11 @@ def connection(request): ) +@pytest.fixture() +def invalid_connection(): + return kombu.Connection('redis://localhost:12345') + + @pytest.mark.env('redis') @pytest.mark.flaky(reruns=5, reruns_delay=2) class test_RedisBasicFunctionality(BasicFunctionality): diff --git a/t/unit/test_connection.py b/t/unit/test_connection.py index 5251f868..21cfeb41 100644 --- a/t/unit/test_connection.py +++ b/t/unit/test_connection.py @@ -153,11 +153,15 @@ class test_Connection: conn._ensure_connection = Mock() conn.connect() - conn._ensure_connection.assert_called_with() + # ensure_connection must be called to return immidiately + # and fail with transport exception + conn._ensure_connection.assert_called_with( + max_retries=1, reraise_as_library_errors=False + ) def test_connect_transport_options(self): conn = self.conn - conn.transport_options = options = { + conn.transport_options = { 'max_retries': 1, 'interval_start': 2, 'interval_step': 3, @@ -167,12 +171,12 @@ class test_Connection: conn._ensure_connection = Mock() conn.connect() - conn._ensure_connection.assert_called_with(**{ - k: v for k, v in options.items() - if k in ['max_retries', - 'interval_start', - 'interval_step', - 'interval_max']}) + # connect() is ignoring transport options + # ensure_connection must be called to return immidiately + # and fail with transport exception + conn._ensure_connection.assert_called_with( + max_retries=1, reraise_as_library_errors=False + ) def test_multiple_urls(self): conn1 = Connection('amqp://foo;amqp://bar') diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py index 5e911929..a36dfc2b 100644 --- a/t/unit/transport/test_redis.py +++ b/t/unit/transport/test_redis.py @@ -10,9 +10,7 @@ from itertools import count from case import ANY, ContextMock, Mock, call, mock, skip, patch from kombu import Connection, Exchange, Queue, Consumer, Producer -from kombu.exceptions import ( - InconsistencyError, VersionMismatch, OperationalError -) +from kombu.exceptions import InconsistencyError, VersionMismatch from kombu.five import Empty, Queue as _Queue, bytes_if_py2 from kombu.transport import virtual from kombu.utils import eventio # patch poll @@ -1045,11 +1043,10 @@ class test_Redis: assert conn.transport.channel_errors def test_check_at_least_we_try_to_connect_and_fail(self): - connection = Connection( - 'redis://localhost:65534/', transport_options={'max_retries': 1} - ) + import redis + connection = Connection('redis://localhost:65534/') - with pytest.raises(OperationalError): + with pytest.raises(redis.exceptions.ConnectionError): chan = connection.channel() chan._size('some_queue') @@ -1468,12 +1465,13 @@ class test_RedisSentinel: master_for().connection_pool.get_connection.assert_called() def test_can_create_connection(self): + from redis.exceptions import ConnectionError + connection = Connection( 'sentinel://localhost:65534/', transport_options={ 'master_name': 'not_important', - 'max_retries': 1 }, ) - with pytest.raises(OperationalError): + with pytest.raises(ConnectionError): connection.channel()