mirror of https://github.com/celery/kombu.git
Documentation: Started writing the User Guide
This commit is contained in:
parent
fd0296af7b
commit
d82bc237a7
|
@ -7,6 +7,7 @@ Contents:
|
|||
:maxdepth: 3
|
||||
|
||||
introduction
|
||||
userguide/index
|
||||
faq
|
||||
reference/index
|
||||
changelog
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
============
|
||||
User Guide
|
||||
============
|
||||
|
||||
:Release: |version|
|
||||
:Date: |today|
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
|
||||
simple
|
|
@ -0,0 +1,153 @@
|
|||
==================
|
||||
Simple Interface
|
||||
==================
|
||||
|
||||
.. contents::
|
||||
:local:
|
||||
|
||||
|
||||
:mod:`kombu.simple` is a simple interface to AMQP queueing.
|
||||
It is only slightly different from the :class:`~Queue.Queue` class in the
|
||||
Python Standard Library, which makes it excellent for users with basic
|
||||
messaging needs.
|
||||
|
||||
Instead of defining exchanges and queues, the simple classes only require
|
||||
a two arguments, a connection channel and a a name. The name is used as the
|
||||
queue, exchange and routing key, but if the need arises, you can also specify
|
||||
a :class:`~kombu.entity.Queue` as the name argument, in that way this have
|
||||
almost all of the functionality of the regular interface.
|
||||
|
||||
In addition, the :class:`~kombu.connection.BrokerConnection` comes with
|
||||
shortcuts to create simple queues using the current connection::
|
||||
|
||||
>>> queue = connection.SimpleQueue("myqueue")
|
||||
>>> # ... do something with queue
|
||||
>>> queue.close()
|
||||
|
||||
|
||||
This is equivalent to::
|
||||
|
||||
>>> from kombu import SimpleQueue, SimpleBuffer
|
||||
|
||||
>>> channel = connection.channel()
|
||||
>>> queue = SimpleBuffer(channel)
|
||||
>>> # ... do something with queue
|
||||
>>> channel.close()
|
||||
|
||||
Connections and transports
|
||||
==========================
|
||||
|
||||
To send and receive messages you need a transport and a connection.
|
||||
There are several transports to choose from (amqplib, pika, redis, in-memory),
|
||||
and you can even create your own. The default transport is amqplib.
|
||||
|
||||
Create a connection using the default transport::
|
||||
|
||||
>>> from kombu import BrokerConnection
|
||||
>>> connection = BrokerConnection()
|
||||
|
||||
The connection will not be established yet, as the connection is established
|
||||
as soon as its needed. If you want to explicitly establish the connection
|
||||
you have to call the :meth:`~kombu.connection.BrokerConnection.connect`
|
||||
method::
|
||||
|
||||
>>> connection.connect()
|
||||
|
||||
This connection will use the default connection settings, which is using
|
||||
the localhost host, default port, username ``guest``,
|
||||
passowrd ``guest`` and virtual host "/". A connection without arguments
|
||||
is the same as::
|
||||
|
||||
>>> BrokerConnection(hostname="localhost",
|
||||
... userid="guest",
|
||||
... password="guest",
|
||||
... virtual_host="/",
|
||||
... port=6379)
|
||||
|
||||
The default port is transport specific, for AMQP transports this 6379, but
|
||||
for others it might be something different.
|
||||
|
||||
Other fields may also have different meanings depending on the transport
|
||||
used. For example, the ``virtual_host`` argument is used as the database
|
||||
number in the Redis transport.
|
||||
|
||||
See the reference documentation for
|
||||
:class:`~kombu.connection.BrokerConnection` for a full list of arguments
|
||||
supported.
|
||||
|
||||
|
||||
Sending and receiving messages
|
||||
==============================
|
||||
|
||||
The simple interface defines two classes; :class:`~kombu.simple.SimpleQueue`,
|
||||
and :class:`~kombu.simple.SimpleBuffer`. The former is used for persistent
|
||||
messages, and the latter is used for transient, buffer-like queues.
|
||||
They both have the same interface, so you can use them interchangeably.
|
||||
|
||||
Here is an example using the :class:`~kombu.simple.SimpleQueue` class
|
||||
to produce and consume logging messages:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from socket import gethostname
|
||||
from time import time
|
||||
|
||||
from kombu import BrokerConnection
|
||||
|
||||
|
||||
class Logger(object):
|
||||
|
||||
def __init__(self, connection, queue_name="log_queue",
|
||||
serializer="json", compression=None):
|
||||
self.queue = connection.SimpleQueue(self.queue_name)
|
||||
self.serializer = serializer
|
||||
self.compression = compression
|
||||
|
||||
def log(self, message, level="INFO", context={}):
|
||||
self.queue.put({"message": message,
|
||||
"level": level,
|
||||
"context": context,
|
||||
"hostname": socket.gethostname(),
|
||||
"timestamp": time()},
|
||||
serializer=self.serializer,
|
||||
compression=self.compression)
|
||||
|
||||
def process(self, callback, n=1, timeout=1):
|
||||
for i in xrange(n):
|
||||
log_message = self.queue.get(block=True, timeout=1)
|
||||
entry = log_message.payload # deserialized data.
|
||||
callback(entry)
|
||||
log_message.ack() # remove message from queue
|
||||
|
||||
def close(self):
|
||||
self.queue.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
connection = BrokerConnection(hostname="localhost",
|
||||
userid="guest",
|
||||
password="guest",
|
||||
virtual_host="/")
|
||||
logger = Logger(connection)
|
||||
|
||||
# Send message
|
||||
logger.log("Error happened while encoding video",
|
||||
level="ERROR",
|
||||
context={"filename": "cutekitten.mpg"})
|
||||
|
||||
# Consume and process message
|
||||
|
||||
# This is the callback called when a log message is
|
||||
# received.
|
||||
def dump_entry(entry):
|
||||
date = datetime.fromtimestamp(entry["timestamp"])
|
||||
print("[%s %s %s] %s %r" % (date,
|
||||
entry["hostname"],
|
||||
entry["level"],
|
||||
entry["message"],
|
||||
entry["context"]))
|
||||
|
||||
# Process a single message using the callback above.
|
||||
logger.process(dump_entry, n=1)
|
||||
|
||||
logger.close()
|
|
@ -9,6 +9,7 @@ from kombu import messaging
|
|||
|
||||
|
||||
class SimpleBase(object):
|
||||
_consuming = False
|
||||
|
||||
def __init__(self, channel, producer, consumer, no_ack=False,
|
||||
channel_autoclose=False):
|
||||
|
@ -19,13 +20,16 @@ class SimpleBase(object):
|
|||
self.channel_autoclose = channel_autoclose
|
||||
self.queue = self.consumer.queues[0]
|
||||
self.buffer = deque()
|
||||
|
||||
self.consumer.consume(no_ack=self.no_ack)
|
||||
self.consumer.register_callback(self._receive)
|
||||
|
||||
def _receive(self, message_data, message):
|
||||
self.buffer.append(message)
|
||||
|
||||
def _consume(self):
|
||||
if not self._consuming:
|
||||
self.consumer.consume(no_ack=self.no_ack)
|
||||
self._consuming = True
|
||||
|
||||
def get_nowait(self):
|
||||
m = self.queue.get(no_ack=self.no_ack)
|
||||
if not m:
|
||||
|
@ -35,6 +39,7 @@ class SimpleBase(object):
|
|||
def get(self, block=True, timeout=None, sync=False):
|
||||
if block:
|
||||
return self.get_nowait()
|
||||
self._consume()
|
||||
elapsed = 0.0
|
||||
remaining = timeout
|
||||
while True:
|
||||
|
@ -68,6 +73,7 @@ class SimpleBase(object):
|
|||
def close(self):
|
||||
if self.channel_autoclose:
|
||||
self.channel.close()
|
||||
self.consumer.cancel()
|
||||
|
||||
def __del__(self):
|
||||
self.close()
|
||||
|
|
Loading…
Reference in New Issue