mirror of https://github.com/celery/kombu.git
Consumer.accept: A whitelist of content types/serializers to allow
This commit is contained in:
parent
24e441c79c
commit
0da2ae8b4c
17
Changelog
17
Changelog
|
@ -26,13 +26,26 @@ Changes
|
|||
- :func:`kombu.disable_insecure_serializers`
|
||||
- :func:`kombu.enable_insecure_serializers`.
|
||||
|
||||
- Consumer: ``accept`` can now be used to specify a whitelist
|
||||
of content types/serializers to accept.
|
||||
|
||||
If the accept whitelist is set and a message is received
|
||||
with a content type that is not in the whitelist then a
|
||||
:exc:`~kombu.exceptions.ContentDisallowed` exception
|
||||
will be raised.
|
||||
|
||||
Examples::
|
||||
|
||||
Consumer(accept=['application/json'])
|
||||
Consumer(accept=['pickle', 'json'])
|
||||
|
||||
- pidbox: Mailbox now supports the ``accept`` argument.
|
||||
|
||||
- Redis: More friendly error for when keys are missing.
|
||||
|
||||
- Connection URLs: The parser did not work well when there were
|
||||
multiple '+' tokens.
|
||||
|
||||
|
||||
|
||||
.. _version-2.5.9:
|
||||
|
||||
2.5.9
|
||||
|
|
|
@ -12,7 +12,7 @@ from itertools import count
|
|||
from .connection import maybe_channel, is_connection
|
||||
from .entity import Exchange, Queue, DELIVERY_MODES
|
||||
from .compression import compress
|
||||
from .serialization import encode
|
||||
from .serialization import encode, registry
|
||||
from .utils import ChannelPromise, maybe_list
|
||||
|
||||
__all__ = ['Exchange', 'Queue', 'Producer', 'Consumer']
|
||||
|
@ -338,12 +338,18 @@ class Consumer(object):
|
|||
self.callbacks = (self.callbacks or [] if callbacks is None
|
||||
else callbacks)
|
||||
self.on_message = on_message
|
||||
self.accept = accept
|
||||
self._active_tags = {}
|
||||
if auto_declare is not None:
|
||||
self.auto_declare = auto_declare
|
||||
if on_decode_error is not None:
|
||||
self.on_decode_error = on_decode_error
|
||||
self.accept = accept
|
||||
|
||||
if self.accept is not None:
|
||||
self.accept = set(
|
||||
n if '/' in n else registry.name_to_type[n]
|
||||
for n in self.accept
|
||||
)
|
||||
|
||||
if self.channel:
|
||||
self.revive(self.channel)
|
||||
|
|
|
@ -65,6 +65,7 @@ class Node(object):
|
|||
|
||||
def Consumer(self, channel=None, **options):
|
||||
options.setdefault('no_ack', True)
|
||||
options.setdefault('accept', self.mailbox.accept)
|
||||
queue = self.mailbox.get_queue(self.hostname)
|
||||
|
||||
def verify_exclusive(name, messages, consumers):
|
||||
|
@ -145,7 +146,8 @@ class Mailbox(object):
|
|||
#: exchange to send replies to.
|
||||
reply_exchange = None
|
||||
|
||||
def __init__(self, namespace, type='direct', connection=None, clock=None):
|
||||
def __init__(self, namespace,
|
||||
type='direct', connection=None, clock=None, accept=None):
|
||||
self.namespace = namespace
|
||||
self.connection = connection
|
||||
self.type = type
|
||||
|
@ -154,6 +156,7 @@ class Mailbox(object):
|
|||
self.reply_exchange = self._get_reply_exchange(self.namespace)
|
||||
self._tls = local()
|
||||
self.unclaimed = defaultdict(deque)
|
||||
self.accept = accept
|
||||
|
||||
def __call__(self, connection):
|
||||
bound = copy(self)
|
||||
|
@ -264,10 +267,13 @@ class Mailbox(object):
|
|||
channel=chan)
|
||||
|
||||
def _collect(self, ticket,
|
||||
limit=None, timeout=1, callback=None, channel=None):
|
||||
limit=None, timeout=1, callback=None,
|
||||
channel=None, accept=None):
|
||||
if accept is None:
|
||||
accept = self.accept
|
||||
chan = channel or self.connection.default_channel
|
||||
queue = self.reply_queue
|
||||
consumer = Consumer(channel, [queue], no_ack=True)
|
||||
consumer = Consumer(channel, [queue], accept=accept, no_ack=True)
|
||||
responses = []
|
||||
unclaimed = self.unclaimed
|
||||
adjust_clock = self.clock.adjust
|
||||
|
|
|
@ -89,6 +89,7 @@ class SerializerRegistry(object):
|
|||
self._default_content_encoding = None
|
||||
self._disabled_content_types = set()
|
||||
self.type_to_name = {}
|
||||
self.name_to_type = {}
|
||||
|
||||
def register(self, name, encoder, decoder, content_type,
|
||||
content_encoding='utf-8'):
|
||||
|
@ -97,23 +98,25 @@ class SerializerRegistry(object):
|
|||
if decoder:
|
||||
self._decoders[content_type] = decoder
|
||||
self.type_to_name[content_type] = name
|
||||
self.name_to_type[name] = content_type
|
||||
|
||||
def enable(self, name):
|
||||
if '/' not in name:
|
||||
name, _, _ = self._encoders[name]
|
||||
name = self.name_to_type[name]
|
||||
self._disabled_content_types.remove(name)
|
||||
|
||||
def disable(self, name):
|
||||
if '/' not in name:
|
||||
name, _, _ = self._encoders[name]
|
||||
name = self.name_to_type[name]
|
||||
self._disabled_content_types.add(name)
|
||||
|
||||
def unregister(self, name):
|
||||
try:
|
||||
content_type = self._encoders[name][0]
|
||||
content_type = self.name_to_type[name]
|
||||
self._decoders.pop(content_type, None)
|
||||
self._encoders.pop(name, None)
|
||||
self.type_to_name.pop(content_type, None)
|
||||
self.name_to_type.pop(name, None)
|
||||
except KeyError:
|
||||
raise SerializerNotInstalled(
|
||||
'No encoder/decoder installed for %s' % name)
|
||||
|
|
Loading…
Reference in New Issue