100% coverage for kombu.transport.librabbitmq

This commit is contained in:
Ask Solem 2013-09-11 17:02:23 +01:00
parent 9dc3ceaa43
commit a136c7b434
1 changed files with 156 additions and 0 deletions

View File

@ -0,0 +1,156 @@
from __future__ import absolute_import
try:
import librabbitmq
except ImportError:
librabbitmq = None # noqa
else:
from kombu.transport import librabbitmq # noqa
from kombu import Connection
from kombu.tests.case import Case, Mock, SkipTest, patch
class lrmqCase(Case):
def setUp(self):
if librabbitmq is None:
raise SkipTest('librabbitmq is not installed')
class test_Message(lrmqCase):
def test_init(self):
chan = Mock(name='channel')
message = librabbitmq.Message(chan, {'prop': 42},
{'delivery_tag': 337}, 'body')
self.assertEqual(message.body, 'body')
self.assertEqual(message.delivery_tag, 337)
self.assertEqual(message.properties['prop'], 42)
class test_Channel(lrmqCase):
def test_prepare_message(self):
conn = Mock(name='connection')
chan = librabbitmq.Channel(conn, 1)
self.assertTrue(chan)
body = 'the quick brown fox...'
properties = {'name': 'Elaine M.'}
body2, props2 = chan.prepare_message(
body, properties=properties,
priority=999,
content_type='ctype',
content_encoding='cenc',
headers={'H': 2},
)
self.assertEqual(props2['name'], 'Elaine M.')
self.assertEqual(props2['priority'], 999)
self.assertEqual(props2['content_type'], 'ctype')
self.assertEqual(props2['content_encoding'], 'cenc')
self.assertEqual(props2['headers'], {'H': 2})
self.assertEqual(body2, body)
body3, props3 = chan.prepare_message(body, priority=777)
self.assertEqual(props3['priority'], 777)
self.assertEqual(body3, body)
class test_Transport(lrmqCase):
def setUp(self):
super(test_Transport, self).setUp()
self.client = Mock(name='client')
self.T = librabbitmq.Transport(self.client)
def test_driver_version(self):
self.assertTrue(self.T.driver_version())
def test_create_channel(self):
conn = Mock(name='connection')
chan = self.T.create_channel(conn)
self.assertTrue(chan)
conn.channel.assert_called_with()
def test_drain_events(self):
conn = Mock(name='connection')
self.T.drain_events(conn, timeout=1.33)
conn.drain_events.assert_called_with(timeout=1.33)
def test_establish_connection_SSL_not_supported(self):
self.client.ssl = True
with self.assertRaises(NotImplementedError):
self.T.establish_connection()
def test_establish_connection(self):
self.T.Connection = Mock(name='Connection')
self.T.client.ssl = False
self.T.client.port = None
self.T.client.transport_options = {}
conn = self.T.establish_connection()
self.assertEqual(
self.T.client.port,
self.T.default_connection_params['port'],
)
self.assertEqual(conn.client, self.T.client)
self.assertEqual(self.T.client.drain_events, conn.drain_events)
def test_collect__no_conn(self):
self.T.client.drain_events = 1234
self.T._collect(None)
self.assertIsNone(self.client.drain_events)
self.assertIsNone(self.T.client)
def test_collect__with_conn(self):
self.T.client.drain_events = 1234
conn = Mock(name='connection')
chans = conn.channels = {1: Mock(name='chan1'), 2: Mock(name='chan2')}
conn.callbacks = {'foo': Mock(name='cb1'), 'bar': Mock(name='cb2')}
for i, chan in enumerate(conn.channels.values()):
chan.connection = i
with patch('os.close') as close:
self.T._collect(conn)
close.assert_called_with(conn.fileno())
self.assertFalse(conn.channels)
self.assertFalse(conn.callbacks)
for chan in chans.values():
self.assertIsNone(chan.connection)
self.assertIsNone(self.client.drain_events)
self.assertIsNone(self.T.client)
with patch('os.close') as close:
self.T.client = self.client
close.side_effect = OSError()
self.T._collect(conn)
close.assert_called_with(conn.fileno())
def test_eventmap(self):
conn = Mock()
self.assertDictEqual(
self.T.eventmap(conn),
{conn.fileno(): self.client.drain_nowait},
)
def test_on_poll_start(self):
self.assertDictEqual(self.T.on_poll_start(), {})
def test_on_poll_init(self):
self.T.on_poll_init(poller=Mock(name='poller'))
def test_verify_connection(self):
conn = Mock(name='connection')
conn.connected = True
self.assertTrue(self.T.verify_connection(conn))
def test_close_connection(self):
conn = Mock(name='connection')
self.client.drain_events = 1234
self.T.close_connection(conn)
self.assertIsNone(self.client.drain_events)
conn.close.assert_called_with()