From d6601ec92cde9fe7f8a271d6bf1828557c1df32e Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Thu, 7 Apr 2011 15:53:29 +0200 Subject: [PATCH] Closes cyclic reference in Connection --- funtests/transport.py | 44 +++++++++++++++++++++++++++-- kombu/__init__.py | 2 +- kombu/connection.py | 4 +++ kombu/tests/utils.py | 23 +++++++++++++++ kombu/transport/pyamqplib.py | 7 +++++ kombu/transport/virtual/__init__.py | 9 ++++-- 6 files changed, 83 insertions(+), 6 deletions(-) diff --git a/funtests/transport.py b/funtests/transport.py index a25dafb3..bca28119 100644 --- a/funtests/transport.py +++ b/funtests/transport.py @@ -5,11 +5,13 @@ import sys import time import unittest2 as unittest import warnings +import weakref from nose import SkipTest from kombu import BrokerConnection from kombu import Producer, Consumer, Exchange, Queue +from kombu.tests.utils import skip_if_quick if sys.version_info >= (2, 5): from hashlib import sha256 as _digest @@ -119,6 +121,7 @@ class TransportCase(unittest.TestCase): def _digest(self, data): return _digest(data).hexdigest() + @skip_if_quick def test_produce__consume_large_messages(self, bytes=1048576, n=10, charset=string.punctuation + string.letters + string.digits): if not self.verify_alive(): @@ -156,8 +159,6 @@ class TransportCase(unittest.TestCase): chan1.close() self.purge([self.queue.name]) - - def P(self, rest): return "%s.%s" % (self.prefix, rest) @@ -214,6 +215,45 @@ class TransportCase(unittest.TestCase): self.assertEqual(m.payload, {"basic.get": "this"}) chan2.close() + def test_cyclic_reference_transport(self): + if not self.verify_alive(): + return + + def _createref(): + conn = self.get_connection() + conn.transport + conn.close() + return weakref.ref(conn) + + self.assertIsNone(_createref()()) + + + def test_cyclic_reference_connection(self): + if not self.verify_alive(): + return + + def _createref(): + conn = self.get_connection() + conn.connect() + conn.close() + return weakref.ref(conn) + + self.assertIsNone(_createref()()) + + def test_cyclic_reference_channel(self): + if not self.verify_alive(): + return + + def _createref(): + conn = self.get_connection() + conn.connect() + channel = conn.channel() + channel.close() + conn.close() + return weakref.ref(conn) + + self.assertIsNone(_createref()()) + def tearDown(self): if self.transport and self.connected: self.connection.close() diff --git a/kombu/__init__.py b/kombu/__init__.py index da75b99e..383c3c71 100644 --- a/kombu/__init__.py +++ b/kombu/__init__.py @@ -14,7 +14,7 @@ if not os.environ.get("KOMBU_NO_EVAL", False): from types import ModuleType all_by_module = { - "kombu.connection": ["BrokerConnection"], + "kombu.connection": ["BrokerConnection", "Connection"], "kombu.entity": ["Exchange", "Queue"], "kombu.messaging": ["Consumer", "Producer"], } diff --git a/kombu/connection.py b/kombu/connection.py index e3e585f6..2744c0af 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -147,6 +147,9 @@ class BrokerConnection(object): pass self._connection = None self._debug("closed") + if self._transport: + self._transport.client = None + self._transport = None self._closed = True def release(self): @@ -456,6 +459,7 @@ class BrokerConnection(object): def channel_errors(self): """List of exceptions that may be raised by the channel.""" return self.transport.channel_errors +Connection = BrokerConnection class Resource(object): diff --git a/kombu/tests/utils.py b/kombu/tests/utils.py index 9f6e2a7d..cda7050d 100644 --- a/kombu/tests/utils.py +++ b/kombu/tests/utils.py @@ -1,9 +1,12 @@ import __builtin__ +import os import sys import types from StringIO import StringIO +from nose import SkipTest + from kombu.utils.functional import wraps try: @@ -89,3 +92,23 @@ def mask_modules(*modnames): return __inner return _inner + + +def skip_if_environ(env_var_name): + + def _wrap_test(fun): + + @wraps(fun) + def _skips_if_environ(*args, **kwargs): + if os.environ.get(env_var_name): + raise SkipTest("SKIP %s: %s set\n" % ( + fun.__name__, env_var_name)) + return fun(*args, **kwargs) + + return _skips_if_environ + + return _wrap_test + + +def skip_if_quick(fun): + return skip_if_environ("QUICKTEST")(fun) diff --git a/kombu/transport/pyamqplib.py b/kombu/transport/pyamqplib.py index 790b6c5b..da70e70c 100644 --- a/kombu/transport/pyamqplib.py +++ b/kombu/transport/pyamqplib.py @@ -178,6 +178,12 @@ class Channel(_Channel): """Convert encoded message body back to a Python value.""" return self.Message(self, raw_message) + def close(self): + try: + super(Channel, self).close() + finally: + self.connection = None + class Transport(base.Transport): Connection = Connection @@ -222,6 +228,7 @@ class Transport(base.Transport): def close_connection(self, connection): """Close the AMQP broker connection.""" + connection.client = None connection.close() def verify_connection(self, connection): diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index a7578ab1..c87dd62d 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -619,9 +619,12 @@ class Transport(base.Transport): def close_channel(self, channel): try: - self.channels.remove(channel) - except ValueError: - pass + try: + self.channels.remove(channel) + except ValueError: + pass + finally: + channel.connection = None def establish_connection(self): self._avail_channels.append(self.create_channel(self))