mirror of https://github.com/celery/kombu.git
add managed identity support to azure storage queue (#1631)
* add managed identity support to azure storage queue * flake8 fixes
This commit is contained in:
parent
7787696737
commit
4fac6bc84c
|
@ -57,7 +57,7 @@ from __future__ import annotations
|
|||
|
||||
import string
|
||||
from queue import Empty
|
||||
from typing import Any, Dict, Set
|
||||
from typing import Any
|
||||
|
||||
import azure.core.exceptions
|
||||
import azure.servicebus.exceptions
|
||||
|
|
|
@ -15,15 +15,34 @@ Features
|
|||
Connection String
|
||||
=================
|
||||
|
||||
Connection string has the following format:
|
||||
Connection string has the following formats:
|
||||
|
||||
.. code-block::
|
||||
|
||||
azurestoragequeues://STORAGE_ACCOUNT_ACCESS_KEY@STORAGE_ACCOUNT_URL
|
||||
azurestoragequeues://SAS_TOKEN@STORAGE_ACCOUNT_URL
|
||||
azurestoragequeues://<STORAGE_ACCOUNT_ACCESS_KEY>@<STORAGE_ACCOUNT_URL>
|
||||
azurestoragequeues://<SAS_TOKEN>@<STORAGE_ACCOUNT_URL>
|
||||
azurestoragequeues://DefaultAzureCredential@<STORAGE_ACCOUNT_URL>
|
||||
azurestoragequeues://ManagedIdentityCredential@<STORAGE_ACCOUNT_URL>
|
||||
|
||||
Note that if the access key for the storage account contains a slash, it will
|
||||
have to be regenerated before it can be used in the connection URL.
|
||||
Note that if the access key for the storage account contains a forward slash
|
||||
(``/``), it will have to be regenerated before it can be used in the connection
|
||||
URL.
|
||||
|
||||
.. code-block::
|
||||
|
||||
azurestoragequeues://DefaultAzureCredential@<STORAGE_ACCOUNT_URL>
|
||||
azurestoragequeues://ManagedIdentityCredential@<STORAGE_ACCOUNT_URL>
|
||||
|
||||
If you wish to use an `Azure Managed Identity` you may use the
|
||||
``DefaultAzureCredential`` format of the connection string which will use
|
||||
``DefaultAzureCredential`` class in the azure-identity package. You may want to
|
||||
read the `azure-identity documentation` for more information on how the
|
||||
``DefaultAzureCredential`` works.
|
||||
|
||||
.. _azure-identity documentation:
|
||||
https://learn.microsoft.com/en-us/python/api/overview/azure/identity-readme?view=azure-python
|
||||
.. _Azure Managed Identity:
|
||||
https://learn.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/overview
|
||||
|
||||
Transport Options
|
||||
=================
|
||||
|
@ -49,6 +68,13 @@ try:
|
|||
except ImportError: # pragma: no cover
|
||||
QueueServiceClient = None
|
||||
|
||||
try:
|
||||
from azure.identity import (DefaultAzureCredential,
|
||||
ManagedIdentityCredential)
|
||||
except ImportError:
|
||||
DefaultAzureCredential = None
|
||||
ManagedIdentityCredential = None
|
||||
|
||||
# Azure storage queues allow only alphanumeric and dashes
|
||||
# so, replace everything with a dash
|
||||
CHARS_REPLACE_TABLE = {
|
||||
|
@ -180,8 +206,10 @@ class Transport(virtual.Transport):
|
|||
@staticmethod
|
||||
def parse_uri(uri: str) -> tuple[str | dict, str]:
|
||||
# URL like:
|
||||
# azurestoragequeues://STORAGE_ACCOUNT_ACCESS_KEY@STORAGE_ACCOUNT_URL
|
||||
# azurestoragequeues://SAS_TOKEN@STORAGE_ACCOUNT_URL
|
||||
# azurestoragequeues://<STORAGE_ACCOUNT_ACCESS_KEY>@<STORAGE_ACCOUNT_URL>
|
||||
# azurestoragequeues://<SAS_TOKEN>@<STORAGE_ACCOUNT_URL>
|
||||
# azurestoragequeues://DefaultAzureCredential@<STORAGE_ACCOUNT_URL>
|
||||
# azurestoragequeues://ManagedIdentityCredential@<STORAGE_ACCOUNT_URL>
|
||||
|
||||
# urllib parse does not work as the sas key could contain a slash
|
||||
# e.g.: azurestoragequeues://some/key@someurl
|
||||
|
@ -192,8 +220,20 @@ class Transport(virtual.Transport):
|
|||
# > 'some/key', 'url'
|
||||
credential, url = uri.rsplit('@', 1)
|
||||
|
||||
# parse credential as a dict if Azurite is being used
|
||||
if "devstoreaccount1" in url and ".core.windows.net" not in url:
|
||||
if "DefaultAzureCredential".lower() == credential.lower():
|
||||
if DefaultAzureCredential is None:
|
||||
raise ImportError('Azure Storage Queues transport with a '
|
||||
'DefaultAzureCredential requires the '
|
||||
'azure-identity library')
|
||||
credential = DefaultAzureCredential()
|
||||
elif "ManagedIdentityCredential".lower() == credential.lower():
|
||||
if ManagedIdentityCredential is None:
|
||||
raise ImportError('Azure Storage Queues transport with a '
|
||||
'ManagedIdentityCredential requires the '
|
||||
'azure-identity library')
|
||||
credential = ManagedIdentityCredential()
|
||||
elif "devstoreaccount1" in url and ".core.windows.net" not in url:
|
||||
# parse credential as a dict if Azurite is being used
|
||||
credential = {
|
||||
"account_name": "devstoreaccount1",
|
||||
"account_key": credential,
|
||||
|
@ -204,7 +244,10 @@ class Transport(virtual.Transport):
|
|||
except Exception:
|
||||
raise ValueError(
|
||||
'Need a URI like '
|
||||
'azurestoragequeues://{SAS or access key}@{URL}'
|
||||
'azurestoragequeues://{SAS or access key}@{URL}, '
|
||||
'azurestoragequeues://DefaultAzureCredential@{URL}, '
|
||||
', or '
|
||||
'azurestoragequeues://ManagedIdentityCredential@{URL}'
|
||||
)
|
||||
|
||||
return credential, url
|
||||
|
|
|
@ -1 +1,2 @@
|
|||
azure-storage-queue>=12.2.0
|
||||
azure-identity>=1.12.0
|
||||
|
|
|
@ -3,6 +3,7 @@ from __future__ import annotations
|
|||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
from azure.identity import DefaultAzureCredential, ManagedIdentityCredential
|
||||
|
||||
from kombu import Connection
|
||||
|
||||
|
@ -13,6 +14,8 @@ URL_NOCREDS = 'azurestoragequeues://'
|
|||
URL_CREDS = 'azurestoragequeues://sas/key%@https://STORAGE_ACCOUNT_NAME.queue.core.windows.net/' # noqa
|
||||
AZURITE_CREDS = 'azurestoragequeues://Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==@http://localhost:10001/devstoreaccount1' # noqa
|
||||
AZURITE_CREDS_DOCKER_COMPOSE = 'azurestoragequeues://Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==@http://azurite:10001/devstoreaccount1' # noqa
|
||||
DEFAULT_AZURE_URL_CREDS = 'azurestoragequeues://DefaultAzureCredential@https://STORAGE_ACCOUNT_NAME.queue.core.windows.net/' # noqa
|
||||
MANAGED_IDENTITY_URL_CREDS = 'azurestoragequeues://ManagedIdentityCredential@https://STORAGE_ACCOUNT_NAME.queue.core.windows.net/' # noqa
|
||||
|
||||
|
||||
def test_queue_service_nocredentials():
|
||||
|
@ -52,3 +55,31 @@ def test_queue_service_works_for_azurite(creds, hostname):
|
|||
'account_key': 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==' # noqa
|
||||
}
|
||||
assert channel._url == f'http://{hostname}:10001/devstoreaccount1' # noqa
|
||||
|
||||
|
||||
def test_queue_service_works_for_default_azure_credentials():
|
||||
conn = Connection(
|
||||
DEFAULT_AZURE_URL_CREDS, transport=azurestoragequeues.Transport
|
||||
)
|
||||
with patch("kombu.transport.azurestoragequeues.QueueServiceClient"):
|
||||
channel = conn.channel()
|
||||
|
||||
assert isinstance(channel._credential, DefaultAzureCredential)
|
||||
assert (
|
||||
channel._url
|
||||
== "https://STORAGE_ACCOUNT_NAME.queue.core.windows.net/"
|
||||
)
|
||||
|
||||
|
||||
def test_queue_service_works_for_managed_identity_credentials():
|
||||
conn = Connection(
|
||||
MANAGED_IDENTITY_URL_CREDS, transport=azurestoragequeues.Transport
|
||||
)
|
||||
with patch("kombu.transport.azurestoragequeues.QueueServiceClient"):
|
||||
channel = conn.channel()
|
||||
|
||||
assert isinstance(channel._credential, ManagedIdentityCredential)
|
||||
assert (
|
||||
channel._url
|
||||
== "https://STORAGE_ACCOUNT_NAME.queue.core.windows.net/"
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue