Merge branch '2.5'

Conflicts:
	Changelog
	README.rst
	kombu/__init__.py
	kombu/messaging.py
	kombu/serialization.py
	kombu/transport/redis.py
	requirements/default.txt
This commit is contained in:
Ask Solem 2013-04-15 12:45:25 +01:00
commit 970e27db8e
10 changed files with 172 additions and 23 deletions

View File

@ -7,6 +7,7 @@
.. _version-3.0.0:
3.0.0
=====
:release-date: TBA
- No longer supports Python 2.5
@ -18,6 +19,51 @@
>>> from kombu import enable_insecure_serializers
>>> enable_insecure_serializers()
.. _version-2.5.10:
2.5.10
======
:release-date: 2013-04-11 18:10 P.M BST
Note about upcoming changes for Kombu 3.0
-----------------------------------------
Kombu 3 consumers will no longer accept pickle/yaml or msgpack
by default, and you will have to explicitly enable untrusted deserializers
either globally using :func:`kombu.enable_insecure_serializers`, or
using the ``accept`` argument to :class:`~kombu.Consumer`.
Changes
-------
- New utility function to disable/enable untrusted serializers.
- :func:`kombu.disable_insecure_serializers`
- :func:`kombu.enable_insecure_serializers`.
- Consumer: ``accept`` can now be used to specify a whitelist
of content types to accept.
If the accept whitelist is set and a message is received
with a content type that is not in the whitelist then a
:exc:`~kombu.exceptions.ContentDisallowed` exception
is raised. Note that this error can be handled by the already
existing `on_decode_error` callback
Examples::
Consumer(accept=['application/json'])
Consumer(accept=['pickle', 'json'])
- Now depends on amqp 1.0.11
- pidbox: Mailbox now supports the ``accept`` argument.
- Redis: More friendly error for when keys are missing.
- Connection URLs: The parser did not work well when there were
multiple '+' tokens.
.. _version-2.5.9:
2.5.9

View File

@ -1,3 +1,5 @@
.. _kombu-index:
========================================
kombu - Messaging Framework for Python
========================================
@ -75,6 +77,8 @@ and the `Wikipedia article about AMQP`_.
.. _`SoftLayer Message Queue`: http://www.softlayer.com/services/additional/message-queue
.. _transport-comparison:
Transport Comparison
====================

View File

@ -5,6 +5,10 @@
.. automodule:: kombu
.. autofunction:: enable_insecure_serializers
.. autofunction:: disable_insecure_serializers
Connection
----------

View File

@ -77,6 +77,33 @@ Note that a `Consumer` do not need the serialization method specified.
They can auto-detect the serialization method as the
content-type is sent as a message header.
.. _disable-untrusted-serializers:
Disabling Insecure Serializers
------------------------------
.. versionadded:: 2.5.10
Deserializing pickle and yaml from untrusted sources is not safe,
as both pickle and yaml have the ability to execute arbitrary code.
If you are not using these formats you should disable them
by calling :func:`kombu.disable_insecure_serializers`::
>>> import kombu
>>> kombu.disable_insecure_serializers()
Or you can specify the content types your consumers should
accept by using the ``accept`` argument::
>>> Consumer(accept=['json', 'pickle'])
>>> Consumer(accept=['application/json'])
.. note::
Insecure serializers will be disabled by default
in the next major version (Kombu 3.0)
.. _sending-raw-data:
Sending raw data without Serialization

View File

@ -66,6 +66,11 @@ class SerializerNotInstalled(KombuError):
pass
class ContentDisallowed(SerializerNotInstalled):
"""Consumer does not allow this content-type."""
pass
class InconsistencyError(StdConnectionError):
"""Data or environment has been found to be inconsistent,
depending on the cause it may be possible to retry the operation."""

View File

