diff --git a/Changelog b/Changelog index c0beb7e6..3c89d029 100644 --- a/Changelog +++ b/Changelog @@ -7,6 +7,7 @@ .. _version-3.0.0: 3.0.0 +===== :release-date: TBA - No longer supports Python 2.5 @@ -18,6 +19,51 @@ >>> from kombu import enable_insecure_serializers >>> enable_insecure_serializers() +.. _version-2.5.10: + +2.5.10 +====== +:release-date: 2013-04-11 18:10 P.M BST + +Note about upcoming changes for Kombu 3.0 +----------------------------------------- + +Kombu 3 consumers will no longer accept pickle/yaml or msgpack +by default, and you will have to explicitly enable untrusted deserializers +either globally using :func:`kombu.enable_insecure_serializers`, or +using the ``accept`` argument to :class:`~kombu.Consumer`. + +Changes +------- + +- New utility function to disable/enable untrusted serializers. + + - :func:`kombu.disable_insecure_serializers` + - :func:`kombu.enable_insecure_serializers`. + +- Consumer: ``accept`` can now be used to specify a whitelist + of content types to accept. + + If the accept whitelist is set and a message is received + with a content type that is not in the whitelist then a + :exc:`~kombu.exceptions.ContentDisallowed` exception + is raised. Note that this error can be handled by the already + existing `on_decode_error` callback + + Examples:: + + Consumer(accept=['application/json']) + Consumer(accept=['pickle', 'json']) + +- Now depends on amqp 1.0.11 + +- pidbox: Mailbox now supports the ``accept`` argument. + +- Redis: More friendly error for when keys are missing. + +- Connection URLs: The parser did not work well when there were + multiple '+' tokens. + .. _version-2.5.9: 2.5.9 diff --git a/README.rst b/README.rst index d05c51c3..8f8f492c 100644 --- a/README.rst +++ b/README.rst @@ -1,3 +1,5 @@ +.. _kombu-index: + ======================================== kombu - Messaging Framework for Python ======================================== @@ -75,6 +77,8 @@ and the `Wikipedia article about AMQP`_. .. _`SoftLayer Message Queue`: http://www.softlayer.com/services/additional/message-queue +.. _transport-comparison: + Transport Comparison ==================== diff --git a/docs/reference/kombu.rst b/docs/reference/kombu.rst index 46fceeaf..596c4b75 100644 --- a/docs/reference/kombu.rst +++ b/docs/reference/kombu.rst @@ -5,6 +5,10 @@ .. automodule:: kombu + .. autofunction:: enable_insecure_serializers + + .. autofunction:: disable_insecure_serializers + Connection ---------- diff --git a/docs/userguide/serialization.rst b/docs/userguide/serialization.rst index 3f236861..7a8ec0e4 100644 --- a/docs/userguide/serialization.rst +++ b/docs/userguide/serialization.rst @@ -77,6 +77,33 @@ Note that a `Consumer` do not need the serialization method specified. They can auto-detect the serialization method as the content-type is sent as a message header. +.. _disable-untrusted-serializers: + +Disabling Insecure Serializers +------------------------------ + +.. versionadded:: 2.5.10 + +Deserializing pickle and yaml from untrusted sources is not safe, +as both pickle and yaml have the ability to execute arbitrary code. + +If you are not using these formats you should disable them +by calling :func:`kombu.disable_insecure_serializers`:: + + >>> import kombu + >>> kombu.disable_insecure_serializers() + +Or you can specify the content types your consumers should +accept by using the ``accept`` argument:: + + >>> Consumer(accept=['json', 'pickle']) + >>> Consumer(accept=['application/json']) + +.. note:: + + Insecure serializers will be disabled by default + in the next major version (Kombu 3.0) + .. _sending-raw-data: Sending raw data without Serialization diff --git a/kombu/exceptions.py b/kombu/exceptions.py index 2bf57f37..7f01321f 100644 --- a/kombu/exceptions.py +++ b/kombu/exceptions.py @@ -66,6 +66,11 @@ class SerializerNotInstalled(KombuError): pass +class ContentDisallowed(SerializerNotInstalled): + """Consumer does not allow this content-type.""" + pass + + class InconsistencyError(StdConnectionError): """Data or environment has been found to be inconsistent, depending on the cause it may be possible to retry the operation.""" diff --git a/kombu/messaging.py b/kombu/messaging.py index c1e0f7de..a235da2e 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -13,7 +13,7 @@ from .compression import compress from .connection import maybe_channel, is_connection from .entity import Exchange, Queue, DELIVERY_MODES from .five import int_types, text_t, values -from .serialization import encode +from .serialization import encode, registry from .utils import ChannelPromise, maybe_list __all__ = ['Exchange', 'Queue', 'Producer', 'Consumer'] @@ -322,10 +322,20 @@ class Consumer(object): #: that occurred while trying to decode it. on_decode_error = None + #: List of accepted content-types. + #: + #: An exception will be raised if the consumer receives + #: a message with an untrusted content type. + #: By default all content-types are accepted, but not if + #: :func:`kombu.disable_untrusted_serializers` was called, + #: in which case only json is allowed. + accept = None + _tags = count(1) # global def __init__(self, channel, queues=None, no_ack=None, auto_declare=None, - callbacks=None, on_decode_error=None, on_message=None): + callbacks=None, on_decode_error=None, on_message=None, + accept=None): self.channel = channel self.queues = self.queues or [] if queues is None else queues self.no_ack = self.no_ack if no_ack is None else no_ack @@ -337,6 +347,13 @@ class Consumer(object): self.auto_declare = auto_declare if on_decode_error is not None: self.on_decode_error = on_decode_error + self.accept = accept + + if self.accept is not None: + self.accept = set( + n if '/' in n else registry.name_to_type[n] + for n in self.accept + ) if self.channel: self.revive(self.channel) @@ -532,6 +549,9 @@ class Consumer(object): return tag def _receive_callback(self, message): + accept = self.accept + if accept is not None: + message.accept = accept on_m, channel, decoded = self.on_message, self.channel, None try: m2p = getattr(channel, 'message_to_python', None) diff --git a/kombu/pidbox.py b/kombu/pidbox.py index e4e07b0d..ada35116 100644 --- a/kombu/pidbox.py +++ b/kombu/pidbox.py @@ -65,6 +65,7 @@ class Node(object): def Consumer(self, channel=None, **options): options.setdefault('no_ack', True) + options.setdefault('accept', self.mailbox.accept) queue = self.mailbox.get_queue(self.hostname) def verify_exclusive(name, messages, consumers): @@ -145,7 +146,8 @@ class Mailbox(object): #: exchange to send replies to. reply_exchange = None - def __init__(self, namespace, type='direct', connection=None, clock=None): + def __init__(self, namespace, + type='direct', connection=None, clock=None, accept=None): self.namespace = namespace self.connection = connection self.type = type @@ -154,6 +156,7 @@ class Mailbox(object): self.reply_exchange = self._get_reply_exchange(self.namespace) self._tls = local() self.unclaimed = defaultdict(deque) + self.accept = accept def __call__(self, connection): bound = copy(self) @@ -264,10 +267,13 @@ class Mailbox(object): channel=chan) def _collect(self, ticket, - limit=None, timeout=1, callback=None, channel=None): + limit=None, timeout=1, callback=None, + channel=None, accept=None): + if accept is None: + accept = self.accept chan = channel or self.connection.default_channel queue = self.reply_queue - consumer = Consumer(channel, [queue], no_ack=True) + consumer = Consumer(channel, [queue], accept=accept, no_ack=True) responses = [] unclaimed = self.unclaimed adjust_clock = self.clock.adjust diff --git a/kombu/serialization.py b/kombu/serialization.py index e4a30456..0107ac6d 100644 --- a/kombu/serialization.py +++ b/kombu/serialization.py @@ -18,7 +18,7 @@ try: except ImportError: # pragma: no cover cpickle = None # noqa -from .exceptions import SerializerNotInstalled +from .exceptions import SerializerNotInstalled, ContentDisallowed from .five import BytesIO, text_t from .utils import entrypoints from .utils.encoding import str_to_bytes, bytes_t @@ -64,6 +64,10 @@ def pickle_loads(s, load=pickle_load): return load(BytesIO(s)) +def parenthesize_alias(first, second): + return '%s (%s)' % (first, second) if first else second + + class SerializerRegistry(object): """The registry keeps track of serialization methods.""" @@ -75,6 +79,7 @@ class SerializerRegistry(object): self._default_content_encoding = None self._disabled_content_types = set() self.type_to_name = {} + self.name_to_type = {} def register(self, name, encoder, decoder, content_type, content_encoding='utf-8'): @@ -83,23 +88,25 @@ class SerializerRegistry(object): if decoder: self._decoders[content_type] = decoder self.type_to_name[content_type] = name + self.name_to_type[name] = content_type def enable(self, name): if '/' not in name: - name, _, _ = self._encoders[name] + name = self.name_to_type[name] self._disabled_content_types.remove(name) def disable(self, name): if '/' not in name: - name, _, _ = self._encoders[name] + name = self.name_to_type[name] self._disabled_content_types.add(name) def unregister(self, name): try: - content_type = self._encoders[name][0] + content_type = self.name_to_type[name] self._decoders.pop(content_type, None) self._encoders.pop(name, None) self.type_to_name.pop(content_type, None) + self.name_to_type.pop(name, None) except KeyError: raise SerializerNotInstalled( 'No encoder/decoder installed for {0}'.format(name)) @@ -153,10 +160,14 @@ class SerializerRegistry(object): payload = encoder(data) return content_type, content_encoding, payload - def decode(self, data, content_type, content_encoding, force=False): - if content_type in self._disabled_content_types and not force: - raise SerializerNotInstalled( - 'Content-type {0!r} has been disabled.'.format(content_type)) + def decode(self, data, content_type, content_encoding, + accept=None, force=False): + if accept is not None: + if content_type not in accept: + raise self._for_untrusted_content(content_type, 'untrusted') + else: + if content_type in self._disabled_content_types and not force: + raise self._for_untrusted_content(content_type, 'disabled') content_type = content_type or 'application/data' content_encoding = (content_encoding or 'utf-8').lower() @@ -169,13 +180,14 @@ class SerializerRegistry(object): return _decode(data, content_encoding) return data + def _for_untrusted_content(self, ctype, why): + return ContentDisallowed( + 'Refusing to decode {0} content of type {1}'.format( + why, parenthesize_alias(self.type_to_name[ctype], ctype)), + ) -""" -.. data:: registry -Global registry of serializers/deserializers. - -""" +#: Global registry of serializers/deserializers. registry = SerializerRegistry() @@ -384,6 +396,13 @@ _setupfuns = { def enable_insecure_serializers(choices=['pickle', 'yaml', 'msgpack']): + """Enable serializers that are considered to be unsafe. + + Will enable ``pickle``, ``yaml`` and ``msgpack`` by default, + but you can also specify a list of serializers (by name or content type) + to enable. + + """ for choice in choices: try: registry.enable(choice) @@ -392,6 +411,18 @@ def enable_insecure_serializers(choices=['pickle', 'yaml', 'msgpack']): def disable_insecure_serializers(allowed=['json']): + """Disable untrusted serializers. + + Will disable all serializers except ``json`` + or you can specify a list of deserializers to allow. + + .. note:: + + Producers will still be able to serialize data + in these formats, but consumers will not accept + incoming data using the untrusted content types. + + """ for name in registry._decoders: registry.disable(name) if allowed is not None: diff --git a/kombu/transport/base.py b/kombu/transport/base.py index 40fcd64c..7fb2874f 100644 --- a/kombu/transport/base.py +++ b/kombu/transport/base.py @@ -53,12 +53,13 @@ class Message(object): __slots__ = ('_state', 'channel', 'delivery_tag', 'content_type', 'content_encoding', 'delivery_info', 'headers', 'properties', - 'body', '_decoded_cache', '__dict__') + 'body', '_decoded_cache', 'accept', '__dict__') MessageStateError = MessageStateError def __init__(self, channel, body=None, delivery_tag=None, content_type=None, content_encoding=None, delivery_info={}, - properties=None, headers=None, postencode=None, **kwargs): + properties=None, headers=None, postencode=None, + accept=None, **kwargs): self.channel = channel self.delivery_tag = delivery_tag self.content_type = content_type @@ -68,6 +69,7 @@ class Message(object): self.properties = properties or {} self._decoded_cache = None self._state = 'RECEIVED' + self.accept = accept try: body = decompress(body, self.headers['compression']) @@ -151,7 +153,7 @@ class Message(object): """Deserialize the message body, returning the original python structure sent by the publisher.""" return decode(self.body, self.content_type, - self.content_encoding) + self.content_encoding, accept=self.accept) @property def acknowledged(self): diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 7996fd84..a6fed4d9 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -26,6 +26,11 @@ from kombu.log import get_logger from kombu.utils import cached_property, uuid from kombu.utils.eventio import poll, READ, ERR +NO_ROUTE_ERROR = """ +Cannot route message for exchange {0!r}: Table empty or key no longer exists. +Probably the key ({1!r}) has been removed from the Redis database. +""" + try: from billiard.util import register_after_fork except ImportError: @@ -571,8 +576,7 @@ class Channel(virtual.Channel): with self.conn_or_acquire() as client: values = client.smembers(key) if not values: - raise InconsistencyError( - 'Queue list empty or does not exist: {0!r}'.format(key)) + raise InconsistencyError(NO_ROUTE_ERROR.format(exchange, key)) return [tuple(val.split(self.sep)) for val in values] def _purge(self, queue):