2011-09-07 14:21:47 +00:00
|
|
|
from __future__ import absolute_import
|
2012-01-13 17:09:53 +00:00
|
|
|
|
2012-11-27 15:10:13 +00:00
|
|
|
import errno
|
2010-11-09 14:27:43 +00:00
|
|
|
import pickle
|
2012-11-27 15:10:13 +00:00
|
|
|
import socket
|
2010-06-28 22:54:13 +00:00
|
|
|
|
2012-11-27 15:10:13 +00:00
|
|
|
from copy import copy
|
|
|
|
from mock import patch
|
2012-02-10 14:52:17 +00:00
|
|
|
from nose import SkipTest
|
|
|
|
|
2012-06-24 15:32:17 +00:00
|
|
|
from kombu import Connection, Consumer, Producer, parse_url
|
|
|
|
from kombu.connection import Resource
|
2013-02-13 11:15:47 +00:00
|
|
|
from kombu.five import items, range
|
2011-09-07 14:21:47 +00:00
|
|
|
|
|
|
|
from .mocks import Transport
|
2012-01-15 16:22:47 +00:00
|
|
|
from .utils import TestCase
|
2012-11-27 15:10:13 +00:00
|
|
|
|
2012-01-14 16:55:56 +00:00
|
|
|
from .utils import Mock, skip_if_not_module
|
2010-06-28 22:54:13 +00:00
|
|
|
|
|
|
|
|
2012-01-15 16:22:47 +00:00
|
|
|
class test_connection_utils(TestCase):
|
2011-11-02 05:17:42 +00:00
|
|
|
|
|
|
|
def setUp(self):
|
2012-06-15 17:32:40 +00:00
|
|
|
self.url = 'amqp://user:pass@localhost:5672/my/vhost'
|
|
|
|
self.nopass = 'amqp://user@localhost:5672/my/vhost'
|
2011-11-02 05:17:42 +00:00
|
|
|
self.expected = {
|
2012-06-15 17:32:40 +00:00
|
|
|
'transport': 'amqp',
|
|
|
|
'userid': 'user',
|
|
|
|
'password': 'pass',
|
|
|
|
'hostname': 'localhost',
|
|
|
|
'port': 5672,
|
|
|
|
'virtual_host': 'my/vhost',
|
2011-11-02 05:17:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
def test_parse_url(self):
|
|
|
|
result = parse_url(self.url)
|
|
|
|
self.assertDictEqual(result, self.expected)
|
|
|
|
|
2012-01-11 14:20:03 +00:00
|
|
|
def test_parse_url_mongodb(self):
|
2012-06-15 17:32:40 +00:00
|
|
|
result = parse_url('mongodb://example.com/')
|
|
|
|
self.assertEqual(result['hostname'], 'example.com/')
|
2012-01-11 14:20:03 +00:00
|
|
|
|
2011-11-02 05:17:42 +00:00
|
|
|
def test_parse_generated_as_uri(self):
|
2012-06-24 15:32:17 +00:00
|
|
|
conn = Connection(self.url)
|
2011-11-02 05:17:42 +00:00
|
|
|
info = conn.info()
|
2013-01-17 13:16:32 +00:00
|
|
|
for k, v in self.expected.items():
|
2011-11-03 15:13:27 +00:00
|
|
|
self.assertEqual(info[k], v)
|
2011-11-02 05:17:42 +00:00
|
|
|
# by default almost the same- no password
|
|
|
|
self.assertEqual(conn.as_uri(), self.nopass)
|
|
|
|
self.assertEqual(conn.as_uri(include_password=True), self.url)
|
|
|
|
|
2012-06-15 17:32:40 +00:00
|
|
|
@skip_if_not_module('pymongo')
|
2012-01-14 00:02:59 +00:00
|
|
|
def test_as_uri_when_mongodb(self):
|
2012-06-24 15:32:17 +00:00
|
|
|
x = Connection('mongodb://localhost')
|
2012-01-14 00:02:59 +00:00
|
|
|
self.assertTrue(x.as_uri())
|
|
|
|
|
2011-11-02 05:17:42 +00:00
|
|
|
def test_bogus_scheme(self):
|
2012-01-13 17:54:48 +00:00
|
|
|
with self.assertRaises(KeyError):
|
2012-06-24 15:32:17 +00:00
|
|
|
Connection('bogus://localhost:7421').transport
|
2011-11-02 05:17:42 +00:00
|
|
|
|
2012-02-10 14:52:17 +00:00
|
|
|
def assert_info(self, conn, **fields):
|
|
|
|
info = conn.info()
|
2013-02-12 16:37:01 +00:00
|
|
|
for field, expected in items(fields):
|
2012-02-10 14:52:17 +00:00
|
|
|
self.assertEqual(info[field], expected)
|
|
|
|
|
|
|
|
def test_rabbitmq_example_urls(self):
|
|
|
|
# see Appendix A of http://www.rabbitmq.com/uri-spec.html
|
|
|
|
|
|
|
|
self.assert_info(
|
2012-06-24 15:32:17 +00:00
|
|
|
Connection('amqp://user:pass@host:10000/vhost'),
|
2013-01-17 13:50:01 +00:00
|
|
|
userid='user', password='pass', hostname='host',
|
|
|
|
port=10000, virtual_host='vhost',
|
|
|
|
)
|
2012-02-10 14:52:17 +00:00
|
|
|
|
|
|
|
self.assert_info(
|
2012-06-24 15:32:17 +00:00
|
|
|
Connection('amqp://user%61:%61pass@ho%61st:10000/v%2fhost'),
|
2013-01-17 13:50:01 +00:00
|
|
|
userid='usera', password='apass', hostname='hoast',
|
|
|
|
port=10000, virtual_host='v/host',
|
|
|
|
)
|
2012-02-10 14:52:17 +00:00
|
|
|
|
|
|
|
self.assert_info(
|
2012-06-24 15:32:17 +00:00
|
|
|
Connection('amqp://'),
|
2013-01-17 13:50:01 +00:00
|
|
|
userid='guest', password='guest', hostname='localhost',
|
|
|
|
port=5672, virtual_host='/',
|
|
|
|
)
|
2012-02-10 14:52:17 +00:00
|
|
|
|
|
|
|
self.assert_info(
|
2012-06-24 15:32:17 +00:00
|
|
|
Connection('amqp://:@/'),
|
2013-01-17 13:50:01 +00:00
|
|
|
userid='guest', password='guest', hostname='localhost',
|
|
|
|
port=5672, virtual_host='/',
|
|
|
|
)
|
2012-02-10 14:52:17 +00:00
|
|
|
|
|
|
|
self.assert_info(
|
2012-06-24 15:32:17 +00:00
|
|
|
Connection('amqp://user@/'),
|
2013-01-17 13:50:01 +00:00
|
|
|
userid='user', password='guest', hostname='localhost',
|
|
|
|
port=5672, virtual_host='/',
|
|
|
|
)
|
2012-02-10 14:52:17 +00:00
|
|
|
|
|
|
|
self.assert_info(
|
2012-06-24 15:32:17 +00:00
|
|
|
Connection('amqp://user:pass@/'),
|
2013-01-17 13:50:01 +00:00
|
|
|
userid='user', password='pass', hostname='localhost',
|
|
|
|
port=5672, virtual_host='/',
|
|
|
|
)
|
2012-02-10 14:52:17 +00:00
|
|
|
|
|
|
|
self.assert_info(
|
2012-06-24 15:32:17 +00:00
|
|
|
Connection('amqp://host'),
|
2013-01-17 13:50:01 +00:00
|
|
|
userid='guest', password='guest', hostname='host',
|
|
|
|
port=5672, virtual_host='/',
|
|
|
|
)
|
2012-02-10 14:52:17 +00:00
|
|
|
|
|
|
|
self.assert_info(
|
2012-06-24 15:32:17 +00:00
|
|
|
Connection('amqp://:10000'),
|
2013-01-17 13:50:01 +00:00
|
|
|
userid='guest', password='guest', hostname='localhost',
|
|
|
|
port=10000, virtual_host='/',
|
|
|
|
)
|
2012-02-10 14:52:17 +00:00
|
|
|
|
|
|
|
self.assert_info(
|
2012-06-24 15:32:17 +00:00
|
|
|
Connection('amqp:///vhost'),
|
2013-01-17 13:50:01 +00:00
|
|
|
userid='guest', password='guest', hostname='localhost',
|
|
|
|
port=5672, virtual_host='vhost',
|
|
|
|
)
|
2012-02-10 14:52:17 +00:00
|
|
|
|
|
|
|
self.assert_info(
|
2012-06-24 15:32:17 +00:00
|
|
|
Connection('amqp://host/'),
|
2013-01-17 13:50:01 +00:00
|
|
|
userid='guest', password='guest', hostname='host',
|
|
|
|
port=5672, virtual_host='/',
|
|
|
|
)
|
2012-02-10 14:52:17 +00:00
|
|
|
|
|
|
|
self.assert_info(
|
2012-06-24 15:32:17 +00:00
|
|
|
Connection('amqp://host/%2f'),
|
2013-01-17 13:50:01 +00:00
|
|
|
userid='guest', password='guest', hostname='host',
|
|
|
|
port=5672, virtual_host='/',
|
|
|
|
)
|
2012-02-10 14:52:17 +00:00
|
|
|
|
|
|
|
def test_url_IPV6(self):
|
|
|
|
raise SkipTest("urllib can't parse ipv6 urls")
|
|
|
|
|
|
|
|
self.assert_info(
|
2012-06-24 15:32:17 +00:00
|
|
|
Connection('amqp://[::1]'),
|
2013-01-17 13:50:01 +00:00
|
|
|
userid='guest', password='guest', hostname='[::1]',
|
|
|
|
port=5672, virtual_host='/',
|
|
|
|
)
|
2012-02-10 14:52:17 +00:00
|
|
|
|
2011-11-03 15:13:27 +00:00
|
|
|
|
2012-01-15 16:22:47 +00:00
|
|
|
class test_Connection(TestCase):
|
2010-06-28 22:54:13 +00:00
|
|
|
|
2010-11-09 14:27:43 +00:00
|
|
|
def setUp(self):
|
2012-06-24 15:32:17 +00:00
|
|
|
self.conn = Connection(port=5672, transport=Transport)
|
2010-11-09 14:27:43 +00:00
|
|
|
|
2010-06-28 22:54:13 +00:00
|
|
|
def test_establish_connection(self):
|
2010-11-09 14:27:43 +00:00
|
|
|
conn = self.conn
|
2010-06-28 22:54:13 +00:00
|
|
|
conn.connect()
|
|
|
|
self.assertTrue(conn.connection.connected)
|
2012-06-15 17:32:40 +00:00
|
|
|
self.assertEqual(conn.host, 'localhost:5672')
|
2010-06-28 22:54:13 +00:00
|
|
|
channel = conn.channel()
|
|
|
|
self.assertTrue(channel.open)
|
2012-06-15 17:32:40 +00:00
|
|
|
self.assertEqual(conn.drain_events(), 'event')
|
2010-06-28 22:54:13 +00:00
|
|
|
_connection = conn.connection
|
|
|
|
conn.close()
|
|
|
|
self.assertFalse(_connection.connected)
|
2011-03-28 15:06:44 +00:00
|
|
|
self.assertIsInstance(conn.transport, Transport)
|
2010-06-29 15:31:56 +00:00
|
|
|
|
2012-11-27 15:10:13 +00:00
|
|
|
def test_multiple_urls(self):
|
|
|
|
conn1 = Connection('amqp://foo;amqp://bar')
|
|
|
|
self.assertEqual(conn1.hostname, 'foo')
|
|
|
|
self.assertListEqual(conn1.alt, ['amqp://foo', 'amqp://bar'])
|
|
|
|
|
|
|
|
conn2 = Connection(['amqp://foo', 'amqp://bar'])
|
|
|
|
self.assertEqual(conn2.hostname, 'foo')
|
|
|
|
self.assertListEqual(conn2.alt, ['amqp://foo', 'amqp://bar'])
|
|
|
|
|
2013-06-18 11:06:49 +00:00
|
|
|
def test_collect(self):
|
|
|
|
connection = Connection('memory://')
|
|
|
|
trans = connection._transport = Mock(name='transport')
|
|
|
|
_collect = trans._collect = Mock(name='transport._collect')
|
|
|
|
_close = connection._close = Mock(name='connection._close')
|
|
|
|
connection.declared_entities = Mock(name='decl_entities')
|
|
|
|
uconn = connection._connection = Mock(name='_connection')
|
|
|
|
connection.collect()
|
|
|
|
|
|
|
|
self.assertFalse(_close.called)
|
|
|
|
_collect.assert_called_with(uconn)
|
|
|
|
connection.declared_entities.clear.assert_called_with()
|
|
|
|
self.assertIsNone(trans.client)
|
|
|
|
self.assertIsNone(connection._transport)
|
|
|
|
self.assertIsNone(connection._connection)
|
|
|
|
|
|
|
|
def test_collect_no_transport(self):
|
|
|
|
connection = Connection('memory://')
|
|
|
|
connection._transport = None
|
|
|
|
connection._close = Mock()
|
|
|
|
connection.collect()
|
|
|
|
connection._close.assert_called_with()
|
|
|
|
|
|
|
|
connection._close.side_effect = socket.timeout()
|
|
|
|
connection.collect()
|
|
|
|
|
|
|
|
def test_collect_transport_gone(self):
|
|
|
|
connection = Connection('memory://')
|
|
|
|
uconn = connection._connection = Mock(name='conn._conn')
|
|
|
|
trans = connection._transport = Mock(name='transport')
|
|
|
|
collect = trans._collect = Mock(name='transport._collect')
|
|
|
|
|
|
|
|
def se(conn):
|
|
|
|
connection._transport = None
|
|
|
|
collect.side_effect = se
|
|
|
|
|
|
|
|
connection.collect()
|
|
|
|
collect.assert_called_with(uconn)
|
|
|
|
self.assertIsNone(connection._transport)
|
|
|
|
|
2012-11-27 15:10:13 +00:00
|
|
|
def test_uri_passthrough(self):
|
|
|
|
from kombu import connection as mod
|
|
|
|
prev, mod.URI_PASSTHROUGH = mod.URI_PASSTHROUGH, set(['foo'])
|
|
|
|
try:
|
|
|
|
with patch('kombu.connection.parse_url') as parse_url:
|
|
|
|
c = Connection('foo+mysql://some_host')
|
|
|
|
self.assertEqual(c.transport_cls, 'foo')
|
|
|
|
self.assertFalse(parse_url.called)
|
|
|
|
self.assertEqual(c.hostname, 'mysql://some_host')
|
|
|
|
self.assertTrue(c.as_uri().startswith('foo+'))
|
|
|
|
with patch('kombu.connection.parse_url') as parse_url:
|
|
|
|
c = Connection('mysql://some_host', transport='foo')
|
|
|
|
self.assertEqual(c.transport_cls, 'foo')
|
|
|
|
self.assertFalse(parse_url.called)
|
|
|
|
self.assertEqual(c.hostname, 'mysql://some_host')
|
|
|
|
finally:
|
|
|
|
mod.URI_PASSTHROUGH = prev
|
|
|
|
c = Connection('amqp+sqlite://some_host')
|
|
|
|
self.assertTrue(c.as_uri().startswith('amqp+'))
|
|
|
|
|
|
|
|
def test_default_ensure_callback(self):
|
|
|
|
with patch('kombu.connection.logger') as logger:
|
|
|
|
c = Connection(transport=Mock)
|
|
|
|
c._default_ensure_callback(KeyError(), 3)
|
|
|
|
self.assertTrue(logger.error.called)
|
|
|
|
|
|
|
|
def test_ensure_connection_on_error(self):
|
|
|
|
c = Connection('amqp://A;amqp://B')
|
|
|
|
with patch('kombu.connection.retry_over_time') as rot:
|
|
|
|
c.ensure_connection()
|
|
|
|
self.assertTrue(rot.called)
|
|
|
|
|
|
|
|
args = rot.call_args[0]
|
|
|
|
cb = args[4]
|
|
|
|
intervals = iter([1, 2, 3, 4, 5])
|
|
|
|
self.assertEqual(cb(KeyError(), intervals, 0), 0)
|
|
|
|
self.assertEqual(cb(KeyError(), intervals, 1), 1)
|
|
|
|
self.assertEqual(cb(KeyError(), intervals, 2), 0)
|
|
|
|
self.assertEqual(cb(KeyError(), intervals, 3), 2)
|
|
|
|
self.assertEqual(cb(KeyError(), intervals, 4), 0)
|
|
|
|
self.assertEqual(cb(KeyError(), intervals, 5), 3)
|
|
|
|
self.assertEqual(cb(KeyError(), intervals, 6), 0)
|
|
|
|
self.assertEqual(cb(KeyError(), intervals, 7), 4)
|
|
|
|
|
|
|
|
errback = Mock()
|
|
|
|
c.ensure_connection(errback=errback)
|
|
|
|
args = rot.call_args[0]
|
|
|
|
cb = args[4]
|
|
|
|
self.assertEqual(cb(KeyError(), intervals, 0), 0)
|
|
|
|
self.assertTrue(errback.called)
|
|
|
|
|
|
|
|
def test_drain_nowait(self):
|
|
|
|
c = Connection(transport=Mock)
|
|
|
|
c.drain_events = Mock()
|
|
|
|
c.drain_events.side_effect = socket.timeout()
|
|
|
|
|
|
|
|
c.more_to_read = True
|
|
|
|
self.assertFalse(c.drain_nowait())
|
|
|
|
self.assertFalse(c.more_to_read)
|
|
|
|
|
|
|
|
c.drain_events.side_effect = socket.error()
|
|
|
|
c.drain_events.side_effect.errno = errno.EAGAIN
|
|
|
|
c.more_to_read = True
|
|
|
|
self.assertFalse(c.drain_nowait())
|
|
|
|
self.assertFalse(c.more_to_read)
|
|
|
|
|
|
|
|
c.drain_events.side_effect = socket.error()
|
|
|
|
c.drain_events.side_effect.errno = errno.EPERM
|
|
|
|
with self.assertRaises(socket.error):
|
|
|
|
c.drain_nowait()
|
|
|
|
|
|
|
|
c.more_to_read = False
|
|
|
|
c.drain_events = Mock()
|
|
|
|
self.assertTrue(c.drain_nowait())
|
|
|
|
c.drain_events.assert_called_with(timeout=0)
|
|
|
|
self.assertTrue(c.more_to_read)
|
|
|
|
|
|
|
|
def test_supports_heartbeats(self):
|
|
|
|
c = Connection(transport=Mock)
|
|
|
|
c.transport.supports_heartbeats = False
|
|
|
|
self.assertFalse(c.supports_heartbeats)
|
|
|
|
|
|
|
|
def test_is_evented(self):
|
|
|
|
c = Connection(transport=Mock)
|
|
|
|
c.transport.supports_ev = False
|
|
|
|
self.assertFalse(c.is_evented)
|
|
|
|
|
|
|
|
def test_eventmap(self):
|
|
|
|
c = Connection(transport=Mock)
|
|
|
|
c.transport.eventmap.return_value = {1: 1, 2: 2}
|
|
|
|
self.assertDictEqual(c.eventmap, {1: 1, 2: 2})
|
|
|
|
c.transport.eventmap.assert_called_with(c.connection)
|
|
|
|
|
|
|
|
def test_manager(self):
|
|
|
|
c = Connection(transport=Mock)
|
|
|
|
self.assertIs(c.manager, c.transport.manager)
|
|
|
|
|
|
|
|
def test_copy(self):
|
|
|
|
c = Connection('amqp://example.com')
|
|
|
|
self.assertEqual(copy(c).info(), c.info())
|
|
|
|
|
2013-06-18 11:06:49 +00:00
|
|
|
def test_copy_multiples(self):
|
|
|
|
c = Connection('amqp://A.example.com;amqp://B.example.com')
|
|
|
|
self.assertTrue(c.alt)
|
|
|
|
d = copy(c)
|
|
|
|
self.assertEqual(d.alt, c.alt)
|
|
|
|
|
2012-11-27 15:10:13 +00:00
|
|
|
def test_switch(self):
|
|
|
|
c = Connection('amqp://foo')
|
|
|
|
c._closed = True
|
|
|
|
c.switch('redis://example.com//3')
|
|
|
|
self.assertFalse(c._closed)
|
|
|
|
self.assertEqual(c.hostname, 'example.com')
|
|
|
|
self.assertEqual(c.transport_cls, 'redis')
|
|
|
|
self.assertEqual(c.virtual_host, '/3')
|
|
|
|
|
|
|
|
def test_maybe_switch_next(self):
|
|
|
|
c = Connection('amqp://foo;redis://example.com//3')
|
|
|
|
c.maybe_switch_next()
|
|
|
|
self.assertFalse(c._closed)
|
|
|
|
self.assertEqual(c.hostname, 'example.com')
|
|
|
|
self.assertEqual(c.transport_cls, 'redis')
|
|
|
|
self.assertEqual(c.virtual_host, '/3')
|
|
|
|
|
|
|
|
def test_maybe_switch_next_no_cycle(self):
|
|
|
|
c = Connection('amqp://foo')
|
|
|
|
c.maybe_switch_next()
|
|
|
|
self.assertFalse(c._closed)
|
|
|
|
self.assertEqual(c.hostname, 'foo')
|
2012-12-10 15:48:34 +00:00
|
|
|
self.assertIn(c.transport_cls, ('librabbitmq', 'pyamqp', 'amqp'))
|
2012-11-27 15:10:13 +00:00
|
|
|
|
|
|
|
def test_heartbeat_check(self):
|
|
|
|
c = Connection(transport=Transport)
|
|
|
|
c.transport.heartbeat_check = Mock()
|
|
|
|
c.heartbeat_check(3)
|
|
|
|
c.transport.heartbeat_check.assert_called_with(c.connection, rate=3)
|
|
|
|
|
|
|
|
def test_completes_cycle_no_cycle(self):
|
|
|
|
c = Connection('amqp://')
|
|
|
|
self.assertTrue(c.completes_cycle(0))
|
|
|
|
self.assertTrue(c.completes_cycle(1))
|
|
|
|
|
|
|
|
def test_completes_cycle(self):
|
|
|
|
c = Connection('amqp://a;amqp://b;amqp://c')
|
|
|
|
self.assertFalse(c.completes_cycle(0))
|
|
|
|
self.assertFalse(c.completes_cycle(1))
|
|
|
|
self.assertTrue(c.completes_cycle(2))
|
|
|
|
|
2010-06-29 15:31:56 +00:00
|
|
|
def test__enter____exit__(self):
|
2010-11-09 14:27:43 +00:00
|
|
|
conn = self.conn
|
2010-06-29 15:31:56 +00:00
|
|
|
context = conn.__enter__()
|
|
|
|
self.assertIs(context, conn)
|
|
|
|
conn.connect()
|
|
|
|
self.assertTrue(conn.connection.connected)
|
|
|
|
conn.__exit__()
|
|
|
|
self.assertIsNone(conn.connection)
|
2010-10-27 07:17:37 +00:00
|
|
|
conn.close() # again
|
2010-10-22 13:42:47 +00:00
|
|
|
|
2010-11-09 14:27:43 +00:00
|
|
|
def test_close_survives_connerror(self):
|
|
|
|
|
|
|
|
class _CustomError(Exception):
|
|
|
|
pass
|
|
|
|
|
|
|
|
class MyTransport(Transport):
|
|
|
|
connection_errors = (_CustomError, )
|
|
|
|
|
|
|
|
def close_connection(self, connection):
|
2012-06-15 17:32:40 +00:00
|
|
|
raise _CustomError('foo')
|
2010-11-09 14:27:43 +00:00
|
|
|
|
2012-06-24 15:32:17 +00:00
|
|
|
conn = Connection(transport=MyTransport)
|
2010-11-09 14:27:43 +00:00
|
|
|
conn.connect()
|
|
|
|
conn.close()
|
|
|
|
self.assertTrue(conn._closed)
|
|
|
|
|
2012-01-11 14:20:03 +00:00
|
|
|
def test_close_when_default_channel(self):
|
|
|
|
conn = self.conn
|
|
|
|
conn._default_channel = Mock()
|
|
|
|
conn._close()
|
|
|
|
conn._default_channel.close.assert_called_with()
|
|
|
|
|
|
|
|
def test_close_when_default_channel_close_raises(self):
|
|
|
|
|
2012-06-24 15:32:17 +00:00
|
|
|
class Conn(Connection):
|
2012-01-11 14:20:03 +00:00
|
|
|
|
|
|
|
@property
|
|
|
|
def connection_errors(self):
|
|
|
|
return (KeyError, )
|
|
|
|
|
2012-06-15 17:32:40 +00:00
|
|
|
conn = Conn('memory://')
|
2012-01-11 14:20:03 +00:00
|
|
|
conn._default_channel = Mock()
|
|
|
|
conn._default_channel.close.side_effect = KeyError()
|
|
|
|
|
|
|
|
conn._close()
|
|
|
|
conn._default_channel.close.assert_called_with()
|
|
|
|
|
|
|
|
def test_revive_when_default_channel(self):
|
|
|
|
conn = self.conn
|
|
|
|
defchan = conn._default_channel = Mock()
|
|
|
|
conn.revive(Mock())
|
|
|
|
|
|
|
|
defchan.close.assert_called_with()
|
|
|
|
self.assertIsNone(conn._default_channel)
|
|
|
|
|
2010-11-09 14:27:43 +00:00
|
|
|
def test_ensure_connection(self):
|
|
|
|
self.assertTrue(self.conn.ensure_connection())
|
|
|
|
|
2011-11-02 17:37:17 +00:00
|
|
|
def test_ensure_success(self):
|
|
|
|
def publish():
|
2012-06-15 17:32:40 +00:00
|
|
|
return 'foobar'
|
2011-11-02 17:37:17 +00:00
|
|
|
|
|
|
|
ensured = self.conn.ensure(None, publish)
|
2012-06-15 17:32:40 +00:00
|
|
|
self.assertEqual(ensured(), 'foobar')
|
2011-11-02 17:37:17 +00:00
|
|
|
|
|
|
|
def test_ensure_failure(self):
|
|
|
|
class _CustomError(Exception):
|
|
|
|
pass
|
|
|
|
|
|
|
|
def publish():
|
2012-06-15 17:32:40 +00:00
|
|
|
raise _CustomError('bar')
|
2011-11-02 17:37:17 +00:00
|
|
|
|
|
|
|
ensured = self.conn.ensure(None, publish)
|
2012-01-13 17:54:48 +00:00
|
|
|
with self.assertRaises(_CustomError):
|
|
|
|
ensured()
|
2011-11-02 17:37:17 +00:00
|
|
|
|
|
|
|
def test_ensure_connection_failure(self):
|
|
|
|
class _ConnectionError(Exception):
|
|
|
|
pass
|
|
|
|
|
|
|
|
def publish():
|
2012-06-15 17:32:40 +00:00
|
|
|
raise _ConnectionError('failed connection')
|
2011-11-02 17:37:17 +00:00
|
|
|
|
|
|
|
self.conn.transport.connection_errors = (_ConnectionError,)
|
|
|
|
ensured = self.conn.ensure(self.conn, publish)
|
2012-01-13 17:54:48 +00:00
|
|
|
with self.assertRaises(_ConnectionError):
|
|
|
|
ensured()
|
2011-11-02 17:37:17 +00:00
|
|
|
|
2012-01-14 00:02:59 +00:00
|
|
|
def test_autoretry(self):
|
|
|
|
myfun = Mock()
|
2012-06-15 17:32:40 +00:00
|
|
|
myfun.__name__ = 'test_autoretry'
|
2012-01-14 00:02:59 +00:00
|
|
|
|
|
|
|
self.conn.transport.connection_errors = (KeyError, )
|
|
|
|
|
|
|
|
def on_call(*args, **kwargs):
|
|
|
|
myfun.side_effect = None
|
2012-06-15 17:32:40 +00:00
|
|
|
raise KeyError('foo')
|
2012-01-14 00:02:59 +00:00
|
|
|
|
|
|
|
myfun.side_effect = on_call
|
|
|
|
insured = self.conn.autoretry(myfun)
|
|
|
|
insured()
|
|
|
|
|
|
|
|
self.assertTrue(myfun.called)
|
|
|
|
|
2010-11-09 14:27:43 +00:00
|
|
|
def test_SimpleQueue(self):
|
|
|
|
conn = self.conn
|
2012-06-15 17:32:40 +00:00
|
|
|
q = conn.SimpleQueue('foo')
|
2012-06-11 17:09:13 +00:00
|
|
|
self.assertIs(q.channel, conn.default_channel)
|
2010-11-09 14:27:43 +00:00
|
|
|
chan = conn.channel()
|
2012-06-15 17:32:40 +00:00
|
|
|
q2 = conn.SimpleQueue('foo', channel=chan)
|
2010-11-09 14:27:43 +00:00
|
|
|
self.assertIs(q2.channel, chan)
|
|
|
|
|
|
|
|
def test_SimpleBuffer(self):
|
|
|
|
conn = self.conn
|
2012-06-15 17:32:40 +00:00
|
|
|
q = conn.SimpleBuffer('foo')
|
2012-06-11 17:09:13 +00:00
|
|
|
self.assertIs(q.channel, conn.default_channel)
|
2010-11-09 14:27:43 +00:00
|
|
|
chan = conn.channel()
|
2012-06-15 17:32:40 +00:00
|
|
|
q2 = conn.SimpleBuffer('foo', channel=chan)
|
2010-11-09 14:27:43 +00:00
|
|
|
self.assertIs(q2.channel, chan)
|
|
|
|
|
2012-01-11 14:20:03 +00:00
|
|
|
def test_Producer(self):
|
|
|
|
conn = self.conn
|
|
|
|
self.assertIsInstance(conn.Producer(), Producer)
|
|
|
|
self.assertIsInstance(conn.Producer(conn.default_channel), Producer)
|
|
|
|
|
|
|
|
def test_Consumer(self):
|
|
|
|
conn = self.conn
|
|
|
|
self.assertIsInstance(conn.Consumer(queues=[]), Consumer)
|
|
|
|
self.assertIsInstance(conn.Consumer(queues=[],
|
|
|
|
channel=conn.default_channel), Consumer)
|
|
|
|
|
2010-11-09 14:27:43 +00:00
|
|
|
def test__repr__(self):
|
|
|
|
self.assertTrue(repr(self.conn))
|
|
|
|
|
|
|
|
def test__reduce__(self):
|
|
|
|
x = pickle.loads(pickle.dumps(self.conn))
|
|
|
|
self.assertDictEqual(x.info(), self.conn.info())
|
|
|
|
|
|
|
|
def test_channel_errors(self):
|
|
|
|
|
|
|
|
class MyTransport(Transport):
|
|
|
|
channel_errors = (KeyError, ValueError)
|
|
|
|
|
2012-06-24 15:32:17 +00:00
|
|
|
conn = Connection(transport=MyTransport)
|
2010-11-09 14:27:43 +00:00
|
|
|
self.assertTupleEqual(conn.channel_errors, (KeyError, ValueError))
|
|
|
|
|
|
|
|
def test_connection_errors(self):
|
|
|
|
|
|
|
|
class MyTransport(Transport):
|
|
|
|
connection_errors = (KeyError, ValueError)
|
|
|
|
|
2012-06-24 15:32:17 +00:00
|
|
|
conn = Connection(transport=MyTransport)
|
2010-11-09 14:27:43 +00:00
|
|
|
self.assertTupleEqual(conn.connection_errors, (KeyError, ValueError))
|
|
|
|
|
2010-10-22 13:42:47 +00:00
|
|
|
|
2012-01-15 16:22:47 +00:00
|
|
|
class test_Connection_with_transport_options(TestCase):
|
2011-03-13 17:02:32 +00:00
|
|
|
|
2012-06-15 17:32:40 +00:00
|
|
|
transport_options = {'pool_recycler': 3600, 'echo': True}
|
2011-03-13 17:02:32 +00:00
|
|
|
|
|
|
|
def setUp(self):
|
2012-06-24 15:32:17 +00:00
|
|
|
self.conn = Connection(port=5672, transport=Transport,
|
|
|
|
transport_options=self.transport_options)
|
2011-03-13 17:02:32 +00:00
|
|
|
|
|
|
|
def test_establish_connection(self):
|
|
|
|
conn = self.conn
|
2011-04-21 16:29:51 +00:00
|
|
|
self.assertEqual(conn.transport_options, self.transport_options)
|
2011-03-13 17:02:32 +00:00
|
|
|
|
|
|
|
|
2012-01-14 00:02:59 +00:00
|
|
|
class xResource(Resource):
|
|
|
|
|
|
|
|
def setup(self):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
2012-01-15 16:22:47 +00:00
|
|
|
class ResourceCase(TestCase):
|
2010-10-22 13:42:47 +00:00
|
|
|
abstract = True
|
|
|
|
|
|
|
|
def create_resource(self, limit, preload):
|
2012-06-15 17:32:40 +00:00
|
|
|
raise NotImplementedError('subclass responsibility')
|
2010-10-22 13:42:47 +00:00
|
|
|
|
|
|
|
def assertState(self, P, avail, dirty):
|
|
|
|
self.assertEqual(P._resource.qsize(), avail)
|
|
|
|
self.assertEqual(len(P._dirty), dirty)
|
|
|
|
|
2010-11-09 14:27:43 +00:00
|
|
|
def test_setup(self):
|
|
|
|
if self.abstract:
|
2012-01-13 17:54:48 +00:00
|
|
|
with self.assertRaises(NotImplementedError):
|
|
|
|
Resource()
|
2010-11-09 14:27:43 +00:00
|
|
|
|
2010-10-22 13:42:47 +00:00
|
|
|
def test_acquire__release(self):
|
|
|
|
if self.abstract:
|
|
|
|
return
|
|
|
|
P = self.create_resource(10, 0)
|
|
|
|
self.assertState(P, 10, 0)
|
2013-02-13 11:15:47 +00:00
|
|
|
chans = [P.acquire() for _ in range(10)]
|
2010-10-22 13:42:47 +00:00
|
|
|
self.assertState(P, 0, 10)
|
2012-01-13 17:54:48 +00:00
|
|
|
with self.assertRaises(P.LimitExceeded):
|
|
|
|
P.acquire()
|
2010-10-22 13:42:47 +00:00
|
|
|
chans.pop().release()
|
|
|
|
self.assertState(P, 1, 9)
|
|
|
|
[chan.release() for chan in chans]
|
|
|
|
self.assertState(P, 10, 0)
|
|
|
|
|
2012-11-27 15:10:13 +00:00
|
|
|
def test_acquire_prepare_raises(self):
|
|
|
|
if self.abstract:
|
|
|
|
return
|
|
|
|
P = self.create_resource(10, 0)
|
|
|
|
|
|
|
|
self.assertEqual(len(P._resource.queue), 10)
|
|
|
|
P.prepare = Mock()
|
|
|
|
P.prepare.side_effect = IOError()
|
|
|
|
with self.assertRaises(IOError):
|
|
|
|
P.acquire(block=True)
|
|
|
|
self.assertEqual(len(P._resource.queue), 10)
|
|
|
|
|
2012-01-14 00:02:59 +00:00
|
|
|
def test_acquire_no_limit(self):
|
|
|
|
if self.abstract:
|
|
|
|
return
|
|
|
|
P = self.create_resource(None, 0)
|
|
|
|
P.acquire().release()
|
|
|
|
|
|
|
|
def test_replace_when_limit(self):
|
|
|
|
if self.abstract:
|
|
|
|
return
|
|
|
|
P = self.create_resource(10, 0)
|
|
|
|
r = P.acquire()
|
|
|
|
P._dirty = Mock()
|
|
|
|
P.close_resource = Mock()
|
|
|
|
|
|
|
|
P.replace(r)
|
|
|
|
P._dirty.discard.assert_called_with(r)
|
|
|
|
P.close_resource.assert_called_with(r)
|
|
|
|
|
|
|
|
def test_replace_no_limit(self):
|
|
|
|
if self.abstract:
|
|
|
|
return
|
|
|
|
P = self.create_resource(None, 0)
|
|
|
|
r = P.acquire()
|
|
|
|
P._dirty = Mock()
|
|
|
|
P.close_resource = Mock()
|
|
|
|
|
|
|
|
P.replace(r)
|
|
|
|
self.assertFalse(P._dirty.discard.called)
|
|
|
|
P.close_resource.assert_called_with(r)
|
|
|
|
|
|
|
|
def test_interface_prepare(self):
|
|
|
|
if not self.abstract:
|
|
|
|
return
|
|
|
|
x = xResource()
|
|
|
|
self.assertEqual(x.prepare(10), 10)
|
|
|
|
|
|
|
|
def test_force_close_all_handles_AttributeError(self):
|
|
|
|
if self.abstract:
|
|
|
|
return
|
|
|
|
P = self.create_resource(10, 10)
|
2013-06-18 11:29:00 +00:00
|
|
|
cr = P.collect_resource = Mock()
|
2012-06-15 17:32:40 +00:00
|
|
|
cr.side_effect = AttributeError('x')
|
2012-01-14 00:02:59 +00:00
|
|
|
|
|
|
|
P.acquire()
|
|
|
|
self.assertTrue(P._dirty)
|
|
|
|
|
|
|
|
P.force_close_all()
|
|
|
|
|
|
|
|
def test_force_close_all_no_mutex(self):
|
|
|
|
if self.abstract:
|
|
|
|
return
|
|
|
|
P = self.create_resource(10, 10)
|
|
|
|
P.close_resource = Mock()
|
|
|
|
|
|
|
|
m = P._resource = Mock()
|
|
|
|
m.mutex = None
|
|
|
|
m.queue.pop.side_effect = IndexError
|
|
|
|
|
|
|
|
P.force_close_all()
|
|
|
|
|
|
|
|
def test_add_when_empty(self):
|
|
|
|
if self.abstract:
|
|
|
|
return
|
|
|
|
P = self.create_resource(None, None)
|
|
|
|
P._resource.queue[:] = []
|
|
|
|
self.assertFalse(P._resource.queue)
|
|
|
|
P._add_when_empty()
|
|
|
|
self.assertTrue(P._resource.queue)
|
|
|
|
|
2010-10-22 13:42:47 +00:00
|
|
|
|
|
|
|
class test_ConnectionPool(ResourceCase):
|
|
|
|
abstract = False
|
|
|
|
|
|
|
|
def create_resource(self, limit, preload):
|
2012-06-24 15:32:17 +00:00
|
|
|
return Connection(port=5672, transport=Transport).Pool(limit, preload)
|
2010-10-22 13:42:47 +00:00
|
|
|
|
|
|
|
def test_setup(self):
|
|
|
|
P = self.create_resource(10, 2)
|
|
|
|
q = P._resource.queue
|
|
|
|
self.assertIsNotNone(q[0]._connection)
|
|
|
|
self.assertIsNotNone(q[1]._connection)
|
2011-09-01 13:50:57 +00:00
|
|
|
self.assertIsNone(q[2]()._connection)
|
2010-10-22 13:42:47 +00:00
|
|
|
|
2013-06-18 11:29:00 +00:00
|
|
|
def test_acquire_raises_evaluated(self):
|
|
|
|
P = self.create_resource(1, 0)
|
|
|
|
# evaluate the connection first
|
|
|
|
r = P.acquire()
|
|
|
|
r.release()
|
|
|
|
P.prepare = Mock()
|
|
|
|
P.prepare.side_effect = MemoryError()
|
|
|
|
P.release = Mock()
|
|
|
|
with self.assertRaises(MemoryError):
|
2013-06-18 12:09:02 +00:00
|
|
|
with P.acquire():
|
2013-06-18 11:29:00 +00:00
|
|
|
assert False
|
|
|
|
P.release.assert_called_with(r)
|
|
|
|
|
2012-11-27 15:10:13 +00:00
|
|
|
def test_release_no__debug(self):
|
|
|
|
P = self.create_resource(10, 2)
|
|
|
|
R = Mock()
|
|
|
|
R._debug.side_effect = AttributeError()
|
|
|
|
P.release_resource(R)
|
|
|
|
|
2012-01-11 14:20:03 +00:00
|
|
|
def test_setup_no_limit(self):
|
|
|
|
P = self.create_resource(None, None)
|
|
|
|
self.assertFalse(P._resource.queue)
|
|
|
|
self.assertIsNone(P.limit)
|
|
|
|
|
|
|
|
def test_prepare_not_callable(self):
|
|
|
|
P = self.create_resource(None, None)
|
2012-06-24 15:32:17 +00:00
|
|
|
conn = Connection('memory://')
|
2012-01-11 14:20:03 +00:00
|
|
|
self.assertIs(P.prepare(conn), conn)
|
|
|
|
|
|
|
|
def test_acquire_channel(self):
|
|
|
|
P = self.create_resource(10, 0)
|
|
|
|
with P.acquire_channel() as (conn, channel):
|
|
|
|
self.assertIs(channel, conn.default_channel)
|
|
|
|
|
2010-10-22 13:42:47 +00:00
|
|
|
|
|
|
|
class test_ChannelPool(ResourceCase):
|
|
|
|
abstract = False
|
|
|
|
|
|
|
|
def create_resource(self, limit, preload):
|
2012-06-24 15:32:17 +00:00
|
|
|
return Connection(port=5672, transport=Transport) \
|
2013-01-17 13:50:01 +00:00
|
|
|
.ChannelPool(limit, preload)
|
2010-10-22 13:42:47 +00:00
|
|
|
|
|
|
|
def test_setup(self):
|
|
|
|
P = self.create_resource(10, 2)
|
|
|
|
q = P._resource.queue
|
|
|
|
self.assertTrue(q[0].basic_consume)
|
|
|
|
self.assertTrue(q[1].basic_consume)
|
2012-01-13 17:54:48 +00:00
|
|
|
with self.assertRaises(AttributeError):
|
2012-06-15 17:32:40 +00:00
|
|
|
getattr(q[2], 'basic_consume')
|
2012-01-11 14:20:03 +00:00
|
|
|
|
|
|
|
def test_setup_no_limit(self):
|
|
|
|
P = self.create_resource(None, None)
|
|
|
|
self.assertFalse(P._resource.queue)
|
|
|
|
self.assertIsNone(P.limit)
|
|
|
|
|
|
|
|
def test_prepare_not_callable(self):
|
|
|
|
P = self.create_resource(10, 0)
|
2012-06-24 15:32:17 +00:00
|
|
|
conn = Connection('memory://')
|
2012-01-11 14:20:03 +00:00
|
|
|
chan = conn.default_channel
|
|
|
|
self.assertIs(P.prepare(chan), chan)
|