From 8920f0d837d4cd762a3b6d6b06861b32377f6793 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Tue, 8 Nov 2011 17:19:49 +0000 Subject: [PATCH 01/20] Catch warnings in unittests --- kombu/tests/compat.py | 87 +++++++++++++++++++++++++++ kombu/tests/test_transport_virtual.py | 11 +++- 2 files changed, 96 insertions(+), 2 deletions(-) create mode 100644 kombu/tests/compat.py diff --git a/kombu/tests/compat.py b/kombu/tests/compat.py new file mode 100644 index 00000000..4cfe6b79 --- /dev/null +++ b/kombu/tests/compat.py @@ -0,0 +1,87 @@ +from __future__ import absolute_import + +import sys + + +class WarningMessage(object): + + """Holds the result of a single showwarning() call.""" + + _WARNING_DETAILS = ("message", "category", "filename", "lineno", "file", + "line") + + def __init__(self, message, category, filename, lineno, file=None, + line=None): + local_values = locals() + for attr in self._WARNING_DETAILS: + setattr(self, attr, local_values[attr]) + + self._category_name = category and category.__name__ or None + + def __str__(self): + return ("{message : %r, category : %r, filename : %r, lineno : %s, " + "line : %r}" % (self.message, self._category_name, + self.filename, self.lineno, self.line)) + + +class catch_warnings(object): + + """A context manager that copies and restores the warnings filter upon + exiting the context. + + The 'record' argument specifies whether warnings should be captured by a + custom implementation of warnings.showwarning() and be appended to a list + returned by the context manager. Otherwise None is returned by the context + manager. The objects appended to the list are arguments whose attributes + mirror the arguments to showwarning(). + + The 'module' argument is to specify an alternative module to the module + named 'warnings' and imported under that name. This argument is only + useful when testing the warnings module itself. + + """ + + def __init__(self, record=False, module=None): + """Specify whether to record warnings and if an alternative module + should be used other than sys.modules['warnings']. + + For compatibility with Python 3.0, please consider all arguments to be + keyword-only. + + """ + self._record = record + self._module = module is None and sys.modules["warnings"] or module + self._entered = False + + def __repr__(self): + args = [] + if self._record: + args.append("record=True") + if self._module is not sys.modules['warnings']: + args.append("module=%r" % self._module) + name = type(self).__name__ + return "%s(%s)" % (name, ", ".join(args)) + + def __enter__(self): + if self._entered: + raise RuntimeError("Cannot enter %r twice" % self) + self._entered = True + self._filters = self._module.filters + self._module.filters = self._filters[:] + self._showwarning = self._module.showwarning + if self._record: + log = [] + + def showwarning(*args, **kwargs): + log.append(WarningMessage(*args, **kwargs)) + + self._module.showwarning = showwarning + return log + else: + return None + + def __exit__(self, *exc_info): + if not self._entered: + raise RuntimeError("Cannot exit %r without entering first" % self) + self._module.filters = self._filters + self._module.showwarning = self._showwarning diff --git a/kombu/tests/test_transport_virtual.py b/kombu/tests/test_transport_virtual.py index 46a8f861..2984f07a 100644 --- a/kombu/tests/test_transport_virtual.py +++ b/kombu/tests/test_transport_virtual.py @@ -1,9 +1,12 @@ from kombu.tests.utils import unittest +import warnings + from kombu.connection import BrokerConnection from kombu.transport import virtual from kombu.utils import uuid +from kombu.tests.compat import catch_warnings from kombu.tests.utils import redirect_stdouts @@ -322,8 +325,12 @@ class test_Channel(unittest.TestCase): self.assertEqual(self.channel._qos.prefetch_count, 128) def test_lookup__undeliverable(self, n="test_lookup__undeliverable"): - self.assertListEqual(self.channel._lookup(n, n, "ae.undeliver"), - ["ae.undeliver"]) + warnings.resetwarnings() + with catch_warnings(record=True) as log: + self.assertListEqual(self.channel._lookup(n, n, "ae.undeliver"), + ["ae.undeliver"]) + self.assertTrue(log) + self.assertIn("could not be delivered", log[0].message.args[0]) def test_context(self): x = self.channel.__enter__() From 3cd0eb6f1b2b2e7bc919276a1b75a5c4b6082940 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 11 Nov 2011 16:26:59 +0100 Subject: [PATCH 02/20] First attempt to enable Mongo Replicasets --- kombu/connection.py | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/kombu/connection.py b/kombu/connection.py index d02a63c9..4521da05 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -38,19 +38,24 @@ URI_FORMAT = """\ def parse_url(url): + port = path = None auth = userid = password = None scheme = urlparse(url).scheme parts = urlparse(url.replace("%s://" % (scheme, ), "http://")) - netloc = parts.netloc - if '@' in netloc: - auth, _, netloc = partition(parts.netloc, '@') - userid, _, password = partition(auth, ':') - hostname, _, port = partition(netloc, ':') - path = parts.path or "" - if path and path[0] == '/': - path = path[1:] + if scheme != 'mongodb': + netloc = parts.netloc + if '@' in netloc: + auth, _, netloc = partition(parts.netloc, '@') + userid, _, password = partition(auth, ':') + hostname, _, port = partition(netloc, ':') + path = parts.path or "" + if path and path[0] == '/': + path = path[1:] + port = int(port) + else: + hostname = url[len('mongodb://'):] return dict({"hostname": hostname, - "port": port and int(port) or None, + "port": port or None, "userid": userid or None, "password": password or None, "transport": scheme, @@ -386,14 +391,15 @@ class BrokerConnection(object): port = fields["port"] userid = fields["userid"] password = fields["password"] - url = "%s://" % fields["transport"] + transport = fields["transport"] + url = "%s://" % transport if userid: url += userid if include_password and password: url += ':' + password url += '@' url += fields["hostname"] - if port: + if port and transport != "mongodb": url += ':' + str(port) url += '/' + fields["virtual_host"] return url From 4e9a5c4e181123ad28c05a4ea61fdb5385301fe4 Mon Sep 17 00:00:00 2001 From: Ivan Metzlar Date: Fri, 11 Nov 2011 16:44:47 +0100 Subject: [PATCH 03/20] Proper Mongodb Replicaset handling --- kombu/connection.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/kombu/connection.py b/kombu/connection.py index 4521da05..7020ceab 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -38,10 +38,15 @@ URI_FORMAT = """\ def parse_url(url): - port = path = None - auth = userid = password = None + port = path = auth = userid = password = None scheme = urlparse(url).scheme parts = urlparse(url.replace("%s://" % (scheme, ), "http://")) + + # The first pymongo.Connection() argument (host) can be + # a mongodb connection URI. If this is the case, don't + # use port but let pymongo get the port(s) from the URI instead. + # This enables the use of replica sets and sharding. + # See pymongo.Connection() for more info. if scheme != 'mongodb': netloc = parts.netloc if '@' in netloc: @@ -53,7 +58,9 @@ def parse_url(url): path = path[1:] port = int(port) else: + # strip the scheme since it is appended automatically hostname = url[len('mongodb://'):] + return dict({"hostname": hostname, "port": port or None, "userid": userid or None, @@ -399,8 +406,13 @@ class BrokerConnection(object): url += ':' + password url += '@' url += fields["hostname"] + + # If the transport equals 'mongodb' the + # hostname contains a full mongodb connection + # URI. Let pymongo retreive the port from there. if port and transport != "mongodb": url += ':' + str(port) + url += '/' + fields["virtual_host"] return url From 0154ed39ca2f584fcdcb5f756cdf6cd317676317 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Tue, 22 Nov 2011 16:29:27 +0000 Subject: [PATCH 04/20] Redis: Must map back to string before joining version. Closes #63. Thanks to martijnm --- kombu/tests/__init__.py | 4 +++- kombu/transport/pyredis.py | 3 ++- kombu/utils/encoding.py | 2 ++ 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/kombu/tests/__init__.py b/kombu/tests/__init__.py index d1bb075f..1e1e1e85 100644 --- a/kombu/tests/__init__.py +++ b/kombu/tests/__init__.py @@ -1,3 +1,5 @@ +from kombu.exceptions import VersionMismatch + moduleindex = ("kombu.abstract", "kombu.compat", "kombu.compression", @@ -30,5 +32,5 @@ def setup(): for module in moduleindex: try: __import__(module) - except ImportError: + except (ImportError, VersionMismatch): pass diff --git a/kombu/transport/pyredis.py b/kombu/transport/pyredis.py index eef0e447..da6f8e17 100644 --- a/kombu/transport/pyredis.py +++ b/kombu/transport/pyredis.py @@ -17,6 +17,7 @@ from kombu.exceptions import VersionMismatch from kombu.transport import virtual from kombu.utils import eventio from kombu.utils import cached_property +from kombu.utils.encoding import str_t DEFAULT_PORT = 6379 DEFAULT_DB = 0 @@ -326,7 +327,7 @@ class Channel(virtual.Channel): if version < (2, 4, 4): raise VersionMismatch( "Redis transport requires redis-py versions 2.4.4 or later. " - "You have %r" % (".".join(version), )) + "You have %r" % (".".join(map(str_t, version)), )) # KombuRedis maintains a connection attribute on it's instance and # uses that when executing commands diff --git a/kombu/utils/encoding.py b/kombu/utils/encoding.py index a51cbe31..1f02dfb3 100644 --- a/kombu/utils/encoding.py +++ b/kombu/utils/encoding.py @@ -29,6 +29,7 @@ if sys.version_info >= (3, 0): return bytes_to_str(s) return s + str_t = str bytes_t = bytes else: @@ -41,6 +42,7 @@ else: def bytes_to_str(s): # noqa return s + str_t = unicode bytes_t = str ensure_bytes = str_to_bytes From e4f3811c8af38b4cb043fbf5544da4699d626f79 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Wed, 23 Nov 2011 15:00:57 +0000 Subject: [PATCH 05/20] Fix up New BSD LICENSE layout --- LICENSE | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/LICENSE b/LICENSE index 81d19cef..f4ab4e29 100644 --- a/LICENSE +++ b/LICENSE @@ -1,23 +1,21 @@ -Copyright (c) 2009, Ask Solem +Copyright (c) 2009-2011, Ask Solem & contributors. All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: - - * Redistributions of source code must retain the above copyright notice, - this list of conditions and the following disclaimer. + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. - -Neither the name of Ask Solem nor the names of its contributors may be used -to endorse or promote products derived from this software without specific -prior written permission. + * Neither the name of Ask Solem nor the + names of its contributors may be used to endorse or promote products + derived from this software without specific prior written permission. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS +PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL Ask Solem OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS @@ -25,4 +23,3 @@ INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - From 3edba1812356f3b8cdaf75cea4859606e8d0189a Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Sun, 27 Nov 2011 11:15:50 +0000 Subject: [PATCH 06/20] parse url: Fixes port bug --- kombu/connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kombu/connection.py b/kombu/connection.py index 7020ceab..dc70a90b 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -56,7 +56,7 @@ def parse_url(url): path = parts.path or "" if path and path[0] == '/': path = path[1:] - port = int(port) + port = port and int(port) or port else: # strip the scheme since it is appended automatically hostname = url[len('mongodb://'):] From 4b7c002d46b71a77590b29bcbd10818b5835128b Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Tue, 22 Nov 2011 16:24:28 +0000 Subject: [PATCH 07/20] Fixes --- kombu/compat.py | 25 ++++++++++--------------- kombu/connection.py | 25 +++++++++++++------------ kombu/pools.py | 4 ++-- kombu/utils/__init__.py | 15 +++++++++++---- 4 files changed, 36 insertions(+), 33 deletions(-) diff --git a/kombu/compat.py b/kombu/compat.py index 5d490475..ac570573 100644 --- a/kombu/compat.py +++ b/kombu/compat.py @@ -32,16 +32,13 @@ class Publisher(messaging.Producer): durable = True auto_delete = False _closed = False - _provided_channel = None + _provided_channel = False def __init__(self, connection, exchange=None, routing_key=None, - exchange_type=None, durable=None, auto_delete=None, channel=None, - **kwargs): - self.connection = connection + exchange_type=None, durable=None, auto_delete=None, + channel=None, **kwargs): if channel: - self._provided_channel = self.backend = channel - else: - self.backend = connection.channel() + connection, self._provided_channel = channel, True self.exchange = exchange or self.exchange self.exchange_type = exchange_type or self.exchange_type @@ -58,20 +55,14 @@ class Publisher(messaging.Producer): routing_key=self.routing_key, auto_delete=self.auto_delete, durable=self.durable) - - super(Publisher, self).__init__(self.backend, self.exchange, - **kwargs) + super(Publisher, self).__init__(connection, self.exchange, **kwargs) def send(self, *args, **kwargs): return self.publish(*args, **kwargs) - def revive(self, channel): - self.backend = channel - super(Publisher, self).revive(channel) - def close(self): if not self._provided_channel: - self.backend.close() + self.channel.close() self._closed = True def __enter__(self): @@ -80,6 +71,10 @@ class Publisher(messaging.Producer): def __exit__(self, *exc_info): self.close() + @property + def backend(self): + return self.channel + class Consumer(messaging.Consumer): queue = "" diff --git a/kombu/connection.py b/kombu/connection.py index dc70a90b..a5d8881f 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -390,7 +390,7 @@ class BrokerConnection(object): info[key] = value return info - def __hash__(self): + def __eqhash__(self): return hash("|".join(map(str, self.info().itervalues()))) def as_uri(self, include_password=False): @@ -644,22 +644,22 @@ class Resource(object): if self.limit: while 1: try: - resource = self._resource.get(block=block, timeout=timeout) + R = self._resource.get(block=block, timeout=timeout) except Empty: self._add_when_empty() else: - resource = self.prepare(resource) - self._dirty.add(resource) + R = self.prepare(R) + self._dirty.add(R) break else: - resource = self.prepare(self.new()) + R = self.prepare(self.new()) @wraps(self.release) def _release(): - self.release(resource) - resource.release = _release + self.release(R) + R.release = _release - return resource + return R def prepare(self, resource): return resource @@ -675,9 +675,7 @@ class Resource(object): of defective resources.""" if self.limit: self._dirty.discard(resource) - self.close_resource(resource) - else: - self.close_resource(resource) + self.close_resource(resource) def release(self, resource): """Release resource so it can be used by another thread. @@ -725,18 +723,21 @@ class Resource(object): mutex.release() if os.environ.get("KOMBU_DEBUG_POOL"): - _orig_acquire = acquire _orig_release = release _next_resource_id = 0 def acquire(self, *args, **kwargs): # noqa + import traceback id = self._next_resource_id = self._next_resource_id + 1 print("+%s ACQUIRE %s" % (id, self.__class__.__name__, )) r = self._orig_acquire(*args, **kwargs) r._resource_id = id print("-%s ACQUIRE %s" % (id, self.__class__.__name__, )) + if not hasattr(r, "acquired_by"): + r.acquired_by = [] + r.acquired_by.append(traceback.format_stack()) return r def release(self, resource): # noqa diff --git a/kombu/pools.py b/kombu/pools.py index 20ea9e36..ddad5c33 100644 --- a/kombu/pools.py +++ b/kombu/pools.py @@ -14,7 +14,7 @@ from itertools import chain from kombu.connection import Resource from kombu.messaging import Producer -from kombu.utils import HashingDict +from kombu.utils import EqualityDict __all__ = ["ProducerPool", "PoolGroup", "register_group", "connections", "producers", "get_limit", "set_limit", "reset"] @@ -60,7 +60,7 @@ class ProducerPool(Resource): super(ProducerPool, self).release(resource) -class PoolGroup(HashingDict): +class PoolGroup(EqualityDict): def __init__(self, limit=None): self.limit = limit diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py index 3d64e71d..b4ac24bf 100644 --- a/kombu/utils/__init__.py +++ b/kombu/utils/__init__.py @@ -21,19 +21,26 @@ except: ctypes = None # noqa -class HashingDict(dict): +def eqhash(o): + try: + return o.__eqhash__() + except AttributeError: + return hash(o) + + +class EqualityDict(dict): def __getitem__(self, key): - h = hash(key) + h = eqhash(key) if h not in self: return self.__missing__(key) return dict.__getitem__(self, h) def __setitem__(self, key, value): - return dict.__setitem__(self, hash(key), value) + return dict.__setitem__(self, eqhash(key), value) def __delitem__(self, key): - return dict.__delitem__(self, hash(key)) + return dict.__delitem__(self, eqhash(key)) def say(m, *s): From 78dfb865c893108227f4afa7a75901fa4d2f663f Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Sun, 27 Nov 2011 11:25:30 +0000 Subject: [PATCH 08/20] PEP8ify + pyflakes --- kombu/tests/test_transport_virtual.py | 6 +++++- kombu/transport/virtual/__init__.py | 3 +-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/kombu/tests/test_transport_virtual.py b/kombu/tests/test_transport_virtual.py index 2984f07a..512ebbd2 100644 --- a/kombu/tests/test_transport_virtual.py +++ b/kombu/tests/test_transport_virtual.py @@ -326,11 +326,15 @@ class test_Channel(unittest.TestCase): def test_lookup__undeliverable(self, n="test_lookup__undeliverable"): warnings.resetwarnings() - with catch_warnings(record=True) as log: + context = catch_warnings(record=True) + log = context.__enter__() + try: self.assertListEqual(self.channel._lookup(n, n, "ae.undeliver"), ["ae.undeliver"]) self.assertTrue(log) self.assertIn("could not be delivered", log[0].message.args[0]) + finally: + context.__exit__() def test_context(self): x = self.channel.__enter__() diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index c40c10f8..d84f3b88 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -13,7 +13,6 @@ Emulates the AMQ API for non-AMQ transports. import base64 import socket import warnings -import os from itertools import count from time import sleep, time @@ -31,7 +30,7 @@ from kombu.transport.virtual.exchange import STANDARD_EXCHANGE_TYPES UNDELIVERABLE_FMT = """\ Message could not be delivered: No queues bound to exchange %(exchange)r -with binding key %(routing_key)r +using binding key %(routing_key)r """ From 16759ac7c1322224646f59df76c5e63fb6468300 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Sun, 27 Nov 2011 16:32:19 +0000 Subject: [PATCH 09/20] kombu.compat: Don't close None channel --- kombu/compat.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kombu/compat.py b/kombu/compat.py index ac570573..ef73c3d7 100644 --- a/kombu/compat.py +++ b/kombu/compat.py @@ -61,8 +61,9 @@ class Publisher(messaging.Producer): return self.publish(*args, **kwargs) def close(self): - if not self._provided_channel: + if self.channel is not None and not self._provided_channel: self.channel.close() + super(Publisher, self).close() self._closed = True def __enter__(self): From c5b4d473a237520d18535b87fe84dfd3f68c2dd4 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Tue, 15 Nov 2011 23:28:50 +0900 Subject: [PATCH 10/20] unicode.translate cannot take table encoded by string.maketrans function. It uses a dict mapping instead, so this patch replaces CHARS_REPLACE_TABLE with a dict of Unicode ordinals. Closes #82 --- kombu/transport/SQS.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index a17b13cd..d2a60a4b 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -30,11 +30,9 @@ from kombu.utils.encoding import safe_str # dots are replaced by dash, all other punctuation # replaced by underscore. -CHARS_REPLACE = string.punctuation.replace('-', '') \ - .replace('_', '') \ - .replace('.', '') -CHARS_REPLACE_TABLE = string.maketrans(CHARS_REPLACE + '.', - "_" * len(CHARS_REPLACE) + '-') +CHARS_REPLACE_TABLE = dict((ord(c), 95) + for c in string.punctuation if c not in '-_.') +CHARS_REPLACE_TABLE[46] = 45 class Table(Domain): From e68693f1ab47fc85afa03a6f430a06afe90558c4 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Wed, 16 Nov 2011 00:00:42 +0900 Subject: [PATCH 11/20] SQS Transport: Must have lowercase transport alias. This is because urlparse normalizes url schemes into all lowercase, making it impossible to resolve the SQS transport when using broker URLs. --- kombu/transport/SQS.py | 4 ++-- kombu/transport/__init__.py | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index d2a60a4b..01c0545d 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -30,9 +30,9 @@ from kombu.utils.encoding import safe_str # dots are replaced by dash, all other punctuation # replaced by underscore. -CHARS_REPLACE_TABLE = dict((ord(c), 95) +CHARS_REPLACE_TABLE = dict((ord(c), 0x5f) for c in string.punctuation if c not in '-_.') -CHARS_REPLACE_TABLE[46] = 45 +CHARS_REPLACE_TABLE[0x2e] = 0x2d # '.' -> '-' class Table(Domain): diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py index 0acdce36..26e16918 100644 --- a/kombu/transport/__init__.py +++ b/kombu/transport/__init__.py @@ -76,6 +76,7 @@ TRANSPORT_ALIASES = { "memory": "kombu.transport.memory.Transport", "redis": "kombu.transport.pyredis.Transport", "SQS": "kombu.transport.SQS.Transport", + "sqs": "kombu.transport.SQS.Transport", "beanstalk": "kombu.transport.beanstalk.Transport", "mongodb": "kombu.transport.mongodb.Transport", "couchdb": "kombu.transport.pycouchdb.Transport", From 2ee489777a333db223fea2da9764fa825af31c6d Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Sun, 27 Nov 2011 16:35:19 +0000 Subject: [PATCH 12/20] Adds Hong Minhee to AUTHORS --- AUTHORS | 1 + 1 file changed, 1 insertion(+) diff --git a/AUTHORS b/AUTHORS index 472bee9b..6b9bf355 100644 --- a/AUTHORS +++ b/AUTHORS @@ -19,6 +19,7 @@ David Strauss David Ziegler Flavio [FlaPer87] Percoco Premoli Gregory Haskins +Hong Minhee Ian Eure Ian Struble Ionel Maries Cristian From 196ba98d617a237b1cc41813f7f0b8f6a8de9cbf Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Sun, 27 Nov 2011 17:03:55 +0000 Subject: [PATCH 13/20] syn.blocking is useless, so remove usage of it --- docs/reference/kombu.syn.rst | 4 +-- kombu/entity.py | 65 ++++++++++++++++++------------------ kombu/messaging.py | 9 +++-- kombu/syn.py | 56 ++----------------------------- 4 files changed, 40 insertions(+), 94 deletions(-) diff --git a/docs/reference/kombu.syn.rst b/docs/reference/kombu.syn.rst index d3e1d23c..f5c650be 100644 --- a/docs/reference/kombu.syn.rst +++ b/docs/reference/kombu.syn.rst @@ -7,5 +7,5 @@ .. currentmodule:: kombu.syn .. automodule:: kombu.syn - :members: - :undoc-members: + + .. autofunction:: detect_environment diff --git a/kombu/entity.py b/kombu/entity.py index 08a12b9e..b7ab322e 100644 --- a/kombu/entity.py +++ b/kombu/entity.py @@ -9,7 +9,6 @@ Exchange and Queue declarations. """ from kombu.abstract import MaybeChannelBound -from kombu.syn import blocking as _SYN TRANSIENT_DELIVERY_MODE = 1 PERSISTENT_DELIVERY_MODE = 2 @@ -146,12 +145,12 @@ class Exchange(MaybeChannelBound): response will not be waited for. Default is :const:`False`. """ - return _SYN(self.channel.exchange_declare, exchange=self.name, - type=self.type, - durable=self.durable, - auto_delete=self.auto_delete, - arguments=self.arguments, - nowait=nowait) + return self.channel.exchange_declare(exchange=self.name, + type=self.type, + durable=self.durable, + auto_delete=self.auto_delete, + arguments=self.arguments, + nowait=nowait) def Message(self, body, delivery_mode=None, priority=None, content_type=None, content_encoding=None, properties=None, @@ -220,9 +219,9 @@ class Exchange(MaybeChannelBound): response will not be waited for. Default is :const:`False`. """ - return _SYN(self.channel.exchange_delete, exchange=self.name, - if_unused=if_unused, - nowait=nowait) + return self.channel.exchange_delete(exchange=self.name, + if_unused=if_unused, + nowait=nowait) def __eq__(self, other): if isinstance(other, Exchange): @@ -392,13 +391,13 @@ class Queue(MaybeChannelBound): without modifying the server state. """ - ret = _SYN(self.channel.queue_declare, queue=self.name, - passive=passive, - durable=self.durable, - exclusive=self.exclusive, - auto_delete=self.auto_delete, - arguments=self.queue_arguments, - nowait=nowait) + ret = self.channel.queue_declare(queue=self.name, + passive=passive, + durable=self.durable, + exclusive=self.exclusive, + auto_delete=self.auto_delete, + arguments=self.queue_arguments, + nowait=nowait) if not self.name: self.name = ret[0] return ret @@ -409,11 +408,11 @@ class Queue(MaybeChannelBound): :keyword nowait: Do not wait for a reply. """ - return _SYN(self.channel.queue_bind, queue=self.name, - exchange=self.exchange.name, - routing_key=self.routing_key, - arguments=self.binding_arguments, - nowait=nowait) + return self.channel.queue_bind(queue=self.name, + exchange=self.exchange.name, + routing_key=self.routing_key, + arguments=self.binding_arguments, + nowait=nowait) def get(self, no_ack=None): """Poll the server for a new message. @@ -430,14 +429,14 @@ class Queue(MaybeChannelBound): is more important than performance. """ - message = _SYN(self.channel.basic_get, queue=self.name, no_ack=no_ack) + message = self.channel.basic_get(queue=self.name, no_ack=no_ack) if message is not None: return self.channel.message_to_python(message) def purge(self, nowait=False): """Remove all messages from the queue.""" - return _SYN(self.channel.queue_purge, queue=self.name, - nowait=nowait) or 0 + return self.channel.queue_purge(queue=self.name, + nowait=nowait) or 0 def consume(self, consumer_tag='', callback=None, no_ack=None, nowait=False): @@ -484,17 +483,17 @@ class Queue(MaybeChannelBound): :keyword nowait: Do not wait for a reply. """ - return _SYN(self.channel.queue_delete, queue=self.name, - if_unused=if_unused, - if_empty=if_empty, - nowait=nowait) + return self.channel.queue_delete(queue=self.name, + if_unused=if_unused, + if_empty=if_empty, + nowait=nowait) def unbind(self): """Delete the binding on the server.""" - return _SYN(self.channel.queue_unbind, queue=self.name, - exchange=self.exchange.name, - routing_key=self.routing_key, - arguments=self.binding_arguments) + return self.channel.queue_unbind(queue=self.name, + exchange=self.exchange.name, + routing_key=self.routing_key, + arguments=self.binding_arguments) def __eq__(self, other): if isinstance(other, Queue): diff --git a/kombu/messaging.py b/kombu/messaging.py index 0d81e7a5..e9207e4d 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -13,7 +13,6 @@ from itertools import count from kombu import entity from kombu.compression import compress from kombu.serialization import encode -from kombu.syn import blocking as _SYN from kombu.utils import maybe_list @@ -383,9 +382,9 @@ class Consumer(object): Currently not supported by RabbitMQ. """ - return _SYN(self.channel.basic_qos, prefetch_size, - prefetch_count, - apply_global) + return self.channel.basic_qos(prefetch_size, + prefetch_count, + apply_global) def recover(self, requeue=False): """Redeliver unacknowledged messages. @@ -399,7 +398,7 @@ class Consumer(object): delivering it to an alternative subscriber. """ - return _SYN(self.channel.basic_recover, requeue=requeue) + return self.channel.basic_recover(requeue=requeue) def receive(self, body, message): """Method called when a message is received. diff --git a/kombu/syn.py b/kombu/syn.py index a80801aa..f2da7b12 100644 --- a/kombu/syn.py +++ b/kombu/syn.py @@ -1,64 +1,12 @@ -""" -kombu.syn -========= - -Thread synchronization. - -:copyright: (c) 2009 - 2011 by Ask Solem. -:license: BSD, see LICENSE for more details. - -""" import sys -#: current blocking method -__sync_current = None - def blocking(fun, *args, **kwargs): - """Make sure function is called by blocking and waiting for the result, - even if we're currently in a monkey patched eventlet/gevent - environment.""" - if __sync_current is None: - select_blocking_method(detect_environment()) - return __sync_current(fun, *args, **kwargs) + return fun(*args, **kwargs) def select_blocking_method(type): - """Select blocking method, where `type` is one of default - gevent or eventlet.""" - global __sync_current - __sync_current = {"eventlet": _sync_eventlet, - "gevent": _sync_gevent, - "default": _sync_default}[type]() - - -def _sync_default(): - """Create blocking primitive.""" - - def __blocking__(fun, *args, **kwargs): - return fun(*args, **kwargs) - - return __blocking__ - - -def _sync_eventlet(): - """Create Eventlet blocking primitive.""" - from eventlet import spawn - - def __eblocking__(fun, *args, **kwargs): - return spawn(fun, *args, **kwargs).wait() - - return __eblocking__ - - -def _sync_gevent(): - """Create gevent blocking primitive.""" - from gevent import Greenlet - - def __gblocking__(fun, *args, **kwargs): - return Greenlet.spawn(fun, *args, **kwargs).get() - - return __gblocking__ + pass def detect_environment(): From d44e2a26f2d52c940582c2f7672bb0f31dfb83ce Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Sun, 27 Nov 2011 17:42:13 +0000 Subject: [PATCH 14/20] paver bump now supports specifying version with -C arg --- pavement.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pavement.py b/pavement.py index 31e9532e..f83cff43 100644 --- a/pavement.py +++ b/pavement.py @@ -82,8 +82,14 @@ def readme(options): @task +@cmdopts([ + ("custom=", "C", "custom version"), +]) def bump(options): - sh("contrib/release/bump_version.py kombu/__init__.py README.rst") + s = "-- '%s'" % (options.custom, ) \ + if getattr(options, "custom") else "" + sh("contrib/release/bump_version.py \ + kombu/__init__.py README.rst %s" % (s, )) @task From 25993c9e0a8ae6aa2132af64a470d2fb6b9a3b5e Mon Sep 17 00:00:00 2001 From: Brian Bernstein Date: Wed, 26 Oct 2011 14:09:55 -0400 Subject: [PATCH 15/20] SQS Transport: Fix for KeyError on message acknowledgment-- (#73) Avoids duplicate keys on message delivery tags by using UUIds instead of a simple counter. Closes #73 --- kombu/transport/SQS.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index 01c0545d..dd2da307 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -285,6 +285,9 @@ class Channel(virtual.Channel): aws_secret_access_key=conninfo.password, port=conninfo.port) + def _next_delivery_tag(self): + return uuid() # See #73 + @property def sqs(self): if self._sqs is None: From d0ca4c263b68ee8c29c18defe3954ca4849a0c45 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Sun, 27 Nov 2011 17:38:37 +0000 Subject: [PATCH 16/20] Adds Brian Bernstein to AUTHORS --- AUTHORS | 1 + 1 file changed, 1 insertion(+) diff --git a/AUTHORS b/AUTHORS index 6b9bf355..206e66aa 100644 --- a/AUTHORS +++ b/AUTHORS @@ -10,6 +10,7 @@ Andrew Watts Andy McCurdy Anton Gyllenberg Ask Solem +Brian Bernstein Christophe Chauvet Christopher Grebs Dan McGee From 321c8baebbae83bb25e2d8c545c7060655fb2a85 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Sun, 27 Nov 2011 17:51:06 +0000 Subject: [PATCH 17/20] Fixes 'function object has no attribute "close"'. Closes #78 --- kombu/connection.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/kombu/connection.py b/kombu/connection.py index a5d8881f..f59c65ff 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -706,7 +706,10 @@ class Resource(object): dres = dirty.pop() except KeyError: break - self.close_resource(dres) + try: + self.close_resource(dres) + except AttributeError: # Issue #78 + pass mutex = getattr(resource, "mutex", None) if mutex: @@ -717,7 +720,10 @@ class Resource(object): res = resource.queue.pop() except IndexError: break - self.close_resource(res) + try: + self.close_resource(res) + except AttributeError: + pass # Issue #78 finally: if mutex: mutex.release() From 097dff9d32352581f8ac20a88553fd78dca3acdf Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Sun, 27 Nov 2011 17:25:46 +0000 Subject: [PATCH 18/20] Bumps version to 1.5.0 and updates Changelog --- Changelog | 65 +++++++++++++++++++++++++++++++++++++++++++++++ README.rst | 2 +- kombu/__init__.py | 2 +- 3 files changed, 67 insertions(+), 2 deletions(-) diff --git a/Changelog b/Changelog index 34caa701..e17766fa 100644 --- a/Changelog +++ b/Changelog @@ -2,6 +2,71 @@ Change history ================ +.. _version-1.5.0: + +1.5.0 +===== +:release-date: 2011-11-27 06:00 P.M GMT +:by: Ask Solem + +* kombu.pools: Fixed a bug resulting in resources not being properly released. + + This was caused by the use of ``__hash__`` to distinguish them. + +* Virtual transports: Dead-letter queue is now disabled by default. + + The dead-letter queue was enabled by default to help application + authors, but now that Kombu is stable it should be removed. + There are after all many cases where messages should just be dropped + when there are no queues to buffer them, and keeping them without + supporting automatic cleanup is rather considered a resource leak + than a feature. + + If wanted the dead-letter queue can still be enabled, by using + the ``deadletter_queue`` transport option:: + + >>> x = BrokerConnection("redis://", + ... transport_options={"deadletter_queue": "ae.undeliver"}) + + In addition, an :class:`UndeliverableWarning` is now emitted when + the dead-letter queue is enabled and a message ends up there. + + Contributed by Ionel Maries Cristian. + +* MongoDB transport now supports Replicasets (Issue #81). + + Contributed by Ivan Metzlar. + +* The ``Connection.ensure`` methods now accepts a ``max_retries`` value + of 0. + + A value of 0 now means *do not retry*, which is distinct from :const:`None` + which means *retry indefinitely*. + + Contributed by Dan McGee. + +* SQS Transport: Now has a lowercase ``sqs`` alias, so that it can be + used with broker URLs (Issue #82). + + Fix contributed by Hong Minhee + +* SQS Transport: Fixes KeyError on message acknowledgements (Issue #73). + + The SQS transport now uses UUID's for delivery tags, rather than + a counter. + + Fix contributed by Brian Bernstein. + +* SQS Transport: Unicode related fixes (Issue #82). + + Fix contributed by Hong Minhee. + +* Redis version check could crash because of improper handling of types + (Issue #63). + +* Fixed error with `Resource.force_close_all`, when resources + not yet properly initialized. (Issue #78). + .. _version-1.4.3: 1.4.3 diff --git a/README.rst b/README.rst index c8a6f76f..34ab8883 100644 --- a/README.rst +++ b/README.rst @@ -2,7 +2,7 @@ kombu - AMQP Messaging Framework for Python ############################################# -:Version: 1.4.3 +:Version: 1.5.0 Synopsis ======== diff --git a/kombu/__init__.py b/kombu/__init__.py index 41ffa619..55769481 100644 --- a/kombu/__init__.py +++ b/kombu/__init__.py @@ -1,5 +1,5 @@ """AMQP Messaging Framework for Python""" -VERSION = (1, 4, 3) +VERSION = (1, 5, 0) __version__ = ".".join(map(str, VERSION[0:3])) + "".join(VERSION[3:]) __author__ = "Ask Solem" __contact__ = "ask@celeryproject.org" From 9fa04affa8dafcdb9bcc52be55f0c50519e92fe1 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Mon, 28 Nov 2011 13:08:46 +0000 Subject: [PATCH 19/20] Fixes typos --- Changelog | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Changelog b/Changelog index e17766fa..3e9eb696 100644 --- a/Changelog +++ b/Changelog @@ -64,8 +64,8 @@ * Redis version check could crash because of improper handling of types (Issue #63). -* Fixed error with `Resource.force_close_all`, when resources - not yet properly initialized. (Issue #78). +* Fixed error with `Resource.force_close_all` when resources + were not yet properly initialized (Issue #78). .. _version-1.4.3: From 2c459e4c8c63995a56e429ae767166147237fd76 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Mon, 28 Nov 2011 13:09:23 +0000 Subject: [PATCH 20/20] Adds ability to disable content_types in the serializer registry --- kombu/serialization.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/kombu/serialization.py b/kombu/serialization.py index 378cf5d7..916954c3 100644 --- a/kombu/serialization.py +++ b/kombu/serialization.py @@ -65,6 +65,7 @@ class SerializerRegistry(object): self._default_encode = None self._default_content_type = None self._default_content_encoding = None + self._disabled_content_types = set() self.type_to_name = {} def register(self, name, encoder, decoder, content_type, @@ -75,6 +76,11 @@ class SerializerRegistry(object): self._decoders[content_type] = decoder self.type_to_name[content_type] = name + def disable(self, name): + if '/' not in name: + name = self.type_to_name[name] + self._disabled_content_types.add(name) + def unregister(self, name): try: content_type = self._encoders[name][0] @@ -134,7 +140,10 @@ class SerializerRegistry(object): payload = encoder(data) return content_type, content_encoding, payload - def decode(self, data, content_type, content_encoding): + def decode(self, data, content_type, content_encoding, force=False): + if content_type in self._disabled_content_types: + raise SerializerNotInstalled( + "Content-type %r has been disabled." % (content_type, )) content_type = content_type or 'application/data' content_encoding = (content_encoding or 'utf-8').lower()