Add transports based on Azure PaaS (#891)

* Add transports based on Azure PaaS

This pull request adds two new transport implementations:

- `azurestoragequeues` is implemented on top of Azure Storage
  Queues [1]. This offers a simple but scalable and low-cost PaaS
  transport for Celery users in Azure. The transport is intended to be
  used in conjunction with the Azure Block Blob Storage backend [2].

- `azureservicebus` is implemented on top of Azure Service Bus [3] and
  offers PaaS support for more demanding Celery workloads in Azure. The
  transport is intended to be used in conjunction with the Azure
  CosmosDB backend [4].

This pull request was created together with @ankurokok, @dkisselev,
@evandropaula, @martinpeck and @michaelperel.

[1] https://azure.microsoft.com/en-us/services/storage/queues/
[2] https://github.com/celery/celery/pull/4685
[3] https://azure.microsoft.com/en-us/services/service-bus/
[4] https://github.com/celery/celery/pull/4720

* Exclude Azure transports from code coverage

There is test coverage for the transports but the tests require Azure
credentials to run (passed via environment variables) so codecov doesn't
exercise them.

* Remove env vars to configure transport

* Remove abbreviations
This commit is contained in:
Clemens Wolff 2018-08-30 12:34:52 -05:00 committed by Asif Saifuddin Auvi
parent 200a60a228
commit 32633554ac
14 changed files with 390 additions and 1 deletions

View File

@ -25,6 +25,7 @@ omit =
*kombu/transport/zmq.py
*kombu/transport/django.py
*kombu/transport/pyro.py
*kombu/transport/azure*
*kombu/transport/qpid*
exclude_lines =
pragma: no cover

View File

@ -35,7 +35,8 @@ Features
* Virtual transports makes it really easy to add support for non-AMQP
transports. There is already built-in support for `Redis`_,
`Amazon SQS`_, `ZooKeeper`_, `SoftLayer MQ`_ and `Pyro`_.
`Amazon SQS`_, `Azure Storage Queues`_, `Azure Service Bus`_,
`ZooKeeper`_, `SoftLayer MQ`_ and `Pyro`_.
* In-memory transport for unit testing.
@ -62,6 +63,8 @@ and the `Wikipedia article about AMQP`_.
.. _`qpid-python`: https://pypi.org/project/qpid-python/
.. _`Redis`: https://redis.io/
.. _`Amazon SQS`: https://aws.amazon.com/sqs/
.. _`Azure Storage Queues`: https://azure.microsoft.com/en-us/services/storage/queues/
.. _`Azure Service Bus`: https://azure.microsoft.com/en-us/services/service-bus/
.. _`Zookeeper`: https://zookeeper.apache.org/
.. _`Rabbits and warrens`: http://web.archive.org/web/20160323134044/http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/
.. _`amqplib`: http://barryp.org/software/py-amqplib/

View File

@ -38,6 +38,8 @@
kombu.asynchronous.aws.sqs.message
kombu.asynchronous.aws.sqs.queue
kombu.transport
kombu.transport.azurestoragequeues
kombu.transport.azureservicebus
kombu.transport.pyamqp
kombu.transport.librabbitmq
kombu.transport.qpid

View File

@ -0,0 +1,24 @@
==================================================================
Azure Service Bus Transport - ``kombu.transport.azureservicebus``
==================================================================
.. currentmodule:: kombu.transport.azureservicebus
.. automodule:: kombu.transport.azureservicebus
.. contents::
:local:
Transport
---------
.. autoclass:: Transport
:members:
:undoc-members:
Channel
-------
.. autoclass:: Channel
:members:
:undoc-members:

View File

@ -0,0 +1,24 @@
========================================================================
Azure Storage Queues Transport - ``kombu.transport.azurestoragequeues``
========================================================================
.. currentmodule:: kombu.transport.azurestoragequeues
.. automodule:: kombu.transport.azurestoragequeues
.. contents::
:local:
Transport
---------
.. autoclass:: Transport
:members:
:undoc-members:
Channel
-------
.. autoclass:: Channel
:members:
:undoc-members:

View File

@ -38,6 +38,8 @@ TRANSPORT_ALIASES = {
'sentinel': 'kombu.transport.redis:SentinelTransport',
'consul': 'kombu.transport.consul:Transport',
'etcd': 'kombu.transport.etcd:Transport',
'azurestoragequeues': 'kombu.transport.azurestoragequeues:Transport',
'azureservicebus': 'kombu.transport.azureservicebus:Transport'
}
_transport_cache = {}

View File

@ -0,0 +1,149 @@
"""Azure Service Bus Message Queue transport.
The transport can be enabled by setting the CELERY_BROKER_URL to:
```
azureservicebus://{SAS policy name}:{SAS key}@{Service Bus Namespace}
```
Note that the Shared Access Policy used to connect to Azure Service Bus
requires Manage, Send and Listen claims since the broker will create new
queues and delete old queues as required.
Note that if the SAS key for the Service Bus account contains a slash, it will
have to be regenerated before it can be used in the connection URL.
More information about Azure Service Bus:
https://azure.microsoft.com/en-us/services/service-bus/
"""
from __future__ import absolute_import, unicode_literals
import string
from kombu.five import Empty, text_t
from kombu.utils.encoding import bytes_to_str, safe_str
from kombu.utils.json import loads, dumps
from kombu.utils.objects import cached_property
from . import virtual
try:
from azure.servicebus import ServiceBusService, Message, Queue
except ImportError:
ServiceBusService = Message = Queue = None
# dots are replaced by dash, all other punctuation replaced by underscore.
CHARS_REPLACE_TABLE = {
ord(c): 0x5f for c in string.punctuation if c not in '_'
}
class Channel(virtual.Channel):
"""Azure Service Bus channel."""
default_visibility_timeout = 1800 # 30 minutes.
domain_format = 'kombu%(vhost)s'
_queue_service = None
_queue_cache = {}
def __init__(self, *args, **kwargs):
if ServiceBusService is None:
raise ImportError('Azure Service Bus transport requires the '
'azure-servicebus library')
super(Channel, self).__init__(*args, **kwargs)
for queue in self.queue_service.list_queues():
self._queue_cache[queue] = queue
def entity_name(self, name, table=CHARS_REPLACE_TABLE):
"""Format AMQP queue name into a valid ServiceBus queue name."""
return text_t(safe_str(name)).translate(table)
def _new_queue(self, queue, **kwargs):
"""Ensure a queue exists in ServiceBus."""
queue = self.entity_name(self.queue_name_prefix + queue)
try:
return self._queue_cache[queue]
except KeyError:
self.queue_service.create_queue(queue, fail_on_exist=False)
q = self._queue_cache[queue] = self.queue_service.get_queue(queue)
return q
def _delete(self, queue, *args, **kwargs):
"""Delete queue by name."""
queue_name = self.entity_name(queue)
self._queue_cache.pop(queue_name, None)
self.queue_service.delete_queue(queue_name)
super(Channel, self)._delete(queue_name)
def _put(self, queue, message, **kwargs):
"""Put message onto queue."""
msg = Message(dumps(message))
self.queue_service.send_queue_message(self.entity_name(queue), msg)
def _get(self, queue, timeout=None):
"""Try to retrieve a single message off ``queue``."""
message = self.queue_service.receive_queue_message(
self.entity_name(queue), timeout=timeout, peek_lock=False)
if message.body is None:
raise Empty()
return loads(bytes_to_str(message.body))
def _size(self, queue):
"""Return the number of messages in a queue."""
return self._new_queue(queue).message_count
def _purge(self, queue):
"""Delete all current messages in a queue."""
n = 0
while True:
message = self.queue_service.read_delete_queue_message(
self.entity_name(queue), timeout=0.1)
if not message.body:
break
else:
n += 1
return n
@property
def queue_service(self):
if self._queue_service is None:
self._queue_service = ServiceBusService(
service_namespace=self.conninfo.hostname,
shared_access_key_name=self.conninfo.userid,
shared_access_key_value=self.conninfo.password)
return self._queue_service
@property
def conninfo(self):
return self.connection.client
@property
def transport_options(self):
return self.connection.client.transport_options
@cached_property
def visibility_timeout(self):
return (self.transport_options.get('visibility_timeout') or
self.default_visibility_timeout)
@cached_property
def queue_name_prefix(self):
return self.transport_options.get('queue_name_prefix', '')
class Transport(virtual.Transport):
"""Azure Service Bus transport."""
Channel = Channel
polling_interval = 1
default_port = None

View File

@ -0,0 +1,150 @@
"""Azure Storage Queues transport.
The transport can be enabled by setting the CELERY_BROKER_URL to:
```
azurestoragequeues://:{Storage Account Access Key}@{Storage Account Name}
```
Note that if the access key for the storage account contains a slash, it will
have to be regenerated before it can be used in the connection URL.
More information about Azure Storage Queues:
https://azure.microsoft.com/en-us/services/storage/queues/
"""
from __future__ import absolute_import, unicode_literals
import string
from kombu.five import Empty, text_t
from kombu.utils.encoding import safe_str
from kombu.utils.json import loads, dumps
from kombu.utils.objects import cached_property
from . import virtual
try:
from azure.storage.queue import QueueService
except ImportError: # pragma: no cover
QueueService = None # noqa
# Azure storage queues allow only alphanumeric and dashes
# so, replace everything with a dash
CHARS_REPLACE_TABLE = {
ord(c): 0x2d for c in string.punctuation
}
class Channel(virtual.Channel):
"""Azure Storage Queues channel."""
domain_format = 'kombu%(vhost)s'
_queue_service = None
_queue_name_cache = {}
no_ack = True
_noack_queues = set()
def __init__(self, *args, **kwargs):
if QueueService is None:
raise ImportError('Azure Storage Queues transport requires the '
'azure-storage-queue library')
super(Channel, self).__init__(*args, **kwargs)
for queue_name in self.queue_service.list_queues():
self._queue_name_cache[queue_name] = queue_name
def basic_consume(self, queue, no_ack, *args, **kwargs):
if no_ack:
self._noack_queues.add(queue)
return super(Channel, self).basic_consume(queue, no_ack,
*args, **kwargs)
def entity_name(self, name, table=CHARS_REPLACE_TABLE):
"""Format AMQP queue name into a valid Azure Storage Queue name."""
return text_t(safe_str(name)).translate(table)
def _ensure_queue(self, queue):
"""Ensure a queue exists."""
queue = self.entity_name(self.queue_name_prefix + queue)
try:
return self._queue_name_cache[queue]
except KeyError:
self.queue_service.create_queue(queue, fail_on_exist=False)
q = self._queue_name_cache[queue] = queue
return q
def _delete(self, queue, *args, **kwargs):
"""Delete queue by name."""
queue_name = self.entity_name(queue)
self._queue_name_cache.pop(queue_name, None)
self.queue_service.delete_queue(queue_name)
super(Channel, self)._delete(queue_name)
def _put(self, queue, message, **kwargs):
"""Put message onto queue."""
q = self._ensure_queue(queue)
encoded_message = dumps(message)
self.queue_service.put_message(q, encoded_message)
def _get(self, queue, timeout=None):
"""Try to retrieve a single message off ``queue``."""
q = self._ensure_queue(queue)
messages = self.queue_service.get_messages(q, num_messages=1,
timeout=timeout)
if not messages:
raise Empty()
message = messages[0]
raw_content = self.queue_service.decode_function(message.content)
content = loads(raw_content)
self.queue_service.delete_message(q, message.id, message.pop_receipt)
return content
def _size(self, queue):
"""Return the number of messages in a queue."""
q = self._ensure_queue(queue)
metadata = self.queue_service.get_queue_metadata(q)
return metadata.approximate_message_count
def _purge(self, queue):
"""Delete all current messages in a queue."""
q = self._ensure_queue(queue)
n = self._size(q)
self.queue_service.clear_messages(q)
return n
@property
def queue_service(self):
if self._queue_service is None:
self._queue_service = QueueService(
account_name=self.conninfo.hostname,
account_key=self.conninfo.password)
return self._queue_service
@property
def conninfo(self):
return self.connection.client
@property
def transport_options(self):
return self.connection.client.transport_options
@cached_property
def queue_name_prefix(self):
return self.transport_options.get('queue_name_prefix', '')
class Transport(virtual.Transport):
"""Azure Storage Queues transport."""
Channel = Channel
polling_interval = 1
default_port = None

View File

@ -0,0 +1 @@
azure-servicebus>=0.21.1

View File

@ -0,0 +1 @@
azure-storage-queue

View File

@ -3,6 +3,8 @@ codecov
redis
PyYAML
msgpack-python>0.2.0
-r extras/azureservicebus.txt
-r extras/azurestoragequeues.txt
-r extras/sqs.txt
-r extras/consul.txt
-r extras/librabbitmq.txt

View File

@ -130,6 +130,8 @@ setup(
'librabbitmq': extras('librabbitmq.txt'),
'pyro': extras('pyro.txt'),
'slmq': extras('slmq.txt'),
'azurestoragequeues': extras('azurestoragequeues.txt'),
'azureservicebus': extras('azureservicebus.txt'),
'qpid': extras('qpid.txt'),
'consul': extras('consul.txt'),
},

View File

@ -0,0 +1,12 @@
from __future__ import absolute_import, unicode_literals
from t.integration import transport
from case import skip
@skip.unless_module('azure.servicebus')
class test_azureservicebus(transport.TransportCase):
transport = 'azureservicebus'
prefix = 'azureservicebus'
message_size_limit = 32000

View File

@ -0,0 +1,16 @@
from __future__ import absolute_import, unicode_literals
from t.integration import transport
from case import skip
@skip.unless_module('azure.storage.queue')
class test_azurestoragequeues(transport.TransportCase):
transport = 'azurestoragequeues'
prefix = 'azurestoragequeues'
event_loop_max = 100
message_size_limit = 32000
reliable_purge = False
#: does not guarantee FIFO order, even in simple cases.
suppress_disorder_warning = True