mirror of https://github.com/celery/kombu.git
Initial import
This commit is contained in:
commit
da3e754594
|
@ -0,0 +1,19 @@
|
|||
==================================
|
||||
AUTHORS (in chronological order)
|
||||
==================================
|
||||
|
||||
Ask Solem <askh@opera.com>
|
||||
Travis Cline <travis.cline@gmail.com>
|
||||
Rune Halvorsen <runeh@opera.com>
|
||||
Sean Creeley <sean.creeley@gmail.com>
|
||||
Jason Cater <jason@ncsfulfillment.com>
|
||||
Ian Struble <istruble@gmail.com>
|
||||
Patrick Schneider <patrick.p2k.schneider@gmail.com>
|
||||
Travis Swicegood <development@domain51.com>
|
||||
Stephen Day <stevvooe@gmail.com>
|
||||
Andrew Watts
|
||||
Paul McLanahan <paul@mclanahan.net>
|
||||
Ralf Nyren <ralf-github@nyren.net>
|
||||
Jeff Balogh <me@jeffbalogh.org>
|
||||
Adam Wentz
|
||||
Vincent Driessen <vincent@datafox.nl>
|
|
@ -0,0 +1,8 @@
|
|||
================
|
||||
Change history
|
||||
================
|
||||
|
||||
0.1.0
|
||||
-----
|
||||
|
||||
* Rewrite of carrot
|
|
@ -0,0 +1,19 @@
|
|||
============================
|
||||
Frequently Asked Questions
|
||||
============================
|
||||
|
||||
Questions
|
||||
=========
|
||||
|
||||
Q: Message.reject doesn't work?
|
||||
--------------------------------------
|
||||
**Answer**: RabbitMQ (as of v1.5.5) has not implemented reject yet.
|
||||
There was a brief discussion about it on their mailing list, and the reason
|
||||
why it's not implemented yet is revealed:
|
||||
|
||||
http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2009-January/003183.html
|
||||
|
||||
Q: Message.requeue doesn't work?
|
||||
--------------------------------------
|
||||
|
||||
**Answer**: See _`Message.reject doesn't work?`
|
|
@ -0,0 +1,21 @@
|
|||
Installation
|
||||
============
|
||||
|
||||
You can install ``carrot`` either via the Python Package Index (PyPI)
|
||||
or from source.
|
||||
|
||||
To install using ``pip``,::
|
||||
|
||||
$ pip install carrot
|
||||
|
||||
|
||||
To install using ``easy_install``,::
|
||||
|
||||
$ easy_install carrot
|
||||
|
||||
|
||||
If you have downloaded a source tarball you can install it
|
||||
by doing the following,::
|
||||
|
||||
$ python setup.py build
|
||||
# python setup.py install # as root
|
|
@ -0,0 +1,28 @@
|
|||
Copyright (c) 2009, Ask Solem
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are met:
|
||||
|
||||
* Redistributions of source code must retain the above copyright notice,
|
||||
this list of conditions and the following disclaimer.
|
||||
* Redistributions in binary form must reproduce the above copyright
|
||||
notice, this list of conditions and the following disclaimer in the
|
||||
documentation and/or other materials provided with the distribution.
|
||||
|
||||
Neither the name of Ask Solem nor the names of its contributors may be used
|
||||
to endorse or promote products derived from this software without specific
|
||||
prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
|
||||
THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
||||
PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS
|
||||
BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
||||
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
||||
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
||||
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||||
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||||
POSSIBILITY OF SUCH DAMAGE.
|
||||
|
|
@ -0,0 +1,12 @@
|
|||
include AUTHORS
|
||||
include Changelog
|
||||
include FAQ
|
||||
include INSTALL
|
||||
include LICENSE
|
||||
include MANIFEST.in
|
||||
include README.rst
|
||||
include README
|
||||
include THANKS
|
||||
include TODO
|
||||
recursive-include docs *
|
||||
recursive-include kombu *.py
|
|
@ -0,0 +1,391 @@
|
|||
##############################################
|
||||
kombu - AMQP Messaging Framework for Python
|
||||
##############################################
|
||||
|
||||
:Version: 0.10.5
|
||||
|
||||
**THIS IS A REWRITE OF CARROT**
|
||||
|
||||
|
||||
Introduction
|
||||
------------
|
||||
|
||||
`carrot` is an `AMQP`_ messaging queue framework. AMQP is the Advanced Message
|
||||
Queuing Protocol, an open standard protocol for message orientation, queuing,
|
||||
routing, reliability and security.
|
||||
|
||||
The aim of `carrot` is to make messaging in Python as easy as possible by
|
||||
providing a high-level interface for producing and consuming messages. At the
|
||||
same time it is a goal to re-use what is already available as much as possible.
|
||||
|
||||
`carrot` has pluggable messaging back-ends, so it is possible to support
|
||||
several messaging systems. Currently, there is support for `AMQP`_
|
||||
(`py-amqplib`_, `pika`_), `STOMP`_ (`python-stomp`_). There's also an
|
||||
in-memory backend for testing purposes, using the `Python queue module`_.
|
||||
|
||||
Several AMQP message broker implementations exists, including `RabbitMQ`_,
|
||||
`ZeroMQ`_ and `Apache ActiveMQ`_. You'll need to have one of these installed,
|
||||
personally we've been using `RabbitMQ`_.
|
||||
|
||||
Before you start playing with ``carrot``, you should probably read up on
|
||||
AMQP, and you could start with the excellent article about using RabbitMQ
|
||||
under Python, `Rabbits and warrens`_. For more detailed information, you can
|
||||
refer to the `Wikipedia article about AMQP`_.
|
||||
|
||||
.. _`RabbitMQ`: http://www.rabbitmq.com/
|
||||
.. _`ZeroMQ`: http://www.zeromq.org/
|
||||
.. _`AMQP`: http://amqp.org
|
||||
.. _`STOMP`: http://stomp.codehaus.org
|
||||
.. _`python-stomp`: http://bitbucket.org/asksol/python-stomp
|
||||
.. _`Python Queue module`: http://docs.python.org/library/queue.html
|
||||
.. _`Apache ActiveMQ`: http://activemq.apache.org/
|
||||
.. _`Django`: http://www.djangoproject.com/
|
||||
.. _`Rabbits and warrens`: http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/
|
||||
.. _`py-amqplib`: http://barryp.org/software/py-amqplib/
|
||||
.. _`pika`: http://github.com/tonyg/pika
|
||||
.. _`Wikipedia article about AMQP`: http://en.wikipedia.org/wiki/AMQP
|
||||
|
||||
Documentation
|
||||
-------------
|
||||
|
||||
Carrot is using Sphinx, and the latest documentation is available at GitHub:
|
||||
|
||||
http://github.com/ask/carrot/
|
||||
|
||||
Installation
|
||||
============
|
||||
|
||||
You can install ``carrot`` either via the Python Package Index (PyPI)
|
||||
or from source.
|
||||
|
||||
To install using ``pip``,::
|
||||
|
||||
$ pip install carrot
|
||||
|
||||
|
||||
To install using ``easy_install``,::
|
||||
|
||||
$ easy_install carrot
|
||||
|
||||
|
||||
If you have downloaded a source tarball you can install it
|
||||
by doing the following,::
|
||||
|
||||
$ python setup.py build
|
||||
# python setup.py install # as root
|
||||
|
||||
|
||||
Terminology
|
||||
===========
|
||||
|
||||
There are some concepts you should be familiar with before starting:
|
||||
|
||||
* Publishers
|
||||
|
||||
Publishers sends messages to an exchange.
|
||||
|
||||
* Exchanges
|
||||
|
||||
Messages are sent to exchanges. Exchanges are named and can be
|
||||
configured to use one of several routing algorithms. The exchange
|
||||
routes the messages to consumers by matching the routing key in the
|
||||
message with the routing key the consumer provides when binding to
|
||||
the exchange.
|
||||
|
||||
* Consumers
|
||||
|
||||
Consumers declares a queue, binds it to a exchange and receives
|
||||
messages from it.
|
||||
|
||||
* Queues
|
||||
|
||||
Queues receive messages sent to exchanges. The queues are declared
|
||||
by consumers.
|
||||
|
||||
* Routing keys
|
||||
|
||||
Every message has a routing key. The interpretation of the routing
|
||||
key depends on the exchange type. There are four default exchange
|
||||
types defined by the AMQP standard, and vendors can define custom
|
||||
types (so see your vendors manual for details).
|
||||
|
||||
These are the default exchange types defined by AMQP/0.8:
|
||||
|
||||
* Direct exchange
|
||||
|
||||
Matches if the routing key property of the message and
|
||||
the ``routing_key`` attribute of the consumer are identical.
|
||||
|
||||
* Fan-out exchange
|
||||
|
||||
Always matches, even if the binding does not have a routing
|
||||
key.
|
||||
|
||||
* Topic exchange
|
||||
|
||||
Matches the routing key property of the message by a primitive
|
||||
pattern matching scheme. The message routing key then consists
|
||||
of words separated by dots (``"."``, like domain names), and
|
||||
two special characters are available; star (``"*"``) and hash
|
||||
(``"#"``). The star matches any word, and the hash matches
|
||||
zero or more words. For example ``"*.stock.#"`` matches the
|
||||
routing keys ``"usd.stock"`` and ``"eur.stock.db"`` but not
|
||||
``"stock.nasdaq"``.
|
||||
|
||||
|
||||
Examples
|
||||
========
|
||||
|
||||
Creating a connection
|
||||
---------------------
|
||||
|
||||
You can set up a connection by creating an instance of
|
||||
``carrot.messaging.BrokerConnection``, with the appropriate options for
|
||||
your broker:
|
||||
|
||||
>>> from carrot.connection import BrokerConnection
|
||||
>>> conn = BrokerConnection(hostname="localhost", port=5672,
|
||||
... userid="test", password="test",
|
||||
... virtual_host="test")
|
||||
|
||||
|
||||
If you're using Django you can use the
|
||||
``carrot.connection.DjangoBrokerConnection`` class instead, which loads
|
||||
the connection settings from your ``settings.py``::
|
||||
|
||||
BROKER_HOST = "localhost"
|
||||
BROKER_PORT = 5672
|
||||
BROKER_USER = "test"
|
||||
BROKER_PASSWORD = "secret"
|
||||
BROKER_VHOST = "/test"
|
||||
|
||||
Then create a connection by doing:
|
||||
|
||||
>>> from carrot.connection import DjangoBrokerConnection
|
||||
>>> conn = DjangoBrokerConnection()
|
||||
|
||||
|
||||
|
||||
Receiving messages using a Consumer
|
||||
-----------------------------------
|
||||
|
||||
First we open up a Python shell and start a message consumer.
|
||||
|
||||
This consumer declares a queue named ``"feed"``, receiving messages with
|
||||
the routing key ``"importer"`` from the ``"feed"`` exchange.
|
||||
|
||||
The example then uses the consumers ``wait()`` method to go into consume
|
||||
mode, where it continuously polls the queue for new messages, and when a
|
||||
message is received it passes the message to all registered callbacks.
|
||||
|
||||
>>> from carrot.messaging import Consumer
|
||||
>>> consumer = Consumer(connection=conn, queue="feed",
|
||||
... exchange="feed", routing_key="importer")
|
||||
>>> def import_feed_callback(message_data, message)
|
||||
... feed_url = message_data["import_feed"]
|
||||
... print("Got feed import message for: %s" % feed_url)
|
||||
... # something importing this feed url
|
||||
... # import_feed(feed_url)
|
||||
... message.ack()
|
||||
>>> consumer.register_callback(import_feed_callback)
|
||||
>>> consumer.wait() # Go into the consumer loop.
|
||||
|
||||
Sending messages using a Publisher
|
||||
----------------------------------
|
||||
|
||||
Then we open up another Python shell to send some messages to the consumer
|
||||
defined in the last section.
|
||||
|
||||
>>> from carrot.messaging import Publisher
|
||||
>>> publisher = Publisher(connection=conn,
|
||||
... exchange="feed", routing_key="importer")
|
||||
>>> publisher.send({"import_feed": "http://cnn.com/rss/edition.rss"})
|
||||
>>> publisher.close()
|
||||
|
||||
|
||||
Look in the first Python shell again (where ``consumer.wait()`` is running),
|
||||
where the following text has been printed to the screen::
|
||||
|
||||
Got feed import message for: http://cnn.com/rss/edition.rss
|
||||
|
||||
|
||||
Serialization of Data
|
||||
-----------------------
|
||||
|
||||
By default every message is encoded using `JSON`_, so sending
|
||||
Python data structures like dictionaries and lists works.
|
||||
`YAML`_, `msgpack`_ and Python's built-in ``pickle`` module is also supported,
|
||||
and if needed you can register any custom serialization scheme you
|
||||
want to use.
|
||||
|
||||
.. _`JSON`: http://www.json.org/
|
||||
.. _`YAML`: http://yaml.org/
|
||||
.. _`msgpack`: http://msgpack.sourceforge.net/
|
||||
|
||||
Each option has its advantages and disadvantages.
|
||||
|
||||
``json`` -- JSON is supported in many programming languages, is now
|
||||
a standard part of Python (since 2.6), and is fairly fast to
|
||||
decode using the modern Python libraries such as ``cjson or
|
||||
``simplejson``.
|
||||
|
||||
The primary disadvantage to ``JSON`` is that it limits you to
|
||||
the following data types: strings, unicode, floats, boolean,
|
||||
dictionaries, and lists. Decimals and dates are notably missing.
|
||||
|
||||
Also, binary data will be transferred using base64 encoding, which
|
||||
will cause the transferred data to be around 34% larger than an
|
||||
encoding which supports native binary types.
|
||||
|
||||
However, if your data fits inside the above constraints and
|
||||
you need cross-language support, the default setting of ``JSON``
|
||||
is probably your best choice.
|
||||
|
||||
``pickle`` -- If you have no desire to support any language other than
|
||||
Python, then using the ``pickle`` encoding will gain you
|
||||
the support of all built-in Python data types (except class instances),
|
||||
smaller messages when sending binary files, and a slight speedup
|
||||
over ``JSON`` processing.
|
||||
|
||||
``yaml`` -- YAML has many of the same characteristics as ``json``,
|
||||
except that it natively supports more data types (including dates,
|
||||
recursive references, etc.)
|
||||
|
||||
However, the Python libraries for YAML are a good bit slower
|
||||
than the libraries for JSON.
|
||||
|
||||
If you need a more expressive set of data types and need to maintain
|
||||
cross-language compatibility, then ``YAML`` may be a better fit
|
||||
than the above.
|
||||
|
||||
To instruct carrot to use an alternate serialization method,
|
||||
use one of the following options.
|
||||
|
||||
1. Set the serialization option on a per-Publisher basis:
|
||||
|
||||
>>> from carrot.messaging import Publisher
|
||||
>>> publisher = Publisher(connection=conn,
|
||||
... exchange="feed", routing_key="importer",
|
||||
... serializer="yaml")
|
||||
|
||||
2. Set the serialization option on a per-call basis
|
||||
|
||||
>>> from carrot.messaging import Publisher
|
||||
>>> publisher = Publisher(connection=conn,
|
||||
... exchange="feed", routing_key="importer")
|
||||
>>> publisher.send({"import_feed": "http://cnn.com/rss/edition.rss"},
|
||||
... serializer="pickle")
|
||||
>>> publisher.close()
|
||||
|
||||
Note that ``Consumer``s do not need the serialization method specified in
|
||||
their code. They can auto-detect the serialization method since we supply
|
||||
the ``Content-type`` header as part of the AMQP message.
|
||||
|
||||
|
||||
Sending raw data without Serialization
|
||||
---------------------------------------
|
||||
|
||||
In some cases, you don't need your message data to be serialized. If you
|
||||
pass in a plain string or unicode object as your message, then carrot will
|
||||
not waste cycles serializing/deserializing the data.
|
||||
|
||||
You can optionally specify a ``content_type`` and ``content_encoding``
|
||||
for the raw data:
|
||||
|
||||
>>> from carrot.messaging import Publisher
|
||||
>>> publisher = Publisher(connection=conn,
|
||||
... exchange="feed",
|
||||
routing_key="import_pictures")
|
||||
>>> publisher.send(open('~/my_picture.jpg','rb').read(),
|
||||
content_type="image/jpeg",
|
||||
content_encoding="binary")
|
||||
>>> publisher.close()
|
||||
|
||||
The ``message`` object returned by the ``Consumer`` class will have a
|
||||
``content_type`` and ``content_encoding`` attribute.
|
||||
|
||||
|
||||
Receiving messages without a callback
|
||||
--------------------------------------
|
||||
|
||||
You can also poll the queue manually, by using the ``fetch`` method.
|
||||
This method returns a ``Message`` object, from where you can get the
|
||||
message body, de-serialize the body to get the data, acknowledge, reject or
|
||||
re-queue the message.
|
||||
|
||||
>>> consumer = Consumer(connection=conn, queue="feed",
|
||||
... exchange="feed", routing_key="importer")
|
||||
>>> message = consumer.fetch()
|
||||
>>> if message:
|
||||
... message_data = message.payload
|
||||
... message.ack()
|
||||
... else:
|
||||
... # No messages waiting on the queue.
|
||||
>>> consumer.close()
|
||||
|
||||
Sub-classing the messaging classes
|
||||
----------------------------------
|
||||
|
||||
The ``Consumer``, and ``Publisher`` classes can also be sub classed. Thus you
|
||||
can define the above publisher and consumer like so:
|
||||
|
||||
>>> from carrot.messaging import Publisher, Consumer
|
||||
|
||||
>>> class FeedPublisher(Publisher):
|
||||
... exchange = "feed"
|
||||
... routing_key = "importer"
|
||||
...
|
||||
... def import_feed(self, feed_url):
|
||||
... return self.send({"action": "import_feed",
|
||||
... "feed_url": feed_url})
|
||||
|
||||
>>> class FeedConsumer(Consumer):
|
||||
... queue = "feed"
|
||||
... exchange = "feed"
|
||||
... routing_key = "importer"
|
||||
...
|
||||
... def receive(self, message_data, message):
|
||||
... action = message_data["action"]
|
||||
... if action == "import_feed":
|
||||
... # something importing this feed
|
||||
... # import_feed(message_data["feed_url"])
|
||||
message.ack()
|
||||
... else:
|
||||
... raise Exception("Unknown action: %s" % action)
|
||||
|
||||
>>> publisher = FeedPublisher(connection=conn)
|
||||
>>> publisher.import_feed("http://cnn.com/rss/edition.rss")
|
||||
>>> publisher.close()
|
||||
|
||||
>>> consumer = FeedConsumer(connection=conn)
|
||||
>>> consumer.wait() # Go into the consumer loop.
|
||||
|
||||
Getting Help
|
||||
============
|
||||
|
||||
Mailing list
|
||||
------------
|
||||
|
||||
Join the `carrot-users`_ mailing list.
|
||||
|
||||
.. _`carrot-users`: http://groups.google.com/group/carrot-users/
|
||||
|
||||
Bug tracker
|
||||
===========
|
||||
|
||||
If you have any suggestions, bug reports or annoyances please report them
|
||||
to our issue tracker at http://github.com/ask/carrot/issues/
|
||||
|
||||
Contributing
|
||||
============
|
||||
|
||||
Development of ``carrot`` happens at Github: http://github.com/ask/carrot
|
||||
|
||||
You are highly encouraged to participate in the development. If you don't
|
||||
like Github (for some reason) you're welcome to send regular patches.
|
||||
|
||||
License
|
||||
=======
|
||||
|
||||
This software is licensed under the ``New BSD License``. See the ``LICENSE``
|
||||
file in the top distribution directory for the full license text.
|
|
@ -0,0 +1,6 @@
|
|||
Thanks to Barry Pederson <bp@barryp.org> for the py-amqplib library.
|
||||
Thanks to Grégoire Cachet <gregoire@audacy.fr> for bug reports.
|
||||
Thanks to Martin Mahner for the Sphinx theme.
|
||||
Thanks to jcater for bug reports.
|
||||
Thanks to sebest for bug reports.
|
||||
Thanks to greut for bug reports
|
|
@ -0,0 +1,2 @@
|
|||
Please see our Issue Tracker at GitHub:
|
||||
http://github.com/ask/kombu/issues
|
|
@ -0,0 +1,13 @@
|
|||
#!/bin/bash
|
||||
|
||||
git checkout master
|
||||
(cd docs;
|
||||
rm -rf .build;
|
||||
make html;
|
||||
(cd .build/html;
|
||||
sphinx-to-github;))
|
||||
git checkout gh-pages
|
||||
cp -r docs/.build/html/* .
|
||||
git commit . -m "Autogenerated documentation for github."
|
||||
git push origin gh-pages
|
||||
git checkout master
|
|
@ -0,0 +1,2 @@
|
|||
anyjson
|
||||
amqplib>=0.6
|
|
@ -0,0 +1,7 @@
|
|||
nose
|
||||
nose-cover3
|
||||
coverage>=3.0
|
||||
simplejson
|
||||
PyYAML
|
||||
msgpack-python
|
||||
|
|
@ -0,0 +1,41 @@
|
|||
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from carrot import Connection, Exchange, Binding
|
||||
from carrot import Consumer, Producer
|
||||
|
||||
media_exchange = Exchange("media", "direct", durable=True)
|
||||
video_binding = Binding("video", exchange=media_exchange, key="video")
|
||||
|
||||
# connections/channels
|
||||
connection = Connection("localhost", "guest", "guest", "/")
|
||||
channel = connection.channel()
|
||||
|
||||
# produce
|
||||
producer = Producer(channel, exchange=media_exchange, serializer="json")
|
||||
producer.publish({"name": "/tmp/lolcat1.avi", "size": 1301013})
|
||||
|
||||
# consume
|
||||
consumer = Consumer(channel, video_binding)
|
||||
consumer.register_callback(process_media)
|
||||
consumer.consume()
|
||||
|
||||
while True:
|
||||
connection.drain_events()
|
||||
|
||||
|
||||
# consumerset:
|
||||
video_binding = Binding("video", exchange=media_exchange, key="video")
|
||||
image_binding = Binding("image", exchange=media_exchange, key="image")
|
||||
|
||||
consumer = Consumer(channel, [video_binding, image_binding])
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,7 @@
|
|||
"""AMQP Messaging Framework for Python"""
|
||||
VERSION = (0, 10, 5)
|
||||
__version__ = ".".join(map(str, VERSION))
|
||||
__author__ = "Ask Solem"
|
||||
__contact__ = "askh@opera.com"
|
||||
__homepage__ = "http://github.com/ask/carrot/"
|
||||
__docformat__ = "restructuredtext"
|
|
@ -0,0 +1,43 @@
|
|||
import sys
|
||||
|
||||
from kombu.utils import rpartition
|
||||
|
||||
DEFAULT_BACKEND = "kombu.backends.pyamqplib.Backend"
|
||||
|
||||
BACKEND_ALIASES = {
|
||||
"amqplib": "kombu.backends.pyamqplib.Backend",
|
||||
"pika": "kombu.backends.pikachu.AsyncoreBackend",
|
||||
"syncpika": "kombu.backends.pikachu.SyncBackend",
|
||||
}
|
||||
|
||||
_backend_cache = {}
|
||||
|
||||
|
||||
def resolve_backend(backend=None):
|
||||
backend = BACKEND_ALIASES.get(backend, backend)
|
||||
backend_module_name, _, backend_cls_name = rpartition(backend, ".")
|
||||
return backend_module_name, backend_cls_name
|
||||
|
||||
|
||||
def _get_backend_cls(backend=None):
|
||||
backend_module_name, backend_cls_name = resolve_backend(backend)
|
||||
__import__(backend_module_name)
|
||||
backend_module = sys.modules[backend_module_name]
|
||||
return getattr(backend_module, backend_cls_name)
|
||||
|
||||
|
||||
def get_backend_cls(backend=None):
|
||||
"""Get backend class by name.
|
||||
|
||||
The backend string is the full path to a backend class, e.g.::
|
||||
|
||||
"kombu.backends.pyamqplib.Backend"
|
||||
|
||||
If the name does not include "``.``" (is not fully qualified),
|
||||
the alias table will be consulted.
|
||||
|
||||
"""
|
||||
backend = backend or DEFAULT_BACKEND
|
||||
if backend not in _backend_cache:
|
||||
_backend_cache[backend] = _get_backend_cls(backend)
|
||||
return _backend_cache[backend]
|
|
@ -0,0 +1,104 @@
|
|||
"""
|
||||
|
||||
Backend base classes.
|
||||
|
||||
"""
|
||||
from kombu import serialization
|
||||
|
||||
ACKNOWLEDGED_STATES = frozenset(["ACK", "REJECTED", "REQUEUED"])
|
||||
|
||||
|
||||
class MessageStateError(Exception):
|
||||
"""The message has already been acknowledged."""
|
||||
|
||||
|
||||
class BaseMessage(object):
|
||||
"""Base class for received messages."""
|
||||
_state = None
|
||||
|
||||
MessageStateError = MessageStateError
|
||||
|
||||
def __init__(self, channel, body=None, delivery_tag=None,
|
||||
content_type=None, content_encoding=None, delivery_info={}, **kwargs):
|
||||
self.channel = channel
|
||||
self._decoded_cache = None
|
||||
self._state = "RECEIVED"
|
||||
|
||||
def decode(self):
|
||||
"""Deserialize the message body, returning the original
|
||||
python structure sent by the publisher."""
|
||||
return serialization.decode(self.body, self.content_type,
|
||||
self.content_encoding)
|
||||
|
||||
@property
|
||||
def payload(self):
|
||||
"""The decoded message."""
|
||||
if not self._decoded_cache:
|
||||
self._decoded_cache = self.decode()
|
||||
return self._decoded_cache
|
||||
|
||||
def ack(self):
|
||||
"""Acknowledge this message as being processed.,
|
||||
This will remove the message from the queue.
|
||||
|
||||
:raises MessageStateError: If the message has already been
|
||||
acknowledged/requeued/rejected.
|
||||
|
||||
"""
|
||||
if self.acknowledged:
|
||||
raise self.MessageStateError(
|
||||
"Message already acknowledged with state: %s" % self._state)
|
||||
self.channel.basic_ack(self.delivery_tag)
|
||||
self._state = "ACK"
|
||||
|
||||
def reject(self):
|
||||
"""Reject this message.
|
||||
|
||||
The message will be discarded by the server.
|
||||
|
||||
:raises MessageStateError: If the message has already been
|
||||
acknowledged/requeued/rejected.
|
||||
|
||||
"""
|
||||
if self.acknowledged:
|
||||
raise self.MessageStateError(
|
||||
"Message already acknowledged with state: %s" % self._state)
|
||||
self.channel.basic_reject(self.delivery_tag)
|
||||
self._state = "REJECTED"
|
||||
|
||||
def requeue(self):
|
||||
"""Reject this message and put it back on the queue.
|
||||
|
||||
You must not use this method as a means of selecting messages
|
||||
to process.
|
||||
|
||||
:raises MessageStateError: If the message has already been
|
||||
acknowledged/requeued/rejected.
|
||||
|
||||
"""
|
||||
if self.acknowledged:
|
||||
raise self.MessageStateError(
|
||||
"Message already acknowledged with state: %s" % self._state)
|
||||
self.channel.basic_reject(self.delivery_tag, requeue=True)
|
||||
self._state = "REQUEUED"
|
||||
|
||||
@property
|
||||
def acknowledged(self):
|
||||
return self._state in ACKNOWLEDGED_STATES
|
||||
|
||||
|
||||
class BaseBackend(object):
|
||||
"""Base class for backends."""
|
||||
default_port = None
|
||||
|
||||
def __init__(self, connection, **kwargs):
|
||||
self.connection = connection
|
||||
|
||||
def get_channel(self):
|
||||
raise NotImplementedError("Subclass responsibility")
|
||||
|
||||
def establish_connection(self):
|
||||
raise NotImplementedError("Subclass responsibility")
|
||||
|
||||
def close_connection(self):
|
||||
raise NotImplementedError("Subclass responsibility")
|
|
@ -0,0 +1,244 @@
|
|||
from carrot.backends.base import BaseBackend, BaseMessage
|
||||
from anyjson import serialize, deserialize
|
||||
from itertools import count
|
||||
from django.utils.datastructures import SortedDict
|
||||
from carrot.utils import gen_unique_id
|
||||
import sys
|
||||
import time
|
||||
import atexit
|
||||
|
||||
from Queue import Empty as QueueEmpty
|
||||
from itertools import cycle
|
||||
|
||||
|
||||
class QueueSet(object):
|
||||
"""A set of queues that operates as one."""
|
||||
|
||||
def __init__(self, backend, queues):
|
||||
self.backend = backend
|
||||
self.queues = queues
|
||||
|
||||
# an infinite cycle through all the queues.
|
||||
self.cycle = cycle(self.queues)
|
||||
|
||||
# A set of all the queue names, so we can match when we've
|
||||
# tried all of them.
|
||||
self.all = frozenset(self.queues)
|
||||
|
||||
def get(self):
|
||||
"""Get the next message avaiable in the queue.
|
||||
|
||||
:returns: The message and the name of the queue it came from as
|
||||
a tuple.
|
||||
:raises Empty: If there are no more items in any of the queues.
|
||||
|
||||
"""
|
||||
|
||||
# A set of queues we've already tried.
|
||||
tried = set()
|
||||
|
||||
while True:
|
||||
# Get the next queue in the cycle, and try to get an item off it.
|
||||
queue = self.cycle.next()
|
||||
try:
|
||||
item = self.backend._get(queue)
|
||||
except Empty:
|
||||
# raises Empty when we've tried all of them.
|
||||
tried.add(queue)
|
||||
if tried == self.all:
|
||||
raise
|
||||
else:
|
||||
return item, queue
|
||||
|
||||
def __repr__(self):
|
||||
return "<QueueSet: %s>" % repr(self.queue_names)
|
||||
|
||||
|
||||
class QualityOfService(object):
|
||||
|
||||
def __init__(self, resource, prefetch_count=None, interval=None):
|
||||
self.resource = resource
|
||||
self.prefetch_count = prefetch_count
|
||||
self.interval = interval
|
||||
self._delivered = SortedDict()
|
||||
self._restored_once = False
|
||||
atexit.register(self.restore_unacked_once)
|
||||
|
||||
def can_consume(self):
|
||||
return len(self._delivered) > self.prefetch_count
|
||||
|
||||
def append(self, message, queue_name, delivery_tag):
|
||||
self._delivered[delivery_tag] = message, queue_name
|
||||
|
||||
def ack(self, delivery_tag):
|
||||
self._delivered.pop(delivery_tag, None)
|
||||
|
||||
def restore_unacked(self):
|
||||
for message, queue_name in self._delivered.items():
|
||||
self.resource.put(queue_name, message)
|
||||
self._delivered = SortedDict()
|
||||
|
||||
def requeue(self, delivery_tag):
|
||||
try:
|
||||
message, queue_name = self._delivered.pop(delivery_tag)
|
||||
except KeyError:
|
||||
pass
|
||||
self.resource.put(queue_name, message)
|
||||
|
||||
def restore_unacked_once(self):
|
||||
if not self._restored_once:
|
||||
if self._delivered:
|
||||
sys.stderr.write(
|
||||
"Restoring unacknowledged messages: %s\n" % (
|
||||
self._delivered))
|
||||
self.restore_unacked()
|
||||
if self._delivered:
|
||||
sys.stderr.write("UNRESTORED MESSAGES: %s\n" % (
|
||||
self._delivered))
|
||||
|
||||
|
||||
class Message(BaseMessage):
|
||||
|
||||
def __init__(self, backend, payload, **kwargs):
|
||||
self.backend = backend
|
||||
|
||||
payload = deserialize(payload)
|
||||
kwargs["body"] = payload.get("body").encode("utf-8")
|
||||
kwargs["delivery_tag"] = payload.get("delivery_tag")
|
||||
kwargs["content_type"] = payload.get("content-type")
|
||||
kwargs["content_encoding"] = payload.get("content-encoding")
|
||||
kwargs["priority"] = payload.get("priority")
|
||||
self.destination = payload.get("destination")
|
||||
|
||||
super(Message, self).__init__(backend, **kwargs)
|
||||
|
||||
def reject(self):
|
||||
raise NotImplementedError(
|
||||
"This backend does not implement basic.reject")
|
||||
|
||||
|
||||
class EmulationBase(BaseBackend):
|
||||
Message = Message
|
||||
default_port = None
|
||||
interval = 1
|
||||
_prefetch_count = None
|
||||
|
||||
QueueSet = QueueSet
|
||||
|
||||
def __init__(self, connection, **kwargs):
|
||||
self.connection = connection
|
||||
self._consumers = {}
|
||||
self._callbacks = {}
|
||||
self._qos_manager = None
|
||||
|
||||
def establish_connection(self):
|
||||
return self # for drain events
|
||||
|
||||
def close_connection(self, connection):
|
||||
pass
|
||||
|
||||
def queue_exists(self, queue):
|
||||
return True
|
||||
|
||||
def queue_purge(self, queue, **kwargs):
|
||||
return self._purge(queue, **kwargs)
|
||||
|
||||
def _poll(self, resource):
|
||||
while True:
|
||||
if self.qos_manager.can_consume():
|
||||
try:
|
||||
return resource.get()
|
||||
except QueueEmpty:
|
||||
pass
|
||||
time.sleep(self.interval)
|
||||
|
||||
def declare_consumer(self, queue, no_ack, callback, consumer_tag,
|
||||
**kwargs):
|
||||
self._consumers[consumer_tag] = queue
|
||||
self._callbacks[queue] = callback
|
||||
|
||||
def drain_events(self, timeout=None):
|
||||
queueset = self.QueueSet(self._consumers.values())
|
||||
payload, queue = self._poll(queueset)
|
||||
|
||||
if not queue or queue not in self._callbacks:
|
||||
return
|
||||
|
||||
self._callbacks[queue](payload)
|
||||
|
||||
def consume(self, limit=None):
|
||||
for total_message_count in count():
|
||||
if limit and total_message_count >= limit:
|
||||
raise StopIteration
|
||||
|
||||
self.drain_events()
|
||||
|
||||
yield True
|
||||
|
||||
def queue_declare(self, queue, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def _get_many(self, queues):
|
||||
raise NotImplementedError("Emulations must implement _get_many")
|
||||
|
||||
def _get(self, queue):
|
||||
raise NotImplementedError("Emulations must implement _get")
|
||||
|
||||
def _put(self, queue, message):
|
||||
raise NotImplementedError("Emulations must implement _put")
|
||||
|
||||
def _purge(self, queue, message):
|
||||
raise NotImplementedError("Emulations must implement _purge")
|
||||
|
||||
def get(self, queue, **kwargs):
|
||||
try:
|
||||
payload = self._get(queue)
|
||||
except QueueEmpty:
|
||||
return None
|
||||
else:
|
||||
return self.message_to_python(payload)
|
||||
|
||||
def ack(self, delivery_tag):
|
||||
self.qos_manager.ack(delivery_tag)
|
||||
|
||||
def requeue(self, delivery_tag):
|
||||
self.qos_manager.requeue(delivery_tag)
|
||||
|
||||
def message_to_python(self, raw_message):
|
||||
message = self.Message(backend=self, payload=raw_message)
|
||||
self.qos_manager.append(message, message.destination,
|
||||
message.delivery_tag)
|
||||
return message
|
||||
|
||||
def prepare_message(self, message_data, delivery_mode, priority=0,
|
||||
content_type=None, content_encoding=None):
|
||||
return {"body": message_data,
|
||||
"priority": priority or 0,
|
||||
"content-encoding": content_encoding,
|
||||
"content-type": content_type}
|
||||
|
||||
def publish(self, message, exchange, routing_key, **kwargs):
|
||||
message["destination"] = exchange
|
||||
self._put(exchange, message)
|
||||
|
||||
def cancel(self, consumer_tag):
|
||||
queue = self._consumers.pop(consumer_tag, None)
|
||||
self._callbacks.pop(queue, None)
|
||||
|
||||
def close(self):
|
||||
for consumer_tag in self._consumers.keys():
|
||||
self.cancel(consumer_tag)
|
||||
|
||||
def basic_qos(self, prefetch_size, prefetch_count, apply_global=False):
|
||||
self._prefetch_count = prefetch_count
|
||||
|
||||
@property
|
||||
def qos_manager(self):
|
||||
if self._qos_manager is None:
|
||||
self._qos_manager = QualityOfService(self)
|
||||
|
||||
# Update prefetch count / interval
|
||||
self._qos_manager.prefetch_count = self._prefetch_count
|
||||
self._qos_manager.interval = self.interval
|
||||
|
||||
return self._qos_manager
|
|
@ -0,0 +1,212 @@
|
|||
import asyncore
|
||||
import weakref
|
||||
import functools
|
||||
import itertools
|
||||
|
||||
import pika
|
||||
|
||||
from carrot.backends.base import BaseMessage, BaseBackend
|
||||
|
||||
DEFAULT_PORT = 5672
|
||||
|
||||
|
||||
class Message(BaseMessage):
|
||||
|
||||
def __init__(self, backend, amqp_message, **kwargs):
|
||||
channel, method, header, body = amqp_message
|
||||
self._channel = channel
|
||||
self._method = method
|
||||
self._header = header
|
||||
self.backend = backend
|
||||
|
||||
kwargs.update({"body": body,
|
||||
"delivery_tag": method.delivery_tag,
|
||||
"content_type": header.content_type,
|
||||
"content_encoding": header.content_encoding,
|
||||
"delivery_info": dict(
|
||||
consumer_tag=method.consumer_tag,
|
||||
routing_key=method.routing_key,
|
||||
delivery_tag=method.delivery_tag,
|
||||
exchange=method.exchange)})
|
||||
|
||||
super(Message, self).__init__(backend, **kwargs)
|
||||
|
||||
|
||||
class SyncBackend(BaseBackend):
|
||||
default_port = DEFAULT_PORT
|
||||
_connection_cls = pika.BlockingConnection
|
||||
|
||||
Message = Message
|
||||
|
||||
def __init__(self, connection, **kwargs):
|
||||
self.connection = connection
|
||||
self.default_port = kwargs.get("default_port", self.default_port)
|
||||
self._channel_ref = None
|
||||
|
||||
@property
|
||||
def _channel(self):
|
||||
return callable(self._channel_ref) and self._channel_ref()
|
||||
|
||||
@property
|
||||
def channel(self):
|
||||
"""If no channel exists, a new one is requested."""
|
||||
if not self._channel:
|
||||
self._channel_ref = weakref.ref(self.connection.get_channel())
|
||||
return self._channel
|
||||
|
||||
def establish_connection(self):
|
||||
"""Establish connection to the AMQP broker."""
|
||||
conninfo = self.connection
|
||||
if not conninfo.port:
|
||||
conninfo.port = self.default_port
|
||||
credentials = pika.PlainCredentials(conninfo.userid,
|
||||
conninfo.password)
|
||||
return self._connection_cls(pika.ConnectionParameters(
|
||||
conninfo.hostname,
|
||||
port=conninfo.port,
|
||||
virtual_host=conninfo.virtual_host,
|
||||
credentials=credentials))
|
||||
|
||||
def close_connection(self, connection):
|
||||
"""Close the AMQP broker connection."""
|
||||
connection.close()
|
||||
|
||||
def queue_exists(self, queue):
|
||||
return False # FIXME
|
||||
|
||||
def queue_delete(self, queue, if_unused=False, if_empty=False):
|
||||
"""Delete queue by name."""
|
||||
return self.channel.queue_delete(queue=queue, if_unused=if_unused,
|
||||
if_empty=if_empty)
|
||||
|
||||
def queue_purge(self, queue, **kwargs):
|
||||
"""Discard all messages in the queue. This will delete the messages
|
||||
and results in an empty queue."""
|
||||
return self.channel.queue_purge(queue=queue).message_count
|
||||
|
||||
def queue_declare(self, queue, durable, exclusive, auto_delete,
|
||||
warn_if_exists=False, arguments=None):
|
||||
"""Declare a named queue."""
|
||||
|
||||
return self.channel.queue_declare(queue=queue,
|
||||
durable=durable,
|
||||
exclusive=exclusive,
|
||||
auto_delete=auto_delete,
|
||||
arguments=arguments)
|
||||
|
||||
def exchange_declare(self, exchange, type, durable, auto_delete):
|
||||
"""Declare an named exchange."""
|
||||
return self.channel.exchange_declare(exchange=exchange,
|
||||
type=type,
|
||||
durable=durable,
|
||||
auto_delete=auto_delete)
|
||||
|
||||
def queue_bind(self, queue, exchange, routing_key, arguments={}):
|
||||
"""Bind queue to an exchange using a routing key."""
|
||||
if not arguments:
|
||||
arguments = {}
|
||||
return self.channel.queue_bind(queue=queue,
|
||||
exchange=exchange,
|
||||
routing_key=routing_key,
|
||||
arguments=arguments)
|
||||
|
||||
def message_to_python(self, raw_message):
|
||||
"""Convert encoded message body back to a Python value."""
|
||||
return self.Message(backend=self, amqp_message=raw_message)
|
||||
|
||||
def get(self, queue, no_ack=False):
|
||||
"""Receive a message from a declared queue by name.
|
||||
|
||||
:returns: A :class:`Message` object if a message was received,
|
||||
``None`` otherwise. If ``None`` was returned, it probably means
|
||||
there was no messages waiting on the queue.
|
||||
|
||||
"""
|
||||
raw_message = self.channel.basic_get(queue, no_ack=no_ack)
|
||||
if not raw_message:
|
||||
return None
|
||||
return self.message_to_python(raw_message)
|
||||
|
||||
def declare_consumer(self, queue, no_ack, callback, consumer_tag,
|
||||
nowait=False):
|
||||
"""Declare a consumer."""
|
||||
|
||||
@functools.wraps(callback)
|
||||
def _callback_decode(channel, method, header, body):
|
||||
return callback((channel, method, header, body))
|
||||
|
||||
return self.channel.basic_consume(_callback_decode,
|
||||
queue=queue,
|
||||
no_ack=no_ack,
|
||||
consumer_tag=consumer_tag)
|
||||
|
||||
def consume(self, limit=None):
|
||||
"""Returns an iterator that waits for one message at a time."""
|
||||
for total_message_count in itertools.count():
|
||||
if limit and total_message_count >= limit:
|
||||
raise StopIteration
|
||||
self.connection.connection.drain_events()
|
||||
yield True
|
||||
|
||||
def cancel(self, consumer_tag):
|
||||
"""Cancel a channel by consumer tag."""
|
||||
if not self._channel:
|
||||
return
|
||||
self.channel.basic_cancel(consumer_tag)
|
||||
|
||||
def close(self):
|
||||
"""Close the channel if open."""
|
||||
if self._channel and not self._channel.handler.channel_close:
|
||||
self._channel.close()
|
||||
self._channel_ref = None
|
||||
|
||||
def ack(self, delivery_tag):
|
||||
"""Acknowledge a message by delivery tag."""
|
||||
return self.channel.basic_ack(delivery_tag)
|
||||
|
||||
def reject(self, delivery_tag):
|
||||
"""Reject a message by deliver tag."""
|
||||
return self.channel.basic_reject(delivery_tag, requeue=False)
|
||||
|
||||
def requeue(self, delivery_tag):
|
||||
"""Reject and requeue a message by delivery tag."""
|
||||
return self.channel.basic_reject(delivery_tag, requeue=True)
|
||||
|
||||
def prepare_message(self, message_data, delivery_mode, priority=None,
|
||||
content_type=None, content_encoding=None):
|
||||
"""Encapsulate data into a AMQP message."""
|
||||
properties = pika.BasicProperties(priority=priority,
|
||||
content_type=content_type,
|
||||
content_encoding=content_encoding,
|
||||
delivery_mode=delivery_mode)
|
||||
return message_data, properties
|
||||
|
||||
def publish(self, message, exchange, routing_key, mandatory=None,
|
||||
immediate=None, headers=None):
|
||||
"""Publish a message to a named exchange."""
|
||||
body, properties = message
|
||||
|
||||
if headers:
|
||||
properties.headers = headers
|
||||
|
||||
ret = self.channel.basic_publish(body=body,
|
||||
properties=properties,
|
||||
exchange=exchange,
|
||||
routing_key=routing_key,
|
||||
mandatory=mandatory,
|
||||
immediate=immediate)
|
||||
if mandatory or immediate:
|
||||
self.close()
|
||||
|
||||
def qos(self, prefetch_size, prefetch_count, apply_global=False):
|
||||
"""Request specific Quality of Service."""
|
||||
self.channel.basic_qos(prefetch_size, prefetch_count,
|
||||
apply_global)
|
||||
|
||||
def flow(self, active):
|
||||
"""Enable/disable flow from peer."""
|
||||
self.channel.flow(active)
|
||||
|
||||
|
||||
class AsyncoreBackend(SyncBackend):
|
||||
_connection_cls = pika.AsyncoreConnection
|
|
@ -0,0 +1,202 @@
|
|||
"""
|
||||
|
||||
`amqplib`_ backend for carrot.
|
||||
|
||||
.. _`amqplib`: http://barryp.org/software/py-amqplib/
|
||||
|
||||
"""
|
||||
import warnings
|
||||
import weakref
|
||||
|
||||
from itertools import count
|
||||
|
||||
from amqplib import client_0_8 as amqp
|
||||
from amqplib.client_0_8.exceptions import AMQPChannelException
|
||||
|
||||
from kombu.backends.base import BaseMessage, BaseBackend
|
||||
|
||||
DEFAULT_PORT = 5672
|
||||
|
||||
class Connection(amqp.Connection):
|
||||
|
||||
def drain_events(self, allowed_methods=None, timeout=None):
|
||||
"""Wait for an event on any channel."""
|
||||
return self.wait_multi(self.channels.values(), timeout=timeout)
|
||||
|
||||
def wait_multi(self, channels, allowed_methods=None, timeout=None):
|
||||
"""Wait for an event on a channel."""
|
||||
chanmap = dict((chan.channel_id, chan) for chan in channels)
|
||||
chanid, method_sig, args, content = self._wait_multiple(
|
||||
chanmap.keys(), allowed_methods, timeout=timeout)
|
||||
|
||||
channel = chanmap[chanid]
|
||||
|
||||
if content \
|
||||
and channel.auto_decode \
|
||||
and hasattr(content, 'content_encoding'):
|
||||
try:
|
||||
content.body = content.body.decode(content.content_encoding)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
amqp_method = channel._METHOD_MAP.get(method_sig, None)
|
||||
|
||||
if amqp_method is None:
|
||||
raise Exception('Unknown AMQP method (%d, %d)' % method_sig)
|
||||
|
||||
if content is None:
|
||||
return amqp_method(channel, args)
|
||||
else:
|
||||
return amqp_method(channel, args, content)
|
||||
|
||||
def read_timeout(self, timeout=None):
|
||||
if timeout is None:
|
||||
return self.method_reader.read_method()
|
||||
sock = self.transport.sock
|
||||
prev = sock.gettimeout()
|
||||
sock.settimeout(timeout)
|
||||
try:
|
||||
return self.method_reader.read_method()
|
||||
finally:
|
||||
sock.settimeout(prev)
|
||||
|
||||
def _wait_multiple(self, channel_ids, allowed_methods, timeout=None):
|
||||
for channel_id in channel_ids:
|
||||
method_queue = self.channels[channel_id].method_queue
|
||||
for queued_method in method_queue:
|
||||
method_sig = queued_method[0]
|
||||
if (allowed_methods is None) \
|
||||
or (method_sig in allowed_methods) \
|
||||
or (method_sig == (20, 40)):
|
||||
method_queue.remove(queued_method)
|
||||
method_sig, args, content = queued_method
|
||||
return channel_id, method_sig, args, content
|
||||
|
||||
# Nothing queued, need to wait for a method from the peer
|
||||
while True:
|
||||
channel, method_sig, args, content = self.read_timeout(timeout)
|
||||
|
||||
if (channel in channel_ids) \
|
||||
and ((allowed_methods is None) \
|
||||
or (method_sig in allowed_methods) \
|
||||
or (method_sig == (20, 40))):
|
||||
return channel, method_sig, args, content
|
||||
|
||||
# Not the channel and/or method we were looking for. Queue
|
||||
# this method for later
|
||||
self.channels[channel].method_queue.append((method_sig,
|
||||
args,
|
||||
content))
|
||||
|
||||
#
|
||||
# If we just queued up a method for channel 0 (the Connection
|
||||
# itself) it's probably a close method in reaction to some
|
||||
# error, so deal with it right away.
|
||||
#
|
||||
if channel == 0:
|
||||
self.wait()
|
||||
|
||||
def channel(self, channel_id=None):
|
||||
try:
|
||||
return self.channels[channel_id]
|
||||
except KeyError:
|
||||
return Channel(self, channel_id)
|
||||
|
||||
|
||||
class Message(BaseMessage):
|
||||
"""A message received by the broker.
|
||||
|
||||
Usually you don't insantiate message objects yourself, but receive
|
||||
them using a :class:`carrot.messaging.Consumer`.
|
||||
|
||||
:param backend: see :attr:`backend`.
|
||||
:param amqp_message: see :attr:`_amqp_message`.
|
||||
|
||||
|
||||
.. attribute:: body
|
||||
|
||||
The message body.
|
||||
|
||||
.. attribute:: delivery_tag
|
||||
|
||||
The message delivery tag, uniquely identifying this message.
|
||||
|
||||
.. attribute:: backend
|
||||
|
||||
The message backend used.
|
||||
A subclass of :class:`carrot.backends.base.BaseBackend`.
|
||||
|
||||
.. attribute:: _amqp_message
|
||||
|
||||
A :class:`amqplib.client_0_8.basic_message.Message` instance.
|
||||
This is a private attribute and should not be accessed by
|
||||
production code.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, channel, amqp_message, **kwargs):
|
||||
self._amqp_message = amqp_message
|
||||
self.channel = channel
|
||||
|
||||
for attr_name in ("body",
|
||||
"delivery_tag",
|
||||
"content_type",
|
||||
"content_encoding",
|
||||
"delivery_info"):
|
||||
kwargs[attr_name] = getattr(amqp_message, attr_name, None)
|
||||
|
||||
super(Message, self).__init__(backend, **kwargs)
|
||||
|
||||
|
||||
class Channel(amqp.Channel):
|
||||
Message = Message
|
||||
|
||||
def prepare_message(self, message_data, priority=None,
|
||||
content_type=None, content_encoding=None, headers=None,
|
||||
properties=None):
|
||||
"""Encapsulate data into a AMQP message."""
|
||||
return amqp.Message(message_data, priority=priority,
|
||||
content_type=content_type,
|
||||
content_encoding=content_encoding,
|
||||
properties=properties)
|
||||
|
||||
def message_to_python(self, raw_message):
|
||||
"""Convert encoded message body back to a Python value."""
|
||||
return self.Message(channel=self, amqp_message=raw_message)
|
||||
|
||||
|
||||
class Backend(BaseBackend):
|
||||
default_port = DEFAULT_PORT
|
||||
|
||||
def __init__(self, connection, **kwargs):
|
||||
self.connection = connection
|
||||
self.default_port = kwargs.get("default_port") or self.default_port
|
||||
|
||||
def get_channel(self, connection):
|
||||
return connection.channel()
|
||||
|
||||
def drain_events(self, connection, **kwargs):
|
||||
return connection.drain_events(**kwargs)
|
||||
|
||||
def establish_connection(self):
|
||||
"""Establish connection to the AMQP broker."""
|
||||
conninfo = self.connection
|
||||
if not conninfo.hostname:
|
||||
raise KeyError("Missing hostname for AMQP connection.")
|
||||
if conninfo.userid is None:
|
||||
raise KeyError("Missing user id for AMQP connection.")
|
||||
if conninfo.password is None:
|
||||
raise KeyError("Missing password for AMQP connection.")
|
||||
if not conninfo.port:
|
||||
conninfo.port = self.default_port
|
||||
return Connection(host=conninfo.host,
|
||||
userid=conninfo.userid,
|
||||
password=conninfo.password,
|
||||
virtual_host=conninfo.virtual_host,
|
||||
insist=conninfo.insist,
|
||||
ssl=conninfo.ssl,
|
||||
connect_timeout=conninfo.connect_timeout)
|
||||
|
||||
def close_connection(self, connection):
|
||||
"""Close the AMQP broker connection."""
|
||||
connection.close()
|
|
@ -0,0 +1,79 @@
|
|||
from kombu.backends import get_backend_cls
|
||||
|
||||
|
||||
class BrokerConnection(object):
|
||||
|
||||
def __init__(self, hostname="localhost", userid="guest",
|
||||
password="guest", virtual_host="/", port=None, **kwargs):
|
||||
self.hostname = hostname
|
||||
self.userid = userid
|
||||
self.password = password
|
||||
self.virtual_host = virtual_host or self.virtual_host
|
||||
self.port = port or self.port
|
||||
self.insist = kwargs.get("insist", False)
|
||||
self.connect_timeout = kwargs.get("connect_timeout", 5)
|
||||
self.ssl = kwargs.get("ssl", False)
|
||||
self.backend_cls = kwargs.get("backend_cls", None)
|
||||
self._closed = None
|
||||
self._connection = None
|
||||
self._backend = None
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, e_type, e_value, e_trace):
|
||||
self.close()
|
||||
|
||||
def _establish_connection(self):
|
||||
return self.backend.establish_connection()
|
||||
|
||||
@property
|
||||
def connection(self):
|
||||
if self._closed:
|
||||
return
|
||||
if not self._connection:
|
||||
self._connection = self._establish_connection()
|
||||
self._closed = False
|
||||
return self._connection
|
||||
|
||||
@property
|
||||
def host(self):
|
||||
"""The host as a hostname/port pair separated by colon."""
|
||||
return ":".join([self.hostname, str(self.port)])
|
||||
|
||||
def get_backend_cls(self):
|
||||
"""Get the currently used backend class."""
|
||||
backend_cls = self.backend_cls
|
||||
if not backend_cls or isinstance(backend_cls, basestring):
|
||||
backend_cls = get_backend_cls(backend_cls)
|
||||
return backend_cls
|
||||
|
||||
def connect(self):
|
||||
"""Establish a connection to the AMQP server."""
|
||||
self._closed = False
|
||||
return self.connection
|
||||
|
||||
def channel(self):
|
||||
"""Request a new AMQP channel."""
|
||||
return self.backend.create_channel(self.connection)
|
||||
|
||||
def drain_events(self, **kwargs):
|
||||
return self.backend.drain_events(self.connection, **kwargs)
|
||||
|
||||
def close(self):
|
||||
"""Close the currently open connection."""
|
||||
try:
|
||||
if self._connection:
|
||||
self.backend.close_connection(self._connection)
|
||||
except socket.error:
|
||||
pass
|
||||
self._closed = True
|
||||
|
||||
def _create_backend(self):
|
||||
return self.get_backend_cls()(connection=self)
|
||||
|
||||
@property
|
||||
def backend(self):
|
||||
if self._backend is None:
|
||||
self._backend = self._create_backend()
|
||||
return self._backend
|
|
@ -0,0 +1,116 @@
|
|||
class Exchange(object):
|
||||
TRANSIENT_DELIVERY_MODE = 1
|
||||
PERSISTENT_DELIVERY_MODE = 2
|
||||
DELIVERY_MODES = {
|
||||
"transient": TRANSIENT_DELIVERY_MODE,
|
||||
"persistent": PERSISTENT_DELIVERY_MODE,
|
||||
}
|
||||
name = ""
|
||||
type = "direct"
|
||||
routing_key = ""
|
||||
delivery_mode = PERSISTENT_DELIVERY_MODE
|
||||
durable = True
|
||||
auto_delete = False
|
||||
_init_opts = ("durable", "auto_delete",
|
||||
"delivery_mode", "auto_declare")
|
||||
|
||||
def __init__(self, name="", type="", routing_key=None, **kwargs):
|
||||
self.name = name or self.name
|
||||
self.type = type or self.type
|
||||
self.routing_key = routing_key or self.routing_key
|
||||
for opt_name in self._init_opts:
|
||||
opt_value = kwargs.get(opt_name)
|
||||
if opt_value is not None:
|
||||
setattr(self, opt_name, opt_value)
|
||||
self.delivery_mode = self.DELIVERY_MODES.get(self.delivery_mode,
|
||||
self.delivery_mode)
|
||||
|
||||
def declare(self, channel):
|
||||
"""Declare the exchange.
|
||||
|
||||
Creates the exchange on the broker.
|
||||
|
||||
"""
|
||||
channel.exchange_declare(exchange=self.name,
|
||||
type=self.type,
|
||||
durable=self.durable,
|
||||
auto_delete=self.auto_delete)
|
||||
|
||||
def create_message(self, channel, message_data, delivery_mode=None,
|
||||
priority=None, content_type=None, content_encoding=None,
|
||||
properties=None):
|
||||
properties["delivery_mode"] = delivery_mode or self.delivery_mode
|
||||
return channel.prepare_message(message_data,
|
||||
properties=properties,
|
||||
priority=priority,
|
||||
content_type=content_type,
|
||||
content_encoding=content_encoding)
|
||||
|
||||
def publish(self, channel, message, routing_key=None,
|
||||
mandatory=False, immediate=False):
|
||||
if routing_key is None:
|
||||
routing_key = self.routing_key
|
||||
channel.basic_publish(message,
|
||||
exchange=self.name, routing_key=routing_key,
|
||||
mandatory=mandatory, immediate=immediate,
|
||||
headers=headers)
|
||||
|
||||
|
||||
class Binding(object):
|
||||
name = ""
|
||||
exchange = None
|
||||
routing_key = ""
|
||||
|
||||
durable = True
|
||||
exclusive = False
|
||||
auto_delete = False
|
||||
warn_if_exists = False
|
||||
_init_opts = ("durable", "exclusive", "auto_delete",
|
||||
"warn_if_exists")
|
||||
|
||||
def __init__(self, name=None, exchange=None, routing_key=None, **kwargs):
|
||||
# Binding.
|
||||
self.name = name or self.name
|
||||
self.exchange = exchange or self.exchange
|
||||
self.routing_key = routing_key or self.routing_key
|
||||
|
||||
# Options
|
||||
for opt_name in self._init_opts:
|
||||
opt_value = kwargs.get(opt_name)
|
||||
if opt_value is not None:
|
||||
setattr(self, opt_name, opt_value)
|
||||
|
||||
# exclusive implies auto-delete.
|
||||
if self.exclusive:
|
||||
self.auto_delete = True
|
||||
|
||||
def declare(self, channel):
|
||||
"""Declares the queue, the exchange and binds the queue to
|
||||
the exchange."""
|
||||
if self.exchange:
|
||||
self.exchange.declare(channel)
|
||||
if self.name:
|
||||
channel.queue_declare(queue=self.name, durable=self.durable,
|
||||
exclusive=self.exclusive,
|
||||
auto_delete=self.auto_delete)
|
||||
channel.queue_bind(queue=self.name,
|
||||
exchange=self.exchange.name,
|
||||
routing_key=self.routing_key)
|
||||
|
||||
def get(self, channel, no_ack=None):
|
||||
message = channel.basic_get(self.name, no_ack=no_ack)
|
||||
if message:
|
||||
return channel.message_to_python(message)
|
||||
|
||||
def purge(self, channel):
|
||||
return channel.queue_purge(self.name)
|
||||
|
||||
def consume(self, channel, consumer_tag, callback, no_ack=None,
|
||||
nowait=True):
|
||||
return channel.consume(queue=self.name, no_ack=no_ack,
|
||||
consumer_tag=consumer_tag,
|
||||
callback=callback,
|
||||
nowait=nowait)
|
||||
|
||||
def cancel(self, channel, consumer_tag):
|
||||
channel.basic_cancel(consumer_tag)
|
|
@ -0,0 +1,136 @@
|
|||
from itertools import count
|
||||
|
||||
from kombu import serialization
|
||||
from kombu.entity import Exchange, Binding
|
||||
|
||||
|
||||
class Producer(object):
|
||||
exchange = None
|
||||
serializer = None
|
||||
auto_declare = True
|
||||
|
||||
def __init__(self, channel, exchange=None, serializer=None,
|
||||
auto_declare=None):
|
||||
self.channel = channel
|
||||
self.exchange = exchange or self.exchange
|
||||
self.serializer = serializer or self.serializer
|
||||
if auto_declare is not None:
|
||||
self.auto_declare = auto_declare
|
||||
|
||||
if self.exchange and self.auto_declare:
|
||||
self.exchange.declare(self.channel)
|
||||
|
||||
def prepare(self, message_data, serializer=None,
|
||||
content_type=None, content_encoding=None):
|
||||
# No content_type? Then we're serializing the data internally.
|
||||
if not content_type:
|
||||
serializer = serializer or self.serializer
|
||||
(content_type, content_encoding,
|
||||
message_data) = serialization.encode(message_data,
|
||||
serializer=serializer)
|
||||
else:
|
||||
# If the programmer doesn't want us to serialize,
|
||||
# make sure content_encoding is set.
|
||||
if isinstance(message_data, unicode):
|
||||
if not content_encoding:
|
||||
content_encoding = 'utf-8'
|
||||
message_data = message_data.encode(content_encoding)
|
||||
|
||||
# If they passed in a string, we can't know anything
|
||||
# about it. So assume it's binary data.
|
||||
elif not content_encoding:
|
||||
content_encoding = 'binary'
|
||||
|
||||
return message_data, content_type, content_encoding
|
||||
|
||||
def publish(self, message_data, routing_key=None, delivery_mode=None,
|
||||
mandatory=False, immediate=False, priority=0, content_type=None,
|
||||
content_encoding=None, serializer=None):
|
||||
|
||||
message_data, content_type, content_encoding = self.prepare(
|
||||
message_data, content_type, content_encoding)
|
||||
message = self.exchange.create_message(channel, message_data,
|
||||
delivery_mode, priority, content_type, content_encoding)
|
||||
return self.exchange.publish(message, routing_key, mandatory,
|
||||
immediate)
|
||||
|
||||
|
||||
_next_tag = count(1).next
|
||||
|
||||
class Consumer(object):
|
||||
no_ack = False
|
||||
auto_declare = True
|
||||
callbacks = None
|
||||
|
||||
def __init__(self, channel, bindings, no_ack=None, auto_declare=None,
|
||||
callbacks=None):
|
||||
self.channel = channel
|
||||
self.bindings = bindings
|
||||
if no_ack is not None:
|
||||
self.no_ack = no_ack
|
||||
if auto_declare is not None:
|
||||
self.auto_declare = auto_declare
|
||||
|
||||
if callbacks is not None:
|
||||
self.callbacks = callbacks
|
||||
if self.callbacks is None:
|
||||
self.callbacks = []
|
||||
|
||||
# bindings maybe list
|
||||
if not hasattr(self.bindings, "__iter__"):
|
||||
self.bindings = [self.bindings]
|
||||
|
||||
if self.auto_declare:
|
||||
for binding in self.bindings:
|
||||
binding.declare(self.channel)
|
||||
|
||||
self._active_tags = {}
|
||||
self._declare_consumer()
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self):
|
||||
self.cancel()
|
||||
|
||||
def _consume(self):
|
||||
H, T = self.bindings[:-1], self.bindings[-1]
|
||||
for binding in H:
|
||||
binding.consume(self.channel, self._add_tag(binding),
|
||||
self._receive_callback, self.no_ack,
|
||||
nowait=True)
|
||||
T.consume(self.channel, self._add_tag(T),
|
||||
self._receive_callback,
|
||||
self.no_ack, nowait=False)
|
||||
|
||||
|
||||
def _add_tag(self, binding):
|
||||
tag = self._active_tags[binding] = str(_next_tag())
|
||||
return tag
|
||||
|
||||
def _receive_callback(self, raw_message):
|
||||
message = self.channel.message_to_python(raw_message)
|
||||
self.receive(message.payload, message)
|
||||
|
||||
def receive(self, message_data, message):
|
||||
if not self.callbacks:
|
||||
raise NotImplementedError("No consumer callbacks registered")
|
||||
for callback in self.callbacks:
|
||||
callback(message_data, message)
|
||||
|
||||
def register_callback(self, callback):
|
||||
self.callbacks.append(callback)
|
||||
|
||||
def purge(self):
|
||||
for binding in self.bindings:
|
||||
self.binding.purge(self.channel)
|
||||
|
||||
def cancel(self):
|
||||
for binding, tag in self._active_tags.items():
|
||||
binding.cancel(self.channel, tag)
|
||||
|
||||
def flow(self, active):
|
||||
self.channel.flow(active)
|
||||
|
||||
def qos(self, prefetch_size=0, prefetch_count=0, apply_global=False):
|
||||
self.channel.qos(prefetch_size, prefetch_count, apply_global)
|
|
@ -0,0 +1,279 @@
|
|||
"""
|
||||
Centralized support for encoding/decoding of data structures.
|
||||
Requires a json library (`cjson`_, `simplejson`_, or `Python 2.6+`_).
|
||||
|
||||
Pickle support is built-in.
|
||||
|
||||
Optionally installs support for ``YAML`` if the `PyYAML`_ package
|
||||
is installed.
|
||||
|
||||
Optionally installs support for `msgpack`_ if the `msgpack-python`_
|
||||
package is installed.
|
||||
|
||||
.. _`cjson`: http://pypi.python.org/pypi/python-cjson/
|
||||
.. _`simplejson`: http://code.google.com/p/simplejson/
|
||||
.. _`Python 2.6+`: http://docs.python.org/library/json.html
|
||||
.. _`PyYAML`: http://pyyaml.org/
|
||||
.. _`msgpack`: http://msgpack.sourceforge.net/
|
||||
.. _`msgpack-python`: http://pypi.python.org/pypi/msgpack-python/
|
||||
|
||||
"""
|
||||
|
||||
import codecs
|
||||
|
||||
__all__ = ['SerializerNotInstalled', 'registry']
|
||||
|
||||
|
||||
class SerializerNotInstalled(StandardError):
|
||||
"""Support for the requested serialization type is not installed"""
|
||||
|
||||
|
||||
class SerializerRegistry(object):
|
||||
"""The registry keeps track of serialization methods."""
|
||||
|
||||
def __init__(self):
|
||||
self._encoders = {}
|
||||
self._decoders = {}
|
||||
self._default_encode = None
|
||||
self._default_content_type = None
|
||||
self._default_content_encoding = None
|
||||
|
||||
def register(self, name, encoder, decoder, content_type,
|
||||
content_encoding='utf-8'):
|
||||
"""Register a new encoder/decoder.
|
||||
|
||||
:param name: A convenience name for the serialization method.
|
||||
|
||||
:param encoder: A method that will be passed a python data structure
|
||||
and should return a string representing the serialized data.
|
||||
If ``None``, then only a decoder will be registered. Encoding
|
||||
will not be possible.
|
||||
|
||||
:param decoder: A method that will be passed a string representing
|
||||
serialized data and should return a python data structure.
|
||||
If ``None``, then only an encoder will be registered.
|
||||
Decoding will not be possible.
|
||||
|
||||
:param content_type: The mime-type describing the serialized
|
||||
structure.
|
||||
|
||||
:param content_encoding: The content encoding (character set) that
|
||||
the :param:`decoder` method will be returning. Will usually be
|
||||
``utf-8``, ``us-ascii``, or ``binary``.
|
||||
|
||||
"""
|
||||
if encoder:
|
||||
self._encoders[name] = (content_type, content_encoding, encoder)
|
||||
if decoder:
|
||||
self._decoders[content_type] = decoder
|
||||
|
||||
def _set_default_serializer(self, name):
|
||||
"""
|
||||
Set the default serialization method used by this library.
|
||||
|
||||
:param name: The name of the registered serialization method.
|
||||
For example, ``json`` (default), ``pickle``, ``yaml``,
|
||||
or any custom methods registered using :meth:`register`.
|
||||
|
||||
:raises SerializerNotInstalled: If the serialization method
|
||||
requested is not available.
|
||||
"""
|
||||
try:
|
||||
(self._default_content_type, self._default_content_encoding,
|
||||
self._default_encode) = self._encoders[name]
|
||||
except KeyError:
|
||||
raise SerializerNotInstalled(
|
||||
"No encoder installed for %s" % name)
|
||||
|
||||
def encode(self, data, serializer=None):
|
||||
"""
|
||||
Serialize a data structure into a string suitable for sending
|
||||
as an AMQP message body.
|
||||
|
||||
:param data: The message data to send. Can be a list,
|
||||
dictionary or a string.
|
||||
|
||||
:keyword serializer: An optional string representing
|
||||
the serialization method you want the data marshalled
|
||||
into. (For example, ``json``, ``raw``, or ``pickle``).
|
||||
|
||||
If ``None`` (default), then `JSON`_ will be used, unless
|
||||
``data`` is a ``str`` or ``unicode`` object. In this
|
||||
latter case, no serialization occurs as it would be
|
||||
unnecessary.
|
||||
|
||||
Note that if ``serializer`` is specified, then that
|
||||
serialization method will be used even if a ``str``
|
||||
or ``unicode`` object is passed in.
|
||||
|
||||
:returns: A three-item tuple containing the content type
|
||||
(e.g., ``application/json``), content encoding, (e.g.,
|
||||
``utf-8``) and a string containing the serialized
|
||||
data.
|
||||
|
||||
:raises SerializerNotInstalled: If the serialization method
|
||||
requested is not available.
|
||||
"""
|
||||
if serializer == "raw":
|
||||
return raw_encode(data)
|
||||
if serializer and not self._encoders.get(serializer):
|
||||
raise SerializerNotInstalled(
|
||||
"No encoder installed for %s" % serializer)
|
||||
|
||||
# If a raw string was sent, assume binary encoding
|
||||
# (it's likely either ASCII or a raw binary file, but 'binary'
|
||||
# charset will encompass both, even if not ideal.
|
||||
if not serializer and isinstance(data, str):
|
||||
# In Python 3+, this would be "bytes"; allow binary data to be
|
||||
# sent as a message without getting encoder errors
|
||||
return "application/data", "binary", data
|
||||
|
||||
# For unicode objects, force it into a string
|
||||
if not serializer and isinstance(data, unicode):
|
||||
payload = data.encode("utf-8")
|
||||
return "text/plain", "utf-8", payload
|
||||
|
||||
if serializer:
|
||||
content_type, content_encoding, encoder = \
|
||||
self._encoders[serializer]
|
||||
else:
|
||||
encoder = self._default_encode
|
||||
content_type = self._default_content_type
|
||||
content_encoding = self._default_content_encoding
|
||||
|
||||
payload = encoder(data)
|
||||
return content_type, content_encoding, payload
|
||||
|
||||
def decode(self, data, content_type, content_encoding):
|
||||
"""Deserialize a data stream as serialized using ``encode``
|
||||
based on :param:`content_type`.
|
||||
|
||||
:param data: The message data to deserialize.
|
||||
|
||||
:param content_type: The content-type of the data.
|
||||
(e.g., ``application/json``).
|
||||
|
||||
:param content_encoding: The content-encoding of the data.
|
||||
(e.g., ``utf-8``, ``binary``, or ``us-ascii``).
|
||||
|
||||
:returns: The unserialized data.
|
||||
"""
|
||||
content_type = content_type or 'application/data'
|
||||
content_encoding = (content_encoding or 'utf-8').lower()
|
||||
|
||||
# Don't decode 8-bit strings or unicode objects
|
||||
if content_encoding not in ('binary', 'ascii-8bit') and \
|
||||
not isinstance(data, unicode):
|
||||
data = codecs.decode(data, content_encoding)
|
||||
|
||||
try:
|
||||
decoder = self._decoders[content_type]
|
||||
except KeyError:
|
||||
return data
|
||||
|
||||
return decoder(data)
|
||||
|
||||
|
||||
"""
|
||||
.. data:: registry
|
||||
|
||||
Global registry of serializers/deserializers.
|
||||
|
||||
"""
|
||||
registry = SerializerRegistry()
|
||||
|
||||
"""
|
||||
.. function:: encode(data, serializer=default_serializer)
|
||||
|
||||
Encode data using the registry's default encoder.
|
||||
|
||||
"""
|
||||
encode = registry.encode
|
||||
|
||||
"""
|
||||
.. function:: decode(data, content_type, content_encoding):
|
||||
|
||||
Decode data using the registry's default decoder.
|
||||
|
||||
"""
|
||||
decode = registry.decode
|
||||
|
||||
|
||||
def raw_encode(data):
|
||||
"""Special case serializer."""
|
||||
content_type = 'application/data'
|
||||
payload = data
|
||||
if isinstance(payload, unicode):
|
||||
content_encoding = 'utf-8'
|
||||
payload = payload.encode(content_encoding)
|
||||
else:
|
||||
content_encoding = 'binary'
|
||||
return content_type, content_encoding, payload
|
||||
|
||||
|
||||
def register_json():
|
||||
"""Register a encoder/decoder for JSON serialization."""
|
||||
from anyjson import serialize as json_serialize
|
||||
from anyjson import deserialize as json_deserialize
|
||||
|
||||
registry.register('json', json_serialize, json_deserialize,
|
||||
content_type='application/json',
|
||||
content_encoding='utf-8')
|
||||
|
||||
|
||||
def register_yaml():
|
||||
"""Register a encoder/decoder for YAML serialization.
|
||||
|
||||
It is slower than JSON, but allows for more data types
|
||||
to be serialized. Useful if you need to send data such as dates"""
|
||||
try:
|
||||
import yaml
|
||||
registry.register('yaml', yaml.safe_dump, yaml.safe_load,
|
||||
content_type='application/x-yaml',
|
||||
content_encoding='utf-8')
|
||||
except ImportError:
|
||||
|
||||
def not_available(*args, **kwargs):
|
||||
"""In case a client receives a yaml message, but yaml
|
||||
isn't installed."""
|
||||
raise SerializerNotInstalled(
|
||||
"No decoder installed for YAML. Install the PyYAML library")
|
||||
registry.register('yaml', None, not_available, 'application/x-yaml')
|
||||
|
||||
|
||||
def register_pickle():
|
||||
"""The fastest serialization method, but restricts
|
||||
you to python clients."""
|
||||
import cPickle
|
||||
registry.register('pickle', cPickle.dumps, cPickle.loads,
|
||||
content_type='application/x-python-serialize',
|
||||
content_encoding='binary')
|
||||
|
||||
|
||||
def register_msgpack():
|
||||
"""See http://msgpack.sourceforge.net/"""
|
||||
try:
|
||||
import msgpack
|
||||
registry.register('msgpack', msgpack.packs, msgpack.unpacks,
|
||||
content_type='application/x-msgpack',
|
||||
content_encoding='utf-8')
|
||||
except ImportError:
|
||||
|
||||
def not_available(*args, **kwargs):
|
||||
"""In case a client receives a msgpack message, but yaml
|
||||
isn't installed."""
|
||||
raise SerializerNotInstalled(
|
||||
"No decoder installed for msgpack. "
|
||||
"Install the msgpack library")
|
||||
registry.register('msgpack', None, not_available,
|
||||
'application/x-msgpack')
|
||||
|
||||
# Register the base serialization methods.
|
||||
register_json()
|
||||
register_pickle()
|
||||
register_yaml()
|
||||
register_msgpack()
|
||||
|
||||
# JSON is assumed to always be available, so is the default.
|
||||
# (this matches the historical use of kombu.)
|
||||
registry._set_default_serializer('json')
|
|
@ -0,0 +1,66 @@
|
|||
from uuid import UUID, uuid4, _uuid_generate_random
|
||||
try:
|
||||
import ctypes
|
||||
except ImportError:
|
||||
ctypes = None
|
||||
|
||||
|
||||
def gen_unique_id():
|
||||
"""Generate a unique id, having - hopefully - a very small chance of
|
||||
collission.
|
||||
|
||||
For now this is provided by :func:`uuid.uuid4`.
|
||||
"""
|
||||
# Workaround for http://bugs.python.org/issue4607
|
||||
if ctypes and _uuid_generate_random:
|
||||
buffer = ctypes.create_string_buffer(16)
|
||||
_uuid_generate_random(buffer)
|
||||
return str(UUID(bytes=buffer.raw))
|
||||
return str(uuid4())
|
||||
|
||||
|
||||
def _compat_rl_partition(S, sep, direction=None):
|
||||
if direction is None:
|
||||
direction = S.split
|
||||
items = direction(sep, 1)
|
||||
if len(items) == 1:
|
||||
return items[0], sep, ''
|
||||
return items[0], sep, items[1]
|
||||
|
||||
|
||||
def _compat_partition(S, sep):
|
||||
"""``partition(S, sep) -> (head, sep, tail)``
|
||||
|
||||
Search for the separator ``sep`` in ``S``, and return the part before
|
||||
it, the separator itself, and the part after it. If the separator is not
|
||||
found, return ``S`` and two empty strings.
|
||||
|
||||
"""
|
||||
return _compat_rl_partition(S, sep, direction=S.split)
|
||||
|
||||
|
||||
def _compat_rpartition(S, sep):
|
||||
"""``rpartition(S, sep) -> (tail, sep, head)``
|
||||
|
||||
Search for the separator ``sep`` in ``S``, starting at the end of ``S``,
|
||||
and return the part before it, the separator itself, and the part
|
||||
after it. If the separator is not found, return two empty
|
||||
strings and ``S``.
|
||||
|
||||
"""
|
||||
return _compat_rl_partition(S, sep, direction=S.rsplit)
|
||||
|
||||
|
||||
|
||||
def partition(S, sep):
|
||||
if hasattr(S, 'partition'):
|
||||
return S.partition(sep)
|
||||
else: # Python <= 2.4:
|
||||
return _compat_partition(S, sep)
|
||||
|
||||
|
||||
def rpartition(S, sep):
|
||||
if hasattr(S, 'rpartition'):
|
||||
return S.rpartition(sep)
|
||||
else: # Python <= 2.4:
|
||||
return _compat_rpartition(S, sep)
|
|
@ -0,0 +1,16 @@
|
|||
[nosetests]
|
||||
verbosity = 1
|
||||
detailed-errors = 1
|
||||
|
||||
[build_sphinx]
|
||||
source-dir = docs/
|
||||
build-dir = docs/.build
|
||||
all_files = 1
|
||||
|
||||
[upload_sphinx]
|
||||
upload-dir = docs/.build/html
|
||||
|
||||
|
||||
[bdist_rpm]
|
||||
requires = python-anyjson
|
||||
python-amqplib >= 0.6
|
|
@ -0,0 +1,92 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
import os
|
||||
import codecs
|
||||
|
||||
try:
|
||||
from setuptools import setup, find_packages
|
||||
except ImportError:
|
||||
from ez_setup import use_setuptools
|
||||
use_setuptools()
|
||||
from setuptools import setup, find_packages
|
||||
|
||||
from distutils.command.install_data import install_data
|
||||
from distutils.command.install import INSTALL_SCHEMES
|
||||
import sys
|
||||
|
||||
import kombu
|
||||
|
||||
packages, data_files = [], []
|
||||
root_dir = os.path.dirname(__file__)
|
||||
if root_dir != '':
|
||||
os.chdir(root_dir)
|
||||
src_dir = "kombu"
|
||||
|
||||
|
||||
def osx_install_data(install_data):
|
||||
|
||||
def finalize_options(self):
|
||||
self.set_undefined_options("install", ("install_lib", "install_dir"))
|
||||
install_data.finalize_options(self)
|
||||
|
||||
|
||||
def fullsplit(path, result=None):
|
||||
if result is None:
|
||||
result = []
|
||||
head, tail = os.path.split(path)
|
||||
if head == '':
|
||||
return [tail] + result
|
||||
if head == path:
|
||||
return result
|
||||
return fullsplit(head, [tail] + result)
|
||||
|
||||
|
||||
for scheme in INSTALL_SCHEMES.values():
|
||||
scheme['data'] = scheme['purelib']
|
||||
|
||||
for dirpath, dirnames, filenames in os.walk(src_dir):
|
||||
# Ignore dirnames that start with '.'
|
||||
for i, dirname in enumerate(dirnames):
|
||||
if dirname.startswith("."):
|
||||
del dirnames[i]
|
||||
for filename in filenames:
|
||||
if filename.endswith(".py"):
|
||||
packages.append('.'.join(fullsplit(dirpath)))
|
||||
else:
|
||||
data_files.append([dirpath, [os.path.join(dirpath, f) for f in
|
||||
filenames]])
|
||||
|
||||
if os.path.exists("README.rst"):
|
||||
long_description = codecs.open('README.rst', "r", "utf-8").read()
|
||||
else:
|
||||
long_description = "See http://pypi.python.org/pypi/kombu"
|
||||
|
||||
setup(
|
||||
name='kombu',
|
||||
version=kombu.__version__,
|
||||
description=kombu.__doc__,
|
||||
author=kombu.__author__,
|
||||
author_email=kombu.__contact__,
|
||||
url=kombu.__homepage__,
|
||||
platforms=["any"],
|
||||
packages=packages,
|
||||
data_files=data_files,
|
||||
zip_safe=False,
|
||||
test_suite="nose.collector",
|
||||
install_requires=[
|
||||
'anyjson',
|
||||
'amqplib>=0.6',
|
||||
],
|
||||
classifiers=[
|
||||
"Development Status :: 4 - Beta",
|
||||
"Framework :: Django",
|
||||
"Operating System :: OS Independent",
|
||||
"Programming Language :: Python",
|
||||
"License :: OSI Approved :: BSD License",
|
||||
"Intended Audience :: Developers",
|
||||
"Topic :: Communications",
|
||||
"Topic :: System :: Distributed Computing",
|
||||
"Topic :: Software Development :: Libraries :: Python Modules",
|
||||
],
|
||||
long_description=long_description,
|
||||
)
|
Loading…
Reference in New Issue