kombu/t/unit/test_common.py

417 lines
13 KiB
Python
Raw Normal View History

from __future__ import absolute_import, unicode_literals
2012-01-10 15:38:17 +00:00
import pytest
2012-01-10 15:38:17 +00:00
import socket
2014-01-16 15:44:12 +00:00
from amqp import RecoverableConnectionError
from case import ContextMock, Mock, patch
2014-01-16 15:44:12 +00:00
2012-03-20 14:53:00 +00:00
from kombu import common
2012-11-22 17:39:35 +00:00
from kombu.common import (
Broadcast, maybe_declare,
send_reply, collect_replies,
2012-11-22 17:39:35 +00:00
declaration_cached, ignore_errors,
QoS, PREFETCH_COUNT_MAX,
)
2012-03-20 14:53:00 +00:00
from t.mocks import MockPool
2012-01-10 15:38:17 +00:00
def test_ignore_errors():
connection = Mock()
connection.channel_errors = (KeyError,)
connection.connection_errors = (KeyError,)
2012-11-22 17:39:35 +00:00
with ignore_errors(connection):
raise KeyError()
2012-11-22 17:39:35 +00:00
def raising():
raise KeyError()
2012-11-22 17:39:35 +00:00
ignore_errors(connection, raising)
2012-11-22 17:39:35 +00:00
connection.channel_errors = connection.connection_errors = ()
2012-11-22 17:39:35 +00:00
with pytest.raises(KeyError):
with ignore_errors(connection):
raise KeyError()
2012-11-22 17:39:35 +00:00
class test_declaration_cached:
2012-11-22 17:39:35 +00:00
def test_when_cached(self):
chan = Mock()
chan.connection.client.declared_entities = ['foo']
assert declaration_cached('foo', chan)
2012-11-22 17:39:35 +00:00
def test_when_not_cached(self):
chan = Mock()
chan.connection.client.declared_entities = ['bar']
assert not declaration_cached('foo', chan)
2012-11-22 17:39:35 +00:00
class test_Broadcast:
2012-01-10 15:38:17 +00:00
def test_arguments(self):
2012-06-15 17:32:40 +00:00
q = Broadcast(name='test_Broadcast')
assert q.name.startswith('bcast.')
assert q.alias == 'test_Broadcast'
assert q.auto_delete
assert q.exchange.name == 'test_Broadcast'
assert q.exchange.type == 'fanout'
2012-01-10 15:38:17 +00:00
2012-06-15 17:32:40 +00:00
q = Broadcast('test_Broadcast', 'explicit_queue_name')
assert q.name == 'explicit_queue_name'
assert q.exchange.name == 'test_Broadcast'
2012-01-10 15:38:17 +00:00
q2 = q(Mock())
assert q2.name == q.name
2012-01-10 15:38:17 +00:00
class test_maybe_declare:
2012-01-10 15:38:17 +00:00
def test_cacheable(self):
channel = Mock()
client = channel.connection.client = Mock()
client.declared_entities = set()
2012-01-10 15:38:17 +00:00
entity = Mock()
entity.can_cache_declaration = True
2012-07-05 13:50:35 +00:00
entity.auto_delete = False
2012-01-10 15:38:17 +00:00
entity.is_bound = True
2012-06-16 16:40:45 +00:00
entity.channel = channel
2012-01-10 15:38:17 +00:00
maybe_declare(entity, channel)
assert entity.declare.call_count == 1
assert hash(entity) in channel.connection.client.declared_entities
2012-01-10 15:38:17 +00:00
maybe_declare(entity, channel)
assert entity.declare.call_count == 1
2012-01-10 15:38:17 +00:00
2012-11-22 17:39:35 +00:00
entity.channel.connection = None
with pytest.raises(RecoverableConnectionError):
2012-11-22 17:39:35 +00:00
maybe_declare(entity)
2012-01-10 15:38:17 +00:00
def test_binds_entities(self):
channel = Mock()
channel.connection.client.declared_entities = set()
2012-01-10 15:38:17 +00:00
entity = Mock()
entity.can_cache_declaration = True
entity.is_bound = False
2012-06-16 16:40:45 +00:00
entity.bind.return_value = entity
entity.bind.return_value.channel = channel
2012-01-10 15:38:17 +00:00
maybe_declare(entity, channel)
entity.bind.assert_called_with(channel)
def test_with_retry(self):
channel = Mock()
client = channel.connection.client = Mock()
client.declared_entities = set()
2012-01-10 15:38:17 +00:00
entity = Mock()
entity.can_cache_declaration = True
entity.is_bound = True
2012-06-16 16:40:45 +00:00
entity.channel = channel
2012-01-10 15:38:17 +00:00
maybe_declare(entity, channel, retry=True)
assert channel.connection.client.ensure.call_count
2012-01-10 15:38:17 +00:00
class test_replies:
2012-01-10 15:38:17 +00:00
def test_send_reply(self):
req = Mock()
2012-06-15 17:32:40 +00:00
req.content_type = 'application/json'
req.content_encoding = 'binary'
2012-06-15 17:32:40 +00:00
req.properties = {'reply_to': 'hello',
'correlation_id': 'world'}
2012-06-16 16:40:45 +00:00
channel = Mock()
2012-01-10 15:38:17 +00:00
exchange = Mock()
exchange.is_bound = True
2012-06-16 16:40:45 +00:00
exchange.channel = channel
2012-01-10 15:38:17 +00:00
producer = Mock()
2012-06-16 16:40:45 +00:00
producer.channel = channel
producer.channel.connection.client.declared_entities = set()
2012-06-15 17:32:40 +00:00
send_reply(exchange, req, {'hello': 'world'}, producer)
2012-01-10 15:38:17 +00:00
assert producer.publish.call_count
2012-01-10 15:38:17 +00:00
args = producer.publish.call_args
assert args[0][0] == {'hello': 'world'}
assert args[1] == {
'exchange': exchange,
'routing_key': 'hello',
'correlation_id': 'world',
'serializer': 'json',
'retry': False,
'retry_policy': None,
'content_encoding': 'binary',
}
2012-01-10 15:38:17 +00:00
2012-06-15 17:32:40 +00:00
@patch('kombu.common.itermessages')
2012-01-10 15:38:17 +00:00
def test_collect_replies_with_ack(self, itermessages):
conn, channel, queue = Mock(), Mock(), Mock()
body, message = Mock(), Mock()
itermessages.return_value = [(body, message)]
it = collect_replies(conn, channel, queue, no_ack=False)
2013-02-12 16:37:01 +00:00
m = next(it)
assert m is body
2012-01-10 15:38:17 +00:00
itermessages.assert_called_with(conn, channel, queue, no_ack=False)
message.ack.assert_called_with()
with pytest.raises(StopIteration):
2013-02-12 16:37:01 +00:00
next(it)
2012-01-10 15:38:17 +00:00
channel.after_reply_message_received.assert_called_with(queue.name)
2012-06-15 17:32:40 +00:00
@patch('kombu.common.itermessages')
2012-01-10 15:38:17 +00:00
def test_collect_replies_no_ack(self, itermessages):
conn, channel, queue = Mock(), Mock(), Mock()
body, message = Mock(), Mock()
itermessages.return_value = [(body, message)]
it = collect_replies(conn, channel, queue)
2013-02-12 16:37:01 +00:00
m = next(it)
assert m is body
2012-01-10 15:38:17 +00:00
itermessages.assert_called_with(conn, channel, queue, no_ack=True)
2016-04-09 04:10:31 +00:00
message.ack.assert_not_called()
2012-01-10 15:38:17 +00:00
2012-06-15 17:32:40 +00:00
@patch('kombu.common.itermessages')
2012-01-10 15:38:17 +00:00
def test_collect_replies_no_replies(self, itermessages):
conn, channel, queue = Mock(), Mock(), Mock()
itermessages.return_value = []
it = collect_replies(conn, channel, queue)
with pytest.raises(StopIteration):
2013-02-12 16:37:01 +00:00
next(it)
2016-04-09 04:10:31 +00:00
channel.after_reply_message_received.assert_not_called()
2012-01-10 15:38:17 +00:00
class test_insured:
2012-01-10 15:38:17 +00:00
2012-11-05 13:28:35 +00:00
@patch('kombu.common.logger')
def test_ensure_errback(self, logger):
2012-06-15 17:32:40 +00:00
common._ensure_errback('foo', 30)
2016-04-09 04:10:31 +00:00
logger.error.assert_called()
2012-01-10 15:38:17 +00:00
def test_revive_connection(self):
on_revive = Mock()
channel = Mock()
common.revive_connection(Mock(), channel, on_revive)
on_revive.assert_called_with(channel)
common.revive_connection(Mock(), channel, None)
2012-06-15 17:32:40 +00:00
def get_insured_mocks(self, insured_returns=('works', 'ignored')):
2012-01-10 15:38:17 +00:00
conn = ContextMock()
pool = MockPool(conn)
fun = Mock()
insured = conn.autoretry.return_value = Mock()
insured.return_value = insured_returns
return conn, pool, fun, insured
def test_insured(self):
conn, pool, fun, insured = self.get_insured_mocks()
2012-06-15 17:32:40 +00:00
ret = common.insured(pool, fun, (2, 2), {'foo': 'bar'})
assert ret == 'works'
2012-01-10 15:38:17 +00:00
conn.ensure_connection.assert_called_with(
2013-01-17 13:50:01 +00:00
errback=common._ensure_errback,
)
2012-01-10 15:38:17 +00:00
2016-04-09 04:10:31 +00:00
insured.assert_called()
2012-01-10 15:38:17 +00:00
i_args, i_kwargs = insured.call_args
assert i_args == (2, 2)
assert i_kwargs == {'foo': 'bar', 'connection': conn}
2012-01-10 15:38:17 +00:00
2016-04-09 04:10:31 +00:00
conn.autoretry.assert_called()
2012-01-10 15:38:17 +00:00
ar_args, ar_kwargs = conn.autoretry.call_args
assert ar_args == (fun, conn.default_channel)
assert ar_kwargs.get('on_revive')
assert ar_kwargs.get('errback')
2012-01-10 15:38:17 +00:00
def test_insured_custom_errback(self):
conn, pool, fun, insured = self.get_insured_mocks()
custom_errback = Mock()
2012-06-15 17:32:40 +00:00
common.insured(pool, fun, (2, 2), {'foo': 'bar'},
2012-01-10 15:38:17 +00:00
errback=custom_errback)
conn.ensure_connection.assert_called_with(errback=custom_errback)
class MockConsumer(object):
consumers = set()
2012-05-03 14:22:02 +00:00
def __init__(self, channel, queues=None, callbacks=None, **kwargs):
2012-01-10 15:38:17 +00:00
self.channel = channel
self.queues = queues
self.callbacks = callbacks
def __enter__(self):
self.consumers.add(self)
return self
def __exit__(self, *exc_info):
self.consumers.discard(self)
class test_itermessages:
2012-01-10 15:38:17 +00:00
class MockConnection(object):
should_raise_timeout = False
def drain_events(self, **kwargs):
if self.should_raise_timeout:
raise socket.timeout()
for consumer in MockConsumer.consumers:
for callback in consumer.callbacks:
2012-06-15 17:32:40 +00:00
callback('body', 'message')
2012-01-10 15:38:17 +00:00
def test_default(self):
conn = self.MockConnection()
channel = Mock()
2012-05-03 14:22:02 +00:00
channel.connection.client = conn
conn.Consumer = MockConsumer
it = common.itermessages(conn, channel, 'q', limit=1)
2012-01-10 15:38:17 +00:00
2013-02-12 16:37:01 +00:00
ret = next(it)
assert ret == ('body', 'message')
2012-01-10 15:38:17 +00:00
with pytest.raises(StopIteration):
2013-02-12 16:37:01 +00:00
next(it)
2012-01-10 15:38:17 +00:00
def test_when_raises_socket_timeout(self):
conn = self.MockConnection()
conn.should_raise_timeout = True
channel = Mock()
2012-05-03 14:22:02 +00:00
channel.connection.client = conn
conn.Consumer = MockConsumer
it = common.itermessages(conn, channel, 'q', limit=1)
2012-01-10 15:38:17 +00:00
with pytest.raises(StopIteration):
2013-02-12 16:37:01 +00:00
next(it)
2012-01-10 15:38:17 +00:00
2012-06-15 17:32:40 +00:00
@patch('kombu.common.deque')
2012-01-10 15:38:17 +00:00
def test_when_raises_IndexError(self, deque):
deque_instance = deque.return_value = Mock()
deque_instance.popleft.side_effect = IndexError()
conn = self.MockConnection()
channel = Mock()
conn.Consumer = MockConsumer
it = common.itermessages(conn, channel, 'q', limit=1)
2012-01-10 15:38:17 +00:00
with pytest.raises(StopIteration):
2013-02-12 16:37:01 +00:00
next(it)
2012-11-22 17:39:35 +00:00
class test_QoS:
2012-11-22 17:39:35 +00:00
class _QoS(QoS):
def __init__(self, value):
self.value = value
QoS.__init__(self, None, value)
def set(self, value):
return value
def test_qos_exceeds_16bit(self):
with patch('kombu.common.logger') as logger:
callback = Mock()
qos = QoS(callback, 10)
qos.prev = 100
2016-06-14 01:48:04 +00:00
# cannot use 2 ** 32 because of a bug on macOS Py2.5:
# https://jira.mongodb.org/browse/PYTHON-389
qos.set(4294967296)
2016-04-09 04:10:31 +00:00
logger.warn.assert_called()
2012-11-22 17:39:35 +00:00
callback.assert_called_with(prefetch_count=0)
def test_qos_increment_decrement(self):
qos = self._QoS(10)
assert qos.increment_eventually() == 11
assert qos.increment_eventually(3) == 14
assert qos.increment_eventually(-30) == 14
assert qos.decrement_eventually(7) == 7
assert qos.decrement_eventually() == 6
2012-11-22 17:39:35 +00:00
def test_qos_disabled_increment_decrement(self):
qos = self._QoS(0)
assert qos.increment_eventually() == 0
assert qos.increment_eventually(3) == 0
assert qos.increment_eventually(-30) == 0
assert qos.decrement_eventually(7) == 0
assert qos.decrement_eventually() == 0
assert qos.decrement_eventually(10) == 0
2012-11-22 17:39:35 +00:00
def test_qos_thread_safe(self):
qos = self._QoS(10)
def add():
for i in range(1000):
qos.increment_eventually()
def sub():
for i in range(1000):
qos.decrement_eventually()
def threaded(funs):
from threading import Thread
threads = [Thread(target=fun) for fun in funs]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
threaded([add, add])
assert qos.value == 2010
2012-11-22 17:39:35 +00:00
qos.value = 1000
threaded([add, sub]) # n = 2
assert qos.value == 1000
2012-11-22 17:39:35 +00:00
def test_exceeds_short(self):
qos = QoS(Mock(), PREFETCH_COUNT_MAX - 1)
qos.update()
assert qos.value == PREFETCH_COUNT_MAX - 1
2012-11-22 17:39:35 +00:00
qos.increment_eventually()
assert qos.value == PREFETCH_COUNT_MAX
2012-11-22 17:39:35 +00:00
qos.increment_eventually()
assert qos.value == PREFETCH_COUNT_MAX + 1
2012-11-22 17:39:35 +00:00
qos.decrement_eventually()
assert qos.value == PREFETCH_COUNT_MAX
2012-11-22 17:39:35 +00:00
qos.decrement_eventually()
assert qos.value == PREFETCH_COUNT_MAX - 1
2012-11-22 17:39:35 +00:00
def test_consumer_increment_decrement(self):
mconsumer = Mock()
qos = QoS(mconsumer.qos, 10)
qos.update()
assert qos.value == 10
2012-11-22 17:39:35 +00:00
mconsumer.qos.assert_called_with(prefetch_count=10)
qos.decrement_eventually()
qos.update()
assert qos.value == 9
2012-11-22 17:39:35 +00:00
mconsumer.qos.assert_called_with(prefetch_count=9)
qos.decrement_eventually()
assert qos.value == 8
2012-11-22 17:39:35 +00:00
mconsumer.qos.assert_called_with(prefetch_count=9)
assert {'prefetch_count': 9} in mconsumer.qos.call_args
2012-11-22 17:39:35 +00:00
# Does not decrement 0 value
qos.value = 0
qos.decrement_eventually()
assert qos.value == 0
2012-11-22 17:39:35 +00:00
qos.increment_eventually()
assert qos.value == 0
2012-11-22 17:39:35 +00:00
def test_consumer_decrement_eventually(self):
mconsumer = Mock()
qos = QoS(mconsumer.qos, 10)
qos.decrement_eventually()
assert qos.value == 9
2012-11-22 17:39:35 +00:00
qos.value = 0
qos.decrement_eventually()
assert qos.value == 0
2012-11-22 17:39:35 +00:00
def test_set(self):
mconsumer = Mock()
qos = QoS(mconsumer.qos, 10)
qos.set(12)
assert qos.prev == 12
2012-11-22 17:39:35 +00:00
qos.set(qos.prev)