diff --git a/docs/reference/kombu.transport.etcd.rst b/docs/reference/kombu.transport.etcd.rst new file mode 100644 index 00000000..c5b87d23 --- /dev/null +++ b/docs/reference/kombu.transport.etcd.rst @@ -0,0 +1,24 @@ +================================================ + Etcd Transport - ``kombu.transport.etcd`` +================================================ + +.. currentmodule:: kombu.transport.etcd + +.. automodule:: kombu.transport.etcd + + .. contents:: + :local: + + Transport + --------- + + .. autoclass:: Transport + :members: + :undoc-members: + + Channel + ------- + + .. autoclass:: Channel + :members: + :undoc-members: diff --git a/kombu/tests/transport/test_etcd.py b/kombu/tests/transport/test_etcd.py new file mode 100644 index 00000000..7994ec67 --- /dev/null +++ b/kombu/tests/transport/test_etcd.py @@ -0,0 +1,67 @@ +from __future__ import absolute_import, unicode_literals + +import mock + +from kombu.five import Empty + +from kombu.transport.etcd import Channel, Transport + +from kombu.tests.case import Case, Mock, skip + + +@skip.unless_module('etcd') +class test_Etcd(Case): + + def setup(self): + self.connection = Mock() + self.connection.client.transport_options = {} + self.connection.client.port = 2739 + self.client = self.patch('etcd.Client').return_value + self.channel = Channel(connection=self.connection) + + def test_driver_version(self): + self.assertTrue(Transport(self.connection.client).driver_version()) + + def test_failed_get(self): + self.channel._acquire_lock = Mock(return_value=False) + self.channel.client.read.side_effect = IndexError + with mock.patch('etcd.Lock'): + with self.assertRaises(Empty): + self.channel._get('empty')() + + def test_test_purge(self): + with mock.patch('etcd.Lock'): + self.client.delete = Mock(return_value=True) + self.assertTrue(self.channel._purge('foo')) + + def test_key_prefix(self): + key = self.channel._key_prefix('myqueue') + self.assertEqual(key, 'kombu/myqueue') + + def test_create_delete_queue(self): + queue = 'mynewqueue' + + with mock.patch('etcd.Lock'): + self.client.write.return_value = self.patch('etcd.EtcdResult') + self.assertTrue(self.channel._new_queue(queue)) + + self.client.delete.return_value = self.patch('etcd.EtcdResult') + self.channel._delete(queue) + + def test_size(self): + with mock.patch('etcd.Lock'): + self.client.read.return_value = self.patch( + 'etcd.EtcdResult', _children=[{}, {}]) + self.assertEqual(self.channel._size('q'), 2) + + def test_get(self): + with mock.patch('etcd.Lock'): + self.client.read.return_value = self.patch( + 'etcd.EtcdResult', + _children=[{'key': 'myqueue', 'modifyIndex': 1, 'value': '1'}]) + self.assertIsNotNone(self.channel._get('myqueue')) + + def test_put(self): + with mock.patch('etcd.Lock'): + self.client.write.return_value = self.patch('etcd.EtcdResult') + self.assertIsNone(self.channel._put('myqueue', 'mydata')) diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py index 57b67236..9455275e 100644 --- a/kombu/transport/__init__.py +++ b/kombu/transport/__init__.py @@ -31,7 +31,8 @@ TRANSPORT_ALIASES = { 'filesystem': 'kombu.transport.filesystem:Transport', 'qpid': 'kombu.transport.qpid:Transport', 'sentinel': 'kombu.transport.redis:SentinelTransport', - 'consul': 'kombu.transport.consul:Transport' + 'consul': 'kombu.transport.consul:Transport', + 'etcd': 'kombu.transport.etcd:Transport', } _transport_cache = {} diff --git a/kombu/transport/etcd.py b/kombu/transport/etcd.py new file mode 100644 index 00000000..afdd7763 --- /dev/null +++ b/kombu/transport/etcd.py @@ -0,0 +1,293 @@ +""" +Etcd Transport. + +It uses Etcd as a store to transport messages in Queues + +It uses python-etcd for talking to Etcd's HTTP API +""" +from __future__ import absolute_import, unicode_literals + +import os +import socket + +from collections import defaultdict +from contextlib import contextmanager + +from kombu.exceptions import ChannelError +from kombu.five import Empty +from kombu.log import get_logger +from kombu.utils.json import loads, dumps +from kombu.utils.objects import cached_property + +from . import virtual + +try: + import etcd +except ImportError: + etcd = None + +logger = get_logger('kombu.transport.etcd') + +DEFAULT_PORT = 2379 +DEFAULT_HOST = 'localhost' + + +class Channel(virtual.Channel): + """ + Etcd Channel class which talks to the Etcd. + """ + + prefix = 'kombu' + index = None + timeout = 10 + session_ttl = 30 + lock_ttl = 10 + + def __init__(self, *args, **kwargs): + """ + Creates a new instance of the etcd.Channel. + """ + if etcd is None: + raise ImportError('Missing python-etcd library') + + super(Channel, self).__init__(*args, **kwargs) + + port = self.connection.client.port or self.connection.default_port + host = self.connection.client.hostname or DEFAULT_HOST + + logger.debug('Host: %s Port: %s Timeout: %s', host, port, self.timeout) + + self.queues = defaultdict(dict) + + self.client = etcd.Client(host=host, port=int(port)) + + def _key_prefix(self, queue): + """ + Creates and returns the `queue` with the proper prefix. + + Arguments: + queue (str): The name of the queue. + """ + return '{0}/{1}'.format(self.prefix, queue) + + @contextmanager + def _queue_lock(self, queue): + """Try to acquire a lock on the Queue. + + It does so by creating a object called 'lock' which is locked by the + current session.. + + This way other nodes are not able to write to the lock object which + means that they have to wait before the lock is released. + + Arguments: + queue (str): The name of the queue. + """ + lock = etcd.Lock(self.client, queue) + lock._uuid = self.lock_value + logger.debug('Acquiring lock {0}'.format(lock.name)) + lock.acquire(blocking=True, lock_ttl=self.lock_ttl) + try: + yield + finally: + logger.debug('Releasing lock {0}'.format(lock.name)) + lock.release() + + def _new_queue(self, queue, **_): + """ + Creates a new `queue` if the `queue` doesn't already exist. + + Arguments: + queue (str): The name of the queue. + """ + self.queues[queue] = queue + with self._queue_lock(queue): + try: + return self.client.write( + key=self._key_prefix(queue), dir=True, value=None) + except etcd.EtcdNotFile: + logger.debug('Queue "{0}" already exists'.format(queue)) + return self.client.read(key=self._key_prefix(queue)) + + def _has_queue(self, queue, **kwargs): + """ + Verify that queue exists. + + Returns: + bool: Should return :const:`True` if the queue exists + or :const:`False` otherwise. + """ + try: + self.client.read(self._key_prefix(queue)) + return True + except etcd.EtcdKeyNotFound: + return False + + def _delete(self, queue, *args, **_): + """ + Deletes a `queue`. + + Arguments: + queue (str): The name of the queue. + """ + self.queues.pop(queue, None) + self._purge(queue) + + def _put(self, queue, payload, **_): + """ + Put `message` onto `queue`. + + This simply writes a key to the Etcd store + + Arguments: + queue (str): The name of the queue. + payload (dict): Message data which will be dumped to etcd. + """ + with self._queue_lock(queue): + key = self._key_prefix(queue) + if not self.client.write( + key=key, + value=dumps(payload), + append=True): + raise ChannelError('Cannot add key {0!r} to etcd'.format(key)) + + def _get(self, queue, timeout=None): + """ + Get the first available message from the queue. + + Before it does so it acquires a lock on the store so + only one node reads at the same time. This is for read consistency + + Arguments: + queue (str): The name of the queue. + timeout (int): Optional seconds to wait for a response. + """ + with self._queue_lock(queue): + key = self._key_prefix(queue) + logger.debug('Fetching key %s with index %s', key, self.index) + + try: + result = self.client.read( + key=key, recursive=True, + index=self.index, timeout=self.timeout) + + if result is None: + raise Empty() + + item = result._children[-1] + logger.debug('Removing key {0}'.format(item['key'])) + + msg_content = loads(item['value']) + self.client.delete(key=item['key']) + return msg_content + except (TypeError, IndexError, etcd.EtcdError) as error: + logger.debug('_get failed: {0}:{1}'.format(type(error), error)) + + raise Empty() + + def _purge(self, queue): + """ + Removes all `message`s from a `queue`. + + Arguments: + queue (str): The name of the queue. + """ + with self._queue_lock(queue): + key = self._key_prefix(queue) + logger.debug('Purging queue at key {0}'.format(key)) + return self.client.delete(key=key, recursive=True) + + def _size(self, queue): + """ + Returns the size of the `queue`. + + Arguments: + queue (str): The name of the queue. + """ + with self._queue_lock(queue): + size = 0 + try: + key = self._key_prefix(queue) + logger.debug('Fetching key recursively %s with index %s', + key, self.index) + result = self.client.read( + key=key, recursive=True, + index=self.index) + size = len(result._children) + except TypeError: + pass + + logger.debug('Found %s keys under %s with index %s', + size, key, self.index) + return size + + @cached_property + def lock_value(self): + return '{0}.{1}'.format(socket.gethostname(), os.getpid()) + + +class Transport(virtual.Transport): + """ + Etcd storage Transport for Kombu. + """ + + Channel = Channel + + default_port = DEFAULT_PORT + driver_type = 'etcd' + driver_name = 'python-etcd' + polling_interval = 3 + + implements = virtual.Transport.implements.extend( + exchange_type=frozenset(['direct'])) + + def __init__(self, *args, **kwargs): + """ + Creates a new instance of etcd.Transport. + """ + if etcd is None: + raise ImportError('Missing python-etcd library') + + super(Transport, self).__init__(*args, **kwargs) + + self.connection_errors = ( + virtual.Transport.connection_errors + (etcd.EtcdError, ) + ) + + self.channel_errors = ( + virtual.Transport.channel_errors + (etcd.EtcdError, ) + ) + + def verify_connection(self, connection): + """ + Verifies the connection works. + """ + port = connection.client.port or self.default_port + host = connection.client.hostname or DEFAULT_HOST + + logger.debug('Verify Etcd connection to %s:%s', host, port) + + try: + etcd.Client(host=host, port=int(port)) + return True + except ValueError: + pass + + return False + + def driver_version(self): + """ + Returns the version of the etcd library. + + .. note:: + + python-etcd has no __version__. This is a workaround. + """ + try: + import pip.commands.freeze + for x in pip.commands.freeze.freeze(): + if x.startswith('python-etcd'): + return x.split('==')[1] + except (ImportError, IndexError): + logger.warn('Unable to find the python-etcd version.') + return 'Unknown' diff --git a/requirements/extras/etcd.txt b/requirements/extras/etcd.txt new file mode 100644 index 00000000..4569cc19 --- /dev/null +++ b/requirements/extras/etcd.txt @@ -0,0 +1 @@ +python-etcd>=0.4.3