Revert incompatible changes in #1193 and additional improvements (#1211)

* Revert incompatible changes introduced in #1193

* Improved integration tests covering connection

* Fix unittests on python2 + flake8 fixes
This commit is contained in:
Matus Valo 2020-06-10 11:14:47 +02:00 committed by GitHub
parent 16749626a4
commit 75027490c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 88 additions and 29 deletions

View File

@ -279,8 +279,9 @@ class Connection(object):
def connect(self):
"""Establish connection to server immediately."""
conn_opts = self._extract_failover_opts()
return self._ensure_connection(**conn_opts)
return self._ensure_connection(
max_retries=1, reraise_as_library_errors=False
)
def channel(self):
"""Create and return a new channel."""
@ -420,6 +421,9 @@ class Connection(object):
timeout (int): Maximum amount of time in seconds to spend
waiting for connection
"""
if self.connected:
return self._connection
def on_error(exc, intervals, retries, interval=0):
round = self.completes_cycle(retries)
if round:
@ -434,13 +438,12 @@ class Connection(object):
if not reraise_as_library_errors:
ctx = self._dummy_context
with ctx():
self._connection = self._connection or retry_over_time(
return retry_over_time(
self._connection_factory, self.recoverable_connection_errors,
(), {}, on_error, max_retries,
interval_start, interval_step, interval_max,
callback, timeout=timeout
)
return self._connection
@contextmanager
def _reraise_as_library_errors(
@ -860,16 +863,17 @@ class Connection(object):
"""
if not self._closed:
if not self.connected:
conn_opts = self._extract_failover_opts()
self._ensure_connection(**conn_opts)
return self._ensure_connection(
max_retries=1, reraise_as_library_errors=False
)
return self._connection
def _connection_factory(self):
self.declared_entities.clear()
self._default_channel = None
connection = self._establish_connection()
self._connection = self._establish_connection()
self._closed = False
return connection
return self._connection
@property
def default_channel(self):
@ -884,7 +888,9 @@ class Connection(object):
require a channel.
"""
# make sure we're still connected, and if not refresh.
self.connect()
conn_opts = self._extract_failover_opts()
self._ensure_connection(**conn_opts)
if self._default_channel is None:
self._default_channel = self.channel()
return self._default_channel

View File

@ -11,13 +11,52 @@ import kombu
class BasicFunctionality(object):
def test_connect(self, connection):
connection.connect()
assert connection.connect()
assert connection.connection
connection.close()
assert connection.connection is None
assert connection.connect()
assert connection.connection
connection.close()
def test_failed_connect(self, invalid_connection):
# method raises transport exception
with pytest.raises(Exception):
invalid_connection.connect()
def test_failed_connection(self, invalid_connection):
# method raises transport exception
with pytest.raises(Exception):
invalid_connection.connection
def test_failed_channel(self, invalid_connection):
# method raises transport exception
with pytest.raises(Exception):
invalid_connection.channel()
def test_failed_default_channel(self, invalid_connection):
invalid_connection.transport_options = {'max_retries': 1}
# method raises transport exception
with pytest.raises(Exception):
invalid_connection.default_channel
def test_default_channel_autoconnect(self, connection):
connection.connect()
connection.close()
connection.default_channel
assert connection.connection is None
assert connection.default_channel
assert connection.connection
connection.close()
def test_channel(self, connection):
chan = connection.channel()
assert chan
assert connection.connection
def test_default_channel(self, connection):
chan = connection.default_channel
assert chan
assert connection.connection
def test_publish_consume(self, connection):
test_queue = kombu.Queue('test', routing_key='test')

View File

@ -21,6 +21,11 @@ def get_failover_connection(hostname, port, vhost):
)
@pytest.fixture()
def invalid_connection():
return kombu.Connection('pyamqp://localhost:12345')
@pytest.fixture()
def connection(request):
return get_connection(

View File

@ -6,7 +6,9 @@ import pytest
import kombu
from time import sleep
from .common import BasicFunctionality, BaseExchangeTypes, BasePriority
from .common import (
BasicFunctionality, BaseExchangeTypes, BasePriority
)
def get_connection(
@ -26,6 +28,11 @@ def connection(request):
)
@pytest.fixture()
def invalid_connection():
return kombu.Connection('redis://localhost:12345')
@pytest.mark.env('redis')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_RedisBasicFunctionality(BasicFunctionality):

View File

@ -153,11 +153,15 @@ class test_Connection:
conn._ensure_connection = Mock()
conn.connect()
conn._ensure_connection.assert_called_with()
# ensure_connection must be called to return immidiately
# and fail with transport exception
conn._ensure_connection.assert_called_with(
max_retries=1, reraise_as_library_errors=False
)
def test_connect_transport_options(self):
conn = self.conn
conn.transport_options = options = {
conn.transport_options = {
'max_retries': 1,
'interval_start': 2,
'interval_step': 3,
@ -167,12 +171,12 @@ class test_Connection:
conn._ensure_connection = Mock()
conn.connect()
conn._ensure_connection.assert_called_with(**{
k: v for k, v in options.items()
if k in ['max_retries',
'interval_start',
'interval_step',
'interval_max']})
# connect() is ignoring transport options
# ensure_connection must be called to return immidiately
# and fail with transport exception
conn._ensure_connection.assert_called_with(
max_retries=1, reraise_as_library_errors=False
)
def test_multiple_urls(self):
conn1 = Connection('amqp://foo;amqp://bar')

View File

@ -10,9 +10,7 @@ from itertools import count
from case import ANY, ContextMock, Mock, call, mock, skip, patch
from kombu import Connection, Exchange, Queue, Consumer, Producer
from kombu.exceptions import (
InconsistencyError, VersionMismatch, OperationalError
)
from kombu.exceptions import InconsistencyError, VersionMismatch
from kombu.five import Empty, Queue as _Queue, bytes_if_py2
from kombu.transport import virtual
from kombu.utils import eventio # patch poll
@ -1045,11 +1043,10 @@ class test_Redis:
assert conn.transport.channel_errors
def test_check_at_least_we_try_to_connect_and_fail(self):
connection = Connection(
'redis://localhost:65534/', transport_options={'max_retries': 1}
)
import redis
connection = Connection('redis://localhost:65534/')
with pytest.raises(OperationalError):
with pytest.raises(redis.exceptions.ConnectionError):
chan = connection.channel()
chan._size('some_queue')
@ -1468,12 +1465,13 @@ class test_RedisSentinel:
master_for().connection_pool.get_connection.assert_called()
def test_can_create_connection(self):
from redis.exceptions import ConnectionError
connection = Connection(
'sentinel://localhost:65534/',
transport_options={
'master_name': 'not_important',
'max_retries': 1
},
)
with pytest.raises(OperationalError):
with pytest.raises(ConnectionError):
connection.channel()