2011-11-02 02:39:48 +00:00
|
|
|
.. _guide-consumers:
|
|
|
|
|
|
|
|
===========
|
|
|
|
Consumers
|
|
|
|
===========
|
|
|
|
|
|
|
|
.. _consumer-basics:
|
|
|
|
|
|
|
|
Basics
|
|
|
|
======
|
|
|
|
|
2011-11-09 14:21:16 +00:00
|
|
|
The :class:`Consumer` takes a connection (or channel) and a list of queues to
|
|
|
|
consume from. Several consumers can be mixed to consume from different
|
|
|
|
channels, as they all bind to the same connection, and ``drain_events`` will
|
|
|
|
drain events from all channels on that connection.
|
|
|
|
|
2013-10-04 15:26:17 +00:00
|
|
|
.. note::
|
|
|
|
|
|
|
|
Kombu since 3.0 will only accept json/binary or text messages by default,
|
|
|
|
to allow deserialization of other formats you have to specify them
|
|
|
|
in the ``accept`` argument::
|
|
|
|
|
|
|
|
Consumer(conn, accept=['json', 'pickle', 'msgpack', 'yaml'])
|
|
|
|
|
2011-11-09 14:21:16 +00:00
|
|
|
|
|
|
|
Draining events from a single consumer:
|
|
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
2013-10-04 15:26:17 +00:00
|
|
|
with Consumer(connection, queues, accept=['json']):
|
2011-11-09 14:21:16 +00:00
|
|
|
connection.drain_events(timeout=1)
|
|
|
|
|
|
|
|
|
|
|
|
Draining events from several consumers:
|
|
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
from kombu.utils import nested
|
|
|
|
|
|
|
|
with connection.channel(), connection.channel() as (channel1, channel2):
|
2013-10-04 15:26:17 +00:00
|
|
|
consumers = [Consumer(channel1, queues1, accept=['json']),
|
|
|
|
Consumer(channel2, queues2, accept=['json'])]
|
2011-11-09 14:21:16 +00:00
|
|
|
with nested(\*consumers):
|
|
|
|
connection.drain_events(timeout=1)
|
|
|
|
|
|
|
|
|
|
|
|
Or using :class:`~kombu.mixins.ConsumerMixin`:
|
|
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
from kombu.mixins import ConsumerMixin
|
|
|
|
|
|
|
|
class C(ConsumerMixin):
|
|
|
|
|
|
|
|
def __init__(self, connection):
|
|
|
|
self.connection = connection
|
|
|
|
|
|
|
|
def get_consumers(self, Consumer, channel):
|
2013-10-04 15:26:17 +00:00
|
|
|
return [
|
|
|
|
Consumer(queues, callbacks=[self.on_message], accept=['json']),
|
|
|
|
]
|
2011-11-09 14:21:16 +00:00
|
|
|
|
|
|
|
def on_message(self, body, message):
|
|
|
|
print("RECEIVED MESSAGE: %r" % (body, ))
|
|
|
|
message.ack()
|
|
|
|
|
|
|
|
C(connection).run()
|
|
|
|
|
|
|
|
|
|
|
|
and with multiple channels again:
|
|
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
2012-11-21 16:24:49 +00:00
|
|
|
from kombu import Consumer
|
2011-11-09 14:21:16 +00:00
|
|
|
from kombu.mixins import ConsumerMixin
|
|
|
|
|
|
|
|
class C(ConsumerMixin):
|
|
|
|
channel2 = None
|
|
|
|
|
|
|
|
def __init__(self, connection):
|
|
|
|
self.connection = connection
|
|
|
|
|
|
|
|
def get_consumers(self, _, default_channel):
|
|
|
|
self.channel2 = default_channel.connection.channel()
|
|
|
|
return [Consumer(default_channel, queues1,
|
2013-10-04 15:26:17 +00:00
|
|
|
callbacks=[self.on_message],
|
|
|
|
accept=['json']),
|
2011-11-09 14:21:16 +00:00
|
|
|
Consumer(self.channel2, queues2,
|
2013-10-04 15:26:17 +00:00
|
|
|
callbacks=[self.on_special_message],
|
|
|
|
accept=['json'])]
|
2011-11-09 14:21:16 +00:00
|
|
|
|
|
|
|
def on_consumer_end(self, connection, default_channel):
|
|
|
|
if self.channel2:
|
|
|
|
self.channel2.close()
|
|
|
|
|
|
|
|
C(connection).run()
|
|
|
|
|
2011-11-02 02:39:48 +00:00
|
|
|
|
|
|
|
Reference
|
|
|
|
=========
|
|
|
|
|
2012-11-21 16:24:49 +00:00
|
|
|
.. autoclass:: kombu.Consumer
|
2011-11-02 02:39:48 +00:00
|
|
|
:noindex:
|
|
|
|
:members:
|