diff --git a/AUTHORS b/AUTHORS index 1586cb45..e5632029 100644 --- a/AUTHORS +++ b/AUTHORS @@ -23,6 +23,7 @@ Ian Struble Jason Cater Jeff Balogh John Spray +Keith Fitzgerald Marcin Lulek (ergo) Mher Movsisyan Noah Kantrowitz diff --git a/THANKS b/THANKS index 6757ee47..85612f63 100644 --- a/THANKS +++ b/THANKS @@ -1,6 +1,32 @@ -Thanks to Barry Pederson for the py-amqplib library. -Thanks to Grégoire Cachet for bug reports. -Thanks to Martin Mahner for the Sphinx theme. -Thanks to jcater for bug reports. -Thanks to sebest for bug reports. -Thanks to greut for bug reports +======== + THANKS +======== + +From ``carrot`` THANKS file +=========================== + +* Thanks to Barry Pederson for the py-amqplib library. +* Thanks to Grégoire Cachet for bug reports. +* Thanks to Martin Mahner for the Sphinx theme. +* Thanks to jcater for bug reports. +* Thanks to sebest for bug reports. +* Thanks to greut for bug reports + +From ``django-kombu`` THANKS file +================================= + +* Thanks to Rajesh Dhawan and other authors of django-queue-service + for the database model implementation. + See http://code.google.com/p/django-queue-service/. + +From ``kombu-sqlalchemy`` THANKS file +===================================== + +* Thanks to Rajesh Dhawan and other authors of django-queue-service + for the database model implementation. + See http://code.google.com/p/django-queue-service/. + +* Thanks to haridsv for the draft SQLAlchemy port (which can still + be found at http://github.com/haridsv/celery-alchemy-poc) + + diff --git a/kombu/tests/__init__.py b/kombu/tests/__init__.py index 27b5e13e..0f58c4aa 100644 --- a/kombu/tests/__init__.py +++ b/kombu/tests/__init__.py @@ -34,6 +34,7 @@ def setup(): # so coverage sees all our modules. for module in moduleindex: try: + print("IMPORT: %r" % (module, )) __import__(module) except ImportError: pass diff --git a/kombu/tests/test_transport.py b/kombu/tests/test_transport.py index 8b6883a9..e80db1f2 100644 --- a/kombu/tests/test_transport.py +++ b/kombu/tests/test_transport.py @@ -8,15 +8,6 @@ from .utils import mask_modules, module_exists class test_transport(unittest.TestCase): - def test_django_transport(self): - self.assertRaises( - ImportError, - mask_modules("djkombu")(transport.resolve_transport), "django") - - self.assertTupleEqual( - module_exists("djkombu")(transport.resolve_transport)("django"), - ("djkombu.transport", "DatabaseTransport")) - def test_resolve_transport__no_class_name(self): self.assertRaises(KeyError, transport.resolve_transport, "nonexistant") diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py index dbcb959e..4441b8d5 100644 --- a/kombu/transport/__init__.py +++ b/kombu/transport/__init__.py @@ -14,36 +14,9 @@ import sys DEFAULT_TRANSPORT = "kombu.transport.amqplib.Transport" -MISSING_LIB = """ - The %(feature)s requires the %(lib)s module to be - installed; http://pypi.python.org/pypi/%(lib)s - - Use pip to install this module:: - - $ pip install %(lib)s - - or using easy_install:: - - $ easy_install %(lib)s -""" - - -def _requires(feature, module, lib): - try: - __import__(module) - except ImportError: - raise ImportError(MISSING_LIB % {"feature": feature, - "module": module, - "lib": lib}) - - -def _django_transport(): - _requires("Django transport", "djkombu", "django-kombu") - return "djkombu.transport.DatabaseTransport" - def _ghettoq(name, new, alias=None): - xxx = new + xxx = new # stupid enclosing def __inner(): import warnings @@ -74,11 +47,10 @@ TRANSPORT_ALIASES = { "beanstalk": "kombu.transport.beanstalk.Transport", "mongodb": "kombu.transport.mongodb.Transport", "couchdb": "kombu.transport.couchdb.Transport", - "django": _django_transport, + "django": "kombu.transport.django.Transport", "sqlalchemy": "kombu.transport.sqlalchemy.Transport", "ghettoq.taproot.Redis": _ghettoq("Redis", "redis", "redis"), - "ghettoq.taproot.Database": _ghettoq("Database", _django_transport, - "django"), + "ghettoq.taproot.Database": _ghettoq("Database", "django", "django"), "ghettoq.taproot.MongoDB": _ghettoq("MongoDB", "mongodb"), "ghettoq.taproot.Beanstalk": _ghettoq("Beanstalk", "beanstalk"), "ghettoq.taproot.CouchDB": _ghettoq("CouchDB", "couchdb"), diff --git a/kombu/transport/django/__init__.py b/kombu/transport/django/__init__.py new file mode 100644 index 00000000..7a821187 --- /dev/null +++ b/kombu/transport/django/__init__.py @@ -0,0 +1,61 @@ +"""Kombu transport using the Django database as a message store.""" +from __future__ import absolute_import + +from Queue import Empty + +from anyjson import serialize, deserialize + +from django.conf import settings +from django.core import exceptions as errors + +from .. import virtual + +from .models import Queue + +VERSION = (0, 9, 4) +__version__ = ".".join(map(str, VERSION)) + +POLLING_INTERVAL = getattr(settings, "DJKOMBU_POLLING_INTERVAL", 5.0) + + +class Channel(virtual.Channel): + + def _new_queue(self, queue, **kwargs): + Queue.objects.get_or_create(name=queue) + + def _put(self, queue, message, **kwargs): + Queue.objects.publish(queue, serialize(message)) + + def basic_consume(self, queue, *args, **kwargs): + qinfo = self.state.bindings[queue] + exchange = qinfo[0] + if self.typeof(exchange).type == "fanout": + return + super(Channel, self).basic_consume(queue, *args, **kwargs) + + def _get(self, queue): + #self.refresh_connection() + m = Queue.objects.fetch(queue) + if m: + return deserialize(m) + raise Empty() + + def _size(self, queue): + return Queue.objects.size(queue) + + def _purge(self, queue): + return Queue.objects.purge(queue) + + def refresh_connection(self): + from django import db + db.close_connection() + + +class DatabaseTransport(virtual.Transport): + Channel = Channel + + default_port = 0 + polling_interval = POLLING_INTERVAL + connection_errors = () + channel_errors = (errors.ObjectDoesNotExist, + errors.MultipleObjectsReturned) diff --git a/kombu/transport/django/management/__init__.py b/kombu/transport/django/management/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/kombu/transport/django/management/commands/__init__.py b/kombu/transport/django/management/commands/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/kombu/transport/django/management/commands/clean_kombu_messages.py b/kombu/transport/django/management/commands/clean_kombu_messages.py new file mode 100644 index 00000000..8c7abcf4 --- /dev/null +++ b/kombu/transport/django/management/commands/clean_kombu_messages.py @@ -0,0 +1,25 @@ +from __future__ import absolute_import + +from django.core.management.base import BaseCommand + + +def pluralize(desc, value): + if value > 1: + return desc + 's' + return desc + + +class Command(BaseCommand): + requires_model_validation = True + + def handle(self, *args, **options): + from ...models import Message + + count = Message.objects.filter(visible=False).count() + + print("Removing %s invisible %s... " % ( + count, pluralize("message", count)) + Message.objects.cleanup() + + + diff --git a/kombu/transport/django/managers.py b/kombu/transport/django/managers.py new file mode 100644 index 00000000..ffb80eff --- /dev/null +++ b/kombu/transport/django/managers.py @@ -0,0 +1,70 @@ +from __future__ import absolute_import + +from django.db import transaction, connection, models +try: + from django.db import connections, router +except ImportError: # pre-Django 1.2 + connections = router = None + + +class QueueManager(models.Manager): + + def publish(self, queue_name, payload): + queue, created = self.get_or_create(name=queue_name) + queue.messages.create(payload=payload) + + def fetch(self, queue_name): + try: + queue = self.get(name=queue_name) + except self.model.DoesNotExist: + return + + return queue.messages.pop() + + def size(self, queue_name): + return self.get(name=queue_name).messages.count() + + def purge(self, queue_name): + try: + queue = self.get(name=queue_name) + except self.model.DoesNotExist: + return + + messages = queue.messages.all() + count = messages.count() + messages.delete() + return count + + +class MessageManager(models.Manager): + _messages_received = [0] + cleanup_every = 10 + + def pop(self): + try: + resultset = self.filter(visible=True).order_by('sent_at', 'id') + result = resultset[0:1].get() + result.visible = False + result.save() + recv = self.__class__._messages_received + recv[0] += 1 + if not recv[0] % self.cleanup_every: + self.cleanup() + return result.payload + except self.model.DoesNotExist: + pass + + def cleanup(self): + cursor = self.connection_for_write().cursor() + try: + cursor.execute("DELETE FROM %s WHERE visible=%%s" % ( + self.model._meta.db_table, ), (False, )) + except: + transaction.rollback_unless_managed() + else: + transaction.commit_unless_managed() + + def connection_for_write(self): + if connections: + return connections[router.db_for_write(self.model)] + return connection diff --git a/kombu/transport/django/models.py b/kombu/transport/django/models.py new file mode 100644 index 00000000..ef6984f2 --- /dev/null +++ b/kombu/transport/django/models.py @@ -0,0 +1,32 @@ +from __future__ import absolute_import + +from django.db import models +from django.utils.translation import ugettext_lazy as _ + +from .managers import QueueManager, MessageManager + + +class Queue(models.Model): + name = models.CharField(_("name"), max_length=200, unique=True) + + objects = QueueManager() + + class Meta: + db_table = "djkombu_queue" + verbose_name = _("queue") + verbose_name_plural = _("queues") + + +class Message(models.Model): + visible = models.BooleanField(default=True, db_index=True) + sent_at = models.DateTimeField(null=True, blank=True, db_index=True, + auto_now_add=True) + payload = models.TextField(_("payload"), null=False) + queue = models.ForeignKey(Queue, related_name="messages") + + objects = MessageManager() + + class Meta: + db_table = "djkombu_message" + verbose_name = _("message") + verbose_name_plural = _("messages")