Merge branch 'master' into 2.0-devel

Conflicts:
	README.rst
	kombu/__init__.py
	kombu/compat.py
	kombu/connection.py
	kombu/entity.py
	kombu/messaging.py
	kombu/pools.py
	kombu/syn.py
	kombu/tests/__init__.py
	kombu/tests/test_transport_virtual.py
	kombu/transport/redis.py
	kombu/utils/encoding.py
This commit is contained in:
Ask Solem 2011-11-28 15:47:26 +00:00
commit 339607c2f9
16 changed files with 163 additions and 49 deletions

View File

@ -10,6 +10,7 @@ Andrew Watts
Andy McCurdy <andy@andymccurdy.com> Andy McCurdy <andy@andymccurdy.com>
Anton Gyllenberg <anton@iki.fi> Anton Gyllenberg <anton@iki.fi>
Ask Solem <ask@celeryproject.org> Ask Solem <ask@celeryproject.org>
Brian Bernstein
Christophe Chauvet <christophe.chauvet@gmail.com> Christophe Chauvet <christophe.chauvet@gmail.com>
Christopher Grebs <cg@webshox.org> Christopher Grebs <cg@webshox.org>
Clay Gerrard <clay.gerrard@gmail.com> Clay Gerrard <clay.gerrard@gmail.com>
@ -20,6 +21,7 @@ David Strauss <david@davidstrauss.net>
David Ziegler <david.ziegler@gmail.com> David Ziegler <david.ziegler@gmail.com>
Flavio [FlaPer87] Percoco Premoli <flaper87@flaper87.org> Flavio [FlaPer87] Percoco Premoli <flaper87@flaper87.org>
Gregory Haskins <greg@greghaskins.com> Gregory Haskins <greg@greghaskins.com>
Hong Minhee <minhee@dahlia.kr>
Ian Eure <ian.eure@gmail.com> Ian Eure <ian.eure@gmail.com>
Ian Struble <istruble@gmail.com> Ian Struble <istruble@gmail.com>
Ionel Maries Cristian <contact@ionelmc.ro> Ionel Maries Cristian <contact@ionelmc.ro>

View File