@ -13,7 +13,7 @@ from .compression import compress
from .connection import maybe_channel, is_connection
from .entity import Exchange, Queue, DELIVERY_MODES
from .five import int_types, text_t, values
from .serialization import encode
from .serialization import encode, registry
from .utils import ChannelPromise, maybe_list
__all__ = ['Exchange', 'Queue', 'Producer', 'Consumer']
@ -322,10 +322,20 @@ class Consumer(object):
#: that occurred while trying to decode it.
on_decode_error = None
#: List of accepted content-types.
#:
#: An exception will be raised if the consumer receives
#: a message with an untrusted content type.
#: By default all content-types are accepted, but not if
#: :func:`kombu.disable_untrusted_serializers` was called,
#: in which case only json is allowed.
accept = None
_tags = count(1) # global
def __init__(self, channel, queues=None, no_ack=None, auto_declare=None,
callbacks=None, on_decode_error=None, on_message=None):
callbacks=None, on_decode_error=None, on_message=None,
accept=None):
self.channel = channel
self.queues = self.queues or [] if queues is None else queues
self.no_ack = self.no_ack if no_ack is None else no_ack
@ -337,6 +347,13 @@ class Consumer(object):
self.auto_declare = auto_declare
if on_decode_error is not None:
self.on_decode_error = on_decode_error
self.accept = accept
if self.accept is not None:
self.accept = set(
n if '/' in n else registry.name_to_type[n]
for n in self.accept
)
if self.channel:
self.revive(self.channel)
@ -532,6 +549,9 @@ class Consumer(object):
return tag
def _receive_callback(self, message):
accept = self.accept
if accept is not None:
message.accept = accept
on_m, channel, decoded = self.on_message, self.channel, None
try:
m2p = getattr(channel, 'message_to_python', None)

View File

@ -65,6 +65,7 @@ class Node(object):
def Consumer(self, channel=None, **options):
options.setdefault('no_ack', True)
options.setdefault('accept', self.mailbox.accept)
queue = self.mailbox.get_queue(self.hostname)
def verify_exclusive(name, messages, consumers):
@ -145,7 +146,8 @@ class Mailbox(object):
#: exchange to send replies to.
reply_exchange = None
def __init__(self, namespace, type='direct', connection=None, clock=None):
def __init__(self, namespace,
type='direct', connection=None, clock=None, accept=None):
self.namespace = namespace
self.connection = connection
self.type = type
@ -154,6 +156,7 @@ class Mailbox(object):
self.reply_exchange = self._get_reply_exchange(self.namespace)
self._tls = local()
self.unclaimed = defaultdict(deque)
self.accept = accept
def __call__(self, connection):
bound = copy(self)
@ -264,10 +267,13 @@ class Mailbox(object):
channel=chan)
def _collect(self, ticket,
limit=None, timeout=1, callback=None, channel=None):
limit=None, timeout=1, callback=None,
channel=None, accept=None):
if accept is None:
accept = self.accept
chan = channel or self.connection.default_channel
queue = self.reply_queue
consumer = Consumer(channel, [queue], no_ack=True)
consumer = Consumer(channel, [queue], accept=accept, no_ack=True)
responses = []
unclaimed = self.unclaimed
adjust_clock = self.clock.adjust

View File

