From be79b3d4cdbfe80cc5400396d7d81abeb73d0136 Mon Sep 17 00:00:00 2001 From: Amin Ghadersohi Date: Wed, 17 May 2017 01:17:47 -0700 Subject: [PATCH] Re-Adding support for sqlalchemy as it is needed by Apache project Airflow (#687) * Re-Adding support for sqlalchemy as it is needed by Apache project Airflow * Re-Adding support for sqlalchemy as it is needed by Apache project Airflow --- .gitignore | 2 + .landscape.yml | 3 + docs/reference/index.rst | 2 + docs/reference/kombu.transport.SLMQ.rst | 1 + .../kombu.transport.sqlalchemy.models.rst | 32 ++++ docs/reference/kombu.transport.sqlalchemy.rst | 25 +++ kombu/transport/__init__.py | 2 + kombu/transport/etcd.py | 1 - kombu/transport/sqlalchemy/__init__.py | 164 ++++++++++++++++++ kombu/transport/sqlalchemy/models.py | 67 +++++++ requirements/extras/sqlalchemy.txt | 1 + requirements/test-ci.txt | 1 + setup.py | 1 + t/integration/tests/test_sqla.py | 13 ++ t/unit/transport/test_sqlalchemy.py | 50 ++++++ 15 files changed, 364 insertions(+), 1 deletion(-) create mode 100644 .landscape.yml create mode 100644 docs/reference/kombu.transport.sqlalchemy.models.rst create mode 100644 docs/reference/kombu.transport.sqlalchemy.rst create mode 100644 kombu/transport/sqlalchemy/__init__.py create mode 100644 kombu/transport/sqlalchemy/models.py create mode 100644 requirements/extras/sqlalchemy.txt create mode 100644 t/integration/tests/test_sqla.py create mode 100644 t/unit/transport/test_sqlalchemy.py diff --git a/.gitignore b/.gitignore index 066482c3..74b71cd8 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,5 @@ htmlcov/ test.db coverage.xml venv/ +env +.eggs diff --git a/.landscape.yml b/.landscape.yml new file mode 100644 index 00000000..f90444af --- /dev/null +++ b/.landscape.yml @@ -0,0 +1,3 @@ +pylint: + disable: + - cyclic-import diff --git a/docs/reference/index.rst b/docs/reference/index.rst index b5cd6647..550b9d7a 100644 --- a/docs/reference/index.rst +++ b/docs/reference/index.rst @@ -48,6 +48,8 @@ kombu.transport.etcd kombu.transport.zookeeper kombu.transport.filesystem + kombu.transport.sqlalchemy + kombu.transport.sqlalchemy.models kombu.transport.SQS kombu.transport.SLMQ kombu.transport.pyro diff --git a/docs/reference/kombu.transport.SLMQ.rst b/docs/reference/kombu.transport.SLMQ.rst index 95c57d7f..da385f86 100644 --- a/docs/reference/kombu.transport.SLMQ.rst +++ b/docs/reference/kombu.transport.SLMQ.rst @@ -2,6 +2,7 @@ SLMQ Transport - ``kombu.transport.SLMQ`` ============================================= + .. currentmodule:: kombu.transport.SLMQ .. automodule:: kombu.transport.SLMQ diff --git a/docs/reference/kombu.transport.sqlalchemy.models.rst b/docs/reference/kombu.transport.sqlalchemy.models.rst new file mode 100644 index 00000000..f4aa3ec3 --- /dev/null +++ b/docs/reference/kombu.transport.sqlalchemy.models.rst @@ -0,0 +1,32 @@ +===================================================================== + SQLAlchemy Transport Model - ``kombu.transport.sqlalchemy.models`` +===================================================================== + + +.. currentmodule:: kombu.transport.sqlalchemy.models + +.. automodule:: kombu.transport.sqlalchemy.models + + .. contents:: + :local: + + Models + ------ + + .. autoclass:: Queue + + .. autoattribute:: Queue.id + + .. autoattribute:: Queue.name + + .. autoclass:: Message + + .. autoattribute:: Message.id + + .. autoattribute:: Message.visible + + .. autoattribute:: Message.sent_at + + .. autoattribute:: Message.payload + + .. autoattribute:: Message.version diff --git a/docs/reference/kombu.transport.sqlalchemy.rst b/docs/reference/kombu.transport.sqlalchemy.rst new file mode 100644 index 00000000..848110d3 --- /dev/null +++ b/docs/reference/kombu.transport.sqlalchemy.rst @@ -0,0 +1,25 @@ +=========================================================== + SQLAlchemy Transport Model - kombu.transport.sqlalchemy +=========================================================== + + +.. currentmodule:: kombu.transport.sqlalchemy + +.. automodule:: kombu.transport.sqlalchemy + + .. contents:: + :local: + + Transport + --------- + + .. autoclass:: Transport + :members: + :undoc-members: + + Channel + ------- + + .. autoclass:: Channel + :members: + :undoc-members: diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py index 7fb689ed..535a2165 100644 --- a/kombu/transport/__init__.py +++ b/kombu/transport/__init__.py @@ -28,6 +28,8 @@ TRANSPORT_ALIASES = { 'sqs': 'kombu.transport.SQS:Transport', 'mongodb': 'kombu.transport.mongodb:Transport', 'zookeeper': 'kombu.transport.zookeeper:Transport', + 'sqlalchemy': 'kombu.transport.sqlalchemy:Transport', + 'sqla': 'kombu.transport.sqlalchemy:Transport', 'SLMQ': 'kombu.transport.SLMQ.Transport', 'slmq': 'kombu.transport.SLMQ.Transport', 'filesystem': 'kombu.transport.filesystem:Transport', diff --git a/kombu/transport/etcd.py b/kombu/transport/etcd.py index 1dcf2de0..025c1d7a 100644 --- a/kombu/transport/etcd.py +++ b/kombu/transport/etcd.py @@ -259,7 +259,6 @@ class Transport(virtual.Transport): """Return the version of the etcd library. .. note:: - python-etcd has no __version__. This is a workaround. """ try: diff --git a/kombu/transport/sqlalchemy/__init__.py b/kombu/transport/sqlalchemy/__init__.py new file mode 100644 index 00000000..bcf9ed5b --- /dev/null +++ b/kombu/transport/sqlalchemy/__init__.py @@ -0,0 +1,164 @@ +"""Kombu transport using SQLAlchemy as the message store.""" +# SQLAlchemy overrides != False to have special meaning and pep8 complains +# flake8: noqa + +from __future__ import absolute_import, unicode_literals + +from json import loads, dumps + +from sqlalchemy import create_engine +from sqlalchemy.exc import OperationalError +from sqlalchemy.orm import sessionmaker + +from kombu.five import Empty +from kombu.transport import virtual +from kombu.utils import cached_property +from kombu.utils.encoding import bytes_to_str +from .models import (ModelBase, Queue as QueueBase, Message as MessageBase, + class_registry, metadata) + + +VERSION = (1, 1, 0) +__version__ = '.'.join(map(str, VERSION)) + + +class Channel(virtual.Channel): + """The channel class.""" + + _session = None + _engines = {} # engine cache + + def __init__(self, connection, **kwargs): + self._configure_entity_tablenames(connection.client.transport_options) + super(Channel, self).__init__(connection, **kwargs) + + def _configure_entity_tablenames(self, opts): + self.queue_tablename = opts.get('queue_tablename', 'kombu_queue') + self.message_tablename = opts.get('message_tablename', 'kombu_message') + + # + # Define the model definitions. This registers the declarative + # classes with the active SQLAlchemy metadata object. This *must* be + # done prior to the ``create_engine`` call. + # + self.queue_cls and self.message_cls + + def _engine_from_config(self): + conninfo = self.connection.client + transport_options = conninfo.transport_options.copy() + transport_options.pop('queue_tablename', None) + transport_options.pop('message_tablename', None) + return create_engine(conninfo.hostname, **transport_options) + + def _open(self): + conninfo = self.connection.client + if conninfo.hostname not in self._engines: + engine = self._engine_from_config() + Session = sessionmaker(bind=engine) + metadata.create_all(engine) + self._engines[conninfo.hostname] = engine, Session + return self._engines[conninfo.hostname] + + @property + def session(self): + if self._session is None: + _, Session = self._open() + self._session = Session() + return self._session + + def _get_or_create(self, queue): + obj = self.session.query(self.queue_cls) \ + .filter(self.queue_cls.name == queue).first() + if not obj: + obj = self.queue_cls(queue) + self.session.add(obj) + try: + self.session.commit() + except OperationalError: + self.session.rollback() + return obj + + def _new_queue(self, queue, **kwargs): + self._get_or_create(queue) + + def _put(self, queue, payload, **kwargs): + obj = self._get_or_create(queue) + message = self.message_cls(dumps(payload), obj) + self.session.add(message) + try: + self.session.commit() + except OperationalError: + self.session.rollback() + + def _get(self, queue): + obj = self._get_or_create(queue) + if self.session.bind.name == 'sqlite': + self.session.execute('BEGIN IMMEDIATE TRANSACTION') + try: + msg = self.session.query(self.message_cls) \ + .with_lockmode('update') \ + .filter(self.message_cls.queue_id == obj.id) \ + .filter(self.message_cls.visible != False) \ + .order_by(self.message_cls.sent_at) \ + .order_by(self.message_cls.id) \ + .limit(1) \ + .first() + if msg: + msg.visible = False + return loads(bytes_to_str(msg.payload)) + raise Empty() + finally: + self.session.commit() + + def _query_all(self, queue): + obj = self._get_or_create(queue) + return self.session.query(self.message_cls) \ + .filter(self.message_cls.queue_id == obj.id) + + def _purge(self, queue): + count = self._query_all(queue).delete(synchronize_session=False) + try: + self.session.commit() + except OperationalError: + self.session.rollback() + return count + + def _size(self, queue): + return self._query_all(queue).count() + + def _declarative_cls(self, name, base, ns): + if name in class_registry: + return class_registry[name] + return type(str(name), (base, ModelBase), ns) + + @cached_property + def queue_cls(self): + return self._declarative_cls( + 'Queue', + QueueBase, + {'__tablename__': self.queue_tablename} + ) + + @cached_property + def message_cls(self): + return self._declarative_cls( + 'Message', + MessageBase, + {'__tablename__': self.message_tablename} + ) + + +class Transport(virtual.Transport): + """The transport class.""" + + Channel = Channel + + can_parse_url = True + default_port = 0 + driver_type = 'sql' + driver_name = 'sqlalchemy' + connection_errors = (OperationalError, ) + + def driver_version(self): + import sqlalchemy + return sqlalchemy.__version__ diff --git a/kombu/transport/sqlalchemy/models.py b/kombu/transport/sqlalchemy/models.py new file mode 100644 index 00000000..5fd56c0c --- /dev/null +++ b/kombu/transport/sqlalchemy/models.py @@ -0,0 +1,67 @@ +"""Kombu transport using SQLAlchemy as the message store.""" +from __future__ import absolute_import, unicode_literals + +import datetime + +from sqlalchemy import (Column, Integer, String, Text, DateTime, + Sequence, Boolean, ForeignKey, SmallInteger) +from sqlalchemy.ext.declarative import declarative_base, declared_attr +from sqlalchemy.orm import relation +from sqlalchemy.schema import MetaData + +class_registry = {} +metadata = MetaData() +ModelBase = declarative_base(metadata=metadata, class_registry=class_registry) + + +class Queue(object): + """The queue class.""" + + __table_args__ = {'sqlite_autoincrement': True, 'mysql_engine': 'InnoDB'} + + id = Column(Integer, Sequence('queue_id_sequence'), primary_key=True, + autoincrement=True) + name = Column(String(200), unique=True) + + def __init__(self, name): + self.name = name + + def __str__(self): + return ''.format(self=self) + + @declared_attr + def messages(cls): + return relation('Message', backref='queue', lazy='noload') + + +class Message(object): + """The message class.""" + + __table_args__ = {'sqlite_autoincrement': True, 'mysql_engine': 'InnoDB'} + + id = Column(Integer, Sequence('message_id_sequence'), + primary_key=True, autoincrement=True) + visible = Column(Boolean, default=True, index=True) + sent_at = Column('timestamp', DateTime, nullable=True, index=True, + onupdate=datetime.datetime.now) + payload = Column(Text, nullable=False) + version = Column(SmallInteger, nullable=False, default=1) + + __mapper_args__ = {'version_id_col': version} + + def __init__(self, payload, queue): + self.payload = payload + self.queue = queue + + def __str__(self): + return ''.format(self) + + @declared_attr + def queue_id(self): + return Column( + Integer, + ForeignKey( + '%s.id' % class_registry['Queue'].__tablename__, + name='FK_kombu_message_queue' + ) + ) diff --git a/requirements/extras/sqlalchemy.txt b/requirements/extras/sqlalchemy.txt new file mode 100644 index 00000000..39fb2bef --- /dev/null +++ b/requirements/extras/sqlalchemy.txt @@ -0,0 +1 @@ +sqlalchemy diff --git a/requirements/test-ci.txt b/requirements/test-ci.txt index f2318d5a..b3cd5427 100644 --- a/requirements/test-ci.txt +++ b/requirements/test-ci.txt @@ -4,3 +4,4 @@ redis PyYAML msgpack-python>0.2.0 -r extras/sqs.txt +sqlalchemy diff --git a/setup.py b/setup.py index 0e49459f..a1b65879 100644 --- a/setup.py +++ b/setup.py @@ -141,6 +141,7 @@ setup( 'mongodb': extras('mongodb.txt'), 'sqs': extras('sqs.txt'), 'zookeeper': extras('zookeeper.txt'), + 'sqlalchemy': extras('sqlalchemy.txt'), 'librabbitmq': extras('librabbitmq.txt'), 'pyro': extras('pyro.txt'), 'slmq': extras('slmq.txt'), diff --git a/t/integration/tests/test_sqla.py b/t/integration/tests/test_sqla.py new file mode 100644 index 00000000..f145c07e --- /dev/null +++ b/t/integration/tests/test_sqla.py @@ -0,0 +1,13 @@ +from __future__ import absolute_import, unicode_literals + +from funtests import transport + +from kombu.tests.case import skip + + +@skip.unless_module('sqlalchemy') +class test_sqla(transport.TransportCase): + transport = 'sqlalchemy' + prefix = 'sqlalchemy' + event_loop_max = 10 + connection_options = {'hostname': 'sqla+sqlite://'} diff --git a/t/unit/transport/test_sqlalchemy.py b/t/unit/transport/test_sqlalchemy.py new file mode 100644 index 00000000..70235b9a --- /dev/null +++ b/t/unit/transport/test_sqlalchemy.py @@ -0,0 +1,50 @@ +from __future__ import absolute_import, unicode_literals + +import pytest +from case import patch, skip + +from kombu import Connection + + +@skip.unless_module('sqlalchemy') +class test_SqlAlchemy: + + def test_url_parser(self): + with patch('kombu.transport.sqlalchemy.Channel._open'): + url = 'sqlalchemy+sqlite:///celerydb.sqlite' + Connection(url).connect() + + url = 'sqla+sqlite:///celerydb.sqlite' + Connection(url).connect() + + url = 'sqlb+sqlite:///celerydb.sqlite' + with pytest.raises(KeyError): + Connection(url).connect() + + def test_simple_queueing(self): + conn = Connection('sqlalchemy+sqlite:///:memory:') + conn.connect() + try: + channel = conn.channel() + assert channel.queue_cls.__table__.name == 'kombu_queue' + assert channel.message_cls.__table__.name == 'kombu_message' + + channel._put('celery', 'DATA_SIMPLE_QUEUEING') + assert channel._get('celery') == 'DATA_SIMPLE_QUEUEING' + finally: + conn.release() + + def test_clone(self): + hostname = 'sqlite:///celerydb.sqlite' + x = Connection('+'.join(['sqla', hostname])) + try: + assert x.uri_prefix == 'sqla' + assert x.hostname == hostname + clone = x.clone() + try: + assert clone.hostname == hostname + assert clone.uri_prefix == 'sqla' + finally: + clone.release() + finally: + x.release()