mirror of https://github.com/celery/kombu.git
Consumer can now work with raw messages using a new on_message callback
This commit is contained in:
parent
00b795b980
commit
9cd9ed7274
|
@ -233,6 +233,7 @@ class Consumer(object):
|
|||
:keyword no_ack: see :attr:`no_ack`.
|
||||
:keyword auto_declare: see :attr:`auto_declare`
|
||||
:keyword callbacks: see :attr:`callbacks`.
|
||||
:keyword on_message: See :attr:`on_message`
|
||||
:keyword on_decode_error: see :attr:`on_decode_error`.
|
||||
|
||||
"""
|
||||
|
@ -259,6 +260,25 @@ class Consumer(object):
|
|||
#: :class:`~kombu.transport.base.Message`).
|
||||
callbacks = None
|
||||
|
||||
#: Optional function called whenever a message is received.
|
||||
#:
|
||||
#: When defined this function will be called instead of the
|
||||
#: :meth:`receive` method, and :attr:`callbacks` will be disabled.
|
||||
#:
|
||||
#: So this can be used as an alternative to :attr:`callbacks` when
|
||||
#: you don't want the body to be automatically decoded.
|
||||
#: Note that the message will still be decompressed if the message
|
||||
#: has the ``compression`` header set.
|
||||
#:
|
||||
#: The signature of the callback must take a single argument,
|
||||
#: which is the raw message object (a subclass of
|
||||
#: :class:`~kombu.transport.base.Message`).
|
||||
#:
|
||||
#: Also note that the ``message.body`` attribute, which is the raw
|
||||
#: contents of the message body, may in some cases be a read-only
|
||||
#: :class:`buffer` object.
|
||||
on_message = None
|
||||
|
||||
#: Callback called when a message can't be decoded.
|
||||
#:
|
||||
#: The signature of the callback must take two arguments: `(message,
|
||||
|
@ -269,12 +289,13 @@ class Consumer(object):
|
|||
_next_tag = count(1).next # global
|
||||
|
||||
def __init__(self, channel, queues=None, no_ack=None, auto_declare=None,
|
||||
callbacks=None, on_decode_error=None):
|
||||
callbacks=None, on_decode_error=None, on_message=None):
|
||||
self.channel = channel
|
||||
self.queues = self.queues or [] if queues is None else queues
|
||||
self.no_ack = self.no_ack if no_ack is None else no_ack
|
||||
self.callbacks = (self.callbacks or [] if callbacks is None
|
||||
else callbacks)
|
||||
self.on_message = on_message
|
||||
self._active_tags = {}
|
||||
if auto_declare is not None:
|
||||
self.auto_declare = auto_declare
|
||||
|
@ -472,18 +493,18 @@ class Consumer(object):
|
|||
return tag
|
||||
|
||||
def _receive_callback(self, message):
|
||||
channel = self.channel
|
||||
on_m, channel, decoded = self.on_message, self.channel, None
|
||||
try:
|
||||
m2p = getattr(channel, 'message_to_python', None)
|
||||
if m2p:
|
||||
message = m2p(message)
|
||||
decoded = message.decode()
|
||||
decoded = None if on_m else message.decode()
|
||||
except Exception, exc:
|
||||
if not self.on_decode_error:
|
||||
raise
|
||||
self.on_decode_error(message, exc)
|
||||
else:
|
||||
self.receive(decoded, message)
|
||||
|
||||
return on_m(message) if on_m else self.receive(decoded, message)
|
||||
|
||||
def __repr__(self):
|
||||
return '<Consumer: %s>' % (self.queues, )
|
||||
|
|
Loading…
Reference in New Issue