@ -2,6 +2,71 @@
Change history Change history
================ ================
.. _version-1.5.0:
1.5.0
=====
:release-date: 2011-11-27 06:00 P.M GMT
:by: Ask Solem
* kombu.pools: Fixed a bug resulting in resources not being properly released.
This was caused by the use of ``__hash__`` to distinguish them.
* Virtual transports: Dead-letter queue is now disabled by default.
The dead-letter queue was enabled by default to help application
authors, but now that Kombu is stable it should be removed.
There are after all many cases where messages should just be dropped
when there are no queues to buffer them, and keeping them without
supporting automatic cleanup is rather considered a resource leak
than a feature.
If wanted the dead-letter queue can still be enabled, by using
the ``deadletter_queue`` transport option::
>>> x = BrokerConnection("redis://",
... transport_options={"deadletter_queue": "ae.undeliver"})
In addition, an :class:`UndeliverableWarning` is now emitted when
the dead-letter queue is enabled and a message ends up there.
Contributed by Ionel Maries Cristian.
* MongoDB transport now supports Replicasets (Issue #81).
Contributed by Ivan Metzlar.
* The ``Connection.ensure`` methods now accepts a ``max_retries`` value
of 0.
A value of 0 now means *do not retry*, which is distinct from :const:`None`
which means *retry indefinitely*.
Contributed by Dan McGee.
* SQS Transport: Now has a lowercase ``sqs`` alias, so that it can be
used with broker URLs (Issue #82).
Fix contributed by Hong Minhee
* SQS Transport: Fixes KeyError on message acknowledgements (Issue #73).
The SQS transport now uses UUID's for delivery tags, rather than
a counter.
Fix contributed by Brian Bernstein.
* SQS Transport: Unicode related fixes (Issue #82).
Fix contributed by Hong Minhee.
* Redis version check could crash because of improper handling of types
(Issue #63).
* Fixed error with `Resource.force_close_all` when resources
were not yet properly initialized (Issue #78).
.. _version-1.4.3: .. _version-1.4.3:
1.4.3 1.4.3

17
LICENSE
View File

@ -1,23 +1,21 @@
Copyright (c) 2009, Ask Solem Copyright (c) 2009-2011, Ask Solem & contributors.
All rights reserved. All rights reserved.
Redistribution and use in source and binary forms, with or without Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met: modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
* Redistributions of source code must retain the above copyright notice, notice, this list of conditions and the following disclaimer.
this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright * Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution. documentation and/or other materials provided with the distribution.
* Neither the name of Ask Solem nor the
Neither the name of Ask Solem nor the names of its contributors may be used names of its contributors may be used to endorse or promote products
to endorse or promote products derived from this software without specific derived from this software without specific prior written permission.
prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL Ask Solem OR CONTRIBUTORS
BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
@ -25,4 +23,3 @@ INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE. POSSIBILITY OF SUCH DAMAGE.

View File

@ -7,5 +7,5 @@
.. currentmodule:: kombu.syn .. currentmodule:: kombu.syn
.. automodule:: kombu.syn .. automodule:: kombu.syn
:members:
:undoc-members: .. autofunction:: detect_environment

View File

@ -65,8 +65,9 @@ class Publisher(messaging.Producer):
return self.publish(*args, **kwargs) return self.publish(*args, **kwargs)
def close(self): def close(self):
if not self._provided_channel: if self.channel is not None and not self._provided_channel:
self.channel.close() self.channel.close()
super(Publisher, self).close()
self._closed = True self._closed = True
def __enter__(self): def __enter__(self):

View File

@ -45,19 +45,31 @@ __all__ = ["parse_url", "BrokerConnection", "Resource",
def parse_url(url): def parse_url(url):
auth = userid = password = None port = path = auth = userid = password = None
scheme = urlparse(url).scheme scheme = urlparse(url).scheme
parts = urlparse(url.replace("%s://" % (scheme, ), "http://")) parts = urlparse(url.replace("%s://" % (scheme, ), "http://"))
netloc = parts.netloc
if '@' in netloc: # The first pymongo.Connection() argument (host) can be
auth, _, netloc = parts.netloc.partition('@') # a mongodb connection URI. If this is the case, don't
userid, _, password = auth.partition(':') # use port but let pymongo get the port(s) from the URI instead.
hostname, _, port = netloc.partition(':') # This enables the use of replica sets and sharding.
path = parts.path or "" # See pymongo.Connection() for more info.
if path and path[0] == '/': if scheme == 'mongodb':
path = path[1:] # strip the scheme since it is appended automatically.
hostname = url[len('mongodb://'):]
else:
netloc = parts.netloc
if '@' in netloc:
auth, _, netloc = parts.netloc.partition('@')
userid, _, password = auth.partition(':')
hostname, _, port = netloc.partition(':')
path = parts.path or ""
if path and path[0] == '/':
path = path[1:]
port = port and int(port) or port
return dict({"hostname": hostname, return dict({"hostname": hostname,
"port": port and int(port) or None, "port": port or None,
"userid": userid or None, "userid": userid or None,
"password": password or None, "password": password or None,
"transport": scheme, "transport": scheme,
@ -393,15 +405,21 @@ class BrokerConnection(object):
port = fields["port"] port = fields["port"]
userid = fields["userid"] userid = fields["userid"]
password = fields["password"] password = fields["password"]
url = "%s://" % fields["transport"] transport = fields["transport"]
url = "%s://" % transport
if userid: if userid:
url += userid url += userid
if include_password and password: if include_password and password:
url += ':' + password url += ':' + password
url += '@' url += '@'
url += fields["hostname"] url += fields["hostname"]
if port:
# If the transport equals 'mongodb' the
# hostname contains a full mongodb connection
# URI. Let pymongo retreive the port from there.
if port and transport != "mongodb":
url += ':' + str(port) url += ':' + str(port)
url += '/' + fields["virtual_host"] url += '/' + fields["virtual_host"]
return url return url
@ -696,7 +714,10 @@ class Resource(object):
dres = dirty.pop() dres = dirty.pop()
except KeyError: except KeyError:
break break
self.close_resource(dres) try:
self.close_resource(dres)
except AttributeError: # Issue #78
pass
mutex = getattr(resource, "mutex", None) mutex = getattr(resource, "mutex", None)
if mutex: if mutex:
@ -707,7 +728,10 @@ class Resource(object):
res = resource.queue.pop() res = resource.queue.pop()
except IndexError: except IndexError:
break break
self.close_resource(res) try:
self.close_resource(res)
except AttributeError:
pass # Issue #78
finally: finally:
if mutex: if mutex:
mutex.release() mutex.release()

View File

@ -12,7 +12,7 @@ from __future__ import absolute_import
from itertools import count from itertools import count
from . import entity from .entity import Exchange, Queue
from .compression import compress from .compression import compress
from .serialization import encode from .serialization import encode
from .utils import maybe_list from .utils import maybe_list
@ -20,10 +20,6 @@ from .utils import maybe_list
__all__ = ["Exchange", "Queue", "Producer", "Consumer"] __all__ = ["Exchange", "Queue", "Producer", "Consumer"]
Exchange = entity.Exchange
Queue = entity.Queue
class Producer(object): class Producer(object):
"""Message Producer. """Message Producer.
@ -471,7 +467,6 @@ class Consumer(object):
message = self.channel.message_to_python(raw_message) message = self.channel.message_to_python(raw_message)
decoded = message.payload decoded = message.payload
except Exception, exc: except Exception, exc:
raise
if not self.on_decode_error: if not self.on_decode_error:
raise raise
self.on_decode_error(message, exc) self.on_decode_error(message, exc)

View File

@ -65,6 +65,7 @@ class SerializerRegistry(object):
self._default_encode = None self._default_encode = None
self._default_content_type = None self._default_content_type = None
self._default_content_encoding = None self._default_content_encoding = None
self._disabled_content_types = set()
self.type_to_name = {} self.type_to_name = {}
def register(self, name, encoder, decoder, content_type, def register(self, name, encoder, decoder, content_type,
@ -75,6 +76,11 @@ class SerializerRegistry(object):
self._decoders[content_type] = decoder self._decoders[content_type] = decoder
self.type_to_name[content_type] = name self.type_to_name[content_type] = name
def disable(self, name):
if '/' not in name:
name = self.type_to_name[name]
self._disabled_content_types.add(name)
def unregister(self, name): def unregister(self, name):
try: try:
content_type = self._encoders[name][0] content_type = self._encoders[name][0]
@ -134,7 +140,10 @@ class SerializerRegistry(object):
payload = encoder(data) payload = encoder(data)
return content_type, content_encoding, payload return content_type, content_encoding, payload
def decode(self, data, content_type, content_encoding): def decode(self, data, content_type, content_encoding, force=False):
if content_type in self._disabled_content_types:
raise SerializerNotInstalled(
"Content-type %r has been disabled." % (content_type, ))
content_type = content_type or 'application/data' content_type = content_type or 'application/data'
content_encoding = (content_encoding or 'utf-8').lower() content_encoding = (content_encoding or 'utf-8').lower()

View File

@ -10,13 +10,17 @@ from __future__ import absolute_import
import sys import sys
__all__ = ["blocking", "detect_environment"] __all__ = ["detect_environment"]
def blocking(fun, *args, **kwargs): def blocking(fun, *args, **kwargs):
return fun(*args, **kwargs) return fun(*args, **kwargs)
def select_blocking_method(type):
pass
def detect_environment(): def detect_environment():
## -eventlet- ## -eventlet-
if "eventlet" in sys.modules: if "eventlet" in sys.modules:

View File

@ -1,5 +1,7 @@
from __future__ import absolute_import from __future__ import absolute_import
from ..exceptions import VersionMismatch
moduleindex = ("kombu.abstract", moduleindex = ("kombu.abstract",
"kombu.compat", "kombu.compat",
"kombu.common", "kombu.common",
@ -36,5 +38,5 @@ def setup():
print("preimporting %r for coverage..." % (module, )) print("preimporting %r for coverage..." % (module, ))
try: try:
__import__(module) __import__(module)
except ImportError: except (ImportError, VersionMismatch):
pass pass

View File

@ -32,11 +32,9 @@ from . import virtual
# dots are replaced by dash, all other punctuation # dots are replaced by dash, all other punctuation
# replaced by underscore. # replaced by underscore.
CHARS_REPLACE = string.punctuation.replace('-', '') \ CHARS_REPLACE_TABLE = dict((ord(c), 0x5f)
.replace('_', '') \ for c in string.punctuation if c not in '-_.')
.replace('.', '') CHARS_REPLACE_TABLE[0x2e] = 0x2d # '.' -> '-'
CHARS_REPLACE_TABLE = string.maketrans(CHARS_REPLACE + '.',
"_" * len(CHARS_REPLACE) + '-')
class Table(Domain): class Table(Domain):
@ -289,6 +287,9 @@ class Channel(virtual.Channel):
aws_secret_access_key=conninfo.password, aws_secret_access_key=conninfo.password,
port=conninfo.port) port=conninfo.port)
def _next_delivery_tag(self):
return uuid() # See #73
@property @property
def sqs(self): def sqs(self):
if self._sqs is None: if self._sqs is None:

View File

@ -44,6 +44,7 @@ TRANSPORT_ALIASES = {
"memory": "kombu.transport.memory.Transport", "memory": "kombu.transport.memory.Transport",
"redis": "kombu.transport.redis.Transport", "redis": "kombu.transport.redis.Transport",
"SQS": "kombu.transport.SQS.Transport", "SQS": "kombu.transport.SQS.Transport",
"sqs": "kombu.transport.SQS.Transport",
"beanstalk": "kombu.transport.beanstalk.Transport", "beanstalk": "kombu.transport.beanstalk.Transport",
"mongodb": "kombu.transport.mongodb.Transport", "mongodb": "kombu.transport.mongodb.Transport",
"couchdb": "kombu.transport.couchdb.Transport", "couchdb": "kombu.transport.couchdb.Transport",

View File

@ -16,6 +16,7 @@ from anyjson import serialize, deserialize
from ..exceptions import VersionMismatch from ..exceptions import VersionMismatch
from ..utils import eventio, cached_property from ..utils import eventio, cached_property
from ..utils.encoding import str_t
from . import virtual from . import virtual
@ -327,7 +328,7 @@ class Channel(virtual.Channel):
if version < (2, 4, 4): if version < (2, 4, 4):
raise VersionMismatch( raise VersionMismatch(
"Redis transport requires redis-py versions 2.4.4 or later. " "Redis transport requires redis-py versions 2.4.4 or later. "
"You have %r" % (".".join(version), )) "You have %r" % (".".join(map(str_t, version)), ))
# KombuRedis maintains a connection attribute on it's instance and # KombuRedis maintains a connection attribute on it's instance and
# uses that when executing commands # uses that when executing commands

View File

@ -15,7 +15,6 @@ from __future__ import absolute_import
import base64 import base64
import socket import socket
import warnings import warnings
import os
from itertools import count from itertools import count
from time import sleep, time from time import sleep, time
@ -34,7 +33,7 @@ from .exchange import STANDARD_EXCHANGE_TYPES
UNDELIVERABLE_FMT = """\ UNDELIVERABLE_FMT = """\
Message could not be delivered: No queues bound to exchange %(exchange)r Message could not be delivered: No queues bound to exchange %(exchange)r
with binding key %(routing_key)r using binding key %(routing_key)r
""" """

View File

@ -51,19 +51,26 @@ class EqualityDict(dict):
return dict.__delitem__(self, eqhash(key)) return dict.__delitem__(self, eqhash(key))
class HashingDict(dict): def eqhash(o):
try:
return o.__eqhash__()
except AttributeError:
return hash(o)
class EqualityDict(dict):
def __getitem__(self, key): def __getitem__(self, key):
h = hash(key) h = eqhash(key)
if h not in self: if h not in self:
return self.__missing__(key) return self.__missing__(key)
return dict.__getitem__(self, h) return dict.__getitem__(self, h)
def __setitem__(self, key, value): def __setitem__(self, key, value):
return dict.__setitem__(self, hash(key), value) return dict.__setitem__(self, eqhash(key), value)
def __delitem__(self, key): def __delitem__(self, key):
return dict.__delitem__(self, hash(key)) return dict.__delitem__(self, eqhash(key))
def say(m, *s): def say(m, *s):

View File

@ -82,8 +82,14 @@ def readme(options):
@task @task
@cmdopts([
("custom=", "C", "custom version"),
])
def bump(options): def bump(options):
sh("contrib/release/bump_version.py kombu/__init__.py README.rst") s = "-- '%s'" % (options.custom, ) \
if getattr(options, "custom") else ""
sh("contrib/release/bump_version.py \
kombu/__init__.py README.rst %s" % (s, ))
@task @task