commit da3e75459489e2069b4de487eb578c99bc6a19ed Author: Ask Solem Date: Wed Jun 23 12:08:39 2010 +0200 Initial import diff --git a/AUTHORS b/AUTHORS new file mode 100644 index 00000000..284d232b --- /dev/null +++ b/AUTHORS @@ -0,0 +1,19 @@ +================================== + AUTHORS (in chronological order) +================================== + +Ask Solem +Travis Cline +Rune Halvorsen +Sean Creeley +Jason Cater +Ian Struble +Patrick Schneider +Travis Swicegood +Stephen Day +Andrew Watts +Paul McLanahan +Ralf Nyren +Jeff Balogh +Adam Wentz +Vincent Driessen diff --git a/Changelog b/Changelog new file mode 100644 index 00000000..66f0f036 --- /dev/null +++ b/Changelog @@ -0,0 +1,8 @@ +================ + Change history +================ + +0.1.0 +----- + +* Rewrite of carrot diff --git a/FAQ b/FAQ new file mode 100644 index 00000000..d8f138c9 --- /dev/null +++ b/FAQ @@ -0,0 +1,19 @@ +============================ + Frequently Asked Questions +============================ + +Questions +========= + +Q: Message.reject doesn't work? +-------------------------------------- +**Answer**: RabbitMQ (as of v1.5.5) has not implemented reject yet. +There was a brief discussion about it on their mailing list, and the reason +why it's not implemented yet is revealed: + +http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2009-January/003183.html + +Q: Message.requeue doesn't work? +-------------------------------------- + +**Answer**: See _`Message.reject doesn't work?` diff --git a/INSTALL b/INSTALL new file mode 100644 index 00000000..3d86448c --- /dev/null +++ b/INSTALL @@ -0,0 +1,21 @@ +Installation +============ + +You can install ``carrot`` either via the Python Package Index (PyPI) +or from source. + +To install using ``pip``,:: + + $ pip install carrot + + +To install using ``easy_install``,:: + + $ easy_install carrot + + +If you have downloaded a source tarball you can install it +by doing the following,:: + + $ python setup.py build + # python setup.py install # as root diff --git a/LICENSE b/LICENSE new file mode 100644 index 00000000..81d19cef --- /dev/null +++ b/LICENSE @@ -0,0 +1,28 @@ +Copyright (c) 2009, Ask Solem +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + +Neither the name of Ask Solem nor the names of its contributors may be used +to endorse or promote products derived from this software without specific +prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS +BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 00000000..08498736 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,12 @@ +include AUTHORS +include Changelog +include FAQ +include INSTALL +include LICENSE +include MANIFEST.in +include README.rst +include README +include THANKS +include TODO +recursive-include docs * +recursive-include kombu *.py diff --git a/README b/README new file mode 120000 index 00000000..92cacd28 --- /dev/null +++ b/README @@ -0,0 +1 @@ +README.rst \ No newline at end of file diff --git a/README.rst b/README.rst new file mode 100644 index 00000000..1678a814 --- /dev/null +++ b/README.rst @@ -0,0 +1,391 @@ +############################################## + kombu - AMQP Messaging Framework for Python +############################################## + +:Version: 0.10.5 + +**THIS IS A REWRITE OF CARROT** + + +Introduction +------------ + +`carrot` is an `AMQP`_ messaging queue framework. AMQP is the Advanced Message +Queuing Protocol, an open standard protocol for message orientation, queuing, +routing, reliability and security. + +The aim of `carrot` is to make messaging in Python as easy as possible by +providing a high-level interface for producing and consuming messages. At the +same time it is a goal to re-use what is already available as much as possible. + +`carrot` has pluggable messaging back-ends, so it is possible to support +several messaging systems. Currently, there is support for `AMQP`_ +(`py-amqplib`_, `pika`_), `STOMP`_ (`python-stomp`_). There's also an +in-memory backend for testing purposes, using the `Python queue module`_. + +Several AMQP message broker implementations exists, including `RabbitMQ`_, +`ZeroMQ`_ and `Apache ActiveMQ`_. You'll need to have one of these installed, +personally we've been using `RabbitMQ`_. + +Before you start playing with ``carrot``, you should probably read up on +AMQP, and you could start with the excellent article about using RabbitMQ +under Python, `Rabbits and warrens`_. For more detailed information, you can +refer to the `Wikipedia article about AMQP`_. + +.. _`RabbitMQ`: http://www.rabbitmq.com/ +.. _`ZeroMQ`: http://www.zeromq.org/ +.. _`AMQP`: http://amqp.org +.. _`STOMP`: http://stomp.codehaus.org +.. _`python-stomp`: http://bitbucket.org/asksol/python-stomp +.. _`Python Queue module`: http://docs.python.org/library/queue.html +.. _`Apache ActiveMQ`: http://activemq.apache.org/ +.. _`Django`: http://www.djangoproject.com/ +.. _`Rabbits and warrens`: http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/ +.. _`py-amqplib`: http://barryp.org/software/py-amqplib/ +.. _`pika`: http://github.com/tonyg/pika +.. _`Wikipedia article about AMQP`: http://en.wikipedia.org/wiki/AMQP + +Documentation +------------- + +Carrot is using Sphinx, and the latest documentation is available at GitHub: + + http://github.com/ask/carrot/ + +Installation +============ + +You can install ``carrot`` either via the Python Package Index (PyPI) +or from source. + +To install using ``pip``,:: + + $ pip install carrot + + +To install using ``easy_install``,:: + + $ easy_install carrot + + +If you have downloaded a source tarball you can install it +by doing the following,:: + + $ python setup.py build + # python setup.py install # as root + + +Terminology +=========== + +There are some concepts you should be familiar with before starting: + + * Publishers + + Publishers sends messages to an exchange. + + * Exchanges + + Messages are sent to exchanges. Exchanges are named and can be + configured to use one of several routing algorithms. The exchange + routes the messages to consumers by matching the routing key in the + message with the routing key the consumer provides when binding to + the exchange. + + * Consumers + + Consumers declares a queue, binds it to a exchange and receives + messages from it. + + * Queues + + Queues receive messages sent to exchanges. The queues are declared + by consumers. + + * Routing keys + + Every message has a routing key. The interpretation of the routing + key depends on the exchange type. There are four default exchange + types defined by the AMQP standard, and vendors can define custom + types (so see your vendors manual for details). + + These are the default exchange types defined by AMQP/0.8: + + * Direct exchange + + Matches if the routing key property of the message and + the ``routing_key`` attribute of the consumer are identical. + + * Fan-out exchange + + Always matches, even if the binding does not have a routing + key. + + * Topic exchange + + Matches the routing key property of the message by a primitive + pattern matching scheme. The message routing key then consists + of words separated by dots (``"."``, like domain names), and + two special characters are available; star (``"*"``) and hash + (``"#"``). The star matches any word, and the hash matches + zero or more words. For example ``"*.stock.#"`` matches the + routing keys ``"usd.stock"`` and ``"eur.stock.db"`` but not + ``"stock.nasdaq"``. + + +Examples +======== + +Creating a connection +--------------------- + + You can set up a connection by creating an instance of + ``carrot.messaging.BrokerConnection``, with the appropriate options for + your broker: + + >>> from carrot.connection import BrokerConnection + >>> conn = BrokerConnection(hostname="localhost", port=5672, + ... userid="test", password="test", + ... virtual_host="test") + + + If you're using Django you can use the + ``carrot.connection.DjangoBrokerConnection`` class instead, which loads + the connection settings from your ``settings.py``:: + + BROKER_HOST = "localhost" + BROKER_PORT = 5672 + BROKER_USER = "test" + BROKER_PASSWORD = "secret" + BROKER_VHOST = "/test" + + Then create a connection by doing: + + >>> from carrot.connection import DjangoBrokerConnection + >>> conn = DjangoBrokerConnection() + + + +Receiving messages using a Consumer +----------------------------------- + +First we open up a Python shell and start a message consumer. + +This consumer declares a queue named ``"feed"``, receiving messages with +the routing key ``"importer"`` from the ``"feed"`` exchange. + +The example then uses the consumers ``wait()`` method to go into consume +mode, where it continuously polls the queue for new messages, and when a +message is received it passes the message to all registered callbacks. + + >>> from carrot.messaging import Consumer + >>> consumer = Consumer(connection=conn, queue="feed", + ... exchange="feed", routing_key="importer") + >>> def import_feed_callback(message_data, message) + ... feed_url = message_data["import_feed"] + ... print("Got feed import message for: %s" % feed_url) + ... # something importing this feed url + ... # import_feed(feed_url) + ... message.ack() + >>> consumer.register_callback(import_feed_callback) + >>> consumer.wait() # Go into the consumer loop. + +Sending messages using a Publisher +---------------------------------- + +Then we open up another Python shell to send some messages to the consumer +defined in the last section. + + >>> from carrot.messaging import Publisher + >>> publisher = Publisher(connection=conn, + ... exchange="feed", routing_key="importer") + >>> publisher.send({"import_feed": "http://cnn.com/rss/edition.rss"}) + >>> publisher.close() + + +Look in the first Python shell again (where ``consumer.wait()`` is running), +where the following text has been printed to the screen:: + + Got feed import message for: http://cnn.com/rss/edition.rss + + +Serialization of Data +----------------------- + +By default every message is encoded using `JSON`_, so sending +Python data structures like dictionaries and lists works. +`YAML`_, `msgpack`_ and Python's built-in ``pickle`` module is also supported, +and if needed you can register any custom serialization scheme you +want to use. + +.. _`JSON`: http://www.json.org/ +.. _`YAML`: http://yaml.org/ +.. _`msgpack`: http://msgpack.sourceforge.net/ + +Each option has its advantages and disadvantages. + +``json`` -- JSON is supported in many programming languages, is now + a standard part of Python (since 2.6), and is fairly fast to + decode using the modern Python libraries such as ``cjson or + ``simplejson``. + + The primary disadvantage to ``JSON`` is that it limits you to + the following data types: strings, unicode, floats, boolean, + dictionaries, and lists. Decimals and dates are notably missing. + + Also, binary data will be transferred using base64 encoding, which + will cause the transferred data to be around 34% larger than an + encoding which supports native binary types. + + However, if your data fits inside the above constraints and + you need cross-language support, the default setting of ``JSON`` + is probably your best choice. + +``pickle`` -- If you have no desire to support any language other than + Python, then using the ``pickle`` encoding will gain you + the support of all built-in Python data types (except class instances), + smaller messages when sending binary files, and a slight speedup + over ``JSON`` processing. + +``yaml`` -- YAML has many of the same characteristics as ``json``, + except that it natively supports more data types (including dates, + recursive references, etc.) + + However, the Python libraries for YAML are a good bit slower + than the libraries for JSON. + + If you need a more expressive set of data types and need to maintain + cross-language compatibility, then ``YAML`` may be a better fit + than the above. + +To instruct carrot to use an alternate serialization method, +use one of the following options. + + 1. Set the serialization option on a per-Publisher basis: + + >>> from carrot.messaging import Publisher + >>> publisher = Publisher(connection=conn, + ... exchange="feed", routing_key="importer", + ... serializer="yaml") + + 2. Set the serialization option on a per-call basis + + >>> from carrot.messaging import Publisher + >>> publisher = Publisher(connection=conn, + ... exchange="feed", routing_key="importer") + >>> publisher.send({"import_feed": "http://cnn.com/rss/edition.rss"}, + ... serializer="pickle") + >>> publisher.close() + +Note that ``Consumer``s do not need the serialization method specified in +their code. They can auto-detect the serialization method since we supply +the ``Content-type`` header as part of the AMQP message. + + +Sending raw data without Serialization +--------------------------------------- + +In some cases, you don't need your message data to be serialized. If you +pass in a plain string or unicode object as your message, then carrot will +not waste cycles serializing/deserializing the data. + +You can optionally specify a ``content_type`` and ``content_encoding`` +for the raw data: + + >>> from carrot.messaging import Publisher + >>> publisher = Publisher(connection=conn, + ... exchange="feed", + routing_key="import_pictures") + >>> publisher.send(open('~/my_picture.jpg','rb').read(), + content_type="image/jpeg", + content_encoding="binary") + >>> publisher.close() + +The ``message`` object returned by the ``Consumer`` class will have a +``content_type`` and ``content_encoding`` attribute. + + +Receiving messages without a callback +-------------------------------------- + +You can also poll the queue manually, by using the ``fetch`` method. +This method returns a ``Message`` object, from where you can get the +message body, de-serialize the body to get the data, acknowledge, reject or +re-queue the message. + + >>> consumer = Consumer(connection=conn, queue="feed", + ... exchange="feed", routing_key="importer") + >>> message = consumer.fetch() + >>> if message: + ... message_data = message.payload + ... message.ack() + ... else: + ... # No messages waiting on the queue. + >>> consumer.close() + +Sub-classing the messaging classes +---------------------------------- + +The ``Consumer``, and ``Publisher`` classes can also be sub classed. Thus you +can define the above publisher and consumer like so: + + >>> from carrot.messaging import Publisher, Consumer + + >>> class FeedPublisher(Publisher): + ... exchange = "feed" + ... routing_key = "importer" + ... + ... def import_feed(self, feed_url): + ... return self.send({"action": "import_feed", + ... "feed_url": feed_url}) + + >>> class FeedConsumer(Consumer): + ... queue = "feed" + ... exchange = "feed" + ... routing_key = "importer" + ... + ... def receive(self, message_data, message): + ... action = message_data["action"] + ... if action == "import_feed": + ... # something importing this feed + ... # import_feed(message_data["feed_url"]) + message.ack() + ... else: + ... raise Exception("Unknown action: %s" % action) + + >>> publisher = FeedPublisher(connection=conn) + >>> publisher.import_feed("http://cnn.com/rss/edition.rss") + >>> publisher.close() + + >>> consumer = FeedConsumer(connection=conn) + >>> consumer.wait() # Go into the consumer loop. + +Getting Help +============ + +Mailing list +------------ + +Join the `carrot-users`_ mailing list. + +.. _`carrot-users`: http://groups.google.com/group/carrot-users/ + +Bug tracker +=========== + +If you have any suggestions, bug reports or annoyances please report them +to our issue tracker at http://github.com/ask/carrot/issues/ + +Contributing +============ + +Development of ``carrot`` happens at Github: http://github.com/ask/carrot + +You are highly encouraged to participate in the development. If you don't +like Github (for some reason) you're welcome to send regular patches. + +License +======= + +This software is licensed under the ``New BSD License``. See the ``LICENSE`` +file in the top distribution directory for the full license text. diff --git a/THANKS b/THANKS new file mode 100644 index 00000000..6757ee47 --- /dev/null +++ b/THANKS @@ -0,0 +1,6 @@ +Thanks to Barry Pederson for the py-amqplib library. +Thanks to Grégoire Cachet for bug reports. +Thanks to Martin Mahner for the Sphinx theme. +Thanks to jcater for bug reports. +Thanks to sebest for bug reports. +Thanks to greut for bug reports diff --git a/TODO b/TODO new file mode 100644 index 00000000..992504b8 --- /dev/null +++ b/TODO @@ -0,0 +1,2 @@ +Please see our Issue Tracker at GitHub: + http://github.com/ask/kombu/issues diff --git a/contrib/doc2ghpages b/contrib/doc2ghpages new file mode 100755 index 00000000..5ebc7aaa --- /dev/null +++ b/contrib/doc2ghpages @@ -0,0 +1,13 @@ +#!/bin/bash + +git checkout master +(cd docs; + rm -rf .build; + make html; + (cd .build/html; + sphinx-to-github;)) +git checkout gh-pages +cp -r docs/.build/html/* . +git commit . -m "Autogenerated documentation for github." +git push origin gh-pages +git checkout master diff --git a/contrib/requirements/default.txt b/contrib/requirements/default.txt new file mode 100644 index 00000000..63c3428e --- /dev/null +++ b/contrib/requirements/default.txt @@ -0,0 +1,2 @@ +anyjson +amqplib>=0.6 diff --git a/contrib/requirements/test.txt b/contrib/requirements/test.txt new file mode 100644 index 00000000..bf222f70 --- /dev/null +++ b/contrib/requirements/test.txt @@ -0,0 +1,7 @@ +nose +nose-cover3 +coverage>=3.0 +simplejson +PyYAML +msgpack-python + diff --git a/docs/rewrite.rst b/docs/rewrite.rst new file mode 100644 index 00000000..c9bbfed6 --- /dev/null +++ b/docs/rewrite.rst @@ -0,0 +1,41 @@ + + +.. code-block:: python + + from carrot import Connection, Exchange, Binding + from carrot import Consumer, Producer + + media_exchange = Exchange("media", "direct", durable=True) + video_binding = Binding("video", exchange=media_exchange, key="video") + + # connections/channels + connection = Connection("localhost", "guest", "guest", "/") + channel = connection.channel() + + # produce + producer = Producer(channel, exchange=media_exchange, serializer="json") + producer.publish({"name": "/tmp/lolcat1.avi", "size": 1301013}) + + # consume + consumer = Consumer(channel, video_binding) + consumer.register_callback(process_media) + consumer.consume() + + while True: + connection.drain_events() + + + # consumerset: + video_binding = Binding("video", exchange=media_exchange, key="video") + image_binding = Binding("image", exchange=media_exchange, key="image") + + consumer = Consumer(channel, [video_binding, image_binding]) + + + + + + + + + diff --git a/kombu/__init__.py b/kombu/__init__.py new file mode 100644 index 00000000..d88bc9cc --- /dev/null +++ b/kombu/__init__.py @@ -0,0 +1,7 @@ +"""AMQP Messaging Framework for Python""" +VERSION = (0, 10, 5) +__version__ = ".".join(map(str, VERSION)) +__author__ = "Ask Solem" +__contact__ = "askh@opera.com" +__homepage__ = "http://github.com/ask/carrot/" +__docformat__ = "restructuredtext" diff --git a/kombu/backends/__init__.py b/kombu/backends/__init__.py new file mode 100644 index 00000000..e2263e60 --- /dev/null +++ b/kombu/backends/__init__.py @@ -0,0 +1,43 @@ +import sys + +from kombu.utils import rpartition + +DEFAULT_BACKEND = "kombu.backends.pyamqplib.Backend" + +BACKEND_ALIASES = { + "amqplib": "kombu.backends.pyamqplib.Backend", + "pika": "kombu.backends.pikachu.AsyncoreBackend", + "syncpika": "kombu.backends.pikachu.SyncBackend", +} + +_backend_cache = {} + + +def resolve_backend(backend=None): + backend = BACKEND_ALIASES.get(backend, backend) + backend_module_name, _, backend_cls_name = rpartition(backend, ".") + return backend_module_name, backend_cls_name + + +def _get_backend_cls(backend=None): + backend_module_name, backend_cls_name = resolve_backend(backend) + __import__(backend_module_name) + backend_module = sys.modules[backend_module_name] + return getattr(backend_module, backend_cls_name) + + +def get_backend_cls(backend=None): + """Get backend class by name. + + The backend string is the full path to a backend class, e.g.:: + + "kombu.backends.pyamqplib.Backend" + + If the name does not include "``.``" (is not fully qualified), + the alias table will be consulted. + + """ + backend = backend or DEFAULT_BACKEND + if backend not in _backend_cache: + _backend_cache[backend] = _get_backend_cls(backend) + return _backend_cache[backend] diff --git a/kombu/backends/base.py b/kombu/backends/base.py new file mode 100644 index 00000000..4413e119 --- /dev/null +++ b/kombu/backends/base.py @@ -0,0 +1,104 @@ +""" + +Backend base classes. + +""" +from kombu import serialization + +ACKNOWLEDGED_STATES = frozenset(["ACK", "REJECTED", "REQUEUED"]) + + +class MessageStateError(Exception): + """The message has already been acknowledged.""" + + +class BaseMessage(object): + """Base class for received messages.""" + _state = None + + MessageStateError = MessageStateError + + def __init__(self, channel, body=None, delivery_tag=None, + content_type=None, content_encoding=None, delivery_info={}, **kwargs): + self.channel = channel + self._decoded_cache = None + self._state = "RECEIVED" + + def decode(self): + """Deserialize the message body, returning the original + python structure sent by the publisher.""" + return serialization.decode(self.body, self.content_type, + self.content_encoding) + + @property + def payload(self): + """The decoded message.""" + if not self._decoded_cache: + self._decoded_cache = self.decode() + return self._decoded_cache + + def ack(self): + """Acknowledge this message as being processed., + This will remove the message from the queue. + + :raises MessageStateError: If the message has already been + acknowledged/requeued/rejected. + + """ + if self.acknowledged: + raise self.MessageStateError( + "Message already acknowledged with state: %s" % self._state) + self.channel.basic_ack(self.delivery_tag) + self._state = "ACK" + + def reject(self): + """Reject this message. + + The message will be discarded by the server. + + :raises MessageStateError: If the message has already been + acknowledged/requeued/rejected. + + """ + if self.acknowledged: + raise self.MessageStateError( + "Message already acknowledged with state: %s" % self._state) + self.channel.basic_reject(self.delivery_tag) + self._state = "REJECTED" + + def requeue(self): + """Reject this message and put it back on the queue. + + You must not use this method as a means of selecting messages + to process. + + :raises MessageStateError: If the message has already been + acknowledged/requeued/rejected. + + """ + if self.acknowledged: + raise self.MessageStateError( + "Message already acknowledged with state: %s" % self._state) + self.channel.basic_reject(self.delivery_tag, requeue=True) + self._state = "REQUEUED" + + @property + def acknowledged(self): + return self._state in ACKNOWLEDGED_STATES + + +class BaseBackend(object): + """Base class for backends.""" + default_port = None + + def __init__(self, connection, **kwargs): + self.connection = connection + + def get_channel(self): + raise NotImplementedError("Subclass responsibility") + + def establish_connection(self): + raise NotImplementedError("Subclass responsibility") + + def close_connection(self): + raise NotImplementedError("Subclass responsibility") diff --git a/kombu/backends/emulation.py b/kombu/backends/emulation.py new file mode 100644 index 00000000..600366fa --- /dev/null +++ b/kombu/backends/emulation.py @@ -0,0 +1,244 @@ +from carrot.backends.base import BaseBackend, BaseMessage +from anyjson import serialize, deserialize +from itertools import count +from django.utils.datastructures import SortedDict +from carrot.utils import gen_unique_id +import sys +import time +import atexit + +from Queue import Empty as QueueEmpty +from itertools import cycle + + +class QueueSet(object): + """A set of queues that operates as one.""" + + def __init__(self, backend, queues): + self.backend = backend + self.queues = queues + + # an infinite cycle through all the queues. + self.cycle = cycle(self.queues) + + # A set of all the queue names, so we can match when we've + # tried all of them. + self.all = frozenset(self.queues) + + def get(self): + """Get the next message avaiable in the queue. + + :returns: The message and the name of the queue it came from as + a tuple. + :raises Empty: If there are no more items in any of the queues. + + """ + + # A set of queues we've already tried. + tried = set() + + while True: + # Get the next queue in the cycle, and try to get an item off it. + queue = self.cycle.next() + try: + item = self.backend._get(queue) + except Empty: + # raises Empty when we've tried all of them. + tried.add(queue) + if tried == self.all: + raise + else: + return item, queue + + def __repr__(self): + return "" % repr(self.queue_names) + + +class QualityOfService(object): + + def __init__(self, resource, prefetch_count=None, interval=None): + self.resource = resource + self.prefetch_count = prefetch_count + self.interval = interval + self._delivered = SortedDict() + self._restored_once = False + atexit.register(self.restore_unacked_once) + + def can_consume(self): + return len(self._delivered) > self.prefetch_count + + def append(self, message, queue_name, delivery_tag): + self._delivered[delivery_tag] = message, queue_name + + def ack(self, delivery_tag): + self._delivered.pop(delivery_tag, None) + + def restore_unacked(self): + for message, queue_name in self._delivered.items(): + self.resource.put(queue_name, message) + self._delivered = SortedDict() + + def requeue(self, delivery_tag): + try: + message, queue_name = self._delivered.pop(delivery_tag) + except KeyError: + pass + self.resource.put(queue_name, message) + + def restore_unacked_once(self): + if not self._restored_once: + if self._delivered: + sys.stderr.write( + "Restoring unacknowledged messages: %s\n" % ( + self._delivered)) + self.restore_unacked() + if self._delivered: + sys.stderr.write("UNRESTORED MESSAGES: %s\n" % ( + self._delivered)) + + +class Message(BaseMessage): + + def __init__(self, backend, payload, **kwargs): + self.backend = backend + + payload = deserialize(payload) + kwargs["body"] = payload.get("body").encode("utf-8") + kwargs["delivery_tag"] = payload.get("delivery_tag") + kwargs["content_type"] = payload.get("content-type") + kwargs["content_encoding"] = payload.get("content-encoding") + kwargs["priority"] = payload.get("priority") + self.destination = payload.get("destination") + + super(Message, self).__init__(backend, **kwargs) + + def reject(self): + raise NotImplementedError( + "This backend does not implement basic.reject") + + +class EmulationBase(BaseBackend): + Message = Message + default_port = None + interval = 1 + _prefetch_count = None + + QueueSet = QueueSet + + def __init__(self, connection, **kwargs): + self.connection = connection + self._consumers = {} + self._callbacks = {} + self._qos_manager = None + + def establish_connection(self): + return self # for drain events + + def close_connection(self, connection): + pass + + def queue_exists(self, queue): + return True + + def queue_purge(self, queue, **kwargs): + return self._purge(queue, **kwargs) + + def _poll(self, resource): + while True: + if self.qos_manager.can_consume(): + try: + return resource.get() + except QueueEmpty: + pass + time.sleep(self.interval) + + def declare_consumer(self, queue, no_ack, callback, consumer_tag, + **kwargs): + self._consumers[consumer_tag] = queue + self._callbacks[queue] = callback + + def drain_events(self, timeout=None): + queueset = self.QueueSet(self._consumers.values()) + payload, queue = self._poll(queueset) + + if not queue or queue not in self._callbacks: + return + + self._callbacks[queue](payload) + + def consume(self, limit=None): + for total_message_count in count(): + if limit and total_message_count >= limit: + raise StopIteration + + self.drain_events() + + yield True + + def queue_declare(self, queue, *args, **kwargs): + pass + + def _get_many(self, queues): + raise NotImplementedError("Emulations must implement _get_many") + + def _get(self, queue): + raise NotImplementedError("Emulations must implement _get") + + def _put(self, queue, message): + raise NotImplementedError("Emulations must implement _put") + + def _purge(self, queue, message): + raise NotImplementedError("Emulations must implement _purge") + + def get(self, queue, **kwargs): + try: + payload = self._get(queue) + except QueueEmpty: + return None + else: + return self.message_to_python(payload) + + def ack(self, delivery_tag): + self.qos_manager.ack(delivery_tag) + + def requeue(self, delivery_tag): + self.qos_manager.requeue(delivery_tag) + + def message_to_python(self, raw_message): + message = self.Message(backend=self, payload=raw_message) + self.qos_manager.append(message, message.destination, + message.delivery_tag) + return message + + def prepare_message(self, message_data, delivery_mode, priority=0, + content_type=None, content_encoding=None): + return {"body": message_data, + "priority": priority or 0, + "content-encoding": content_encoding, + "content-type": content_type} + + def publish(self, message, exchange, routing_key, **kwargs): + message["destination"] = exchange + self._put(exchange, message) + + def cancel(self, consumer_tag): + queue = self._consumers.pop(consumer_tag, None) + self._callbacks.pop(queue, None) + + def close(self): + for consumer_tag in self._consumers.keys(): + self.cancel(consumer_tag) + + def basic_qos(self, prefetch_size, prefetch_count, apply_global=False): + self._prefetch_count = prefetch_count + + @property + def qos_manager(self): + if self._qos_manager is None: + self._qos_manager = QualityOfService(self) + + # Update prefetch count / interval + self._qos_manager.prefetch_count = self._prefetch_count + self._qos_manager.interval = self.interval + + return self._qos_manager diff --git a/kombu/backends/pikachu.py b/kombu/backends/pikachu.py new file mode 100644 index 00000000..507bbbfb --- /dev/null +++ b/kombu/backends/pikachu.py @@ -0,0 +1,212 @@ +import asyncore +import weakref +import functools +import itertools + +import pika + +from carrot.backends.base import BaseMessage, BaseBackend + +DEFAULT_PORT = 5672 + + +class Message(BaseMessage): + + def __init__(self, backend, amqp_message, **kwargs): + channel, method, header, body = amqp_message + self._channel = channel + self._method = method + self._header = header + self.backend = backend + + kwargs.update({"body": body, + "delivery_tag": method.delivery_tag, + "content_type": header.content_type, + "content_encoding": header.content_encoding, + "delivery_info": dict( + consumer_tag=method.consumer_tag, + routing_key=method.routing_key, + delivery_tag=method.delivery_tag, + exchange=method.exchange)}) + + super(Message, self).__init__(backend, **kwargs) + + +class SyncBackend(BaseBackend): + default_port = DEFAULT_PORT + _connection_cls = pika.BlockingConnection + + Message = Message + + def __init__(self, connection, **kwargs): + self.connection = connection + self.default_port = kwargs.get("default_port", self.default_port) + self._channel_ref = None + + @property + def _channel(self): + return callable(self._channel_ref) and self._channel_ref() + + @property + def channel(self): + """If no channel exists, a new one is requested.""" + if not self._channel: + self._channel_ref = weakref.ref(self.connection.get_channel()) + return self._channel + + def establish_connection(self): + """Establish connection to the AMQP broker.""" + conninfo = self.connection + if not conninfo.port: + conninfo.port = self.default_port + credentials = pika.PlainCredentials(conninfo.userid, + conninfo.password) + return self._connection_cls(pika.ConnectionParameters( + conninfo.hostname, + port=conninfo.port, + virtual_host=conninfo.virtual_host, + credentials=credentials)) + + def close_connection(self, connection): + """Close the AMQP broker connection.""" + connection.close() + + def queue_exists(self, queue): + return False # FIXME + + def queue_delete(self, queue, if_unused=False, if_empty=False): + """Delete queue by name.""" + return self.channel.queue_delete(queue=queue, if_unused=if_unused, + if_empty=if_empty) + + def queue_purge(self, queue, **kwargs): + """Discard all messages in the queue. This will delete the messages + and results in an empty queue.""" + return self.channel.queue_purge(queue=queue).message_count + + def queue_declare(self, queue, durable, exclusive, auto_delete, + warn_if_exists=False, arguments=None): + """Declare a named queue.""" + + return self.channel.queue_declare(queue=queue, + durable=durable, + exclusive=exclusive, + auto_delete=auto_delete, + arguments=arguments) + + def exchange_declare(self, exchange, type, durable, auto_delete): + """Declare an named exchange.""" + return self.channel.exchange_declare(exchange=exchange, + type=type, + durable=durable, + auto_delete=auto_delete) + + def queue_bind(self, queue, exchange, routing_key, arguments={}): + """Bind queue to an exchange using a routing key.""" + if not arguments: + arguments = {} + return self.channel.queue_bind(queue=queue, + exchange=exchange, + routing_key=routing_key, + arguments=arguments) + + def message_to_python(self, raw_message): + """Convert encoded message body back to a Python value.""" + return self.Message(backend=self, amqp_message=raw_message) + + def get(self, queue, no_ack=False): + """Receive a message from a declared queue by name. + + :returns: A :class:`Message` object if a message was received, + ``None`` otherwise. If ``None`` was returned, it probably means + there was no messages waiting on the queue. + + """ + raw_message = self.channel.basic_get(queue, no_ack=no_ack) + if not raw_message: + return None + return self.message_to_python(raw_message) + + def declare_consumer(self, queue, no_ack, callback, consumer_tag, + nowait=False): + """Declare a consumer.""" + + @functools.wraps(callback) + def _callback_decode(channel, method, header, body): + return callback((channel, method, header, body)) + + return self.channel.basic_consume(_callback_decode, + queue=queue, + no_ack=no_ack, + consumer_tag=consumer_tag) + + def consume(self, limit=None): + """Returns an iterator that waits for one message at a time.""" + for total_message_count in itertools.count(): + if limit and total_message_count >= limit: + raise StopIteration + self.connection.connection.drain_events() + yield True + + def cancel(self, consumer_tag): + """Cancel a channel by consumer tag.""" + if not self._channel: + return + self.channel.basic_cancel(consumer_tag) + + def close(self): + """Close the channel if open.""" + if self._channel and not self._channel.handler.channel_close: + self._channel.close() + self._channel_ref = None + + def ack(self, delivery_tag): + """Acknowledge a message by delivery tag.""" + return self.channel.basic_ack(delivery_tag) + + def reject(self, delivery_tag): + """Reject a message by deliver tag.""" + return self.channel.basic_reject(delivery_tag, requeue=False) + + def requeue(self, delivery_tag): + """Reject and requeue a message by delivery tag.""" + return self.channel.basic_reject(delivery_tag, requeue=True) + + def prepare_message(self, message_data, delivery_mode, priority=None, + content_type=None, content_encoding=None): + """Encapsulate data into a AMQP message.""" + properties = pika.BasicProperties(priority=priority, + content_type=content_type, + content_encoding=content_encoding, + delivery_mode=delivery_mode) + return message_data, properties + + def publish(self, message, exchange, routing_key, mandatory=None, + immediate=None, headers=None): + """Publish a message to a named exchange.""" + body, properties = message + + if headers: + properties.headers = headers + + ret = self.channel.basic_publish(body=body, + properties=properties, + exchange=exchange, + routing_key=routing_key, + mandatory=mandatory, + immediate=immediate) + if mandatory or immediate: + self.close() + + def qos(self, prefetch_size, prefetch_count, apply_global=False): + """Request specific Quality of Service.""" + self.channel.basic_qos(prefetch_size, prefetch_count, + apply_global) + + def flow(self, active): + """Enable/disable flow from peer.""" + self.channel.flow(active) + + +class AsyncoreBackend(SyncBackend): + _connection_cls = pika.AsyncoreConnection diff --git a/kombu/backends/pyamqplib.py b/kombu/backends/pyamqplib.py new file mode 100644 index 00000000..dbc6df69 --- /dev/null +++ b/kombu/backends/pyamqplib.py @@ -0,0 +1,202 @@ +""" + +`amqplib`_ backend for carrot. + +.. _`amqplib`: http://barryp.org/software/py-amqplib/ + +""" +import warnings +import weakref + +from itertools import count + +from amqplib import client_0_8 as amqp +from amqplib.client_0_8.exceptions import AMQPChannelException + +from kombu.backends.base import BaseMessage, BaseBackend + +DEFAULT_PORT = 5672 + +class Connection(amqp.Connection): + + def drain_events(self, allowed_methods=None, timeout=None): + """Wait for an event on any channel.""" + return self.wait_multi(self.channels.values(), timeout=timeout) + + def wait_multi(self, channels, allowed_methods=None, timeout=None): + """Wait for an event on a channel.""" + chanmap = dict((chan.channel_id, chan) for chan in channels) + chanid, method_sig, args, content = self._wait_multiple( + chanmap.keys(), allowed_methods, timeout=timeout) + + channel = chanmap[chanid] + + if content \ + and channel.auto_decode \ + and hasattr(content, 'content_encoding'): + try: + content.body = content.body.decode(content.content_encoding) + except Exception: + pass + + amqp_method = channel._METHOD_MAP.get(method_sig, None) + + if amqp_method is None: + raise Exception('Unknown AMQP method (%d, %d)' % method_sig) + + if content is None: + return amqp_method(channel, args) + else: + return amqp_method(channel, args, content) + + def read_timeout(self, timeout=None): + if timeout is None: + return self.method_reader.read_method() + sock = self.transport.sock + prev = sock.gettimeout() + sock.settimeout(timeout) + try: + return self.method_reader.read_method() + finally: + sock.settimeout(prev) + + def _wait_multiple(self, channel_ids, allowed_methods, timeout=None): + for channel_id in channel_ids: + method_queue = self.channels[channel_id].method_queue + for queued_method in method_queue: + method_sig = queued_method[0] + if (allowed_methods is None) \ + or (method_sig in allowed_methods) \ + or (method_sig == (20, 40)): + method_queue.remove(queued_method) + method_sig, args, content = queued_method + return channel_id, method_sig, args, content + + # Nothing queued, need to wait for a method from the peer + while True: + channel, method_sig, args, content = self.read_timeout(timeout) + + if (channel in channel_ids) \ + and ((allowed_methods is None) \ + or (method_sig in allowed_methods) \ + or (method_sig == (20, 40))): + return channel, method_sig, args, content + + # Not the channel and/or method we were looking for. Queue + # this method for later + self.channels[channel].method_queue.append((method_sig, + args, + content)) + + # + # If we just queued up a method for channel 0 (the Connection + # itself) it's probably a close method in reaction to some + # error, so deal with it right away. + # + if channel == 0: + self.wait() + + def channel(self, channel_id=None): + try: + return self.channels[channel_id] + except KeyError: + return Channel(self, channel_id) + + +class Message(BaseMessage): + """A message received by the broker. + + Usually you don't insantiate message objects yourself, but receive + them using a :class:`carrot.messaging.Consumer`. + + :param backend: see :attr:`backend`. + :param amqp_message: see :attr:`_amqp_message`. + + + .. attribute:: body + + The message body. + + .. attribute:: delivery_tag + + The message delivery tag, uniquely identifying this message. + + .. attribute:: backend + + The message backend used. + A subclass of :class:`carrot.backends.base.BaseBackend`. + + .. attribute:: _amqp_message + + A :class:`amqplib.client_0_8.basic_message.Message` instance. + This is a private attribute and should not be accessed by + production code. + + """ + + def __init__(self, channel, amqp_message, **kwargs): + self._amqp_message = amqp_message + self.channel = channel + + for attr_name in ("body", + "delivery_tag", + "content_type", + "content_encoding", + "delivery_info"): + kwargs[attr_name] = getattr(amqp_message, attr_name, None) + + super(Message, self).__init__(backend, **kwargs) + + +class Channel(amqp.Channel): + Message = Message + + def prepare_message(self, message_data, priority=None, + content_type=None, content_encoding=None, headers=None, + properties=None): + """Encapsulate data into a AMQP message.""" + return amqp.Message(message_data, priority=priority, + content_type=content_type, + content_encoding=content_encoding, + properties=properties) + + def message_to_python(self, raw_message): + """Convert encoded message body back to a Python value.""" + return self.Message(channel=self, amqp_message=raw_message) + + +class Backend(BaseBackend): + default_port = DEFAULT_PORT + + def __init__(self, connection, **kwargs): + self.connection = connection + self.default_port = kwargs.get("default_port") or self.default_port + + def get_channel(self, connection): + return connection.channel() + + def drain_events(self, connection, **kwargs): + return connection.drain_events(**kwargs) + + def establish_connection(self): + """Establish connection to the AMQP broker.""" + conninfo = self.connection + if not conninfo.hostname: + raise KeyError("Missing hostname for AMQP connection.") + if conninfo.userid is None: + raise KeyError("Missing user id for AMQP connection.") + if conninfo.password is None: + raise KeyError("Missing password for AMQP connection.") + if not conninfo.port: + conninfo.port = self.default_port + return Connection(host=conninfo.host, + userid=conninfo.userid, + password=conninfo.password, + virtual_host=conninfo.virtual_host, + insist=conninfo.insist, + ssl=conninfo.ssl, + connect_timeout=conninfo.connect_timeout) + + def close_connection(self, connection): + """Close the AMQP broker connection.""" + connection.close() diff --git a/kombu/connection.py b/kombu/connection.py new file mode 100644 index 00000000..64727dc1 --- /dev/null +++ b/kombu/connection.py @@ -0,0 +1,79 @@ +from kombu.backends import get_backend_cls + + +class BrokerConnection(object): + + def __init__(self, hostname="localhost", userid="guest", + password="guest", virtual_host="/", port=None, **kwargs): + self.hostname = hostname + self.userid = userid + self.password = password + self.virtual_host = virtual_host or self.virtual_host + self.port = port or self.port + self.insist = kwargs.get("insist", False) + self.connect_timeout = kwargs.get("connect_timeout", 5) + self.ssl = kwargs.get("ssl", False) + self.backend_cls = kwargs.get("backend_cls", None) + self._closed = None + self._connection = None + self._backend = None + + def __enter__(self): + return self + + def __exit__(self, e_type, e_value, e_trace): + self.close() + + def _establish_connection(self): + return self.backend.establish_connection() + + @property + def connection(self): + if self._closed: + return + if not self._connection: + self._connection = self._establish_connection() + self._closed = False + return self._connection + + @property + def host(self): + """The host as a hostname/port pair separated by colon.""" + return ":".join([self.hostname, str(self.port)]) + + def get_backend_cls(self): + """Get the currently used backend class.""" + backend_cls = self.backend_cls + if not backend_cls or isinstance(backend_cls, basestring): + backend_cls = get_backend_cls(backend_cls) + return backend_cls + + def connect(self): + """Establish a connection to the AMQP server.""" + self._closed = False + return self.connection + + def channel(self): + """Request a new AMQP channel.""" + return self.backend.create_channel(self.connection) + + def drain_events(self, **kwargs): + return self.backend.drain_events(self.connection, **kwargs) + + def close(self): + """Close the currently open connection.""" + try: + if self._connection: + self.backend.close_connection(self._connection) + except socket.error: + pass + self._closed = True + + def _create_backend(self): + return self.get_backend_cls()(connection=self) + + @property + def backend(self): + if self._backend is None: + self._backend = self._create_backend() + return self._backend diff --git a/kombu/entity.py b/kombu/entity.py new file mode 100644 index 00000000..c2269a23 --- /dev/null +++ b/kombu/entity.py @@ -0,0 +1,116 @@ +class Exchange(object): + TRANSIENT_DELIVERY_MODE = 1 + PERSISTENT_DELIVERY_MODE = 2 + DELIVERY_MODES = { + "transient": TRANSIENT_DELIVERY_MODE, + "persistent": PERSISTENT_DELIVERY_MODE, + } + name = "" + type = "direct" + routing_key = "" + delivery_mode = PERSISTENT_DELIVERY_MODE + durable = True + auto_delete = False + _init_opts = ("durable", "auto_delete", + "delivery_mode", "auto_declare") + + def __init__(self, name="", type="", routing_key=None, **kwargs): + self.name = name or self.name + self.type = type or self.type + self.routing_key = routing_key or self.routing_key + for opt_name in self._init_opts: + opt_value = kwargs.get(opt_name) + if opt_value is not None: + setattr(self, opt_name, opt_value) + self.delivery_mode = self.DELIVERY_MODES.get(self.delivery_mode, + self.delivery_mode) + + def declare(self, channel): + """Declare the exchange. + + Creates the exchange on the broker. + + """ + channel.exchange_declare(exchange=self.name, + type=self.type, + durable=self.durable, + auto_delete=self.auto_delete) + + def create_message(self, channel, message_data, delivery_mode=None, + priority=None, content_type=None, content_encoding=None, + properties=None): + properties["delivery_mode"] = delivery_mode or self.delivery_mode + return channel.prepare_message(message_data, + properties=properties, + priority=priority, + content_type=content_type, + content_encoding=content_encoding) + + def publish(self, channel, message, routing_key=None, + mandatory=False, immediate=False): + if routing_key is None: + routing_key = self.routing_key + channel.basic_publish(message, + exchange=self.name, routing_key=routing_key, + mandatory=mandatory, immediate=immediate, + headers=headers) + + +class Binding(object): + name = "" + exchange = None + routing_key = "" + + durable = True + exclusive = False + auto_delete = False + warn_if_exists = False + _init_opts = ("durable", "exclusive", "auto_delete", + "warn_if_exists") + + def __init__(self, name=None, exchange=None, routing_key=None, **kwargs): + # Binding. + self.name = name or self.name + self.exchange = exchange or self.exchange + self.routing_key = routing_key or self.routing_key + + # Options + for opt_name in self._init_opts: + opt_value = kwargs.get(opt_name) + if opt_value is not None: + setattr(self, opt_name, opt_value) + + # exclusive implies auto-delete. + if self.exclusive: + self.auto_delete = True + + def declare(self, channel): + """Declares the queue, the exchange and binds the queue to + the exchange.""" + if self.exchange: + self.exchange.declare(channel) + if self.name: + channel.queue_declare(queue=self.name, durable=self.durable, + exclusive=self.exclusive, + auto_delete=self.auto_delete) + channel.queue_bind(queue=self.name, + exchange=self.exchange.name, + routing_key=self.routing_key) + + def get(self, channel, no_ack=None): + message = channel.basic_get(self.name, no_ack=no_ack) + if message: + return channel.message_to_python(message) + + def purge(self, channel): + return channel.queue_purge(self.name) + + def consume(self, channel, consumer_tag, callback, no_ack=None, + nowait=True): + return channel.consume(queue=self.name, no_ack=no_ack, + consumer_tag=consumer_tag, + callback=callback, + nowait=nowait) + + def cancel(self, channel, consumer_tag): + channel.basic_cancel(consumer_tag) diff --git a/kombu/messaging.py b/kombu/messaging.py new file mode 100644 index 00000000..cdfae69a --- /dev/null +++ b/kombu/messaging.py @@ -0,0 +1,136 @@ +from itertools import count + +from kombu import serialization +from kombu.entity import Exchange, Binding + + +class Producer(object): + exchange = None + serializer = None + auto_declare = True + + def __init__(self, channel, exchange=None, serializer=None, + auto_declare=None): + self.channel = channel + self.exchange = exchange or self.exchange + self.serializer = serializer or self.serializer + if auto_declare is not None: + self.auto_declare = auto_declare + + if self.exchange and self.auto_declare: + self.exchange.declare(self.channel) + + def prepare(self, message_data, serializer=None, + content_type=None, content_encoding=None): + # No content_type? Then we're serializing the data internally. + if not content_type: + serializer = serializer or self.serializer + (content_type, content_encoding, + message_data) = serialization.encode(message_data, + serializer=serializer) + else: + # If the programmer doesn't want us to serialize, + # make sure content_encoding is set. + if isinstance(message_data, unicode): + if not content_encoding: + content_encoding = 'utf-8' + message_data = message_data.encode(content_encoding) + + # If they passed in a string, we can't know anything + # about it. So assume it's binary data. + elif not content_encoding: + content_encoding = 'binary' + + return message_data, content_type, content_encoding + + def publish(self, message_data, routing_key=None, delivery_mode=None, + mandatory=False, immediate=False, priority=0, content_type=None, + content_encoding=None, serializer=None): + + message_data, content_type, content_encoding = self.prepare( + message_data, content_type, content_encoding) + message = self.exchange.create_message(channel, message_data, + delivery_mode, priority, content_type, content_encoding) + return self.exchange.publish(message, routing_key, mandatory, + immediate) + + +_next_tag = count(1).next + +class Consumer(object): + no_ack = False + auto_declare = True + callbacks = None + + def __init__(self, channel, bindings, no_ack=None, auto_declare=None, + callbacks=None): + self.channel = channel + self.bindings = bindings + if no_ack is not None: + self.no_ack = no_ack + if auto_declare is not None: + self.auto_declare = auto_declare + + if callbacks is not None: + self.callbacks = callbacks + if self.callbacks is None: + self.callbacks = [] + + # bindings maybe list + if not hasattr(self.bindings, "__iter__"): + self.bindings = [self.bindings] + + if self.auto_declare: + for binding in self.bindings: + binding.declare(self.channel) + + self._active_tags = {} + self._declare_consumer() + + def __enter__(self): + return self + + def __exit__(self): + self.cancel() + + def _consume(self): + H, T = self.bindings[:-1], self.bindings[-1] + for binding in H: + binding.consume(self.channel, self._add_tag(binding), + self._receive_callback, self.no_ack, + nowait=True) + T.consume(self.channel, self._add_tag(T), + self._receive_callback, + self.no_ack, nowait=False) + + + def _add_tag(self, binding): + tag = self._active_tags[binding] = str(_next_tag()) + return tag + + def _receive_callback(self, raw_message): + message = self.channel.message_to_python(raw_message) + self.receive(message.payload, message) + + def receive(self, message_data, message): + if not self.callbacks: + raise NotImplementedError("No consumer callbacks registered") + for callback in self.callbacks: + callback(message_data, message) + + def register_callback(self, callback): + self.callbacks.append(callback) + + def purge(self): + for binding in self.bindings: + self.binding.purge(self.channel) + + def cancel(self): + for binding, tag in self._active_tags.items(): + binding.cancel(self.channel, tag) + + def flow(self, active): + self.channel.flow(active) + + def qos(self, prefetch_size=0, prefetch_count=0, apply_global=False): + self.channel.qos(prefetch_size, prefetch_count, apply_global) diff --git a/kombu/serialization.py b/kombu/serialization.py new file mode 100644 index 00000000..0e6d1f66 --- /dev/null +++ b/kombu/serialization.py @@ -0,0 +1,279 @@ +""" +Centralized support for encoding/decoding of data structures. +Requires a json library (`cjson`_, `simplejson`_, or `Python 2.6+`_). + +Pickle support is built-in. + +Optionally installs support for ``YAML`` if the `PyYAML`_ package +is installed. + +Optionally installs support for `msgpack`_ if the `msgpack-python`_ +package is installed. + +.. _`cjson`: http://pypi.python.org/pypi/python-cjson/ +.. _`simplejson`: http://code.google.com/p/simplejson/ +.. _`Python 2.6+`: http://docs.python.org/library/json.html +.. _`PyYAML`: http://pyyaml.org/ +.. _`msgpack`: http://msgpack.sourceforge.net/ +.. _`msgpack-python`: http://pypi.python.org/pypi/msgpack-python/ + +""" + +import codecs + +__all__ = ['SerializerNotInstalled', 'registry'] + + +class SerializerNotInstalled(StandardError): + """Support for the requested serialization type is not installed""" + + +class SerializerRegistry(object): + """The registry keeps track of serialization methods.""" + + def __init__(self): + self._encoders = {} + self._decoders = {} + self._default_encode = None + self._default_content_type = None + self._default_content_encoding = None + + def register(self, name, encoder, decoder, content_type, + content_encoding='utf-8'): + """Register a new encoder/decoder. + + :param name: A convenience name for the serialization method. + + :param encoder: A method that will be passed a python data structure + and should return a string representing the serialized data. + If ``None``, then only a decoder will be registered. Encoding + will not be possible. + + :param decoder: A method that will be passed a string representing + serialized data and should return a python data structure. + If ``None``, then only an encoder will be registered. + Decoding will not be possible. + + :param content_type: The mime-type describing the serialized + structure. + + :param content_encoding: The content encoding (character set) that + the :param:`decoder` method will be returning. Will usually be + ``utf-8``, ``us-ascii``, or ``binary``. + + """ + if encoder: + self._encoders[name] = (content_type, content_encoding, encoder) + if decoder: + self._decoders[content_type] = decoder + + def _set_default_serializer(self, name): + """ + Set the default serialization method used by this library. + + :param name: The name of the registered serialization method. + For example, ``json`` (default), ``pickle``, ``yaml``, + or any custom methods registered using :meth:`register`. + + :raises SerializerNotInstalled: If the serialization method + requested is not available. + """ + try: + (self._default_content_type, self._default_content_encoding, + self._default_encode) = self._encoders[name] + except KeyError: + raise SerializerNotInstalled( + "No encoder installed for %s" % name) + + def encode(self, data, serializer=None): + """ + Serialize a data structure into a string suitable for sending + as an AMQP message body. + + :param data: The message data to send. Can be a list, + dictionary or a string. + + :keyword serializer: An optional string representing + the serialization method you want the data marshalled + into. (For example, ``json``, ``raw``, or ``pickle``). + + If ``None`` (default), then `JSON`_ will be used, unless + ``data`` is a ``str`` or ``unicode`` object. In this + latter case, no serialization occurs as it would be + unnecessary. + + Note that if ``serializer`` is specified, then that + serialization method will be used even if a ``str`` + or ``unicode`` object is passed in. + + :returns: A three-item tuple containing the content type + (e.g., ``application/json``), content encoding, (e.g., + ``utf-8``) and a string containing the serialized + data. + + :raises SerializerNotInstalled: If the serialization method + requested is not available. + """ + if serializer == "raw": + return raw_encode(data) + if serializer and not self._encoders.get(serializer): + raise SerializerNotInstalled( + "No encoder installed for %s" % serializer) + + # If a raw string was sent, assume binary encoding + # (it's likely either ASCII or a raw binary file, but 'binary' + # charset will encompass both, even if not ideal. + if not serializer and isinstance(data, str): + # In Python 3+, this would be "bytes"; allow binary data to be + # sent as a message without getting encoder errors + return "application/data", "binary", data + + # For unicode objects, force it into a string + if not serializer and isinstance(data, unicode): + payload = data.encode("utf-8") + return "text/plain", "utf-8", payload + + if serializer: + content_type, content_encoding, encoder = \ + self._encoders[serializer] + else: + encoder = self._default_encode + content_type = self._default_content_type + content_encoding = self._default_content_encoding + + payload = encoder(data) + return content_type, content_encoding, payload + + def decode(self, data, content_type, content_encoding): + """Deserialize a data stream as serialized using ``encode`` + based on :param:`content_type`. + + :param data: The message data to deserialize. + + :param content_type: The content-type of the data. + (e.g., ``application/json``). + + :param content_encoding: The content-encoding of the data. + (e.g., ``utf-8``, ``binary``, or ``us-ascii``). + + :returns: The unserialized data. + """ + content_type = content_type or 'application/data' + content_encoding = (content_encoding or 'utf-8').lower() + + # Don't decode 8-bit strings or unicode objects + if content_encoding not in ('binary', 'ascii-8bit') and \ + not isinstance(data, unicode): + data = codecs.decode(data, content_encoding) + + try: + decoder = self._decoders[content_type] + except KeyError: + return data + + return decoder(data) + + +""" +.. data:: registry + +Global registry of serializers/deserializers. + +""" +registry = SerializerRegistry() + +""" +.. function:: encode(data, serializer=default_serializer) + +Encode data using the registry's default encoder. + +""" +encode = registry.encode + +""" +.. function:: decode(data, content_type, content_encoding): + +Decode data using the registry's default decoder. + +""" +decode = registry.decode + + +def raw_encode(data): + """Special case serializer.""" + content_type = 'application/data' + payload = data + if isinstance(payload, unicode): + content_encoding = 'utf-8' + payload = payload.encode(content_encoding) + else: + content_encoding = 'binary' + return content_type, content_encoding, payload + + +def register_json(): + """Register a encoder/decoder for JSON serialization.""" + from anyjson import serialize as json_serialize + from anyjson import deserialize as json_deserialize + + registry.register('json', json_serialize, json_deserialize, + content_type='application/json', + content_encoding='utf-8') + + +def register_yaml(): + """Register a encoder/decoder for YAML serialization. + + It is slower than JSON, but allows for more data types + to be serialized. Useful if you need to send data such as dates""" + try: + import yaml + registry.register('yaml', yaml.safe_dump, yaml.safe_load, + content_type='application/x-yaml', + content_encoding='utf-8') + except ImportError: + + def not_available(*args, **kwargs): + """In case a client receives a yaml message, but yaml + isn't installed.""" + raise SerializerNotInstalled( + "No decoder installed for YAML. Install the PyYAML library") + registry.register('yaml', None, not_available, 'application/x-yaml') + + +def register_pickle(): + """The fastest serialization method, but restricts + you to python clients.""" + import cPickle + registry.register('pickle', cPickle.dumps, cPickle.loads, + content_type='application/x-python-serialize', + content_encoding='binary') + + +def register_msgpack(): + """See http://msgpack.sourceforge.net/""" + try: + import msgpack + registry.register('msgpack', msgpack.packs, msgpack.unpacks, + content_type='application/x-msgpack', + content_encoding='utf-8') + except ImportError: + + def not_available(*args, **kwargs): + """In case a client receives a msgpack message, but yaml + isn't installed.""" + raise SerializerNotInstalled( + "No decoder installed for msgpack. " + "Install the msgpack library") + registry.register('msgpack', None, not_available, + 'application/x-msgpack') + +# Register the base serialization methods. +register_json() +register_pickle() +register_yaml() +register_msgpack() + +# JSON is assumed to always be available, so is the default. +# (this matches the historical use of kombu.) +registry._set_default_serializer('json') diff --git a/kombu/utils.py b/kombu/utils.py new file mode 100644 index 00000000..37ba1aed --- /dev/null +++ b/kombu/utils.py @@ -0,0 +1,66 @@ +from uuid import UUID, uuid4, _uuid_generate_random +try: + import ctypes +except ImportError: + ctypes = None + + +def gen_unique_id(): + """Generate a unique id, having - hopefully - a very small chance of + collission. + + For now this is provided by :func:`uuid.uuid4`. + """ + # Workaround for http://bugs.python.org/issue4607 + if ctypes and _uuid_generate_random: + buffer = ctypes.create_string_buffer(16) + _uuid_generate_random(buffer) + return str(UUID(bytes=buffer.raw)) + return str(uuid4()) + + +def _compat_rl_partition(S, sep, direction=None): + if direction is None: + direction = S.split + items = direction(sep, 1) + if len(items) == 1: + return items[0], sep, '' + return items[0], sep, items[1] + + +def _compat_partition(S, sep): + """``partition(S, sep) -> (head, sep, tail)`` + + Search for the separator ``sep`` in ``S``, and return the part before + it, the separator itself, and the part after it. If the separator is not + found, return ``S`` and two empty strings. + + """ + return _compat_rl_partition(S, sep, direction=S.split) + + +def _compat_rpartition(S, sep): + """``rpartition(S, sep) -> (tail, sep, head)`` + + Search for the separator ``sep`` in ``S``, starting at the end of ``S``, + and return the part before it, the separator itself, and the part + after it. If the separator is not found, return two empty + strings and ``S``. + + """ + return _compat_rl_partition(S, sep, direction=S.rsplit) + + + +def partition(S, sep): + if hasattr(S, 'partition'): + return S.partition(sep) + else: # Python <= 2.4: + return _compat_partition(S, sep) + + +def rpartition(S, sep): + if hasattr(S, 'rpartition'): + return S.rpartition(sep) + else: # Python <= 2.4: + return _compat_rpartition(S, sep) diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 00000000..fd518807 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,16 @@ +[nosetests] +verbosity = 1 +detailed-errors = 1 + +[build_sphinx] +source-dir = docs/ +build-dir = docs/.build +all_files = 1 + +[upload_sphinx] +upload-dir = docs/.build/html + + +[bdist_rpm] +requires = python-anyjson + python-amqplib >= 0.6 diff --git a/setup.py b/setup.py new file mode 100644 index 00000000..581788ef --- /dev/null +++ b/setup.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import os +import codecs + +try: + from setuptools import setup, find_packages +except ImportError: + from ez_setup import use_setuptools + use_setuptools() + from setuptools import setup, find_packages + +from distutils.command.install_data import install_data +from distutils.command.install import INSTALL_SCHEMES +import sys + +import kombu + +packages, data_files = [], [] +root_dir = os.path.dirname(__file__) +if root_dir != '': + os.chdir(root_dir) +src_dir = "kombu" + + +def osx_install_data(install_data): + + def finalize_options(self): + self.set_undefined_options("install", ("install_lib", "install_dir")) + install_data.finalize_options(self) + + +def fullsplit(path, result=None): + if result is None: + result = [] + head, tail = os.path.split(path) + if head == '': + return [tail] + result + if head == path: + return result + return fullsplit(head, [tail] + result) + + +for scheme in INSTALL_SCHEMES.values(): + scheme['data'] = scheme['purelib'] + +for dirpath, dirnames, filenames in os.walk(src_dir): + # Ignore dirnames that start with '.' + for i, dirname in enumerate(dirnames): + if dirname.startswith("."): + del dirnames[i] + for filename in filenames: + if filename.endswith(".py"): + packages.append('.'.join(fullsplit(dirpath))) + else: + data_files.append([dirpath, [os.path.join(dirpath, f) for f in + filenames]]) + +if os.path.exists("README.rst"): + long_description = codecs.open('README.rst', "r", "utf-8").read() +else: + long_description = "See http://pypi.python.org/pypi/kombu" + +setup( + name='kombu', + version=kombu.__version__, + description=kombu.__doc__, + author=kombu.__author__, + author_email=kombu.__contact__, + url=kombu.__homepage__, + platforms=["any"], + packages=packages, + data_files=data_files, + zip_safe=False, + test_suite="nose.collector", + install_requires=[ + 'anyjson', + 'amqplib>=0.6', + ], + classifiers=[ + "Development Status :: 4 - Beta", + "Framework :: Django", + "Operating System :: OS Independent", + "Programming Language :: Python", + "License :: OSI Approved :: BSD License", + "Intended Audience :: Developers", + "Topic :: Communications", + "Topic :: System :: Distributed Computing", + "Topic :: Software Development :: Libraries :: Python Modules", + ], + long_description=long_description, +)