diff --git a/docs/reference/kombu.transport.consul.rst b/docs/reference/kombu.transport.consul.rst new file mode 100644 index 00000000..5e429617 --- /dev/null +++ b/docs/reference/kombu.transport.consul.rst @@ -0,0 +1,20 @@ +.. currentmodule:: kombu.transport.consul + +.. automodule:: kombu.transport.consul + + .. contents:: + :local: + + Transport + --------- + + .. autoclass:: Transport + :members: + :undoc-members: + + Channel + ------- + + .. autoclass:: Channel + :members: + :undoc-members: diff --git a/kombu/tests/transport/test_consul.py b/kombu/tests/transport/test_consul.py new file mode 100644 index 00000000..c7dfffcd --- /dev/null +++ b/kombu/tests/transport/test_consul.py @@ -0,0 +1,90 @@ +from __future__ import absolute_import, unicode_literals + +from kombu.tests.case import Case, Mock, skip +from kombu.transport.consul import Channel +from kombu.five import Empty + + +@skip.unless_module('consul') +class TestConsul(Case): + + mock_consul = None + channel = None + + def setup(self): + self.mock_consul = Mock() + self.mock_consul.kv = Mock() + self.mock_consul.session = Mock() + self.mock_consul.agent = Mock() + + self.channel = Channel() + self.channel.consul = self.mock_consul + + def test_driver_version(self): + self.assertTrue(self.channel.transport.driver_version()) + + def test_failed_get(self): + self.channel._obtain_lock = Mock(return_value=False) + self.assertRaises(Empty, self.channel._get('empty')) + + def test_test_purge(self): + self.channel._destroy_session = Mock(return_value=True) + self.mock_consul.kv.delete = Mock(return_value=True) + self.assertTrue(self.channel._purge('foo')) + + def test_variables(self): + self.assertEqual(self.channel.session_ttl, 30) + self.assertEqual(self.channel.timeout, '10s') + + def test_lock_key(self): + key = self.channel._lock_key('myqueue') + self.assertEqual(key, 'kombu/myqueue.lock') + + def test_key_prefix(self): + key = self.channel._key_prefix('myqueue') + self.assertEqual(key, 'kombu/myqueue') + + def test_get_or_create_session(self): + queue = 'myqueue' + session_id = '123456' + self.mock_consul.session.create = Mock(return_value=session_id) + self.assertEqual(self._get_or_create_session(queue), session_id) + + def test_create_delete_queue(self): + channel = Channel() + channel.consul = self.mock_consul + + queue = 'mynewqueue' + + self.mock_consul.kv.put = Mock(return_value=True) + self.assertTrue(channel._new_queue(queue)) + + self.mock_consul.kv.delete = Mock(return_value=True) + self.channel._destroy_session = Mock() + self._delete(queue) + + def test_size(self): + keys = list() + keys.append((1, dict())) + keys.append((2, dict())) + + self.mock_consul.kv.get = Mock(return_value=keys) + + self.assertEqual(self.channel._size(), 2) + + def test_get(self): + self.channel._obtain_lock = Mock(return_value=True) + self.channel._release_lock = Mock(return_value=True) + + keys = list() + keys.append((1, dict())) + keys.append((2, dict())) + self.mock_consul.kv.get = Mock(return_value=keys) + + self.mock_consul.kv.delete = Mock(return_value=True) + + self.assertIsNotNone(self.channel._get('myqueue')) + + def test_put(self): + self.mock_consul.kv.put = Mock(return_value=True) + self.assertIsNone(self.channel.put('myqueue', 'mydata')) diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py index ad8d9084..d560972e 100644 --- a/kombu/transport/__init__.py +++ b/kombu/transport/__init__.py @@ -70,6 +70,7 @@ TRANSPORT_ALIASES = { 'amqplib': 'kombu.transport.amqplib:Transport', 'qpid': 'kombu.transport.qpid:Transport', 'sentinel': 'kombu.transport.redis:SentinelTransport', + 'consul': 'kombu.transport.consul:Transport' } _transport_cache = {} diff --git a/kombu/transport/consul.py b/kombu/transport/consul.py new file mode 100644 index 00000000..f5ca57d5 --- /dev/null +++ b/kombu/transport/consul.py @@ -0,0 +1,272 @@ +""" +Consul Kombu Transport + +It uses Consul.io's Key/Value store to transport messages in Queues + +It uses python-consul for talking to Consul's HTTP API +""" +from __future__ import absolute_import, unicode_literals + +import uuid +import socket + +from . import virtual +from kombu.exceptions import ChannelError +from kombu.log import get_logger +from kombu.five import Empty, monotonic +from kombu.utils.json import loads, dumps + +try: + import consul +except ImportError: + consul = None + +LOGGER = get_logger('kombu.transport.consul') + +DEFAULT_PORT = 8500 +DEFAULT_HOST = 'localhost' + + +class Channel(virtual.Channel): + """ + Consul Channel class which talks to the Consul Key/Value store + """ + prefix = 'kombu' + index = None + timeout = '10s' + session_ttl = 30 + lock_name = '%s' % socket.gethostname() + queues = {} + + def __init__(self, *args, **kwargs): + if consul is None: + raise ImportError('Missing python-consul library') + + super(Channel, self).__init__(*args, **kwargs) + + port = self.connection.client.port or self.connection.default_port + host = self.connection.client.hostname or DEFAULT_HOST + + LOGGER.debug('Host: %s Port: %d Timeout: %s', host, port, self.timeout) + + self.client = consul.Consul(host=host, port=int(port)) + + def _lock_key(self, queue): + return '%s/%s.lock' % (self.prefix, queue) + + def _key_prefix(self, queue): + return '%s/%s' % (self.prefix, queue) + + def _get_or_create_session(self, queue): + """ + Try to renew the session if it exists, otherwise create a new session + in Consul + + This session is used to obtain a lock inside Consul so that we achieve + read-consistency between the nodes + + :param queue: The name of the Queue + :return: The ID of the session + """ + + session_id = None + try: + session_id = self.queues[queue]['session_id'] + except KeyError: + pass + + if session_id is not None: + LOGGER.debug('Trying to renew existing session %s', + self.queues[queue]['session_id']) + session = self.client.session.renew(session_id=session_id) + session_id = None + try: + session_id = session['ID'] + except KeyError: + pass + + if session_id is None: + LOGGER.debug('Creating session %s with TTL %d', self.lock_name, + self.session_ttl) + session_id = self.client.session.create(name=self.lock_name, + ttl=self.session_ttl) + + LOGGER.debug('Created session %s with id %s', self.lock_name, + session_id) + + return session_id + + def _obtain_lock(self, queue): + """ + Try to obtain a lock on the Queue + + It does so by creating a object called 'lock' which is locked by the + current session. + + This way other nodes are not able to write to the lock object which + means that they have to wait before the lock is released + + :param queue: The name of the Queue + :return: True on success, False otherwise + """ + session_id = self._get_or_create_session(queue) + lock_key = self._lock_key(queue) + + LOGGER.debug('Trying to create lock object %s with session %s', + lock_key, session_id) + + if self.client.kv.put(key=lock_key, + acquire=session_id, + value=self.lock_name) is False: + LOGGER.info('Could not obtain a lock on key %s', lock_key) + return False + + self.queues[queue]['session_id'] = session_id + + return True + + def _release_lock(self, queue): + """ + Try to release a lock. It does so by simply removing the lock key in + Consul. + + :param queue: The name of the queue we want to release the lock from + :return: None + """ + LOGGER.debug('Removing lock key %s', self._lock_key(queue)) + self.client.kv.delete(key=self._lock_key(queue)) + + def _destroy_session(self, queue): + """ + Destroy a previously created Consul session and release all locks + it still might hold + :param queue: The name of the Queue + :return: None + """ + LOGGER.debug('Destroying session %s', self.queues[queue]['session_id']) + self.client.session.destroy(self.queues[queue]['session_id']) + + def _new_queue(self, queue, **_): + self.queues[queue] = {'session_id': None} + return self.client.kv.put(key=self._key_prefix(queue), value=None) + + def _delete(self, queue, *args, **_): + self._destroy_session(queue) + del self.queues[queue] + self._purge(queue) + + def _put(self, queue, payload, **_): + """ + Put `message` onto `queue`. + + This simply writes a key to the K/V store of Consul + """ + key = '%s/msg/%d_%s' % (self._key_prefix(queue), + int(round(monotonic() * 1000)), + uuid.uuid4()) + + if self.client.kv.put(key=key, value=dumps(payload), cas=0) is False: + raise ChannelError('Cannot add key {0!r} to consul'.format(key)) + + def _get(self, queue, timeout=None): + """ + Get the first available message from the queue + + Before it does so it obtains a lock on the Key/Value store so + only one node reads at the same time. This is for read consistency + """ + if self._obtain_lock(queue) is False: + raise Empty + + key = '%s/msg/' % self._key_prefix(queue) + LOGGER.debug('Fetching key %s with index %s', key, self.index) + self.index, data = self.client.kv.get(key=key, recurse=True, + index=self.index, + wait=self.timeout) + + try: + if data is None: + raise Empty + + LOGGER.debug('Removing key %s with modifyindex %s', + data[0]['Key'], data[0]['ModifyIndex']) + + self.client.kv.delete(key=data[0]['Key'], + cas=data[0]['ModifyIndex']) + + return loads(data[0]['Value']) + except TypeError: + pass + finally: + self._release_lock(queue) + + raise Empty + + def _purge(self, queue): + self._destroy_session(queue) + return self.client.kv.delete(key='%s/msg/' % self._key_prefix(queue), + recurse=True) + + def _size(self, queue): + size = 0 + try: + key = '%s/msg/' % self._key_prefix(queue) + LOGGER.debug('Fetching key recursively %s with index %s', key, + self.index) + self.index, data = self.client.kv.get(key=key, recurse=True, + index=self.index, + wait=self.timeout) + size = len(data) + except TypeError: + pass + + LOGGER.debug('Found %d keys under %s with index %s', size, key, + self.index) + return size + + +class Transport(virtual.Transport): + """ + Consul K/V storage Transport for Kombu + """ + Channel = Channel + + default_port = DEFAULT_PORT + driver_type = 'consul' + driver_name = 'consul' + + def __init__(self, *args, **kwargs): + if consul is None: + raise ImportError('Missing python-consul library') + + super(Transport, self).__init__(*args, **kwargs) + + self.connection_errors = ( + virtual.Transport.connection_errors + ( + consul.ConsulException, consul.base.ConsulException + ) + ) + + self.channel_errors = ( + virtual.Transport.channel_errors + ( + consul.ConsulException, consul.base.ConsulException + ) + ) + + def verify_connection(self, connection): + port = connection.client.port or self.default_port + host = connection.client.hostname or DEFAULT_HOST + + LOGGER.debug('Verify Consul connection to %s:%d', host, port) + + try: + client = consul.Consul(host=host, port=int(port)) + client.agent.self() + return True + except ValueError: + pass + + return False + + def driver_version(self): + return consul.__version__ diff --git a/requirements/extras/consul.txt b/requirements/extras/consul.txt new file mode 100644 index 00000000..dd29fbef --- /dev/null +++ b/requirements/extras/consul.txt @@ -0,0 +1 @@ +python-consul>=0.6.0 diff --git a/setup.py b/setup.py index 0d522e11..410348e1 100644 --- a/setup.py +++ b/setup.py @@ -165,5 +165,6 @@ setup( 'pyro': extras('pyro.txt'), 'slmq': extras('slmq.txt'), 'qpid': extras('qpid.txt'), + 'consul': extras('consul.txt'), }, )