From 09146d0b933b96aa5d5675a54cba6402f0970210 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Mon, 12 Sep 2011 10:25:42 +0100 Subject: [PATCH] Adds examples to userguide --- docs/_theme/celery/static/celery.css_t | 1 + docs/userguide/connections.rst | 2 +- docs/userguide/examples.rst | 27 +++++++++++++++++++++++ docs/userguide/index.rst | 2 ++ examples/simple_task_queue/__init__.py | 0 examples/simple_task_queue/client.py | 28 ++++++++++++++++++++++++ examples/simple_task_queue/queues.py | 7 ++++++ examples/simple_task_queue/tasks.py | 4 ++++ examples/simple_task_queue/worker.py | 30 ++++++++++++++++++++++++++ kombu/clocks.py | 5 ++--- 10 files changed, 102 insertions(+), 4 deletions(-) create mode 100644 docs/userguide/examples.rst create mode 100644 examples/simple_task_queue/__init__.py create mode 100644 examples/simple_task_queue/client.py create mode 100644 examples/simple_task_queue/queues.py create mode 100644 examples/simple_task_queue/tasks.py create mode 100644 examples/simple_task_queue/worker.py diff --git a/docs/_theme/celery/static/celery.css_t b/docs/_theme/celery/static/celery.css_t index 18e4c1ee..4274f31f 100644 --- a/docs/_theme/celery/static/celery.css_t +++ b/docs/_theme/celery/static/celery.css_t @@ -17,6 +17,7 @@ /* -- page layout ----------------------------------------------------------- */ body { + align: left; font-family: {{ body_font_stack }}; font-size: 17px; background-color: white; diff --git a/docs/userguide/connections.rst b/docs/userguide/connections.rst index c4393e2f..daf3fc07 100644 --- a/docs/userguide/connections.rst +++ b/docs/userguide/connections.rst @@ -111,7 +111,7 @@ keyword arguments, these are: :transport: Default transport if not provided in the URL. Can be a string specifying the path to the class. (e.g. ``kombu.transport.pyamqplib.Transport``), or one of the aliases: - ``amqplib``, ``pika``, ``redis``, ``memory``, and so on. + ``amqplib``, ``pika``, ``redis``, ``memory``, and so on. :ssl: Use ssl to connect to the server. Default is ``False``. Only supported by the amqp transport. diff --git a/docs/userguide/examples.rst b/docs/userguide/examples.rst new file mode 100644 index 00000000..ed807780 --- /dev/null +++ b/docs/userguide/examples.rst @@ -0,0 +1,27 @@ +Task Queue Example +================== + +Very simple task queue using pickle, with primitive support +for priorities using different queues. + + +:file:`queues.py`: + +.. literalinclude:: ../../examples/simple_task_queue/queues.py + :language: python + +:file:`worker.py`: + +.. literalinclude:: ../../examples/simple_task_queue/worker.py + :language: python + +:file:`tasks.py`: + +.. literalinclude:: ../../examples/simple_task_queue/tasks.py + :language: python + +.. code-block:: python + +:file:`client.py`: + +.. literalinclude:: ../../examples/simple_task_queue/client.py diff --git a/docs/userguide/index.rst b/docs/userguide/index.rst index 3010fd10..b400e289 100644 --- a/docs/userguide/index.rst +++ b/docs/userguide/index.rst @@ -10,4 +10,6 @@ connections simple + examples + pools serialization diff --git a/examples/simple_task_queue/__init__.py b/examples/simple_task_queue/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/simple_task_queue/client.py b/examples/simple_task_queue/client.py new file mode 100644 index 00000000..489aa2e6 --- /dev/null +++ b/examples/simple_task_queue/client.py @@ -0,0 +1,28 @@ +from __future__ import with_statement + +from kombu.common import maybe_declare +from kombu.pools import producers + +from .queues import task_exchange + +priority_to_routing_key = {"high": "hipri", + "mid": "midpri", + "low": "lopri"} + +def send_as_task(connection, fun, args, kwargs, priority="mid"): + payload = {"fun": fun, "args": args, "kwargs": kwargs} + routing_key = priority_to_routing_key[priority] + + with producers[connection].acquire(block=True) as producer: + maybe_declare(task_exchange, producer.channel) + producer.publish(payload, serializer="pickle", + compression="zlib", + routing_key=routing_key) + +if __name__ == "__main__": + from kombu import BrokerConnection + from .tasks import hello_task + + conection = BrokerConnection("amqp://guest:guest@localhost:5672//") + send_as_task(connection, fun=hello_task, args=("Kombu", ), + priority="high") diff --git a/examples/simple_task_queue/queues.py b/examples/simple_task_queue/queues.py new file mode 100644 index 00000000..5f1feef2 --- /dev/null +++ b/examples/simple_task_queue/queues.py @@ -0,0 +1,7 @@ +from kombu import Exchange, Queue + +task_exchange = Exchange("tasks", type="direct") +task_queues = [Queue("hipri", task_exchange, routing_key="hipri"), + Queue("midpri", task_exchange, routing_key="midpri"), + Queue("lopri", task_exchange, routing_key="lopri")] + diff --git a/examples/simple_task_queue/tasks.py b/examples/simple_task_queue/tasks.py new file mode 100644 index 00000000..47146e45 --- /dev/null +++ b/examples/simple_task_queue/tasks.py @@ -0,0 +1,4 @@ +def hello_task(who="world"): + print("Hello %s" % (who, )) + + diff --git a/examples/simple_task_queue/worker.py b/examples/simple_task_queue/worker.py new file mode 100644 index 00000000..68e7d6b6 --- /dev/null +++ b/examples/simple_task_queue/worker.py @@ -0,0 +1,30 @@ +from __future__ import with_statement + +from kombu import Exchange, Queue +from kombu.mixins import ConsumerMixin +from kombu.utils import kwdict + +from queues import task_queues + +class Worker(ConsumerMixin): + + def get_consumers(self, Consumer, channel): + return Consumer(queues=task_queues, + callbacks=[self.process_task]) + + def process_task(body, message): + fun = body["fun"] + args = body["args"] + kwargs = body["kwargs"] + fun(*args, **kwdict(kwargs)) + message.ack() + +if __name__ == "__main__": + from kombu import BrokerConnection + from kombu.log import setup_logging + setup_logging(loglevel="INFO") + + with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn: + Worker(conn).run() + + diff --git a/kombu/clocks.py b/kombu/clocks.py index 1e698e92..2e6a7b9b 100644 --- a/kombu/clocks.py +++ b/kombu/clocks.py @@ -16,7 +16,7 @@ class LamportClock(object): From Wikipedia: - "A Lamport logical clock is a monotonically incrementing software counter + A Lamport logical clock is a monotonically incrementing software counter maintained in each process. It follows some simple rules: * A process increments its counter before each event in that process; @@ -34,8 +34,7 @@ class LamportClock(object): .. seealso:: http://en.wikipedia.org/wiki/Lamport_timestamps - http://en.wikipedia.org/wiki/Lamport's_Distributed_ - Mutual_Exclusion_Algorithm + http://en.wikipedia.org/wiki/Lamport's_Distributed_Mutual_Exclusion_Algorithm *Usage*