Re-Adding support for sqlalchemy as it is needed by Apache project Airflow (#687)

* Re-Adding support for sqlalchemy as it is needed by Apache project Airflow

* Re-Adding support for sqlalchemy as it is needed by Apache project Airflow
This commit is contained in:
Amin Ghadersohi 2017-05-17 01:17:47 -07:00 committed by Asif Saifuddin Auvi
parent 25a9e76f3a
commit be79b3d4cd
15 changed files with 364 additions and 1 deletions

2
.gitignore vendored
View File

@ -31,3 +31,5 @@ htmlcov/
test.db
coverage.xml
venv/
env
.eggs

3
.landscape.yml Normal file
View File

@ -0,0 +1,3 @@
pylint:
disable:
- cyclic-import

View File

@ -48,6 +48,8 @@
kombu.transport.etcd
kombu.transport.zookeeper
kombu.transport.filesystem
kombu.transport.sqlalchemy
kombu.transport.sqlalchemy.models
kombu.transport.SQS
kombu.transport.SLMQ
kombu.transport.pyro

View File

@ -2,6 +2,7 @@
SLMQ Transport - ``kombu.transport.SLMQ``
=============================================
.. currentmodule:: kombu.transport.SLMQ
.. automodule:: kombu.transport.SLMQ

View File

@ -0,0 +1,32 @@
=====================================================================
SQLAlchemy Transport Model - ``kombu.transport.sqlalchemy.models``
=====================================================================
.. currentmodule:: kombu.transport.sqlalchemy.models
.. automodule:: kombu.transport.sqlalchemy.models
.. contents::
:local:
Models
------
.. autoclass:: Queue
.. autoattribute:: Queue.id
.. autoattribute:: Queue.name
.. autoclass:: Message
.. autoattribute:: Message.id
.. autoattribute:: Message.visible
.. autoattribute:: Message.sent_at
.. autoattribute:: Message.payload
.. autoattribute:: Message.version

View File

@ -0,0 +1,25 @@
===========================================================
SQLAlchemy Transport Model - kombu.transport.sqlalchemy
===========================================================
.. currentmodule:: kombu.transport.sqlalchemy
.. automodule:: kombu.transport.sqlalchemy
.. contents::
:local:
Transport
---------
.. autoclass:: Transport
:members:
:undoc-members:
Channel
-------
.. autoclass:: Channel
:members:
:undoc-members:

View File

@ -28,6 +28,8 @@ TRANSPORT_ALIASES = {
'sqs': 'kombu.transport.SQS:Transport',
'mongodb': 'kombu.transport.mongodb:Transport',
'zookeeper': 'kombu.transport.zookeeper:Transport',
'sqlalchemy': 'kombu.transport.sqlalchemy:Transport',
'sqla': 'kombu.transport.sqlalchemy:Transport',
'SLMQ': 'kombu.transport.SLMQ.Transport',
'slmq': 'kombu.transport.SLMQ.Transport',
'filesystem': 'kombu.transport.filesystem:Transport',

View File

@ -259,7 +259,6 @@ class Transport(virtual.Transport):
"""Return the version of the etcd library.
.. note::
python-etcd has no __version__. This is a workaround.
"""
try:

View File

@ -0,0 +1,164 @@
"""Kombu transport using SQLAlchemy as the message store."""
# SQLAlchemy overrides != False to have special meaning and pep8 complains
# flake8: noqa
from __future__ import absolute_import, unicode_literals
from json import loads, dumps
from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import sessionmaker
from kombu.five import Empty
from kombu.transport import virtual
from kombu.utils import cached_property
from kombu.utils.encoding import bytes_to_str
from .models import (ModelBase, Queue as QueueBase, Message as MessageBase,
class_registry, metadata)
VERSION = (1, 1, 0)
__version__ = '.'.join(map(str, VERSION))
class Channel(virtual.Channel):
"""The channel class."""
_session = None
_engines = {} # engine cache
def __init__(self, connection, **kwargs):
self._configure_entity_tablenames(connection.client.transport_options)
super(Channel, self).__init__(connection, **kwargs)
def _configure_entity_tablenames(self, opts):
self.queue_tablename = opts.get('queue_tablename', 'kombu_queue')
self.message_tablename = opts.get('message_tablename', 'kombu_message')
#
# Define the model definitions. This registers the declarative
# classes with the active SQLAlchemy metadata object. This *must* be
# done prior to the ``create_engine`` call.
#
self.queue_cls and self.message_cls
def _engine_from_config(self):
conninfo = self.connection.client
transport_options = conninfo.transport_options.copy()
transport_options.pop('queue_tablename', None)
transport_options.pop('message_tablename', None)
return create_engine(conninfo.hostname, **transport_options)
def _open(self):
conninfo = self.connection.client
if conninfo.hostname not in self._engines:
engine = self._engine_from_config()
Session = sessionmaker(bind=engine)
metadata.create_all(engine)
self._engines[conninfo.hostname] = engine, Session
return self._engines[conninfo.hostname]
@property
def session(self):
if self._session is None:
_, Session = self._open()
self._session = Session()
return self._session
def _get_or_create(self, queue):
obj = self.session.query(self.queue_cls) \
.filter(self.queue_cls.name == queue).first()
if not obj:
obj = self.queue_cls(queue)
self.session.add(obj)
try:
self.session.commit()
except OperationalError:
self.session.rollback()
return obj
def _new_queue(self, queue, **kwargs):
self._get_or_create(queue)
def _put(self, queue, payload, **kwargs):
obj = self._get_or_create(queue)
message = self.message_cls(dumps(payload), obj)
self.session.add(message)
try:
self.session.commit()
except OperationalError:
self.session.rollback()
def _get(self, queue):
obj = self._get_or_create(queue)
if self.session.bind.name == 'sqlite':
self.session.execute('BEGIN IMMEDIATE TRANSACTION')
try:
msg = self.session.query(self.message_cls) \
.with_lockmode('update') \
.filter(self.message_cls.queue_id == obj.id) \
.filter(self.message_cls.visible != False) \
.order_by(self.message_cls.sent_at) \
.order_by(self.message_cls.id) \
.limit(1) \
.first()
if msg:
msg.visible = False
return loads(bytes_to_str(msg.payload))
raise Empty()
finally:
self.session.commit()
def _query_all(self, queue):
obj = self._get_or_create(queue)
return self.session.query(self.message_cls) \
.filter(self.message_cls.queue_id == obj.id)
def _purge(self, queue):
count = self._query_all(queue).delete(synchronize_session=False)
try:
self.session.commit()
except OperationalError:
self.session.rollback()
return count
def _size(self, queue):
return self._query_all(queue).count()
def _declarative_cls(self, name, base, ns):
if name in class_registry:
return class_registry[name]
return type(str(name), (base, ModelBase), ns)
@cached_property
def queue_cls(self):
return self._declarative_cls(
'Queue',
QueueBase,
{'__tablename__': self.queue_tablename}
)
@cached_property
def message_cls(self):
return self._declarative_cls(
'Message',
MessageBase,
{'__tablename__': self.message_tablename}
)
class Transport(virtual.Transport):
"""The transport class."""
Channel = Channel
can_parse_url = True
default_port = 0
driver_type = 'sql'
driver_name = 'sqlalchemy'
connection_errors = (OperationalError, )
def driver_version(self):
import sqlalchemy
return sqlalchemy.__version__

View File

@ -0,0 +1,67 @@
"""Kombu transport using SQLAlchemy as the message store."""
from __future__ import absolute_import, unicode_literals
import datetime
from sqlalchemy import (Column, Integer, String, Text, DateTime,
Sequence, Boolean, ForeignKey, SmallInteger)
from sqlalchemy.ext.declarative import declarative_base, declared_attr
from sqlalchemy.orm import relation
from sqlalchemy.schema import MetaData
class_registry = {}
metadata = MetaData()
ModelBase = declarative_base(metadata=metadata, class_registry=class_registry)
class Queue(object):
"""The queue class."""
__table_args__ = {'sqlite_autoincrement': True, 'mysql_engine': 'InnoDB'}
id = Column(Integer, Sequence('queue_id_sequence'), primary_key=True,
autoincrement=True)
name = Column(String(200), unique=True)
def __init__(self, name):
self.name = name
def __str__(self):
return '<Queue({self.name})>'.format(self=self)
@declared_attr
def messages(cls):
return relation('Message', backref='queue', lazy='noload')
class Message(object):
"""The message class."""
__table_args__ = {'sqlite_autoincrement': True, 'mysql_engine': 'InnoDB'}
id = Column(Integer, Sequence('message_id_sequence'),
primary_key=True, autoincrement=True)
visible = Column(Boolean, default=True, index=True)
sent_at = Column('timestamp', DateTime, nullable=True, index=True,
onupdate=datetime.datetime.now)
payload = Column(Text, nullable=False)
version = Column(SmallInteger, nullable=False, default=1)
__mapper_args__ = {'version_id_col': version}
def __init__(self, payload, queue):
self.payload = payload
self.queue = queue
def __str__(self):
return '<Message: {0.sent_at} {0.payload} {0.queue_id}>'.format(self)
@declared_attr
def queue_id(self):
return Column(
Integer,
ForeignKey(
'%s.id' % class_registry['Queue'].__tablename__,
name='FK_kombu_message_queue'
)
)

View File

@ -0,0 +1 @@
sqlalchemy

View File

@ -4,3 +4,4 @@ redis
PyYAML
msgpack-python>0.2.0
-r extras/sqs.txt
sqlalchemy

View File

@ -141,6 +141,7 @@ setup(
'mongodb': extras('mongodb.txt'),
'sqs': extras('sqs.txt'),
'zookeeper': extras('zookeeper.txt'),
'sqlalchemy': extras('sqlalchemy.txt'),
'librabbitmq': extras('librabbitmq.txt'),
'pyro': extras('pyro.txt'),
'slmq': extras('slmq.txt'),

View File

@ -0,0 +1,13 @@
from __future__ import absolute_import, unicode_literals
from funtests import transport
from kombu.tests.case import skip
@skip.unless_module('sqlalchemy')
class test_sqla(transport.TransportCase):
transport = 'sqlalchemy'
prefix = 'sqlalchemy'
event_loop_max = 10
connection_options = {'hostname': 'sqla+sqlite://'}

View File

@ -0,0 +1,50 @@
from __future__ import absolute_import, unicode_literals
import pytest
from case import patch, skip
from kombu import Connection
@skip.unless_module('sqlalchemy')
class test_SqlAlchemy:
def test_url_parser(self):
with patch('kombu.transport.sqlalchemy.Channel._open'):
url = 'sqlalchemy+sqlite:///celerydb.sqlite'
Connection(url).connect()
url = 'sqla+sqlite:///celerydb.sqlite'
Connection(url).connect()
url = 'sqlb+sqlite:///celerydb.sqlite'
with pytest.raises(KeyError):
Connection(url).connect()
def test_simple_queueing(self):
conn = Connection('sqlalchemy+sqlite:///:memory:')
conn.connect()
try:
channel = conn.channel()
assert channel.queue_cls.__table__.name == 'kombu_queue'
assert channel.message_cls.__table__.name == 'kombu_message'
channel._put('celery', 'DATA_SIMPLE_QUEUEING')
assert channel._get('celery') == 'DATA_SIMPLE_QUEUEING'
finally:
conn.release()
def test_clone(self):
hostname = 'sqlite:///celerydb.sqlite'
x = Connection('+'.join(['sqla', hostname]))
try:
assert x.uri_prefix == 'sqla'
assert x.hostname == hostname
clone = x.clone()
try:
assert clone.hostname == hostname
assert clone.uri_prefix == 'sqla'
finally:
clone.release()
finally:
x.release()