From 32633554ac0a5f15f66c648b952d14cc1eab14b2 Mon Sep 17 00:00:00 2001 From: Clemens Wolff Date: Thu, 30 Aug 2018 12:34:52 -0500 Subject: [PATCH] Add transports based on Azure PaaS (#891) * Add transports based on Azure PaaS This pull request adds two new transport implementations: - `azurestoragequeues` is implemented on top of Azure Storage Queues [1]. This offers a simple but scalable and low-cost PaaS transport for Celery users in Azure. The transport is intended to be used in conjunction with the Azure Block Blob Storage backend [2]. - `azureservicebus` is implemented on top of Azure Service Bus [3] and offers PaaS support for more demanding Celery workloads in Azure. The transport is intended to be used in conjunction with the Azure CosmosDB backend [4]. This pull request was created together with @ankurokok, @dkisselev, @evandropaula, @martinpeck and @michaelperel. [1] https://azure.microsoft.com/en-us/services/storage/queues/ [2] https://github.com/celery/celery/pull/4685 [3] https://azure.microsoft.com/en-us/services/service-bus/ [4] https://github.com/celery/celery/pull/4720 * Exclude Azure transports from code coverage There is test coverage for the transports but the tests require Azure credentials to run (passed via environment variables) so codecov doesn't exercise them. * Remove env vars to configure transport * Remove abbreviations --- .coveragerc | 1 + docs/includes/introduction.txt | 5 +- docs/reference/index.rst | 2 + .../kombu.transport.azureservicebus.rst | 24 +++ .../kombu.transport.azurestoragequeues.rst | 24 +++ kombu/transport/__init__.py | 2 + kombu/transport/azureservicebus.py | 149 +++++++++++++++++ kombu/transport/azurestoragequeues.py | 150 ++++++++++++++++++ requirements/extras/azureservicebus.txt | 1 + requirements/extras/azurestoragequeues.txt | 1 + requirements/test-ci.txt | 2 + setup.py | 2 + t/integration/tests/test_azureservicebus.py | 12 ++ .../tests/test_azurestoragequeues.py | 16 ++ 14 files changed, 390 insertions(+), 1 deletion(-) create mode 100644 docs/reference/kombu.transport.azureservicebus.rst create mode 100644 docs/reference/kombu.transport.azurestoragequeues.rst create mode 100644 kombu/transport/azureservicebus.py create mode 100644 kombu/transport/azurestoragequeues.py create mode 100644 requirements/extras/azureservicebus.txt create mode 100644 requirements/extras/azurestoragequeues.txt create mode 100644 t/integration/tests/test_azureservicebus.py create mode 100644 t/integration/tests/test_azurestoragequeues.py diff --git a/.coveragerc b/.coveragerc index 9a767c99..db561e4c 100644 --- a/.coveragerc +++ b/.coveragerc @@ -25,6 +25,7 @@ omit = *kombu/transport/zmq.py *kombu/transport/django.py *kombu/transport/pyro.py + *kombu/transport/azure* *kombu/transport/qpid* exclude_lines = pragma: no cover diff --git a/docs/includes/introduction.txt b/docs/includes/introduction.txt index 2cbee3c2..a4fc4f0a 100644 --- a/docs/includes/introduction.txt +++ b/docs/includes/introduction.txt @@ -35,7 +35,8 @@ Features * Virtual transports makes it really easy to add support for non-AMQP transports. There is already built-in support for `Redis`_, - `Amazon SQS`_, `ZooKeeper`_, `SoftLayer MQ`_ and `Pyro`_. + `Amazon SQS`_, `Azure Storage Queues`_, `Azure Service Bus`_, + `ZooKeeper`_, `SoftLayer MQ`_ and `Pyro`_. * In-memory transport for unit testing. @@ -62,6 +63,8 @@ and the `Wikipedia article about AMQP`_. .. _`qpid-python`: https://pypi.org/project/qpid-python/ .. _`Redis`: https://redis.io/ .. _`Amazon SQS`: https://aws.amazon.com/sqs/ +.. _`Azure Storage Queues`: https://azure.microsoft.com/en-us/services/storage/queues/ +.. _`Azure Service Bus`: https://azure.microsoft.com/en-us/services/service-bus/ .. _`Zookeeper`: https://zookeeper.apache.org/ .. _`Rabbits and warrens`: http://web.archive.org/web/20160323134044/http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/ .. _`amqplib`: http://barryp.org/software/py-amqplib/ diff --git a/docs/reference/index.rst b/docs/reference/index.rst index 092429bc..471fe4f8 100644 --- a/docs/reference/index.rst +++ b/docs/reference/index.rst @@ -38,6 +38,8 @@ kombu.asynchronous.aws.sqs.message kombu.asynchronous.aws.sqs.queue kombu.transport + kombu.transport.azurestoragequeues + kombu.transport.azureservicebus kombu.transport.pyamqp kombu.transport.librabbitmq kombu.transport.qpid diff --git a/docs/reference/kombu.transport.azureservicebus.rst b/docs/reference/kombu.transport.azureservicebus.rst new file mode 100644 index 00000000..881de039 --- /dev/null +++ b/docs/reference/kombu.transport.azureservicebus.rst @@ -0,0 +1,24 @@ +================================================================== + Azure Service Bus Transport - ``kombu.transport.azureservicebus`` +================================================================== + +.. currentmodule:: kombu.transport.azureservicebus + +.. automodule:: kombu.transport.azureservicebus + + .. contents:: + :local: + + Transport + --------- + + .. autoclass:: Transport + :members: + :undoc-members: + + Channel + ------- + + .. autoclass:: Channel + :members: + :undoc-members: diff --git a/docs/reference/kombu.transport.azurestoragequeues.rst b/docs/reference/kombu.transport.azurestoragequeues.rst new file mode 100644 index 00000000..3ee0ef6d --- /dev/null +++ b/docs/reference/kombu.transport.azurestoragequeues.rst @@ -0,0 +1,24 @@ +======================================================================== + Azure Storage Queues Transport - ``kombu.transport.azurestoragequeues`` +======================================================================== + +.. currentmodule:: kombu.transport.azurestoragequeues + +.. automodule:: kombu.transport.azurestoragequeues + + .. contents:: + :local: + + Transport + --------- + + .. autoclass:: Transport + :members: + :undoc-members: + + Channel + ------- + + .. autoclass:: Channel + :members: + :undoc-members: diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py index 554852b7..30b0a3d2 100644 --- a/kombu/transport/__init__.py +++ b/kombu/transport/__init__.py @@ -38,6 +38,8 @@ TRANSPORT_ALIASES = { 'sentinel': 'kombu.transport.redis:SentinelTransport', 'consul': 'kombu.transport.consul:Transport', 'etcd': 'kombu.transport.etcd:Transport', + 'azurestoragequeues': 'kombu.transport.azurestoragequeues:Transport', + 'azureservicebus': 'kombu.transport.azureservicebus:Transport' } _transport_cache = {} diff --git a/kombu/transport/azureservicebus.py b/kombu/transport/azureservicebus.py new file mode 100644 index 00000000..f022ebb4 --- /dev/null +++ b/kombu/transport/azureservicebus.py @@ -0,0 +1,149 @@ +"""Azure Service Bus Message Queue transport. + +The transport can be enabled by setting the CELERY_BROKER_URL to: + +``` +azureservicebus://{SAS policy name}:{SAS key}@{Service Bus Namespace} +``` + +Note that the Shared Access Policy used to connect to Azure Service Bus +requires Manage, Send and Listen claims since the broker will create new +queues and delete old queues as required. + +Note that if the SAS key for the Service Bus account contains a slash, it will +have to be regenerated before it can be used in the connection URL. + +More information about Azure Service Bus: +https://azure.microsoft.com/en-us/services/service-bus/ + +""" +from __future__ import absolute_import, unicode_literals + +import string + +from kombu.five import Empty, text_t +from kombu.utils.encoding import bytes_to_str, safe_str +from kombu.utils.json import loads, dumps +from kombu.utils.objects import cached_property + +from . import virtual + +try: + from azure.servicebus import ServiceBusService, Message, Queue +except ImportError: + ServiceBusService = Message = Queue = None + +# dots are replaced by dash, all other punctuation replaced by underscore. +CHARS_REPLACE_TABLE = { + ord(c): 0x5f for c in string.punctuation if c not in '_' +} + + +class Channel(virtual.Channel): + """Azure Service Bus channel.""" + + default_visibility_timeout = 1800 # 30 minutes. + domain_format = 'kombu%(vhost)s' + _queue_service = None + _queue_cache = {} + + def __init__(self, *args, **kwargs): + if ServiceBusService is None: + raise ImportError('Azure Service Bus transport requires the ' + 'azure-servicebus library') + + super(Channel, self).__init__(*args, **kwargs) + + for queue in self.queue_service.list_queues(): + self._queue_cache[queue] = queue + + def entity_name(self, name, table=CHARS_REPLACE_TABLE): + """Format AMQP queue name into a valid ServiceBus queue name.""" + return text_t(safe_str(name)).translate(table) + + def _new_queue(self, queue, **kwargs): + """Ensure a queue exists in ServiceBus.""" + queue = self.entity_name(self.queue_name_prefix + queue) + try: + return self._queue_cache[queue] + except KeyError: + self.queue_service.create_queue(queue, fail_on_exist=False) + q = self._queue_cache[queue] = self.queue_service.get_queue(queue) + return q + + def _delete(self, queue, *args, **kwargs): + """Delete queue by name.""" + queue_name = self.entity_name(queue) + self._queue_cache.pop(queue_name, None) + self.queue_service.delete_queue(queue_name) + super(Channel, self)._delete(queue_name) + + def _put(self, queue, message, **kwargs): + """Put message onto queue.""" + msg = Message(dumps(message)) + self.queue_service.send_queue_message(self.entity_name(queue), msg) + + def _get(self, queue, timeout=None): + """Try to retrieve a single message off ``queue``.""" + message = self.queue_service.receive_queue_message( + self.entity_name(queue), timeout=timeout, peek_lock=False) + + if message.body is None: + raise Empty() + + return loads(bytes_to_str(message.body)) + + def _size(self, queue): + """Return the number of messages in a queue.""" + return self._new_queue(queue).message_count + + def _purge(self, queue): + """Delete all current messages in a queue.""" + n = 0 + + while True: + message = self.queue_service.read_delete_queue_message( + self.entity_name(queue), timeout=0.1) + + if not message.body: + break + else: + n += 1 + + return n + + @property + def queue_service(self): + if self._queue_service is None: + self._queue_service = ServiceBusService( + service_namespace=self.conninfo.hostname, + shared_access_key_name=self.conninfo.userid, + shared_access_key_value=self.conninfo.password) + + return self._queue_service + + @property + def conninfo(self): + return self.connection.client + + @property + def transport_options(self): + return self.connection.client.transport_options + + @cached_property + def visibility_timeout(self): + return (self.transport_options.get('visibility_timeout') or + self.default_visibility_timeout) + + @cached_property + def queue_name_prefix(self): + return self.transport_options.get('queue_name_prefix', '') + + +class Transport(virtual.Transport): + """Azure Service Bus transport.""" + + Channel = Channel + + polling_interval = 1 + default_port = None diff --git a/kombu/transport/azurestoragequeues.py b/kombu/transport/azurestoragequeues.py new file mode 100644 index 00000000..f7918688 --- /dev/null +++ b/kombu/transport/azurestoragequeues.py @@ -0,0 +1,150 @@ +"""Azure Storage Queues transport. + +The transport can be enabled by setting the CELERY_BROKER_URL to: + +``` +azurestoragequeues://:{Storage Account Access Key}@{Storage Account Name} +``` + +Note that if the access key for the storage account contains a slash, it will +have to be regenerated before it can be used in the connection URL. + +More information about Azure Storage Queues: +https://azure.microsoft.com/en-us/services/storage/queues/ + +""" +from __future__ import absolute_import, unicode_literals + +import string + +from kombu.five import Empty, text_t +from kombu.utils.encoding import safe_str +from kombu.utils.json import loads, dumps +from kombu.utils.objects import cached_property + +from . import virtual + +try: + from azure.storage.queue import QueueService +except ImportError: # pragma: no cover + QueueService = None # noqa + +# Azure storage queues allow only alphanumeric and dashes +# so, replace everything with a dash +CHARS_REPLACE_TABLE = { + ord(c): 0x2d for c in string.punctuation +} + + +class Channel(virtual.Channel): + """Azure Storage Queues channel.""" + + domain_format = 'kombu%(vhost)s' + _queue_service = None + _queue_name_cache = {} + no_ack = True + _noack_queues = set() + + def __init__(self, *args, **kwargs): + if QueueService is None: + raise ImportError('Azure Storage Queues transport requires the ' + 'azure-storage-queue library') + + super(Channel, self).__init__(*args, **kwargs) + + for queue_name in self.queue_service.list_queues(): + self._queue_name_cache[queue_name] = queue_name + + def basic_consume(self, queue, no_ack, *args, **kwargs): + if no_ack: + self._noack_queues.add(queue) + + return super(Channel, self).basic_consume(queue, no_ack, + *args, **kwargs) + + def entity_name(self, name, table=CHARS_REPLACE_TABLE): + """Format AMQP queue name into a valid Azure Storage Queue name.""" + return text_t(safe_str(name)).translate(table) + + def _ensure_queue(self, queue): + """Ensure a queue exists.""" + queue = self.entity_name(self.queue_name_prefix + queue) + try: + return self._queue_name_cache[queue] + except KeyError: + self.queue_service.create_queue(queue, fail_on_exist=False) + q = self._queue_name_cache[queue] = queue + return q + + def _delete(self, queue, *args, **kwargs): + """Delete queue by name.""" + queue_name = self.entity_name(queue) + self._queue_name_cache.pop(queue_name, None) + self.queue_service.delete_queue(queue_name) + super(Channel, self)._delete(queue_name) + + def _put(self, queue, message, **kwargs): + """Put message onto queue.""" + q = self._ensure_queue(queue) + encoded_message = dumps(message) + self.queue_service.put_message(q, encoded_message) + + def _get(self, queue, timeout=None): + """Try to retrieve a single message off ``queue``.""" + q = self._ensure_queue(queue) + + messages = self.queue_service.get_messages(q, num_messages=1, + timeout=timeout) + if not messages: + raise Empty() + + message = messages[0] + raw_content = self.queue_service.decode_function(message.content) + content = loads(raw_content) + + self.queue_service.delete_message(q, message.id, message.pop_receipt) + + return content + + def _size(self, queue): + """Return the number of messages in a queue.""" + q = self._ensure_queue(queue) + metadata = self.queue_service.get_queue_metadata(q) + return metadata.approximate_message_count + + def _purge(self, queue): + """Delete all current messages in a queue.""" + q = self._ensure_queue(queue) + n = self._size(q) + self.queue_service.clear_messages(q) + return n + + @property + def queue_service(self): + if self._queue_service is None: + self._queue_service = QueueService( + account_name=self.conninfo.hostname, + account_key=self.conninfo.password) + + return self._queue_service + + @property + def conninfo(self): + return self.connection.client + + @property + def transport_options(self): + return self.connection.client.transport_options + + @cached_property + def queue_name_prefix(self): + return self.transport_options.get('queue_name_prefix', '') + + +class Transport(virtual.Transport): + """Azure Storage Queues transport.""" + + Channel = Channel + + polling_interval = 1 + default_port = None diff --git a/requirements/extras/azureservicebus.txt b/requirements/extras/azureservicebus.txt new file mode 100644 index 00000000..8f6f15ce --- /dev/null +++ b/requirements/extras/azureservicebus.txt @@ -0,0 +1 @@ +azure-servicebus>=0.21.1 diff --git a/requirements/extras/azurestoragequeues.txt b/requirements/extras/azurestoragequeues.txt new file mode 100644 index 00000000..2424ee7e --- /dev/null +++ b/requirements/extras/azurestoragequeues.txt @@ -0,0 +1 @@ +azure-storage-queue diff --git a/requirements/test-ci.txt b/requirements/test-ci.txt index 4bb72214..65b43895 100644 --- a/requirements/test-ci.txt +++ b/requirements/test-ci.txt @@ -3,6 +3,8 @@ codecov redis PyYAML msgpack-python>0.2.0 +-r extras/azureservicebus.txt +-r extras/azurestoragequeues.txt -r extras/sqs.txt -r extras/consul.txt -r extras/librabbitmq.txt diff --git a/setup.py b/setup.py index 77c35913..dac04d7e 100644 --- a/setup.py +++ b/setup.py @@ -130,6 +130,8 @@ setup( 'librabbitmq': extras('librabbitmq.txt'), 'pyro': extras('pyro.txt'), 'slmq': extras('slmq.txt'), + 'azurestoragequeues': extras('azurestoragequeues.txt'), + 'azureservicebus': extras('azureservicebus.txt'), 'qpid': extras('qpid.txt'), 'consul': extras('consul.txt'), }, diff --git a/t/integration/tests/test_azureservicebus.py b/t/integration/tests/test_azureservicebus.py new file mode 100644 index 00000000..98dc7340 --- /dev/null +++ b/t/integration/tests/test_azureservicebus.py @@ -0,0 +1,12 @@ +from __future__ import absolute_import, unicode_literals + +from t.integration import transport + +from case import skip + + +@skip.unless_module('azure.servicebus') +class test_azureservicebus(transport.TransportCase): + transport = 'azureservicebus' + prefix = 'azureservicebus' + message_size_limit = 32000 diff --git a/t/integration/tests/test_azurestoragequeues.py b/t/integration/tests/test_azurestoragequeues.py new file mode 100644 index 00000000..eb66c6be --- /dev/null +++ b/t/integration/tests/test_azurestoragequeues.py @@ -0,0 +1,16 @@ +from __future__ import absolute_import, unicode_literals + +from t.integration import transport + +from case import skip + + +@skip.unless_module('azure.storage.queue') +class test_azurestoragequeues(transport.TransportCase): + transport = 'azurestoragequeues' + prefix = 'azurestoragequeues' + event_loop_max = 100 + message_size_limit = 32000 + reliable_purge = False + #: does not guarantee FIFO order, even in simple cases. + suppress_disorder_warning = True