diff --git a/kombu/tests/test_connection.py b/kombu/tests/test_connection.py index 854f7a00..b2116edb 100644 --- a/kombu/tests/test_connection.py +++ b/kombu/tests/test_connection.py @@ -29,3 +29,57 @@ class test_Connection(unittest.TestCase): conn.__exit__() self.assertIsNone(conn.connection) conn.close() # again + + +class ResourceCase(unittest.TestCase): + abstract = True + + def create_resource(self, limit, preload): + raise NotImplementedError("subclass responsibility") + + def assertState(self, P, avail, dirty): + self.assertEqual(P._resource.qsize(), avail) + self.assertEqual(len(P._dirty), dirty) + + def test_acquire__release(self): + if self.abstract: + return + P = self.create_resource(10, 0) + self.assertState(P, 10, 0) + chans = [P.acquire() for _ in xrange(10)] + self.assertState(P, 0, 10) + self.assertRaises(P.LimitExceeded, P.acquire) + chans.pop().release() + self.assertState(P, 1, 9) + [chan.release() for chan in chans] + self.assertState(P, 10, 0) + + +class test_ConnectionPool(ResourceCase): + abstract = False + + def create_resource(self, limit, preload): + return BrokerConnection(port=5672, transport=Transport) \ + .Pool(limit, preload) + + def test_setup(self): + P = self.create_resource(10, 2) + q = P._resource.queue + self.assertIsNotNone(q[0]._connection) + self.assertIsNotNone(q[1]._connection) + self.assertIsNone(q[2]._connection) + + +class test_ChannelPool(ResourceCase): + abstract = False + + def create_resource(self, limit, preload): + return BrokerConnection(port=5672, transport=Transport) \ + .ChannelPool(limit, preload) + + def test_setup(self): + P = self.create_resource(10, 2) + q = P._resource.queue + self.assertTrue(q[0].basic_consume) + self.assertTrue(q[1].basic_consume) + self.assertRaises(AttributeError, getattr, q[2], "basic_consume")