diff --git a/README.rst b/README.rst index fc8c445c..46ad74d6 100644 --- a/README.rst +++ b/README.rst @@ -57,8 +57,8 @@ Kombu is using Sphinx, and the latest documentation is available at GitHub: Quick overview -------------- - .. code-block:: python + from kombu.connection BrokerConnection from kombu.messaging import Exchange, Queue, Consumer, Producer @@ -82,7 +82,6 @@ Quick overview while True: connection.drain_events() - # Consume from several queues on the same channel: video_queue = Queue("video", exchange=media_exchange, key="video") image_queue = Queue("image", exchange=media_exchange, key="image") diff --git a/docs/reference/index.rst b/docs/reference/index.rst index b485fc8f..5a924b30 100644 --- a/docs/reference/index.rst +++ b/docs/reference/index.rst @@ -16,10 +16,12 @@ kombu.pidbox kombu.exceptions kombu.transport - kombu.transport.pypika kombu.transport.pyamqplib - kombu.transport.pyredis + kombu.transport.pypika kombu.transport.memory + kombu.transport.pyredis + kombu.transport.mongodb + kombu.transport.pycouchdb kombu.transport.base kombu.transport.virtual kombu.transport.virtual.exchange diff --git a/docs/reference/kombu.transport.pycouchdb.rst b/docs/reference/kombu.transport.pycouchdb.rst new file mode 100644 index 00000000..b9407437 --- /dev/null +++ b/docs/reference/kombu.transport.pycouchdb.rst @@ -0,0 +1,25 @@ +.. currentmodule:: kombu.transport.pycouchdb + +.. automodule:: kombu.transport.pycouchdb + + .. contents:: + :local: + + Transport + --------- + + .. autoclass:: Transport + :members: + :undoc-members: + + Channel + ------- + + .. autoclass:: Channel + :members: + :undoc-members: + + Functions + --------- + + .. autofunction:: create_message_view diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py index 584e5051..960350c1 100644 --- a/kombu/transport/__init__.py +++ b/kombu/transport/__init__.py @@ -21,6 +21,7 @@ TRANSPORT_ALIASES = { "memory": "kombu.transport.memory.Transport", "redis": "kombu.transport.pyredis.Transport", "mongodb": "kombu.transport.mongodb.Transport", + "couchdb": "kombu.transport.pycouchdb.Transport", } _transport_cache = {} diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py index a0fe927d..a74030c4 100644 --- a/kombu/transport/mongodb.py +++ b/kombu/transport/mongodb.py @@ -37,7 +37,7 @@ class Channel(virtual.Channel): "messages", query={"queue": queue}, remove=True) except OperationFailure: raise Empty() - return msg["value"]["payload"] + return deserialize(msg["value"]["payload"]) def _size(self, queue): return self.client.count() diff --git a/kombu/transport/pycouchdb.py b/kombu/transport/pycouchdb.py new file mode 100644 index 00000000..8b4f2e88 --- /dev/null +++ b/kombu/transport/pycouchdb.py @@ -0,0 +1,110 @@ +""" +kombu.transport.pycouchdb +========================= + +CouchDB transport. + +:copyright: (c) 2009 - 2010 by David Clymer +:license: BSD, see LICENSE for more details. + +""" +from Queue import Empty + +import couchdb + +from anyjson import serialize, deserialize + +from kombu.transport import virtual + +DEFAULT_PORT = 5984 +DEFAULT_DATABASE = "kombu_default" + +__author__ = "David Clymer " + + +def create_message_view(db): + from couchdb import design + + view = design.ViewDefinition("kombu", "messages", """ + function (doc) { + if (doc.queue && doc.payload) + emit(doc.queue, doc); + } + """) + if not view.get_doc(db): + view.sync(db) + + +class Channel(virtual.Channel): + _client = None + _mongoconn = None + _database = None + + view_created = False + + def _new_queue(self, queue, **kwargs): + pass + + def _query(self, queue, **kwargs): + # If the message view is not yet set up, we'll need it now. + if not self.view_created: + create_message_view(self.client) + self.view_created = True + + if not queue: + raise Empty() + + return self.client.view("kombu/messages", key=queue, **kwargs) + + def _get(self, queue): + result = self._query(queue, limit=1) + if not result: + raise Empty() + + item = result.rows[0].value + self.client.delete(item) + return deserialize(item["payload"]) + + def _purge(self, queue): + result = self._query(queue) + for item in result: + self.client.delete(item.value) + return len(result) + + def _size(self, queue): + return len(self._query(queue)) + + def _open(self): + conninfo = self.connection.client + dbname = conninfo.virtual_host + proto = conninfo.ssl and "https" or "http" + if not dbname or dbname == "/": + dbname = DEFAULT_DATABASE + server = couchdb.Server('%s://%s:%s/' % (proto, + conninfo.hostname, + conninfo.port)) + try: + return server.create(dbname) + except couchdb.PreconditionFailed: + return server[dbname] + + @property + def client(self): + if self._client is None: + self._client = self._open() + return self._client + + +class Transport(virtual.Transport): + Channel = Channel + + interval = 1 + default_port = DEFAULT_PORT + connection_errors = (couchdb.HTTPError, + couchdb.ServerError, + couchdb.Unauthorized) + channel_errors = (couchdb.HTTPError, + couchdb.ServerError, + couchdb.PreconditionFailed, + couchdb.ResourceConflict, + couchdb.ResourceNotFound)