@ -18,7 +18,7 @@ try:
except ImportError: # pragma: no cover
cpickle = None # noqa
from .exceptions import SerializerNotInstalled
from .exceptions import SerializerNotInstalled, ContentDisallowed
from .five import BytesIO, text_t
from .utils import entrypoints
from .utils.encoding import str_to_bytes, bytes_t
@ -64,6 +64,10 @@ def pickle_loads(s, load=pickle_load):
return load(BytesIO(s))
def parenthesize_alias(first, second):
return '%s (%s)' % (first, second) if first else second
class SerializerRegistry(object):
"""The registry keeps track of serialization methods."""
@ -75,6 +79,7 @@ class SerializerRegistry(object):
self._default_content_encoding = None
self._disabled_content_types = set()
self.type_to_name = {}
self.name_to_type = {}
def register(self, name, encoder, decoder, content_type,
content_encoding='utf-8'):
@ -83,23 +88,25 @@ class SerializerRegistry(object):
if decoder:
self._decoders[content_type] = decoder
self.type_to_name[content_type] = name
self.name_to_type[name] = content_type
def enable(self, name):
if '/' not in name:
name, _, _ = self._encoders[name]
name = self.name_to_type[name]
self._disabled_content_types.remove(name)
def disable(self, name):
if '/' not in name:
name, _, _ = self._encoders[name]
name = self.name_to_type[name]
self._disabled_content_types.add(name)
def unregister(self, name):
try:
content_type = self._encoders[name][0]
content_type = self.name_to_type[name]
self._decoders.pop(content_type, None)
self._encoders.pop(name, None)
self.type_to_name.pop(content_type, None)
self.name_to_type.pop(name, None)
except KeyError:
raise SerializerNotInstalled(
'No encoder/decoder installed for {0}'.format(name))
@ -153,10 +160,14 @@ class SerializerRegistry(object):
payload = encoder(data)
return content_type, content_encoding, payload
def decode(self, data, content_type, content_encoding, force=False):
if content_type in self._disabled_content_types and not force:
raise SerializerNotInstalled(
'Content-type {0!r} has been disabled.'.format(content_type))
def decode(self, data, content_type, content_encoding,
accept=None, force=False):
if accept is not None:
if content_type not in accept:
raise self._for_untrusted_content(content_type, 'untrusted')
else:
if content_type in self._disabled_content_types and not force:
raise self._for_untrusted_content(content_type, 'disabled')
content_type = content_type or 'application/data'
content_encoding = (content_encoding or 'utf-8').lower()
@ -169,13 +180,14 @@ class SerializerRegistry(object):
return _decode(data, content_encoding)
return data
def _for_untrusted_content(self, ctype, why):
return ContentDisallowed(
'Refusing to decode {0} content of type {1}'.format(
why, parenthesize_alias(self.type_to_name[ctype], ctype)),
)
"""
.. data:: registry
Global registry of serializers/deserializers.
"""
#: Global registry of serializers/deserializers.
registry = SerializerRegistry()
@ -384,6 +396,13 @@ _setupfuns = {
def enable_insecure_serializers(choices=['pickle', 'yaml', 'msgpack']):
"""Enable serializers that are considered to be unsafe.
Will enable ``pickle``, ``yaml`` and ``msgpack`` by default,
but you can also specify a list of serializers (by name or content type)
to enable.
"""
for choice in choices:
try:
registry.enable(choice)
@ -392,6 +411,18 @@ def enable_insecure_serializers(choices=['pickle', 'yaml', 'msgpack']):
def disable_insecure_serializers(allowed=['json']):
"""Disable untrusted serializers.
Will disable all serializers except ``json``
or you can specify a list of deserializers to allow.
.. note::
Producers will still be able to serialize data
in these formats, but consumers will not accept
incoming data using the untrusted content types.
"""
for name in registry._decoders:
registry.disable(name)
if allowed is not None:

View File

@ -53,12 +53,13 @@ class Message(object):
__slots__ = ('_state', 'channel', 'delivery_tag',
'content_type', 'content_encoding',
'delivery_info', 'headers', 'properties',
'body', '_decoded_cache', '__dict__')
'body', '_decoded_cache', 'accept', '__dict__')
MessageStateError = MessageStateError
def __init__(self, channel, body=None, delivery_tag=None,
content_type=None, content_encoding=None, delivery_info={},
properties=None, headers=None, postencode=None, **kwargs):
properties=None, headers=None, postencode=None,
accept=None, **kwargs):
self.channel = channel
self.delivery_tag = delivery_tag
self.content_type = content_type
@ -68,6 +69,7 @@ class Message(object):
self.properties = properties or {}
self._decoded_cache = None
self._state = 'RECEIVED'
self.accept = accept
try:
body = decompress(body, self.headers['compression'])
@ -151,7 +153,7 @@ class Message(object):
"""Deserialize the message body, returning the original
python structure sent by the publisher."""
return decode(self.body, self.content_type,
self.content_encoding)
self.content_encoding, accept=self.accept)
@property
def acknowledged(self):

View File

@ -26,6 +26,11 @@ from kombu.log import get_logger
from kombu.utils import cached_property, uuid
from kombu.utils.eventio import poll, READ, ERR
NO_ROUTE_ERROR = """
Cannot route message for exchange {0!r}: Table empty or key no longer exists.
Probably the key ({1!r}) has been removed from the Redis database.
"""
try:
from billiard.util import register_after_fork
except ImportError:
@ -571,8 +576,7 @@ class Channel(virtual.Channel):
with self.conn_or_acquire() as client:
values = client.smembers(key)
if not values:
raise InconsistencyError(
'Queue list empty or does not exist: {0!r}'.format(key))
raise InconsistencyError(NO_ROUTE_ERROR.format(exchange, key))
return [tuple(val.split(self.sep)) for val in values]
def _purge(self, queue):