From 9cd9ed7274c33627c7cc479edd532ffc2545c593 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Wed, 3 Oct 2012 11:26:54 +0100 Subject: [PATCH] Consumer can now work with raw messages using a new on_message callback --- kombu/messaging.py | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/kombu/messaging.py b/kombu/messaging.py index a3af6a41..03ac2dd8 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -233,6 +233,7 @@ class Consumer(object): :keyword no_ack: see :attr:`no_ack`. :keyword auto_declare: see :attr:`auto_declare` :keyword callbacks: see :attr:`callbacks`. + :keyword on_message: See :attr:`on_message` :keyword on_decode_error: see :attr:`on_decode_error`. """ @@ -259,6 +260,25 @@ class Consumer(object): #: :class:`~kombu.transport.base.Message`). callbacks = None + #: Optional function called whenever a message is received. + #: + #: When defined this function will be called instead of the + #: :meth:`receive` method, and :attr:`callbacks` will be disabled. + #: + #: So this can be used as an alternative to :attr:`callbacks` when + #: you don't want the body to be automatically decoded. + #: Note that the message will still be decompressed if the message + #: has the ``compression`` header set. + #: + #: The signature of the callback must take a single argument, + #: which is the raw message object (a subclass of + #: :class:`~kombu.transport.base.Message`). + #: + #: Also note that the ``message.body`` attribute, which is the raw + #: contents of the message body, may in some cases be a read-only + #: :class:`buffer` object. + on_message = None + #: Callback called when a message can't be decoded. #: #: The signature of the callback must take two arguments: `(message, @@ -269,12 +289,13 @@ class Consumer(object): _next_tag = count(1).next # global def __init__(self, channel, queues=None, no_ack=None, auto_declare=None, - callbacks=None, on_decode_error=None): + callbacks=None, on_decode_error=None, on_message=None): self.channel = channel self.queues = self.queues or [] if queues is None else queues self.no_ack = self.no_ack if no_ack is None else no_ack self.callbacks = (self.callbacks or [] if callbacks is None else callbacks) + self.on_message = on_message self._active_tags = {} if auto_declare is not None: self.auto_declare = auto_declare @@ -472,18 +493,18 @@ class Consumer(object): return tag def _receive_callback(self, message): - channel = self.channel + on_m, channel, decoded = self.on_message, self.channel, None try: m2p = getattr(channel, 'message_to_python', None) if m2p: message = m2p(message) - decoded = message.decode() + decoded = None if on_m else message.decode() except Exception, exc: if not self.on_decode_error: raise self.on_decode_error(message, exc) - else: - self.receive(decoded, message) + + return on_m(message) if on_m else self.receive(decoded, message) def __repr__(self): return '' % (self.queues, )