mirror of https://github.com/celery/kombu.git
Add Etcd Transport (#619)
This transport is highly influenced by the Consul Transport. Messages are kept in keys within Etcd through the python-etcd library.
This commit is contained in:
parent
c5886a33ef
commit
7fa3172ead
|
@ -0,0 +1,24 @@
|
|||
================================================
|
||||
Etcd Transport - ``kombu.transport.etcd``
|
||||
================================================
|
||||
|
||||
.. currentmodule:: kombu.transport.etcd
|
||||
|
||||
.. automodule:: kombu.transport.etcd
|
||||
|
||||
.. contents::
|
||||
:local:
|
||||
|
||||
Transport
|
||||
---------
|
||||
|
||||
.. autoclass:: Transport
|
||||
:members:
|
||||
:undoc-members:
|
||||
|
||||
Channel
|
||||
-------
|
||||
|
||||
.. autoclass:: Channel
|
||||
:members:
|
||||
:undoc-members:
|
|
@ -0,0 +1,67 @@
|
|||
from __future__ import absolute_import, unicode_literals
|
||||
|
||||
import mock
|
||||
|
||||
from kombu.five import Empty
|
||||
|
||||
from kombu.transport.etcd import Channel, Transport
|
||||
|
||||
from kombu.tests.case import Case, Mock, skip
|
||||
|
||||
|
||||
@skip.unless_module('etcd')
|
||||
class test_Etcd(Case):
|
||||
|
||||
def setup(self):
|
||||
self.connection = Mock()
|
||||
self.connection.client.transport_options = {}
|
||||
self.connection.client.port = 2739
|
||||
self.client = self.patch('etcd.Client').return_value
|
||||
self.channel = Channel(connection=self.connection)
|
||||
|
||||
def test_driver_version(self):
|
||||
self.assertTrue(Transport(self.connection.client).driver_version())
|
||||
|
||||
def test_failed_get(self):
|
||||
self.channel._acquire_lock = Mock(return_value=False)
|
||||
self.channel.client.read.side_effect = IndexError
|
||||
with mock.patch('etcd.Lock'):
|
||||
with self.assertRaises(Empty):
|
||||
self.channel._get('empty')()
|
||||
|
||||
def test_test_purge(self):
|
||||
with mock.patch('etcd.Lock'):
|
||||
self.client.delete = Mock(return_value=True)
|
||||
self.assertTrue(self.channel._purge('foo'))
|
||||
|
||||
def test_key_prefix(self):
|
||||
key = self.channel._key_prefix('myqueue')
|
||||
self.assertEqual(key, 'kombu/myqueue')
|
||||
|
||||
def test_create_delete_queue(self):
|
||||
queue = 'mynewqueue'
|
||||
|
||||
with mock.patch('etcd.Lock'):
|
||||
self.client.write.return_value = self.patch('etcd.EtcdResult')
|
||||
self.assertTrue(self.channel._new_queue(queue))
|
||||
|
||||
self.client.delete.return_value = self.patch('etcd.EtcdResult')
|
||||
self.channel._delete(queue)
|
||||
|
||||
def test_size(self):
|
||||
with mock.patch('etcd.Lock'):
|
||||
self.client.read.return_value = self.patch(
|
||||
'etcd.EtcdResult', _children=[{}, {}])
|
||||
self.assertEqual(self.channel._size('q'), 2)
|
||||
|
||||
def test_get(self):
|
||||
with mock.patch('etcd.Lock'):
|
||||
self.client.read.return_value = self.patch(
|
||||
'etcd.EtcdResult',
|
||||
_children=[{'key': 'myqueue', 'modifyIndex': 1, 'value': '1'}])
|
||||
self.assertIsNotNone(self.channel._get('myqueue'))
|
||||
|
||||
def test_put(self):
|
||||
with mock.patch('etcd.Lock'):
|
||||
self.client.write.return_value = self.patch('etcd.EtcdResult')
|
||||
self.assertIsNone(self.channel._put('myqueue', 'mydata'))
|
|
@ -31,7 +31,8 @@ TRANSPORT_ALIASES = {
|
|||
'filesystem': 'kombu.transport.filesystem:Transport',
|
||||
'qpid': 'kombu.transport.qpid:Transport',
|
||||
'sentinel': 'kombu.transport.redis:SentinelTransport',
|
||||
'consul': 'kombu.transport.consul:Transport'
|
||||
'consul': 'kombu.transport.consul:Transport',
|
||||
'etcd': 'kombu.transport.etcd:Transport',
|
||||
}
|
||||
|
||||
_transport_cache = {}
|
||||
|
|
|
@ -0,0 +1,293 @@
|
|||
"""
|
||||
Etcd Transport.
|
||||
|
||||
It uses Etcd as a store to transport messages in Queues
|
||||
|
||||
It uses python-etcd for talking to Etcd's HTTP API
|
||||
"""
|
||||
from __future__ import absolute_import, unicode_literals
|
||||
|
||||
import os
|
||||
import socket
|
||||
|
||||
from collections import defaultdict
|
||||
from contextlib import contextmanager
|
||||
|
||||
from kombu.exceptions import ChannelError
|
||||
from kombu.five import Empty
|
||||
from kombu.log import get_logger
|
||||
from kombu.utils.json import loads, dumps
|
||||
from kombu.utils.objects import cached_property
|
||||
|
||||
from . import virtual
|
||||
|
||||
try:
|
||||
import etcd
|
||||
except ImportError:
|
||||
etcd = None
|
||||
|
||||
logger = get_logger('kombu.transport.etcd')
|
||||
|
||||
DEFAULT_PORT = 2379
|
||||
DEFAULT_HOST = 'localhost'
|
||||
|
||||
|
||||
class Channel(virtual.Channel):
|
||||
"""
|
||||
Etcd Channel class which talks to the Etcd.
|
||||
"""
|
||||
|
||||
prefix = 'kombu'
|
||||
index = None
|
||||
timeout = 10
|
||||
session_ttl = 30
|
||||
lock_ttl = 10
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
"""
|
||||
Creates a new instance of the etcd.Channel.
|
||||
"""
|
||||
if etcd is None:
|
||||
raise ImportError('Missing python-etcd library')
|
||||
|
||||
super(Channel, self).__init__(*args, **kwargs)
|
||||
|
||||
port = self.connection.client.port or self.connection.default_port
|
||||
host = self.connection.client.hostname or DEFAULT_HOST
|
||||
|
||||
logger.debug('Host: %s Port: %s Timeout: %s', host, port, self.timeout)
|
||||
|
||||
self.queues = defaultdict(dict)
|
||||
|
||||
self.client = etcd.Client(host=host, port=int(port))
|
||||
|
||||
def _key_prefix(self, queue):
|
||||
"""
|
||||
Creates and returns the `queue` with the proper prefix.
|
||||
|
||||
Arguments:
|
||||
queue (str): The name of the queue.
|
||||
"""
|
||||
return '{0}/{1}'.format(self.prefix, queue)
|
||||
|
||||
@contextmanager
|
||||
def _queue_lock(self, queue):
|
||||
"""Try to acquire a lock on the Queue.
|
||||
|
||||
It does so by creating a object called 'lock' which is locked by the
|
||||
current session..
|
||||
|
||||
This way other nodes are not able to write to the lock object which
|
||||
means that they have to wait before the lock is released.
|
||||
|
||||
Arguments:
|
||||
queue (str): The name of the queue.
|
||||
"""
|
||||
lock = etcd.Lock(self.client, queue)
|
||||
lock._uuid = self.lock_value
|
||||
logger.debug('Acquiring lock {0}'.format(lock.name))
|
||||
lock.acquire(blocking=True, lock_ttl=self.lock_ttl)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
logger.debug('Releasing lock {0}'.format(lock.name))
|
||||
lock.release()
|
||||
|
||||
def _new_queue(self, queue, **_):
|
||||
"""
|
||||
Creates a new `queue` if the `queue` doesn't already exist.
|
||||
|
||||
Arguments:
|
||||
queue (str): The name of the queue.
|
||||
"""
|
||||
self.queues[queue] = queue
|
||||
with self._queue_lock(queue):
|
||||
try:
|
||||
return self.client.write(
|
||||
key=self._key_prefix(queue), dir=True, value=None)
|
||||
except etcd.EtcdNotFile:
|
||||
logger.debug('Queue "{0}" already exists'.format(queue))
|
||||
return self.client.read(key=self._key_prefix(queue))
|
||||
|
||||
def _has_queue(self, queue, **kwargs):
|
||||
"""
|
||||
Verify that queue exists.
|
||||
|
||||
Returns:
|
||||
bool: Should return :const:`True` if the queue exists
|
||||
or :const:`False` otherwise.
|
||||
"""
|
||||
try:
|
||||
self.client.read(self._key_prefix(queue))
|
||||
return True
|
||||
except etcd.EtcdKeyNotFound:
|
||||
return False
|
||||
|
||||
def _delete(self, queue, *args, **_):
|
||||
"""
|
||||
Deletes a `queue`.
|
||||
|
||||
Arguments:
|
||||
queue (str): The name of the queue.
|
||||
"""
|
||||
self.queues.pop(queue, None)
|
||||
self._purge(queue)
|
||||
|
||||
def _put(self, queue, payload, **_):
|
||||
"""
|
||||
Put `message` onto `queue`.
|
||||
|
||||
This simply writes a key to the Etcd store
|
||||
|
||||
Arguments:
|
||||
queue (str): The name of the queue.
|
||||
payload (dict): Message data which will be dumped to etcd.
|
||||
"""
|
||||
with self._queue_lock(queue):
|
||||
key = self._key_prefix(queue)
|
||||
if not self.client.write(
|
||||
key=key,
|
||||
value=dumps(payload),
|
||||
append=True):
|
||||
raise ChannelError('Cannot add key {0!r} to etcd'.format(key))
|
||||
|
||||
def _get(self, queue, timeout=None):
|
||||
"""
|
||||
Get the first available message from the queue.
|
||||
|
||||
Before it does so it acquires a lock on the store so
|
||||
only one node reads at the same time. This is for read consistency
|
||||
|
||||
Arguments:
|
||||
queue (str): The name of the queue.
|
||||
timeout (int): Optional seconds to wait for a response.
|
||||
"""
|
||||
with self._queue_lock(queue):
|
||||
key = self._key_prefix(queue)
|
||||
logger.debug('Fetching key %s with index %s', key, self.index)
|
||||
|
||||
try:
|
||||
result = self.client.read(
|
||||
key=key, recursive=True,
|
||||
index=self.index, timeout=self.timeout)
|
||||
|
||||
if result is None:
|
||||
raise Empty()
|
||||
|
||||
item = result._children[-1]
|
||||
logger.debug('Removing key {0}'.format(item['key']))
|
||||
|
||||
msg_content = loads(item['value'])
|
||||
self.client.delete(key=item['key'])
|
||||
return msg_content
|
||||
except (TypeError, IndexError, etcd.EtcdError) as error:
|
||||
logger.debug('_get failed: {0}:{1}'.format(type(error), error))
|
||||
|
||||
raise Empty()
|
||||
|
||||
def _purge(self, queue):
|
||||
"""
|
||||
Removes all `message`s from a `queue`.
|
||||
|
||||
Arguments:
|
||||
queue (str): The name of the queue.
|
||||
"""
|
||||
with self._queue_lock(queue):
|
||||
key = self._key_prefix(queue)
|
||||
logger.debug('Purging queue at key {0}'.format(key))
|
||||
return self.client.delete(key=key, recursive=True)
|
||||
|
||||
def _size(self, queue):
|
||||
"""
|
||||
Returns the size of the `queue`.
|
||||
|
||||
Arguments:
|
||||
queue (str): The name of the queue.
|
||||
"""
|
||||
with self._queue_lock(queue):
|
||||
size = 0
|
||||
try:
|
||||
key = self._key_prefix(queue)
|
||||
logger.debug('Fetching key recursively %s with index %s',
|
||||
key, self.index)
|
||||
result = self.client.read(
|
||||
key=key, recursive=True,
|
||||
index=self.index)
|
||||
size = len(result._children)
|
||||
except TypeError:
|
||||
pass
|
||||
|
||||
logger.debug('Found %s keys under %s with index %s',
|
||||
size, key, self.index)
|
||||
return size
|
||||
|
||||
@cached_property
|
||||
def lock_value(self):
|
||||
return '{0}.{1}'.format(socket.gethostname(), os.getpid())
|
||||
|
||||
|
||||
class Transport(virtual.Transport):
|
||||
"""
|
||||
Etcd storage Transport for Kombu.
|
||||
"""
|
||||
|
||||
Channel = Channel
|
||||
|
||||
default_port = DEFAULT_PORT
|
||||
driver_type = 'etcd'
|
||||
driver_name = 'python-etcd'
|
||||
polling_interval = 3
|
||||
|
||||
implements = virtual.Transport.implements.extend(
|
||||
exchange_type=frozenset(['direct']))
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
"""
|
||||
Creates a new instance of etcd.Transport.
|
||||
"""
|
||||
if etcd is None:
|
||||
raise ImportError('Missing python-etcd library')
|
||||
|
||||
super(Transport, self).__init__(*args, **kwargs)
|
||||
|
||||
self.connection_errors = (
|
||||
virtual.Transport.connection_errors + (etcd.EtcdError, )
|
||||
)
|
||||
|
||||
self.channel_errors = (
|
||||
virtual.Transport.channel_errors + (etcd.EtcdError, )
|
||||
)
|
||||
|
||||
def verify_connection(self, connection):
|
||||
"""
|
||||
Verifies the connection works.
|
||||
"""
|
||||
port = connection.client.port or self.default_port
|
||||
host = connection.client.hostname or DEFAULT_HOST
|
||||
|
||||
logger.debug('Verify Etcd connection to %s:%s', host, port)
|
||||
|
||||
try:
|
||||
etcd.Client(host=host, port=int(port))
|
||||
return True
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
return False
|
||||
|
||||
def driver_version(self):
|
||||
"""
|
||||
Returns the version of the etcd library.
|
||||
|
||||
.. note::
|
||||
|
||||
python-etcd has no __version__. This is a workaround.
|
||||
"""
|
||||
try:
|
||||
import pip.commands.freeze
|
||||
for x in pip.commands.freeze.freeze():
|
||||
if x.startswith('python-etcd'):
|
||||
return x.split('==')[1]
|
||||
except (ImportError, IndexError):
|
||||
logger.warn('Unable to find the python-etcd version.')
|
||||
return 'Unknown'
|
|
@ -0,0 +1 @@
|
|||
python-etcd>=0.4.3
|
Loading…
Reference in New Issue