From d3a06bc5670354c81fde8a20c981b82b395cb70d Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Thu, 11 Apr 2013 12:27:18 +0100 Subject: [PATCH 01/11] Remove accidental print statement --- kombu/common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kombu/common.py b/kombu/common.py index 9e1bcc78..27b3f322 100644 --- a/kombu/common.py +++ b/kombu/common.py @@ -368,7 +368,7 @@ class QoS(object): logger.warn('QoS: Disabled: prefetch_count exceeds %r', PREFETCH_COUNT_MAX) new_value = 0 - print('basic.qos: prefetch_count->%s', new_value) + logger.debug('basic.qos: prefetch_count->%s', new_value) self.callback(prefetch_count=new_value) self.prev = pcount return pcount From 6966e922206e4c10958f385422a54ce3c2f5aac8 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Thu, 11 Apr 2013 12:48:04 +0100 Subject: [PATCH 02/11] Improve inconsistency error message. celery/celery#704 --- kombu/transport/redis.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 501f056c..bf9e9f49 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -27,6 +27,11 @@ from kombu.log import get_logger from kombu.utils import cached_property, uuid from kombu.utils.eventio import poll, READ, ERR +NO_ROUTE_ERROR = """ +Cannot route message for exchange %r: Table empty or key no longer exists. +Probably the key (%r) has been removed from the Redis database. +""" + try: from billiard.util import register_after_fork except ImportError: @@ -572,9 +577,7 @@ class Channel(virtual.Channel): with self.conn_or_acquire() as client: values = client.smembers(key) if not values: - raise InconsistencyError( - 'Queue list empty or key does not exist: %r' % ( - self.keyprefix_queue % exchange)) + raise InconsistencyError(NO_ROUTE_ERROR % (exchange, key)) return [tuple(val.split(self.sep)) for val in values] def _purge(self, queue): From b3aa4b886b749dce27d087a471ba3d4b1c6ca2e7 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Thu, 11 Apr 2013 13:45:21 +0100 Subject: [PATCH 03/11] Adds ref --- README.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.rst b/README.rst index aac6c428..ecc4e392 100644 --- a/README.rst +++ b/README.rst @@ -73,6 +73,8 @@ and the `Wikipedia article about AMQP`_. .. _`librabbitmq`: http://pypi.python.org/pypi/librabbitmq +.. _transport-comparison: + Transport Comparison ==================== From b66c6135e4035ea6b906b00903365d5154bb1fb4 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Thu, 11 Apr 2013 13:47:42 +0100 Subject: [PATCH 04/11] Adds ref kombu-index --- README.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.rst b/README.rst index ecc4e392..88b2cf54 100644 --- a/README.rst +++ b/README.rst @@ -1,3 +1,5 @@ +.. _kombu-index: + ======================================== kombu - Messaging Framework for Python ======================================== From b1a8d2f8683ccfe8d645ba1e31d987297d12553b Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Thu, 11 Apr 2013 14:11:16 +0100 Subject: [PATCH 05/11] Updates Changelog --- Changelog | 29 +++++++++++++++++++++++++++++ docs/reference/kombu.rst | 4 ++++ kombu/serialization.py | 19 +++++++++++++++++++ 3 files changed, 52 insertions(+) diff --git a/Changelog b/Changelog index 49892edf..44653ed4 100644 --- a/Changelog +++ b/Changelog @@ -4,6 +4,35 @@ Change history ================ +.. _version-2.5.10: + +2.5.10 +====== +:release-date: 2013-04-11 XX:XX P.M BST + +Note about upcoming changes for Kombu 3.0 +----------------------------------------- + +Kombu 3 consumers will no longer accept pickle/yaml or msgpack +by default, and you will have to explicitly enable untrusted deserializers +either globally using :func:`kombu.enable_insecure_serializers`, or +using the ``accept`` argument to :class:`~kombu.Consumer`. + +Changes +------- + +- New utility to disable untrusted serializers. + + - :func:`kombu.disable_insecure_serializers` + - :func:`kombu.enable_insecure_serializers`. + +- Redis: More friendly error for when keys are missing. + +- Connection URLs: The parser did not work well when there were + multiple '+' tokens. + + + .. _version-2.5.9: 2.5.9 diff --git a/docs/reference/kombu.rst b/docs/reference/kombu.rst index e0bfe037..a56467ea 100644 --- a/docs/reference/kombu.rst +++ b/docs/reference/kombu.rst @@ -5,6 +5,10 @@ .. automodule:: kombu + .. autofunction:: enable_insecure_serializers + + .. autofunction:: disable_insecure_serializers + Connection ---------- diff --git a/kombu/serialization.py b/kombu/serialization.py index e7b71c5a..3393f1fb 100644 --- a/kombu/serialization.py +++ b/kombu/serialization.py @@ -394,6 +394,13 @@ _setupfuns = { def enable_insecure_serializers(choices=['pickle', 'yaml', 'msgpack']): + """Enable serializers that are considered to be unsafe. + + Will enable ``pickle``, ``yaml`` and ``msgpack`` by default, + but you can also specify a list of serializers (by name or content type) + to enable. + + """ for choice in choices: try: registry.enable(choice) @@ -402,6 +409,18 @@ def enable_insecure_serializers(choices=['pickle', 'yaml', 'msgpack']): def disable_insecure_serializers(allowed=['json']): + """Disable untrusted serializers. + + Will disable all serializers except ``json`` + or you can specify a list of deserializers to allow. + + .. note:: + + Producers will still be able to serialize data + in these formats, but consumers will not accept + incoming data using the untrusted content types. + + """ for name in registry._decoders: registry.disable(name) if allowed is not None: From 24e441c79c07fe104fa2505e956cc7d6a0858de4 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Thu, 11 Apr 2013 14:59:07 +0100 Subject: [PATCH 06/11] Adds Consumer.accept: A whitelist of content_types/serializer names to accept --- kombu/exceptions.py | 5 +++++ kombu/messaging.py | 16 +++++++++++++++- kombu/serialization.py | 31 +++++++++++++++++++++---------- kombu/transport/base.py | 8 +++++--- 4 files changed, 46 insertions(+), 14 deletions(-) diff --git a/kombu/exceptions.py b/kombu/exceptions.py index 914dca5b..32261506 100644 --- a/kombu/exceptions.py +++ b/kombu/exceptions.py @@ -63,6 +63,11 @@ class SerializerNotInstalled(KombuError): pass +class ContentDisallowed(SerializerNotInstalled): + """Consumer does not allow this content-type.""" + pass + + class InconsistencyError(StdConnectionError): """Data or environment has been found to be inconsistent, depending on the cause it may be possible to retry the operation.""" diff --git a/kombu/messaging.py b/kombu/messaging.py index cb13f2c7..20069c37 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -318,16 +318,27 @@ class Consumer(object): #: that occurred while trying to decode it. on_decode_error = None + #: List of accepted content-types. + #: + #: An exception will be raised if the consumer receives + #: a message with an untrusted content type. + #: By default all content-types are accepted, but not if + #: :func:`kombu.disable_untrusted_serializers` was called, + #: in which case only json is allowed. + accept = None + _next_tag = count(1).next # global def __init__(self, channel, queues=None, no_ack=None, auto_declare=None, - callbacks=None, on_decode_error=None, on_message=None): + callbacks=None, on_decode_error=None, on_message=None, + accept=None): self.channel = channel self.queues = self.queues or [] if queues is None else queues self.no_ack = self.no_ack if no_ack is None else no_ack self.callbacks = (self.callbacks or [] if callbacks is None else callbacks) self.on_message = on_message + self.accept = accept self._active_tags = {} if auto_declare is not None: self.auto_declare = auto_declare @@ -528,6 +539,9 @@ class Consumer(object): return tag def _receive_callback(self, message): + accept = self.accept + if accept is not None: + message.accept = accept on_m, channel, decoded = self.on_message, self.channel, None try: m2p = getattr(channel, 'message_to_python', None) diff --git a/kombu/serialization.py b/kombu/serialization.py index 3393f1fb..0e190d6e 100644 --- a/kombu/serialization.py +++ b/kombu/serialization.py @@ -18,7 +18,7 @@ try: except ImportError: # pragma: no cover cpickle = None # noqa -from .exceptions import SerializerNotInstalled +from .exceptions import SerializerNotInstalled, ContentDisallowed from .utils import entrypoints from .utils.encoding import str_to_bytes, bytes_t @@ -74,6 +74,10 @@ def pickle_loads(s, load=pickle_load): return load(BytesIO(s)) +def parenthesize_alias(first, second): + return '%s (%s)' % (first, second) if first else second + + class SerializerRegistry(object): """The registry keeps track of serialization methods.""" @@ -163,10 +167,14 @@ class SerializerRegistry(object): payload = encoder(data) return content_type, content_encoding, payload - def decode(self, data, content_type, content_encoding, force=False): - if content_type in self._disabled_content_types and not force: - raise SerializerNotInstalled( - 'Content-type %r has been disabled.' % (content_type, )) + def decode(self, data, content_type, content_encoding, + accept=None, force=False): + if accept is not None: + if content_type not in accept: + raise self._for_untrusted_content(content_type, 'untrusted') + else: + if content_type in self._disabled_content_types and not force: + raise self._for_untrusted_content(content_type, 'disabled') content_type = content_type or 'application/data' content_encoding = (content_encoding or 'utf-8').lower() @@ -179,13 +187,16 @@ class SerializerRegistry(object): return _decode(data, content_encoding) return data + def _for_untrusted_content(self, ctype, why): + return ContentDisallowed( + 'Refusing to decode %(why)s content of type %(type)s' % { + 'why': why, + 'type': parenthesize_alias(self.type_to_name[ctype], ctype), + }, + ) -""" -.. data:: registry -Global registry of serializers/deserializers. - -""" +#: Global registry of serializers/deserializers. registry = SerializerRegistry() diff --git a/kombu/transport/base.py b/kombu/transport/base.py index 917c6198..7a2a6e91 100644 --- a/kombu/transport/base.py +++ b/kombu/transport/base.py @@ -47,12 +47,13 @@ class Message(object): __slots__ = ('_state', 'channel', 'delivery_tag', 'content_type', 'content_encoding', 'delivery_info', 'headers', 'properties', - 'body', '_decoded_cache', '__dict__') + 'body', '_decoded_cache', 'accept', '__dict__') MessageStateError = MessageStateError def __init__(self, channel, body=None, delivery_tag=None, content_type=None, content_encoding=None, delivery_info={}, - properties=None, headers=None, postencode=None, **kwargs): + properties=None, headers=None, postencode=None, + accept=None, **kwargs): self.channel = channel self.delivery_tag = delivery_tag self.content_type = content_type @@ -62,6 +63,7 @@ class Message(object): self.properties = properties or {} self._decoded_cache = None self._state = 'RECEIVED' + self.accept = accept try: body = decompress(body, self.headers['compression']) @@ -142,7 +144,7 @@ class Message(object): """Deserialize the message body, returning the original python structure sent by the publisher.""" return decode(self.body, self.content_type, - self.content_encoding) + self.content_encoding, accept=self.accept) @property def acknowledged(self): From 0da2ae8b4cf1cbece5eca5cfbd4f550fbb685800 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Thu, 11 Apr 2013 15:15:04 +0100 Subject: [PATCH 07/11] Consumer.accept: A whitelist of content types/serializers to allow --- Changelog | 17 +++++++++++++++-- kombu/messaging.py | 10 ++++++++-- kombu/pidbox.py | 12 +++++++++--- kombu/serialization.py | 9 ++++++--- 4 files changed, 38 insertions(+), 10 deletions(-) diff --git a/Changelog b/Changelog index 44653ed4..2817a97c 100644 --- a/Changelog +++ b/Changelog @@ -26,13 +26,26 @@ Changes - :func:`kombu.disable_insecure_serializers` - :func:`kombu.enable_insecure_serializers`. +- Consumer: ``accept`` can now be used to specify a whitelist + of content types/serializers to accept. + + If the accept whitelist is set and a message is received + with a content type that is not in the whitelist then a + :exc:`~kombu.exceptions.ContentDisallowed` exception + will be raised. + + Examples:: + + Consumer(accept=['application/json']) + Consumer(accept=['pickle', 'json']) + +- pidbox: Mailbox now supports the ``accept`` argument. + - Redis: More friendly error for when keys are missing. - Connection URLs: The parser did not work well when there were multiple '+' tokens. - - .. _version-2.5.9: 2.5.9 diff --git a/kombu/messaging.py b/kombu/messaging.py index 20069c37..3067fc7b 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -12,7 +12,7 @@ from itertools import count from .connection import maybe_channel, is_connection from .entity import Exchange, Queue, DELIVERY_MODES from .compression import compress -from .serialization import encode +from .serialization import encode, registry from .utils import ChannelPromise, maybe_list __all__ = ['Exchange', 'Queue', 'Producer', 'Consumer'] @@ -338,12 +338,18 @@ class Consumer(object): self.callbacks = (self.callbacks or [] if callbacks is None else callbacks) self.on_message = on_message - self.accept = accept self._active_tags = {} if auto_declare is not None: self.auto_declare = auto_declare if on_decode_error is not None: self.on_decode_error = on_decode_error + self.accept = accept + + if self.accept is not None: + self.accept = set( + n if '/' in n else registry.name_to_type[n] + for n in self.accept + ) if self.channel: self.revive(self.channel) diff --git a/kombu/pidbox.py b/kombu/pidbox.py index 74a34fd3..cc29ce1a 100644 --- a/kombu/pidbox.py +++ b/kombu/pidbox.py @@ -65,6 +65,7 @@ class Node(object): def Consumer(self, channel=None, **options): options.setdefault('no_ack', True) + options.setdefault('accept', self.mailbox.accept) queue = self.mailbox.get_queue(self.hostname) def verify_exclusive(name, messages, consumers): @@ -145,7 +146,8 @@ class Mailbox(object): #: exchange to send replies to. reply_exchange = None - def __init__(self, namespace, type='direct', connection=None, clock=None): + def __init__(self, namespace, + type='direct', connection=None, clock=None, accept=None): self.namespace = namespace self.connection = connection self.type = type @@ -154,6 +156,7 @@ class Mailbox(object): self.reply_exchange = self._get_reply_exchange(self.namespace) self._tls = local() self.unclaimed = defaultdict(deque) + self.accept = accept def __call__(self, connection): bound = copy(self) @@ -264,10 +267,13 @@ class Mailbox(object): channel=chan) def _collect(self, ticket, - limit=None, timeout=1, callback=None, channel=None): + limit=None, timeout=1, callback=None, + channel=None, accept=None): + if accept is None: + accept = self.accept chan = channel or self.connection.default_channel queue = self.reply_queue - consumer = Consumer(channel, [queue], no_ack=True) + consumer = Consumer(channel, [queue], accept=accept, no_ack=True) responses = [] unclaimed = self.unclaimed adjust_clock = self.clock.adjust diff --git a/kombu/serialization.py b/kombu/serialization.py index 0e190d6e..aeac0477 100644 --- a/kombu/serialization.py +++ b/kombu/serialization.py @@ -89,6 +89,7 @@ class SerializerRegistry(object): self._default_content_encoding = None self._disabled_content_types = set() self.type_to_name = {} + self.name_to_type = {} def register(self, name, encoder, decoder, content_type, content_encoding='utf-8'): @@ -97,23 +98,25 @@ class SerializerRegistry(object): if decoder: self._decoders[content_type] = decoder self.type_to_name[content_type] = name + self.name_to_type[name] = content_type def enable(self, name): if '/' not in name: - name, _, _ = self._encoders[name] + name = self.name_to_type[name] self._disabled_content_types.remove(name) def disable(self, name): if '/' not in name: - name, _, _ = self._encoders[name] + name = self.name_to_type[name] self._disabled_content_types.add(name) def unregister(self, name): try: - content_type = self._encoders[name][0] + content_type = self.name_to_type[name] self._decoders.pop(content_type, None) self._encoders.pop(name, None) self.type_to_name.pop(content_type, None) + self.name_to_type.pop(name, None) except KeyError: raise SerializerNotInstalled( 'No encoder/decoder installed for %s' % name) From d51ba1f94ea08643ca06f9ba0db6c7700819c2f7 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Thu, 11 Apr 2013 15:28:49 +0100 Subject: [PATCH 08/11] Updates serialization guide --- docs/userguide/serialization.rst | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/docs/userguide/serialization.rst b/docs/userguide/serialization.rst index 3f236861..7a8ec0e4 100644 --- a/docs/userguide/serialization.rst +++ b/docs/userguide/serialization.rst @@ -77,6 +77,33 @@ Note that a `Consumer` do not need the serialization method specified. They can auto-detect the serialization method as the content-type is sent as a message header. +.. _disable-untrusted-serializers: + +Disabling Insecure Serializers +------------------------------ + +.. versionadded:: 2.5.10 + +Deserializing pickle and yaml from untrusted sources is not safe, +as both pickle and yaml have the ability to execute arbitrary code. + +If you are not using these formats you should disable them +by calling :func:`kombu.disable_insecure_serializers`:: + + >>> import kombu + >>> kombu.disable_insecure_serializers() + +Or you can specify the content types your consumers should +accept by using the ``accept`` argument:: + + >>> Consumer(accept=['json', 'pickle']) + >>> Consumer(accept=['application/json']) + +.. note:: + + Insecure serializers will be disabled by default + in the next major version (Kombu 3.0) + .. _sending-raw-data: Sending raw data without Serialization From 9958ee9021c58d740ddeac108dd145b4c07f7519 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Thu, 11 Apr 2013 17:50:02 +0100 Subject: [PATCH 09/11] Now depends on amqp 1.0.11 --- Changelog | 2 ++ requirements/default.txt | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/Changelog b/Changelog index 2817a97c..0fc26379 100644 --- a/Changelog +++ b/Changelog @@ -39,6 +39,8 @@ Changes Consumer(accept=['application/json']) Consumer(accept=['pickle', 'json']) +- Now depends on amqp 1.0.11 + - pidbox: Mailbox now supports the ``accept`` argument. - Redis: More friendly error for when keys are missing. diff --git a/requirements/default.txt b/requirements/default.txt index a33ee3c2..0d80250e 100644 --- a/requirements/default.txt +++ b/requirements/default.txt @@ -1,2 +1,2 @@ anyjson>=0.3.3 -amqp>=1.0.10,<1.1.0 +amqp>=1.0.11,<1.1.0 From ed46db76bdacda047a30d33f76add66df564792b Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Thu, 11 Apr 2013 18:12:13 +0100 Subject: [PATCH 10/11] Updates Changelog --- Changelog | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/Changelog b/Changelog index 0fc26379..a7ab104c 100644 --- a/Changelog +++ b/Changelog @@ -8,7 +8,7 @@ 2.5.10 ====== -:release-date: 2013-04-11 XX:XX P.M BST +:release-date: 2013-04-11 18:10 P.M BST Note about upcoming changes for Kombu 3.0 ----------------------------------------- @@ -21,18 +21,19 @@ using the ``accept`` argument to :class:`~kombu.Consumer`. Changes ------- -- New utility to disable untrusted serializers. +- New utility function to disable/enable untrusted serializers. - :func:`kombu.disable_insecure_serializers` - :func:`kombu.enable_insecure_serializers`. - Consumer: ``accept`` can now be used to specify a whitelist - of content types/serializers to accept. + of content types to accept. If the accept whitelist is set and a message is received with a content type that is not in the whitelist then a :exc:`~kombu.exceptions.ContentDisallowed` exception - will be raised. + is raised. Note that this error can be handled by the already + existing `on_decode_error` callback Examples:: From 4962da4a19c477bca05bea1eb73d8eb6655f4c5a Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Thu, 11 Apr 2013 18:12:15 +0100 Subject: [PATCH 11/11] Bumps version to 2.5.10 --- README.rst | 2 +- kombu/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.rst b/README.rst index 88b2cf54..602629e1 100644 --- a/README.rst +++ b/README.rst @@ -4,7 +4,7 @@ kombu - Messaging Framework for Python ======================================== -:Version: 2.5.9 +:Version: 2.5.10 `Kombu` is a messaging framework for Python. diff --git a/kombu/__init__.py b/kombu/__init__.py index 5c831405..962eae85 100644 --- a/kombu/__init__.py +++ b/kombu/__init__.py @@ -1,7 +1,7 @@ """Messaging Framework for Python""" from __future__ import absolute_import -VERSION = (2, 5, 9) +VERSION = (2, 5, 10) __version__ = '.'.join(map(str, VERSION[0:3])) + ''.join(VERSION[3:]) __author__ = 'Ask Solem' __contact__ = 'ask@celeryproject.org'