mirror of https://github.com/celery/kombu.git
PEP8ify + pyflakes
This commit is contained in:
parent
7b706c40ab
commit
d1cbcef94c
|
@ -1,16 +1,12 @@
|
|||
from __future__ import absolute_import
|
||||
from __future__ import with_statement
|
||||
|
||||
import socket
|
||||
|
||||
from ..connection import BrokerConnection
|
||||
from ..entity import Exchange, Queue
|
||||
from ..messaging import Consumer, Producer
|
||||
|
||||
from .utils import TestCase
|
||||
|
||||
import sys
|
||||
|
||||
|
||||
class test_MongoDBTransport(TestCase):
|
||||
|
||||
|
@ -23,7 +19,6 @@ class test_MongoDBTransport(TestCase):
|
|||
self.q2 = Queue("test_transport_memory2",
|
||||
exchange=self.e,
|
||||
routing_key="test_transport_mongodb2")
|
||||
|
||||
|
||||
def test_produce_consume(self):
|
||||
channel = self.c.channel()
|
||||
|
|
|
@ -13,6 +13,7 @@ from __future__ import absolute_import
|
|||
from Queue import Empty
|
||||
|
||||
import pymongo
|
||||
|
||||
from pymongo import errors
|
||||
from anyjson import loads, dumps
|
||||
from pymongo.connection import Connection
|
||||
|
@ -22,18 +23,21 @@ from . import virtual
|
|||
DEFAULT_HOST = "127.0.0.1"
|
||||
DEFAULT_PORT = 27017
|
||||
|
||||
__author__ = "Flavio [FlaPer87] Percoco Premoli <flaper87@flaper87.org>; Scott Lyons <scottalyons@gmail.com>"
|
||||
__author__ = """\
|
||||
Flavio [FlaPer87] Percoco Premoli <flaper87@flaper87.org>;\
|
||||
Scott Lyons <scottalyons@gmail.com>;\
|
||||
"""
|
||||
|
||||
|
||||
class Channel(virtual.Channel):
|
||||
_client = None
|
||||
supports_fanout=True
|
||||
supports_fanout = True
|
||||
_fanout_queues = {}
|
||||
|
||||
|
||||
def __init__(self, *vargs, **kwargs):
|
||||
super_ = super(Channel, self)
|
||||
super_.__init__(*vargs, **kwargs)
|
||||
|
||||
|
||||
self._queue_cursors = {}
|
||||
self._queue_readcounts = {}
|
||||
|
||||
|
@ -44,7 +48,7 @@ class Channel(virtual.Channel):
|
|||
try:
|
||||
if queue in self._fanout_queues:
|
||||
msg = self._queue_cursors[queue].next()
|
||||
self._queue_readcounts[queue]+=1
|
||||
self._queue_readcounts[queue] += 1
|
||||
return loads(msg["payload"])
|
||||
else:
|
||||
msg = self.client.command("findandmodify", "messages",
|
||||
|
@ -56,20 +60,22 @@ class Channel(virtual.Channel):
|
|||
raise
|
||||
except StopIteration:
|
||||
raise Empty()
|
||||
|
||||
|
||||
# as of mongo 2.0 empty results won't raise an error
|
||||
if msg['value'] is None:
|
||||
if msg["value"] is None:
|
||||
raise Empty()
|
||||
return loads(msg["value"]["payload"])
|
||||
|
||||
def _size(self, queue):
|
||||
if queue in self._fanout_queues:
|
||||
return self._queue_cursors[queue].count() - self._queue_readcounts[queue]
|
||||
|
||||
return (self._queue_cursors[queue].count() -
|
||||
self._queue_readcounts[queue])
|
||||
|
||||
return self.client.messages.find({"queue": queue}).count()
|
||||
|
||||
def _put(self, queue, message, **kwargs):
|
||||
self.client.messages.insert({"payload": dumps(message), "queue": queue})
|
||||
self.client.messages.insert({"payload": dumps(message),
|
||||
"queue": queue})
|
||||
|
||||
def _purge(self, queue):
|
||||
size = self._size(queue)
|
||||
|
@ -100,55 +106,62 @@ class Channel(virtual.Channel):
|
|||
database = getattr(mongoconn, dbname)
|
||||
if conninfo.userid:
|
||||
database.authenticate(conninfo.userid, conninfo.password)
|
||||
|
||||
|
||||
self.db = database
|
||||
col = database.messages
|
||||
col.ensure_index([("queue", 1)])
|
||||
|
||||
|
||||
if "messages.broadcast" not in database.collection_names():
|
||||
capsize = conninfo.capped_queue_size or 100000
|
||||
database.create_collection("messages.broadcast", size=capsize, capped=True)
|
||||
|
||||
database.create_collection("messages.broadcast", size=capsize,
|
||||
capped=True)
|
||||
|
||||
self.bcast = getattr(database, "messages.broadcast")
|
||||
self.bcast.ensure_index([("queue", 1)])
|
||||
|
||||
|
||||
self.routing = getattr(database, "messages.routing")
|
||||
self.routing.ensure_index([("queue", 1), ("exchange", 1)])
|
||||
return database
|
||||
|
||||
|
||||
def get_table(self, exchange):
|
||||
"""Get table of bindings for `exchange`."""
|
||||
brokerRoutes = self.client.messages.routing.find({"exchange":exchange})
|
||||
"""Get table of bindings for ``exchange``."""
|
||||
brokerRoutes = self.client.messages.routing.find({
|
||||
"exchange": exchange})
|
||||
|
||||
localRoutes = self.state.exchanges[exchange]["table"]
|
||||
for route in brokerRoutes:
|
||||
localRoutes.append((route["routing_key"], route["pattern"], route["queue"]))
|
||||
localRoutes.append((route["routing_key"],
|
||||
route["pattern"],
|
||||
route["queue"]))
|
||||
return set(localRoutes)
|
||||
|
||||
|
||||
def _put_fanout(self, exchange, message, **kwargs):
|
||||
"""Deliver fanout message."""
|
||||
self.client.messages.broadcast.insert({"payload": dumps(message), "queue": exchange})
|
||||
|
||||
self.client.messages.broadcast.insert({"payload": dumps(message),
|
||||
"queue": exchange})
|
||||
|
||||
def _queue_bind(self, exchange, routing_key, pattern, queue):
|
||||
if self.typeof(exchange).type == "fanout":
|
||||
cursor = self.bcast.find(query={"queue":exchange}, sort=[("$natural", 1)], tailable=True)
|
||||
cursor = self.bcast.find(query={"queue": exchange},
|
||||
sort=[("$natural", 1)], tailable=True)
|
||||
# Fast forward the cursor past old events
|
||||
self._queue_cursors[queue] = cursor.skip(cursor.count())
|
||||
self._queue_readcounts[queue] = cursor.count()
|
||||
self._fanout_queues[queue] = exchange
|
||||
|
||||
meta = dict(exchange=exchange, queue=queue, routing_key=routing_key, pattern=pattern)
|
||||
|
||||
meta = {"exchange": exchange,
|
||||
"queue": queue,
|
||||
"routing_key": routing_key,
|
||||
"pattern": pattern}
|
||||
self.client.messages.routing.update(meta, meta, upsert=True)
|
||||
|
||||
|
||||
def queue_delete(self, queue, if_unusued=False, if_empty=False, **kwargs):
|
||||
self.routing.remove({"queue":queue})
|
||||
super(Channel, self).queue_delete(queue, if_unusued, if_empty, **kwargs)
|
||||
|
||||
def queue_delete(self, queue, **kwargs):
|
||||
self.routing.remove({"queue": queue})
|
||||
super(Channel, self).queue_delete(queue, **kwargs)
|
||||
if queue in self._fanout_queues:
|
||||
self._queue_cursors[queue].close()
|
||||
del self._queue_cursors[queue]
|
||||
del self._fanout_queues[queue]
|
||||
|
||||
self._queue_cursors.pop(queue, None)
|
||||
self._fanout_queues.pop(queue, None)
|
||||
|
||||
@property
|
||||
def client(self):
|
||||
|
|
Loading…
Reference in New Issue