Adds examples to userguide

This commit is contained in:
Ask Solem 2011-09-12 10:25:42 +01:00
parent e845c01404
commit 09146d0b93
10 changed files with 102 additions and 4 deletions

View File

@ -17,6 +17,7 @@
/* -- page layout ----------------------------------------------------------- */
body {
align: left;
font-family: {{ body_font_stack }};
font-size: 17px;
background-color: white;

View File

@ -111,7 +111,7 @@ keyword arguments, these are:
:transport: Default transport if not provided in the URL.
Can be a string specifying the path to the class. (e.g.
``kombu.transport.pyamqplib.Transport``), or one of the aliases:
``amqplib``, ``pika``, ``redis``, ``memory``, and so on.
``amqplib``, ``pika``, ``redis``, ``memory``, and so on.
:ssl: Use ssl to connect to the server. Default is ``False``.
Only supported by the amqp transport.

View File

@ -0,0 +1,27 @@
Task Queue Example
==================
Very simple task queue using pickle, with primitive support
for priorities using different queues.
:file:`queues.py`:
.. literalinclude:: ../../examples/simple_task_queue/queues.py
:language: python
:file:`worker.py`:
.. literalinclude:: ../../examples/simple_task_queue/worker.py
:language: python
:file:`tasks.py`:
.. literalinclude:: ../../examples/simple_task_queue/tasks.py
:language: python
.. code-block:: python
:file:`client.py`:
.. literalinclude:: ../../examples/simple_task_queue/client.py

View File

@ -10,4 +10,6 @@
connections
simple
examples
pools
serialization

View File

View File

@ -0,0 +1,28 @@
from __future__ import with_statement
from kombu.common import maybe_declare
from kombu.pools import producers
from .queues import task_exchange
priority_to_routing_key = {"high": "hipri",
"mid": "midpri",
"low": "lopri"}
def send_as_task(connection, fun, args, kwargs, priority="mid"):
payload = {"fun": fun, "args": args, "kwargs": kwargs}
routing_key = priority_to_routing_key[priority]
with producers[connection].acquire(block=True) as producer:
maybe_declare(task_exchange, producer.channel)
producer.publish(payload, serializer="pickle",
compression="zlib",
routing_key=routing_key)
if __name__ == "__main__":
from kombu import BrokerConnection
from .tasks import hello_task
conection = BrokerConnection("amqp://guest:guest@localhost:5672//")
send_as_task(connection, fun=hello_task, args=("Kombu", ),
priority="high")

View File

@ -0,0 +1,7 @@
from kombu import Exchange, Queue
task_exchange = Exchange("tasks", type="direct")
task_queues = [Queue("hipri", task_exchange, routing_key="hipri"),
Queue("midpri", task_exchange, routing_key="midpri"),
Queue("lopri", task_exchange, routing_key="lopri")]

View File

@ -0,0 +1,4 @@
def hello_task(who="world"):
print("Hello %s" % (who, ))

View File

@ -0,0 +1,30 @@
from __future__ import with_statement
from kombu import Exchange, Queue
from kombu.mixins import ConsumerMixin
from kombu.utils import kwdict
from queues import task_queues
class Worker(ConsumerMixin):
def get_consumers(self, Consumer, channel):
return Consumer(queues=task_queues,
callbacks=[self.process_task])
def process_task(body, message):
fun = body["fun"]
args = body["args"]
kwargs = body["kwargs"]
fun(*args, **kwdict(kwargs))
message.ack()
if __name__ == "__main__":
from kombu import BrokerConnection
from kombu.log import setup_logging
setup_logging(loglevel="INFO")
with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn:
Worker(conn).run()

View File

@ -16,7 +16,7 @@ class LamportClock(object):
From Wikipedia:
"A Lamport logical clock is a monotonically incrementing software counter
A Lamport logical clock is a monotonically incrementing software counter
maintained in each process. It follows some simple rules:
* A process increments its counter before each event in that process;
@ -34,8 +34,7 @@ class LamportClock(object):
.. seealso::
http://en.wikipedia.org/wiki/Lamport_timestamps
http://en.wikipedia.org/wiki/Lamport's_Distributed_
Mutual_Exclusion_Algorithm
http://en.wikipedia.org/wiki/Lamport's_Distributed_Mutual_Exclusion_Algorithm
*Usage*