Fixes warning from amqplib's Transport.__del__

This commit is contained in:
Ask Solem 2012-02-21 15:12:55 +00:00
parent c661f19075
commit 2014371cd4
2 changed files with 14 additions and 0 deletions

View File

@ -128,6 +128,7 @@ class test_connection_utils(TestCase):
virtual_host="/")
def test_url_IPV6(self):
C = BrokerConnection
raise SkipTest("urllib can't parse ipv6 urls")
self.assert_info(

View File

@ -35,6 +35,19 @@ DEFAULT_PORT = 5672
transport.AMQP_PROTOCOL_HEADER = str_to_bytes("AMQP\x01\x01\x08\x00")
# - fixes warning when socket is not connected.
_del = transport._AbstractTransport.__del__
for cls in transport.TCPTransport, transport.SSLTransport:
class _Transport(cls):
def __del__(self):
try:
_del(self)
except socket.error:
pass
setattr(transport, cls.__name__, _Transport)
class Connection(amqp.Connection): # pragma: no cover
def _do_close(self, *args, **kwargs):