From d82bc237a71f4b5e636641a5702704aac1e10511 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Wed, 4 Aug 2010 15:20:12 +0200 Subject: [PATCH] Documentation: Started writing the User Guide --- docs/index.rst | 1 + docs/userguide/index.rst | 11 +++ docs/userguide/simple.rst | 153 ++++++++++++++++++++++++++++++++++++++ kombu/simple.py | 10 ++- 4 files changed, 173 insertions(+), 2 deletions(-) create mode 100644 docs/userguide/index.rst create mode 100644 docs/userguide/simple.rst diff --git a/docs/index.rst b/docs/index.rst index 228c5730..c26249f6 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -7,6 +7,7 @@ Contents: :maxdepth: 3 introduction + userguide/index faq reference/index changelog diff --git a/docs/userguide/index.rst b/docs/userguide/index.rst new file mode 100644 index 00000000..2b1d4dbb --- /dev/null +++ b/docs/userguide/index.rst @@ -0,0 +1,11 @@ +============ + User Guide +============ + +:Release: |version| +:Date: |today| + +.. toctree:: + :maxdepth: 2 + + simple diff --git a/docs/userguide/simple.rst b/docs/userguide/simple.rst new file mode 100644 index 00000000..e091d31a --- /dev/null +++ b/docs/userguide/simple.rst @@ -0,0 +1,153 @@ +================== + Simple Interface +================== + +.. contents:: + :local: + + +:mod:`kombu.simple` is a simple interface to AMQP queueing. +It is only slightly different from the :class:`~Queue.Queue` class in the +Python Standard Library, which makes it excellent for users with basic +messaging needs. + +Instead of defining exchanges and queues, the simple classes only require +a two arguments, a connection channel and a a name. The name is used as the +queue, exchange and routing key, but if the need arises, you can also specify +a :class:`~kombu.entity.Queue` as the name argument, in that way this have +almost all of the functionality of the regular interface. + +In addition, the :class:`~kombu.connection.BrokerConnection` comes with +shortcuts to create simple queues using the current connection:: + + >>> queue = connection.SimpleQueue("myqueue") + >>> # ... do something with queue + >>> queue.close() + + +This is equivalent to:: + + >>> from kombu import SimpleQueue, SimpleBuffer + + >>> channel = connection.channel() + >>> queue = SimpleBuffer(channel) + >>> # ... do something with queue + >>> channel.close() + +Connections and transports +========================== + +To send and receive messages you need a transport and a connection. +There are several transports to choose from (amqplib, pika, redis, in-memory), +and you can even create your own. The default transport is amqplib. + +Create a connection using the default transport:: + + >>> from kombu import BrokerConnection + >>> connection = BrokerConnection() + +The connection will not be established yet, as the connection is established +as soon as its needed. If you want to explicitly establish the connection +you have to call the :meth:`~kombu.connection.BrokerConnection.connect` +method:: + + >>> connection.connect() + +This connection will use the default connection settings, which is using +the localhost host, default port, username ``guest``, +passowrd ``guest`` and virtual host "/". A connection without arguments +is the same as:: + + >>> BrokerConnection(hostname="localhost", + ... userid="guest", + ... password="guest", + ... virtual_host="/", + ... port=6379) + +The default port is transport specific, for AMQP transports this 6379, but +for others it might be something different. + +Other fields may also have different meanings depending on the transport +used. For example, the ``virtual_host`` argument is used as the database +number in the Redis transport. + +See the reference documentation for +:class:`~kombu.connection.BrokerConnection` for a full list of arguments +supported. + + +Sending and receiving messages +============================== + +The simple interface defines two classes; :class:`~kombu.simple.SimpleQueue`, +and :class:`~kombu.simple.SimpleBuffer`. The former is used for persistent +messages, and the latter is used for transient, buffer-like queues. +They both have the same interface, so you can use them interchangeably. + +Here is an example using the :class:`~kombu.simple.SimpleQueue` class +to produce and consume logging messages: + +.. code-block:: python + + from socket import gethostname + from time import time + + from kombu import BrokerConnection + + + class Logger(object): + + def __init__(self, connection, queue_name="log_queue", + serializer="json", compression=None): + self.queue = connection.SimpleQueue(self.queue_name) + self.serializer = serializer + self.compression = compression + + def log(self, message, level="INFO", context={}): + self.queue.put({"message": message, + "level": level, + "context": context, + "hostname": socket.gethostname(), + "timestamp": time()}, + serializer=self.serializer, + compression=self.compression) + + def process(self, callback, n=1, timeout=1): + for i in xrange(n): + log_message = self.queue.get(block=True, timeout=1) + entry = log_message.payload # deserialized data. + callback(entry) + log_message.ack() # remove message from queue + + def close(self): + self.queue.close() + + + if __name__ == "__main__": + connection = BrokerConnection(hostname="localhost", + userid="guest", + password="guest", + virtual_host="/") + logger = Logger(connection) + + # Send message + logger.log("Error happened while encoding video", + level="ERROR", + context={"filename": "cutekitten.mpg"}) + + # Consume and process message + + # This is the callback called when a log message is + # received. + def dump_entry(entry): + date = datetime.fromtimestamp(entry["timestamp"]) + print("[%s %s %s] %s %r" % (date, + entry["hostname"], + entry["level"], + entry["message"], + entry["context"])) + + # Process a single message using the callback above. + logger.process(dump_entry, n=1) + + logger.close() diff --git a/kombu/simple.py b/kombu/simple.py index fcc5fbd2..e3968c50 100644 --- a/kombu/simple.py +++ b/kombu/simple.py @@ -9,6 +9,7 @@ from kombu import messaging class SimpleBase(object): + _consuming = False def __init__(self, channel, producer, consumer, no_ack=False, channel_autoclose=False): @@ -19,13 +20,16 @@ class SimpleBase(object): self.channel_autoclose = channel_autoclose self.queue = self.consumer.queues[0] self.buffer = deque() - - self.consumer.consume(no_ack=self.no_ack) self.consumer.register_callback(self._receive) def _receive(self, message_data, message): self.buffer.append(message) + def _consume(self): + if not self._consuming: + self.consumer.consume(no_ack=self.no_ack) + self._consuming = True + def get_nowait(self): m = self.queue.get(no_ack=self.no_ack) if not m: @@ -35,6 +39,7 @@ class SimpleBase(object): def get(self, block=True, timeout=None, sync=False): if block: return self.get_nowait() + self._consume() elapsed = 0.0 remaining = timeout while True: @@ -68,6 +73,7 @@ class SimpleBase(object): def close(self): if self.channel_autoclose: self.channel.close() + self.consumer.cancel() def __del__(self): self.close()