mirror of https://github.com/celery/kombu.git
237 lines
7.1 KiB
ReStructuredText
237 lines
7.1 KiB
ReStructuredText
.. _guide-consumers:
|
|
|
|
===========
|
|
Consumers
|
|
===========
|
|
|
|
.. _consumer-basics:
|
|
|
|
Basics
|
|
======
|
|
|
|
The :class:`~kombu.messaging.Consumer` takes a connection (or channel) and a list of queues to
|
|
consume from. Several consumers can be mixed to consume from different
|
|
channels, as they all bind to the same connection, and ``drain_events`` will
|
|
drain events from all channels on that connection.
|
|
|
|
.. note::
|
|
|
|
Kombu since 3.0 will only accept json/binary or text messages by default,
|
|
to allow deserialization of other formats you have to specify them
|
|
in the ``accept`` argument (in addition to setting the right content type for your messages):
|
|
|
|
.. code-block:: python
|
|
|
|
>>> Consumer(conn, accept=['json', 'pickle', 'msgpack', 'yaml'])
|
|
|
|
You can create a consumer using a Connection. This consumer is consuming from a single queue with name `'queue'`:
|
|
|
|
.. code-block:: python
|
|
|
|
>>> queue = Queue('queue', routing_key='queue')
|
|
>>> consumer = connection.Consumer(queue)
|
|
|
|
You can also instantiate Consumer directly, it takes a channel or a connection as an argument. This consumer also
|
|
consumes from single queue with name `'queue'`:
|
|
|
|
.. code-block:: python
|
|
|
|
>>> queue = Queue('queue', routing_key='queue')
|
|
>>> with Connection('amqp://') as conn:
|
|
... with conn.channel() as channel:
|
|
... consumer = Consumer(channel, queue)
|
|
|
|
A consumer needs to specify a handler for received data. This handler is specified in the form of a callback. The callback function is called
|
|
by kombu every time a new message is received. The callback is called with two parameters: ``body``, containing deserialized
|
|
data sent by a producer, and a :class:`~kombu.message.Message` instance ``message``. The user is responsible for acknowledging messages when manual
|
|
acknowledgement is set.
|
|
|
|
.. code-block:: python
|
|
|
|
>>> def callback(body, message):
|
|
... print(body)
|
|
... message.ack()
|
|
|
|
>>> consumer.register_callback(callback)
|
|
|
|
Draining events from a single consumer
|
|
--------------------------------------
|
|
|
|
The method ``drain_events`` blocks indefinitely by default. This example sets the timeout to 1 second:
|
|
|
|
.. code-block:: python
|
|
|
|
>>> with consumer:
|
|
... connection.drain_events(timeout=1)
|
|
|
|
Draining events from several consumers
|
|
--------------------------------------
|
|
|
|
Each consumer has its own list of queues. Each consumer accepts data in `'json'` format:
|
|
|
|
.. code-block:: python
|
|
|
|
>>> from kombu.utils.compat import nested
|
|
|
|
>>> queues1 = [Queue('queue11', routing_key='queue11'),
|
|
Queue('queue12', routing_key='queue12')]
|
|
>>> queues2 = [Queue('queue21', routing_key='queue21'),
|
|
Queue('queue22', routing_key='queue22')]
|
|
>>> with connection.channel(), connection.channel() as (channel1, channel2):
|
|
... with nested(Consumer(channel1, queues1, accept=['json']),
|
|
... Consumer(channel2, queues2, accept=['json'])):
|
|
... connection.drain_events(timeout=1)
|
|
|
|
The full example will look as follows:
|
|
|
|
.. code-block:: python
|
|
|
|
from kombu import Connection, Consumer, Queue
|
|
|
|
def callback(body, message):
|
|
print('RECEIVED MESSAGE: {0!r}'.format(body))
|
|
message.ack()
|
|
|
|
queue1 = Queue('queue1', routing_key='queue1')
|
|
queue2 = Queue('queue2', routing_key='queue2')
|
|
|
|
with Connection('amqp://') as conn:
|
|
with conn.channel() as channel:
|
|
consumer = Consumer(conn, [queue1, queue2], accept=['json'])
|
|
consumer.register_callback(callback)
|
|
with consumer:
|
|
conn.drain_events(timeout=1)
|
|
|
|
Consumer mixin classes
|
|
======================
|
|
|
|
Kombu provides predefined mixin classes in module :py:mod:`~kombu.mixins`. It contains two classes:
|
|
:class:`~kombu.mixins.ConsumerMixin` for creating consumers and :class:`~kombu.mixins.ConsumerProducerMixin`
|
|
for creating consumers supporting also publishing messages. Consumers can be created just by subclassing
|
|
mixin class and overriding some of the methods:
|
|
|
|
.. code-block:: python
|
|
|
|
from kombu.mixins import ConsumerMixin
|
|
|
|
class C(ConsumerMixin):
|
|
|
|
def __init__(self, connection):
|
|
self.connection = connection
|
|
|
|
def get_consumers(self, Consumer, channel):
|
|
return [
|
|
Consumer(channel, callbacks=[self.on_message], accept=['json']),
|
|
]
|
|
|
|
def on_message(self, body, message):
|
|
print('RECEIVED MESSAGE: {0!r}'.format(body))
|
|
message.ack()
|
|
|
|
C(connection).run()
|
|
|
|
|
|
and with multiple channels again:
|
|
|
|
.. code-block:: python
|
|
|
|
from kombu import Consumer
|
|
from kombu.mixins import ConsumerMixin
|
|
|
|
class C(ConsumerMixin):
|
|
channel2 = None
|
|
|
|
def __init__(self, connection):
|
|
self.connection = connection
|
|
|
|
def get_consumers(self, _, default_channel):
|
|
self.channel2 = default_channel.connection.channel()
|
|
return [Consumer(default_channel, queues1,
|
|
callbacks=[self.on_message],
|
|
accept=['json']),
|
|
Consumer(self.channel2, queues2,
|
|
callbacks=[self.on_special_message],
|
|
accept=['json'])]
|
|
|
|
def on_consume_end(self, connection, default_channel):
|
|
if self.channel2:
|
|
self.channel2.close()
|
|
|
|
C(connection).run()
|
|
|
|
|
|
The main use of :class:`~kombu.mixins.ConsumerProducerMixin` is to create consumers
|
|
that need to also publish messages on a separate connection (e.g. sending rpc
|
|
replies, streaming results):
|
|
|
|
.. code-block:: python
|
|
|
|
from kombu import Producer, Queue
|
|
from kombu.mixins import ConsumerProducerMixin
|
|
|
|
rpc_queue = Queue('rpc_queue')
|
|
|
|
class Worker(ConsumerProducerMixin):
|
|
|
|
def __init__(self, connection):
|
|
self.connection = connection
|
|
|
|
def get_consumers(self, Consumer, channel):
|
|
return [Consumer(
|
|
queues=[rpc_queue],
|
|
on_message=self.on_request,
|
|
accept={'application/json'},
|
|
prefetch_count=1,
|
|
)]
|
|
|
|
def on_request(self, message):
|
|
n = message.payload['n']
|
|
print(' [.] fib({0})'.format(n))
|
|
result = fib(n)
|
|
|
|
self.producer.publish(
|
|
{'result': result},
|
|
exchange='', routing_key=message.properties['reply_to'],
|
|
correlation_id=message.properties['correlation_id'],
|
|
serializer='json',
|
|
retry=True,
|
|
)
|
|
message.ack()
|
|
|
|
.. seealso::
|
|
|
|
:file:`examples/rpc-tut6/` in the Github repository.
|
|
|
|
|
|
Advanced Topics
|
|
===============
|
|
|
|
RabbitMQ
|
|
--------
|
|
|
|
Consumer Priorities
|
|
~~~~~~~~~~~~~~~~~~~
|
|
|
|
RabbitMQ defines a consumer priority extension to the amqp protocol,
|
|
that can be enabled by setting the ``x-priority`` argument to
|
|
``basic.consume``.
|
|
|
|
In kombu you can specify this argument on the :class:`~kombu.Queue`, like
|
|
this:
|
|
|
|
.. code-block:: python
|
|
|
|
queue = Queue('name', Exchange('exchange_name', type='direct'),
|
|
consumer_arguments={'x-priority': 10})
|
|
|
|
Read more about consumer priorities here:
|
|
https://www.rabbitmq.com/consumer-priority.html
|
|
|
|
|
|
Reference
|
|
=========
|
|
|
|
.. autoclass:: kombu.Consumer
|
|
:noindex:
|
|
:members:
|