diff --git a/Changelog b/Changelog index 44653ed4..2817a97c 100644 --- a/Changelog +++ b/Changelog @@ -26,13 +26,26 @@ Changes - :func:`kombu.disable_insecure_serializers` - :func:`kombu.enable_insecure_serializers`. +- Consumer: ``accept`` can now be used to specify a whitelist + of content types/serializers to accept. + + If the accept whitelist is set and a message is received + with a content type that is not in the whitelist then a + :exc:`~kombu.exceptions.ContentDisallowed` exception + will be raised. + + Examples:: + + Consumer(accept=['application/json']) + Consumer(accept=['pickle', 'json']) + +- pidbox: Mailbox now supports the ``accept`` argument. + - Redis: More friendly error for when keys are missing. - Connection URLs: The parser did not work well when there were multiple '+' tokens. - - .. _version-2.5.9: 2.5.9 diff --git a/kombu/messaging.py b/kombu/messaging.py index 20069c37..3067fc7b 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -12,7 +12,7 @@ from itertools import count from .connection import maybe_channel, is_connection from .entity import Exchange, Queue, DELIVERY_MODES from .compression import compress -from .serialization import encode +from .serialization import encode, registry from .utils import ChannelPromise, maybe_list __all__ = ['Exchange', 'Queue', 'Producer', 'Consumer'] @@ -338,12 +338,18 @@ class Consumer(object): self.callbacks = (self.callbacks or [] if callbacks is None else callbacks) self.on_message = on_message - self.accept = accept self._active_tags = {} if auto_declare is not None: self.auto_declare = auto_declare if on_decode_error is not None: self.on_decode_error = on_decode_error + self.accept = accept + + if self.accept is not None: + self.accept = set( + n if '/' in n else registry.name_to_type[n] + for n in self.accept + ) if self.channel: self.revive(self.channel) diff --git a/kombu/pidbox.py b/kombu/pidbox.py index 74a34fd3..cc29ce1a 100644 --- a/kombu/pidbox.py +++ b/kombu/pidbox.py @@ -65,6 +65,7 @@ class Node(object): def Consumer(self, channel=None, **options): options.setdefault('no_ack', True) + options.setdefault('accept', self.mailbox.accept) queue = self.mailbox.get_queue(self.hostname) def verify_exclusive(name, messages, consumers): @@ -145,7 +146,8 @@ class Mailbox(object): #: exchange to send replies to. reply_exchange = None - def __init__(self, namespace, type='direct', connection=None, clock=None): + def __init__(self, namespace, + type='direct', connection=None, clock=None, accept=None): self.namespace = namespace self.connection = connection self.type = type @@ -154,6 +156,7 @@ class Mailbox(object): self.reply_exchange = self._get_reply_exchange(self.namespace) self._tls = local() self.unclaimed = defaultdict(deque) + self.accept = accept def __call__(self, connection): bound = copy(self) @@ -264,10 +267,13 @@ class Mailbox(object): channel=chan) def _collect(self, ticket, - limit=None, timeout=1, callback=None, channel=None): + limit=None, timeout=1, callback=None, + channel=None, accept=None): + if accept is None: + accept = self.accept chan = channel or self.connection.default_channel queue = self.reply_queue - consumer = Consumer(channel, [queue], no_ack=True) + consumer = Consumer(channel, [queue], accept=accept, no_ack=True) responses = [] unclaimed = self.unclaimed adjust_clock = self.clock.adjust diff --git a/kombu/serialization.py b/kombu/serialization.py index 0e190d6e..aeac0477 100644 --- a/kombu/serialization.py +++ b/kombu/serialization.py @@ -89,6 +89,7 @@ class SerializerRegistry(object): self._default_content_encoding = None self._disabled_content_types = set() self.type_to_name = {} + self.name_to_type = {} def register(self, name, encoder, decoder, content_type, content_encoding='utf-8'): @@ -97,23 +98,25 @@ class SerializerRegistry(object): if decoder: self._decoders[content_type] = decoder self.type_to_name[content_type] = name + self.name_to_type[name] = content_type def enable(self, name): if '/' not in name: - name, _, _ = self._encoders[name] + name = self.name_to_type[name] self._disabled_content_types.remove(name) def disable(self, name): if '/' not in name: - name, _, _ = self._encoders[name] + name = self.name_to_type[name] self._disabled_content_types.add(name) def unregister(self, name): try: - content_type = self._encoders[name][0] + content_type = self.name_to_type[name] self._decoders.pop(content_type, None) self._encoders.pop(name, None) self.type_to_name.pop(content_type, None) + self.name_to_type.pop(name, None) except KeyError: raise SerializerNotInstalled( 'No encoder/decoder installed for %s' % name)