From ad2b2f44db64cd212b746012d85110aa76189274 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Wed, 28 Nov 2012 12:32:28 +0000 Subject: [PATCH 01/13] Result of Queue.as_dict is now JSON serializable again. Closes #177 --- kombu/abstract.py | 6 +++--- kombu/entity.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/kombu/abstract.py b/kombu/abstract.py index e9c1bb59..f919ca44 100644 --- a/kombu/abstract.py +++ b/kombu/abstract.py @@ -38,11 +38,11 @@ class Object(object): setattr(self, name, None) def as_dict(self, recurse=False): - def f(obj): + def f(obj, type): if recurse and isinstance(obj, Object): return obj.as_dict(recurse=True) - return obj - return dict((attr, f(getattr(self, attr))) for attr, _ in self.attrs) + return type(obj) if type else obj + return dict((attr, f(getattr(self, attr), type)) for attr, type in self.attrs) def __reduce__(self): return unpickle_dict, (self.__class__, self.as_dict()) diff --git a/kombu/entity.py b/kombu/entity.py index 84a7a445..77cb277c 100644 --- a/kombu/entity.py +++ b/kombu/entity.py @@ -427,7 +427,7 @@ class Queue(MaybeChannelBound): ('auto_delete', bool), ('no_ack', None), ('alias', None), - ('bindings', None)) + ('bindings', list)) def __init__(self, name='', exchange=None, routing_key='', channel=None, bindings=None, **kwargs): From b7f4cfb6b2ceb91b842ae73a362919712f5f1ab3 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Wed, 28 Nov 2012 12:43:21 +0000 Subject: [PATCH 02/13] Improves Queue/binding/Exchange repr/str --- kombu/abstract.py | 3 ++- kombu/entity.py | 26 ++++++++++++++++++++------ 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/kombu/abstract.py b/kombu/abstract.py index f919ca44..62d39de5 100644 --- a/kombu/abstract.py +++ b/kombu/abstract.py @@ -42,7 +42,8 @@ class Object(object): if recurse and isinstance(obj, Object): return obj.as_dict(recurse=True) return type(obj) if type else obj - return dict((attr, f(getattr(self, attr), type)) for attr, type in self.attrs) + return dict((attr, f(getattr(self, attr), type)) + for attr, type in self.attrs) def __reduce__(self): return unpickle_dict, (self.__class__, self.as_dict()) diff --git a/kombu/entity.py b/kombu/entity.py index 77cb277c..d3ac8c74 100644 --- a/kombu/entity.py +++ b/kombu/entity.py @@ -17,6 +17,10 @@ DELIVERY_MODES = {'transient': TRANSIENT_DELIVERY_MODE, __all__ = ['Exchange', 'Queue'] +def pretty_bindings(bindings): + return '[%s]' % (', '.join(map(str, bindings))) + + class Exchange(MaybeChannelBound): """An Exchange declaration. @@ -266,7 +270,7 @@ class Exchange(MaybeChannelBound): return super(Exchange, self).__repr__(str(self)) def __str__(self): - return 'Exchange %s(%s)' % (self.name, self.type) + return 'Exchange %s(%s)' % (self.name or repr(''), self.type) @property def can_cache_declaration(self): @@ -311,7 +315,10 @@ class binding(object): nowait=nowait) def __repr__(self): - return ' %r>' % (self.exchange.name, self.routing_key) + return '' % (self, ) + + def __str__(self): + return '%s->%s' % (self.exchange.name, self.routing_key) class Queue(MaybeChannelBound): @@ -611,10 +618,17 @@ class Queue(MaybeChannelBound): return False def __repr__(self): - return super(Queue, self).__repr__( - 'Queue %s -> %s -> %s' % (self.name, - self.exchange, - self.routing_key)) + s = super(Queue, self).__repr__ + if self.bindings: + return s('Queue %r -> %s' % ( + self.name, + pretty_bindings(self.bindings), + )) + return s('Queue %r -> %s -> %r' % ( + self.name, + self.exchange, + self.routing_key or '', + )) @property def can_cache_declaration(self): From 92c1743cb6771954934c1531ca5e99661ef515f7 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Wed, 28 Nov 2012 12:47:03 +0000 Subject: [PATCH 03/13] Bumps version to 2.5.1 --- README.rst | 2 +- kombu/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.rst b/README.rst index a2b84029..032912e5 100644 --- a/README.rst +++ b/README.rst @@ -2,7 +2,7 @@ kombu - Messaging Framework for Python ======================================== -:Version: 2.5.0 +:Version: 2.5.1 `Kombu` is a messaging framework for Python. diff --git a/kombu/__init__.py b/kombu/__init__.py index 2ff23910..ec365100 100644 --- a/kombu/__init__.py +++ b/kombu/__init__.py @@ -1,7 +1,7 @@ """Messaging Framework for Python""" from __future__ import absolute_import -VERSION = (2, 5, 0) +VERSION = (2, 5, 1) __version__ = '.'.join(map(str, VERSION[0:3])) + ''.join(VERSION[3:]) __author__ = 'Ask Solem' __contact__ = 'ask@celeryproject.org' From d8b1d096d8a868e8a51c57c6f768784d1a68fc34 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Wed, 28 Nov 2012 12:47:15 +0000 Subject: [PATCH 04/13] Updates Changelog --- Changelog | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/Changelog b/Changelog index 0edbc49e..143ded1d 100644 --- a/Changelog +++ b/Changelog @@ -4,6 +4,15 @@ Change history ================ +.. _version-2.5.1: + +2.5.1 +===== +:release-date: 2012-11-28 12:45 P.M UTC + +- Fixed bug where return value of Queue.as_dict could not be serialized with + JSON (Issue #177). + .. _version-2.5.0: 2.5.0 From 3dad58fed2c4bb26bfe6f93433348e273a56f18e Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Thu, 29 Nov 2012 12:23:51 +0000 Subject: [PATCH 05/13] [Redis] Fixes connection leak --- kombu/transport/redis.py | 223 ++++++++++++++++++++++----------------- 1 file changed, 127 insertions(+), 96 deletions(-) diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index f7c1acee..94a123dd 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -25,6 +25,15 @@ from kombu.log import get_logger from kombu.utils import cached_property, uuid from kombu.utils.eventio import poll, READ, ERR +try: + from billiard.util import register_after_fork +except ImportError: + try: + from multiprocessing.util import register_after_fork # noqa + except ImportError: + def register_after_fork(*args, **kwargs): # noqa + pass + try: import redis except ImportError: @@ -92,12 +101,12 @@ class QoS(virtual.QoS): def append(self, message, delivery_tag): delivery = message.delivery_info EX, RK = delivery['exchange'], delivery['routing_key'] - self.client.pipeline() \ - .zadd(self.unacked_index_key, delivery_tag, time()) \ + with self.pipe_or_acquire() as pipe: + pipe.zadd(self.unacked_index_key, delivery_tag, time()) \ .hset(self.unacked_key, delivery_tag, dumps([message._raw, EX, RK])) \ .execute() - super(QoS, self).append(message, delivery_tag) + super(QoS, self).append(message, delivery_tag) def restore_unacked(self): for tag in self._delivered: @@ -111,39 +120,43 @@ class QoS(virtual.QoS): def reject(self, delivery_tag, requeue=False): self.ack(delivery_tag) + @contextmanager + def pipe_or_acquire(self, pipe=None): + if pipe: + yield pipe + else: + with self.channel.conn_or_acquire() as client: + yield client.pipeline() + def _remove_from_indices(self, delivery_tag, pipe=None): - return (pipe or self.client.pipeline()) \ - .zrem(self.unacked_index_key, delivery_tag) \ - .hdel(self.unacked_key, delivery_tag) + with self.pipe_or_acquire(pipe) as pipe: + return pipe.zrem(self.unacked_index_key, delivery_tag) \ + .hdel(self.unacked_key, delivery_tag) def restore_visible(self, start=0, num=10, interval=10): self._vrestore_count += 1 if (self._vrestore_count - 1) % interval: return - client = self.client - ceil = time() - self.visibility_timeout - try: - with Mutex(client, self.unacked_mutex_key, - self.unacked_mutex_expire): - visible = client.zrevrangebyscore( + with self.channel.conn_or_acquire() as client: + ceil = time() - self.visibility_timeout + try: + with Mutex(client, self.unacked_mutex_key, + self.unacked_mutex_expire): + visible = client.zrevrangebyscore( self.unacked_index_key, ceil, 0, start=num and start, num=num, withscores=True) - for tag, score in visible or []: - self.restore_by_tag(tag, client) - except MutexHeld: - pass + for tag, score in visible or []: + self.restore_by_tag(tag, client) + except MutexHeld: + pass def restore_by_tag(self, tag, client=None): - client = client or self.client - p, _, _ = self._remove_from_indices(tag, - client.pipeline().hget(self.unacked_key, tag)).execute() - if p: - M, EX, RK = loads(p) - self.channel._do_restore_message(M, EX, RK, client) - - @property - def client(self): - return self.channel._avail_client + with self.channel.conn_or_acquire(client) as client: + p, _, _ = self._remove_from_indices(tag, + client.pipeline().hget(self.unacked_key, tag)).execute() + if p: + M, EX, RK = loads(p) + self.channel._do_restore_message(M, EX, RK, client) @cached_property def unacked_key(self): @@ -303,6 +316,7 @@ class Channel(virtual.Channel): unacked_restore_limit = None visibility_timeout = 3600 # 1 hour priority_steps = PRIORITY_STEPS + max_connections = 10 _pool = None from_transport_options = (virtual.Channel.from_transport_options @@ -312,6 +326,7 @@ class Channel(virtual.Channel): 'unacked_mutex_expire', 'visibility_timeout', 'unacked_restore_limit', + 'max_connections', 'priority_steps')) def __init__(self, *args, **kwargs): @@ -339,29 +354,35 @@ class Channel(virtual.Channel): # are still waiting for data. self.connection_errors = self.connection.connection_errors + register_after_fork(self, self._after_fork) + + def _after_fork(self): + if self._pool is not None: + self._pool.disconnect() + def _do_restore_message(self, payload, exchange, routing_key, client=None): - client = client or self._avail_client - try: + with self.conn_or_acquire(client) as client: try: - payload['headers']['redelivered'] = True - except KeyError: - pass - for queue in self._lookup(exchange, routing_key): - client.lpush(queue, dumps(payload)) - except Exception: - logger.critical('Could not restore message: %r', payload, - exc_info=True) + try: + payload['headers']['redelivered'] = True + except KeyError: + pass + for queue in self._lookup(exchange, routing_key): + client.lpush(queue, dumps(payload)) + except Exception: + logger.critical('Could not restore message: %r', payload, + exc_info=True) def _restore(self, message, payload=None): tag = message.delivery_tag - client = self._avail_client - P, _ = client.pipeline() \ - .hget(self.unacked_key, tag) \ - .hdel(self.unacked_key, tag) \ - .execute() - if P: - M, EX, RK = loads(P) - self._do_restore_message(M, EX, RK, client) + with self.conn_or_acquire() as client: + P, _ = client.pipeline() \ + .hget(self.unacked_key, tag) \ + .hdel(self.unacked_key, tag) \ + .execute() + if P: + M, EX, RK = loads(P) + self._do_restore_message(M, EX, RK, client) def _next_delivery_tag(self): return uuid() @@ -461,18 +482,20 @@ class Channel(virtual.Channel): pass def _get(self, queue): - for pri in PRIORITY_STEPS: - item = self._avail_client.rpop(self._q_for_pri(queue, pri)) - if item: - return loads(item) - raise Empty() + with self.conn_or_acquire() as client: + for pri in PRIORITY_STEPS: + item = client.rpop(self._q_for_pri(queue, pri)) + if item: + return loads(item) + raise Empty() def _size(self, queue): - cmds = self._avail_client.pipeline() - for pri in PRIORITY_STEPS: - cmds = cmds.llen(self._q_for_pri(queue, pri)) - sizes = cmds.execute() - return sum(size for size in sizes if isinstance(size, int)) + with self.conn_or_acquire() as client: + cmds = client.pipeline() + for pri in PRIORITY_STEPS: + cmds = cmds.llen(self._q_for_pri(queue, pri)) + sizes = cmds.execute() + return sum(size for size in sizes if isinstance(size, int)) def _q_for_pri(self, queue, pri): pri = self.priority(pri) @@ -489,11 +512,13 @@ class Channel(virtual.Channel): message['properties']['delivery_info']['priority']), 9), 0) except (TypeError, ValueError, KeyError): pri = 0 - self._avail_client.lpush(self._q_for_pri(queue, pri), dumps(message)) + with self.conn_or_acquire() as client: + client.lpush(self._q_for_pri(queue, pri), dumps(message)) def _put_fanout(self, exchange, message, **kwargs): """Deliver fanout message.""" - self._avail_client.publish(exchange, dumps(message)) + with self.conn_or_acquire() as client: + client.publish(exchange, dumps(message)) def _new_queue(self, queue, auto_delete=False, **kwargs): if auto_delete: @@ -503,44 +528,49 @@ class Channel(virtual.Channel): if self.typeof(exchange).type == 'fanout': # Mark exchange as fanout. self._fanout_queues[queue] = exchange - self._avail_client.sadd(self.keyprefix_queue % (exchange, ), - self.sep.join([routing_key or '', - pattern or '', - queue or ''])) + with self.conn_or_acquire() as client: + client.sadd(self.keyprefix_queue % (exchange, ), + self.sep.join([routing_key or '', + pattern or '', + queue or ''])) def _delete(self, queue, exchange, routing_key, pattern, *args): self.auto_delete_queues.discard(queue) - self._avail_client.srem(self.keyprefix_queue % (exchange, ), - self.sep.join([routing_key or '', - pattern or '', - queue or ''])) - cmds = self._avail_client.pipeline() - for pri in PRIORITY_STEPS: - cmds = cmds.delete(self._q_for_pri(queue, pri)) - cmds.execute() + with self.conn_or_acquire() as client: + client.srem(self.keyprefix_queue % (exchange, ), + self.sep.join([routing_key or '', + pattern or '', + queue or ''])) + cmds = client.pipeline() + for pri in PRIORITY_STEPS: + cmds = cmds.delete(self._q_for_pri(queue, pri)) + cmds.execute() def _has_queue(self, queue, **kwargs): - cmds = self._avail_client.pipeline() - for pri in PRIORITY_STEPS: - cmds = cmds.exists(self._q_for_pri(queue, pri)) - return any(cmds.execute()) + with self.conn_or_acquire() as client: + cmds = client.pipeline() + for pri in PRIORITY_STEPS: + cmds = cmds.exists(self._q_for_pri(queue, pri)) + return any(cmds.execute()) def get_table(self, exchange): key = self.keyprefix_queue % exchange - values = self._avail_client.smembers(key) - if not values: - raise InconsistencyError( - 'Queue list empty or key does not exist: %r' % ( - self.keyprefix_queue % exchange)) - return [tuple(val.split(self.sep)) for val in values] + with self.conn_or_acquire() as client: + values = client.smembers(key) + if not values: + raise InconsistencyError( + 'Queue list empty or key does not exist: %r' % ( + self.keyprefix_queue % exchange)) + return [tuple(val.split(self.sep)) for val in values] def _purge(self, queue): - cmds = self.pipeline() - for pri in PRIORITY_STEPS: - priq = self._q_for_pri(queue, pri) - cmds = cmds.llen(priq).delete(priq) - sizes = cmds.execute() - return sum(sizes[::2]) + with self.conn_or_acquire() as client: + cmds = client.pipeline() + for pri in PRIORITY_STEPS: + priq = self._q_for_pri(queue, pri) + cmds = cmds.llen(priq).delete(priq) + sizes = cmds.execute() + return sum(sizes[::2]) def close(self): if self._pool: @@ -578,7 +608,8 @@ class Channel(virtual.Channel): return {'host': conninfo.hostname or '127.0.0.1', 'port': conninfo.port or DEFAULT_PORT, 'db': database, - 'password': conninfo.password} + 'password': conninfo.password, + 'max_connections': self.max_connections} def _create_client(self): return self.Client(connection_pool=self.pool) @@ -614,17 +645,17 @@ class Channel(virtual.Channel): return KombuRedis - def pipeline(self): - """Create a new pipeline using available connection.""" - return self._avail_client.pipeline() - - @property - def _avail_client(self): - """Returns new connection from the pool if the main client is - currently in polling-mode.""" - if self._in_poll: - return self._create_client() - return self.client + @contextmanager + def conn_or_acquire(self, client=None): + if client: + yield client + else: + if self._in_poll: + client = self._create_client() + yield client + self.pool.release(client.connection) + else: + yield self.client @property def pool(self): From ff6b20ce37c6b92bf50112444844d231ad386464 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Thu, 29 Nov 2012 12:34:34 +0000 Subject: [PATCH 06/13] Tests passing --- kombu/tests/transport/test_redis.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py index 5645e96f..26cfaaf6 100644 --- a/kombu/tests/transport/test_redis.py +++ b/kombu/tests/transport/test_redis.py @@ -383,7 +383,8 @@ class test_Channel(TestCase): def test_invalid_database_raises_ValueError(self): with self.assertRaises(ValueError): - Connection('redis:///dwqwewqe').channel() + self.channel.connection.client.virtual_host = 'dwqeq' + self.channel._connparams() @skip_if_not_module('redis') def test_get_client(self): @@ -410,13 +411,18 @@ class test_Channel(TestCase): self.channel._in_poll = False c = self.channel.client = Mock() - self.assertIs(self.channel._avail_client, c) + with self.channel.conn_or_acquire() as client: + self.assertIs(client, c) def test_avail_client_when_in_poll(self): self.channel._in_poll = True + self.channel._pool = Mock() cc = self.channel._create_client = Mock() + client = cc.return_value = Mock() - self.assertTrue(self.channel._avail_client) + with self.channel.conn_or_acquire() as _client: + pass + self.channel.pool.release.assert_called_with(client.connection) cc.assert_called_with() @skip_if_not_module('redis') From 48bbb3de19261dd2c662bd3a05960c051e64e5f5 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Thu, 29 Nov 2012 12:36:19 +0000 Subject: [PATCH 07/13] Bumps version to 2.5.2 and updates Changelog --- Changelog | 9 +++++++++ README.rst | 2 +- kombu/__init__.py | 2 +- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/Changelog b/Changelog index 143ded1d..22616640 100644 --- a/Changelog +++ b/Changelog @@ -4,6 +4,15 @@ Change history ================ +.. _version-2.5.2: + +2.5.2 +===== +:release-date: 2012-11-29 12:35 PM UTC + +- [Redis] Fixed connection leak and added a new 'max_connections' transport + option. + .. _version-2.5.1: 2.5.1 diff --git a/README.rst b/README.rst index 032912e5..d11e331e 100644 --- a/README.rst +++ b/README.rst @@ -2,7 +2,7 @@ kombu - Messaging Framework for Python ======================================== -:Version: 2.5.1 +:Version: 2.5.2 `Kombu` is a messaging framework for Python. diff --git a/kombu/__init__.py b/kombu/__init__.py index ec365100..5472fdeb 100644 --- a/kombu/__init__.py +++ b/kombu/__init__.py @@ -1,7 +1,7 @@ """Messaging Framework for Python""" from __future__ import absolute_import -VERSION = (2, 5, 1) +VERSION = (2, 5, 2) __version__ = '.'.join(map(str, VERSION[0:3])) + ''.join(VERSION[3:]) __author__ = 'Ask Solem' __contact__ = 'ask@celeryproject.org' From fd3aa52383212ec2edb686ef94cf8caa2c558dd1 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Thu, 29 Nov 2012 12:37:17 +0000 Subject: [PATCH 08/13] flakes --- kombu/tests/transport/test_redis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py index 26cfaaf6..e007664d 100644 --- a/kombu/tests/transport/test_redis.py +++ b/kombu/tests/transport/test_redis.py @@ -420,7 +420,7 @@ class test_Channel(TestCase): cc = self.channel._create_client = Mock() client = cc.return_value = Mock() - with self.channel.conn_or_acquire() as _client: + with self.channel.conn_or_acquire(): pass self.channel.pool.release.assert_called_with(client.connection) cc.assert_called_with() From 724a5d4fe61467d7e7e8f43ae6c6f105d4c7ef10 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Thu, 29 Nov 2012 12:44:14 +0000 Subject: [PATCH 09/13] Fixes problem with pidbox on 2.6 (kwargs must be str) --- kombu/pidbox.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kombu/pidbox.py b/kombu/pidbox.py index b991c914..cf74d67b 100644 --- a/kombu/pidbox.py +++ b/kombu/pidbox.py @@ -103,7 +103,7 @@ class Node(object): if message: self.adjust_clock(message.headers.get('clock') or 0) if not destination or self.hostname in destination: - return self.dispatch(**body) + return self.dispatch(**kwdict(body)) dispatch_from_message = handle_message def reply(self, data, exchange, routing_key, ticket, **kwargs): From 185e437af91a5921f75907966c32debb34dcfb23 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Thu, 29 Nov 2012 16:44:27 +0000 Subject: [PATCH 10/13] Updates Changelog --- Changelog | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/Changelog b/Changelog index 22616640..fd67e195 100644 --- a/Changelog +++ b/Changelog @@ -4,6 +4,18 @@ Change history ================ +.. _version-2.5.3: + +2.5.3 +===== +:release-date: 2012-11-29 12:35 PM UTC + +- Pidbox: Fixed compatibility with Python 2.6 + +2.5.2 +===== +:release-date: 2012-11-29 12:35 PM UTC + .. _version-2.5.2: 2.5.2 From 72d0d9d8ce511dd7628d73ba50fb6972b217afef Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Thu, 29 Nov 2012 16:44:34 +0000 Subject: [PATCH 11/13] Bumps version to 2.5.3 --- README.rst | 2 +- kombu/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.rst b/README.rst index d11e331e..ada99099 100644 --- a/README.rst +++ b/README.rst @@ -2,7 +2,7 @@ kombu - Messaging Framework for Python ======================================== -:Version: 2.5.2 +:Version: 2.5.3 `Kombu` is a messaging framework for Python. diff --git a/kombu/__init__.py b/kombu/__init__.py index 5472fdeb..1ae73ff3 100644 --- a/kombu/__init__.py +++ b/kombu/__init__.py @@ -1,7 +1,7 @@ """Messaging Framework for Python""" from __future__ import absolute_import -VERSION = (2, 5, 2) +VERSION = (2, 5, 3) __version__ = '.'.join(map(str, VERSION[0:3])) + ''.join(VERSION[3:]) __author__ = 'Ask Solem' __contact__ = 'ask@celeryproject.org' From 4dd702333622d9631b0580e4b4ae6863053caeca Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Thu, 6 Dec 2012 13:31:21 +0000 Subject: [PATCH 12/13] Fixes typo (conn.connected is a property). Closes #182 --- docs/userguide/connections.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/userguide/connections.rst b/docs/userguide/connections.rst index aa88add8..12e9249d 100644 --- a/docs/userguide/connections.rst +++ b/docs/userguide/connections.rst @@ -27,7 +27,7 @@ method:: You can also check whether the connection is connected:: - >>> connection.connected() + >>> connection.connected True Connections must always be closed after use:: From 635a10b9534c7e20aaaad258c332478908741f24 Mon Sep 17 00:00:00 2001 From: Dane Guempel Date: Fri, 7 Dec 2012 20:28:24 -0600 Subject: [PATCH 13/13] Changing the _info method of the connection class to respect the alt attribute of the object. fixes #185 --- kombu/connection.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/kombu/connection.py b/kombu/connection.py index dbe8c6a0..026b46f4 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -515,9 +515,14 @@ class Connection(object): if resolve: transport_cls = RESOLVE_ALIASES.get(transport_cls, transport_cls) D = self.transport.default_connection_params - hostname = self.hostname or D.get('hostname') - if self.uri_prefix: - hostname = '%s+%s' % (self.uri_prefix, hostname) + + if self.alt: + hostname = ";".join(self.alt) + else: + hostname = self.hostname or D.get('hostname') + if self.uri_prefix: + hostname = '%s+%s' % (self.uri_prefix, hostname) + info = (('hostname', hostname), ('userid', self.userid or D.get('userid')), ('password', self.password or D.get('password')),