django-kombu is now part of kombu as kombu.transport.django

This commit is contained in:
Ask Solem 2011-09-09 14:37:54 +01:00
parent 2daf3c25a9
commit 2f6fe164e8
11 changed files with 225 additions and 46 deletions

View File

@ -23,6 +23,7 @@ Ian Struble <istruble@gmail.com>
Jason Cater <jason@ncsfulfillment.com>
Jeff Balogh <me@jeffbalogh.org>
John Spray <jcspray@gmail.com>
Keith Fitzgerald <ghostrocket@me.com>
Marcin Lulek (ergo) <info@webreactor.eu>
Mher Movsisyan <mher.movsisyan@gmail.com>
Noah Kantrowitz <noah@coderanger.net>

38
THANKS
View File

@ -1,6 +1,32 @@
Thanks to Barry Pederson <bp@barryp.org> for the py-amqplib library.
Thanks to Grégoire Cachet <gregoire@audacy.fr> for bug reports.
Thanks to Martin Mahner for the Sphinx theme.
Thanks to jcater for bug reports.
Thanks to sebest for bug reports.
Thanks to greut for bug reports
========
THANKS
========
From ``carrot`` THANKS file
===========================
* Thanks to Barry Pederson <bp@barryp.org> for the py-amqplib library.
* Thanks to Grégoire Cachet <gregoire@audacy.fr> for bug reports.
* Thanks to Martin Mahner for the Sphinx theme.
* Thanks to jcater for bug reports.
* Thanks to sebest for bug reports.
* Thanks to greut for bug reports
From ``django-kombu`` THANKS file
=================================
* Thanks to Rajesh Dhawan and other authors of django-queue-service
for the database model implementation.
See http://code.google.com/p/django-queue-service/.
From ``kombu-sqlalchemy`` THANKS file
=====================================
* Thanks to Rajesh Dhawan and other authors of django-queue-service
for the database model implementation.
See http://code.google.com/p/django-queue-service/.
* Thanks to haridsv for the draft SQLAlchemy port (which can still
be found at http://github.com/haridsv/celery-alchemy-poc)

View File

@ -34,6 +34,7 @@ def setup():
# so coverage sees all our modules.
for module in moduleindex:
try:
print("IMPORT: %r" % (module, ))
__import__(module)
except ImportError:
pass

View File

@ -8,15 +8,6 @@ from .utils import mask_modules, module_exists
class test_transport(unittest.TestCase):
def test_django_transport(self):
self.assertRaises(
ImportError,
mask_modules("djkombu")(transport.resolve_transport), "django")
self.assertTupleEqual(
module_exists("djkombu")(transport.resolve_transport)("django"),
("djkombu.transport", "DatabaseTransport"))
def test_resolve_transport__no_class_name(self):
self.assertRaises(KeyError, transport.resolve_transport,
"nonexistant")

View File

@ -14,36 +14,9 @@ import sys
DEFAULT_TRANSPORT = "kombu.transport.amqplib.Transport"
MISSING_LIB = """
The %(feature)s requires the %(lib)s module to be
installed; http://pypi.python.org/pypi/%(lib)s
Use pip to install this module::
$ pip install %(lib)s
or using easy_install::
$ easy_install %(lib)s
"""
def _requires(feature, module, lib):
try:
__import__(module)
except ImportError:
raise ImportError(MISSING_LIB % {"feature": feature,
"module": module,
"lib": lib})
def _django_transport():
_requires("Django transport", "djkombu", "django-kombu")
return "djkombu.transport.DatabaseTransport"
def _ghettoq(name, new, alias=None):
xxx = new
xxx = new # stupid enclosing
def __inner():
import warnings
@ -74,11 +47,10 @@ TRANSPORT_ALIASES = {
"beanstalk": "kombu.transport.beanstalk.Transport",
"mongodb": "kombu.transport.mongodb.Transport",
"couchdb": "kombu.transport.couchdb.Transport",
"django": _django_transport,
"django": "kombu.transport.django.Transport",
"sqlalchemy": "kombu.transport.sqlalchemy.Transport",
"ghettoq.taproot.Redis": _ghettoq("Redis", "redis", "redis"),
"ghettoq.taproot.Database": _ghettoq("Database", _django_transport,
"django"),
"ghettoq.taproot.Database": _ghettoq("Database", "django", "django"),
"ghettoq.taproot.MongoDB": _ghettoq("MongoDB", "mongodb"),
"ghettoq.taproot.Beanstalk": _ghettoq("Beanstalk", "beanstalk"),
"ghettoq.taproot.CouchDB": _ghettoq("CouchDB", "couchdb"),

View File

@ -0,0 +1,61 @@
"""Kombu transport using the Django database as a message store."""
from __future__ import absolute_import
from Queue import Empty
from anyjson import serialize, deserialize
from django.conf import settings
from django.core import exceptions as errors
from .. import virtual
from .models import Queue
VERSION = (0, 9, 4)
__version__ = ".".join(map(str, VERSION))
POLLING_INTERVAL = getattr(settings, "DJKOMBU_POLLING_INTERVAL", 5.0)
class Channel(virtual.Channel):
def _new_queue(self, queue, **kwargs):
Queue.objects.get_or_create(name=queue)
def _put(self, queue, message, **kwargs):
Queue.objects.publish(queue, serialize(message))
def basic_consume(self, queue, *args, **kwargs):
qinfo = self.state.bindings[queue]
exchange = qinfo[0]
if self.typeof(exchange).type == "fanout":
return
super(Channel, self).basic_consume(queue, *args, **kwargs)
def _get(self, queue):
#self.refresh_connection()
m = Queue.objects.fetch(queue)
if m:
return deserialize(m)
raise Empty()
def _size(self, queue):
return Queue.objects.size(queue)
def _purge(self, queue):
return Queue.objects.purge(queue)
def refresh_connection(self):
from django import db
db.close_connection()
class DatabaseTransport(virtual.Transport):
Channel = Channel
default_port = 0
polling_interval = POLLING_INTERVAL
connection_errors = ()
channel_errors = (errors.ObjectDoesNotExist,
errors.MultipleObjectsReturned)

View File

@ -0,0 +1,25 @@
from __future__ import absolute_import
from django.core.management.base import BaseCommand
def pluralize(desc, value):
if value > 1:
return desc + 's'
return desc
class Command(BaseCommand):
requires_model_validation = True
def handle(self, *args, **options):
from ...models import Message
count = Message.objects.filter(visible=False).count()
print("Removing %s invisible %s... " % (
count, pluralize("message", count))
Message.objects.cleanup()

View File

@ -0,0 +1,70 @@
from __future__ import absolute_import
from django.db import transaction, connection, models
try:
from django.db import connections, router
except ImportError: # pre-Django 1.2
connections = router = None
class QueueManager(models.Manager):
def publish(self, queue_name, payload):
queue, created = self.get_or_create(name=queue_name)
queue.messages.create(payload=payload)
def fetch(self, queue_name):
try:
queue = self.get(name=queue_name)
except self.model.DoesNotExist:
return
return queue.messages.pop()
def size(self, queue_name):
return self.get(name=queue_name).messages.count()
def purge(self, queue_name):
try:
queue = self.get(name=queue_name)
except self.model.DoesNotExist:
return
messages = queue.messages.all()
count = messages.count()
messages.delete()
return count
class MessageManager(models.Manager):
_messages_received = [0]
cleanup_every = 10
def pop(self):
try:
resultset = self.filter(visible=True).order_by('sent_at', 'id')
result = resultset[0:1].get()
result.visible = False
result.save()
recv = self.__class__._messages_received
recv[0] += 1
if not recv[0] % self.cleanup_every:
self.cleanup()
return result.payload
except self.model.DoesNotExist:
pass
def cleanup(self):
cursor = self.connection_for_write().cursor()
try:
cursor.execute("DELETE FROM %s WHERE visible=%%s" % (
self.model._meta.db_table, ), (False, ))
except:
transaction.rollback_unless_managed()
else:
transaction.commit_unless_managed()
def connection_for_write(self):
if connections:
return connections[router.db_for_write(self.model)]
return connection

View File

@ -0,0 +1,32 @@
from __future__ import absolute_import
from django.db import models
from django.utils.translation import ugettext_lazy as _
from .managers import QueueManager, MessageManager
class Queue(models.Model):
name = models.CharField(_("name"), max_length=200, unique=True)
objects = QueueManager()
class Meta:
db_table = "djkombu_queue"
verbose_name = _("queue")
verbose_name_plural = _("queues")
class Message(models.Model):
visible = models.BooleanField(default=True, db_index=True)
sent_at = models.DateTimeField(null=True, blank=True, db_index=True,
auto_now_add=True)
payload = models.TextField(_("payload"), null=False)
queue = models.ForeignKey(Queue, related_name="messages")
objects = MessageManager()
class Meta:
db_table = "djkombu_message"
verbose_name = _("message")
verbose_name_plural = _("messages")