diff --git a/docs/userguide/consumers.rst b/docs/userguide/consumers.rst index c2e9d03e..69065aab 100644 --- a/docs/userguide/consumers.rst +++ b/docs/userguide/consumers.rst @@ -24,7 +24,7 @@ drain events from all channels on that connection. >>> Consumer(conn, accept=['json', 'pickle', 'msgpack', 'yaml']) -You can create a consumer using a Connection. Consumer is consuming from single queue with name `'queue'`: +You can create a consumer using a Connection. This consumer is consuming from a single queue with name `'queue'`: .. code-block:: python @@ -41,10 +41,10 @@ consumes from single queue with name `'queue'`: ... with conn.channel() as channel: ... consumer = Consumer(channel, queue) -Consumer needs to specify handler of received data. This handler specified in form of callback. Callback function is called -by kombu library every time a new message is received. Callback is called with two parameters ``body`` containing deserialized -data sent by producer and :class:`~kombu.message.Message` instance ``message``. User is also responsible for acknowledging of message when manual -acknowledge is set. +A consumer needs to specify a handler for received data. This handler is specified in the form of a callback. The callback function is called +by kombu every time a new message is received. The callback is called with two parameters: ``body``, containing deserialized +data sent by a producer, and a :class:`~kombu.message.Message` instance ``message``. The user is responsible for acknowledging messages when manual +acknowledgement is set. .. code-block:: python @@ -54,21 +54,29 @@ acknowledge is set. >>> consumer.register_callback(callback) -Draining events from a single consumer. Method ``drain_events`` by default blocks indefinitely. This example sets timeout to 1 second: +Draining events from a single consumer +-------------------------------------- +The method ``drain_events`` blocks indefinitely by default. This example sets the timeout to 1 second: + .. code-block:: python >>> with consumer: ... connection.drain_events(timeout=1) -Draining events from several consumers. Each consumer has its own list of queues. Each consumer accepts `'json'` format of data: +Draining events from several consumers +-------------------------------------- + +Each consumer has its own list of queues. Each consumer accepts data in `'json'` format: .. code-block:: python >>> from kombu.utils.compat import nested - >>> queues1 = [Queue('queue11', routing_key='queue12')] - >>> queues2 = [Queue('queue21', routing_key='queue22')] + >>> queues1 = [Queue('queue11', routing_key='queue11'), + Queue('queue12', routing_key='queue12')] + >>> queues2 = [Queue('queue21', routing_key='queue21'), + Queue('queue22', routing_key='queue22')] >>> with connection.channel(), connection.channel() as (channel1, channel2): ... with nested(Consumer(channel1, queues1, accept=['json']), ... Consumer(channel2, queues2, accept=['json'])): @@ -113,7 +121,7 @@ mixin class and overriding some of the methods: def get_consumers(self, Consumer, channel): return [ - Consumer(queues, callbacks=[self.on_message], accept=['json']), + Consumer(channel, callbacks=[self.on_message], accept=['json']), ] def on_message(self, body, message): @@ -145,7 +153,7 @@ and with multiple channels again: callbacks=[self.on_special_message], accept=['json'])] - def on_consumer_end(self, connection, default_channel): + def on_consume_end(self, connection, default_channel): if self.channel2: self.channel2.close()