From 2e9856023c06e8bd8866608c735705e2835aacc0 Mon Sep 17 00:00:00 2001 From: gal cohen Date: Tue, 2 Mar 2021 12:22:47 +0200 Subject: [PATCH] SQS back-off policy (#1301) * sqs retry policy * add test * method definition * add validation * rename policy * add kombu doc * improve docstring * add test and doc * test fix * test fixes * add doc * Update docs/reference/kombu.transport.SQS.rst Co-authored-by: Omer Katz * Update docs/reference/kombu.transport.SQS.rst Co-authored-by: Omer Katz * Update kombu/transport/SQS.py Co-authored-by: Omer Katz * Update kombu/transport/SQS.py Co-authored-by: Omer Katz * Update kombu/transport/SQS.py Co-authored-by: Omer Katz * review improvements * improvements * add improvements * rename Co-authored-by: galcohen Co-authored-by: Omer Katz Co-authored-by: Asif Saif Uddin --- docs/reference/kombu.transport.SQS.rst | 40 +++++++++++++++++ kombu/asynchronous/aws/sqs/connection.py | 2 +- kombu/transport/SQS.py | 50 +++++++++++++++++++++ t/unit/transport/test_SQS.py | 57 ++++++++++++++++++++++++ 4 files changed, 148 insertions(+), 1 deletion(-) diff --git a/docs/reference/kombu.transport.SQS.rst b/docs/reference/kombu.transport.SQS.rst index f9abcfae..87e11d28 100644 --- a/docs/reference/kombu.transport.SQS.rst +++ b/docs/reference/kombu.transport.SQS.rst @@ -22,3 +22,43 @@ .. autoclass:: Channel :members: :undoc-members: + +Back-off policy +------------------------ +Back-off policy is using SQS visibility timeout mechanism altering the time difference between task retries. +The number of retries is managed by SQS (specifically by the ``ApproximateReceiveCount`` message attribute) and no further action is required by the user. + +Configuring the queues and backoff policy:: + + broker_transport_options = { + 'predefined_queues': { + 'my-q': { + 'url': 'https://ap-southeast-2.queue.amazonaws.com/123456/my-q', + 'access_key_id': 'xxx', + 'secret_access_key': 'xxx', + 'backoff_policy': {1: 10, 2: 20, 3: 40, 4: 80, 5: 320, 6: 640}, + 'backoff_tasks': ['svc.tasks.tasks.task1'] + } + } + } + + +``backoff_policy`` dictionary where key is number of retries, and value is delay seconds between retries (i.e +SQS visibility timeout) +``backoff_tasks`` list of task names to apply the above policy + +The above policy: + ++-----------------------------------------+--------------------------------------------+ +| **Attempt** | **Delay** | ++-----------------------------------------+--------------------------------------------+ +| ``2nd attempt`` | 20 seconds | ++-----------------------------------------+--------------------------------------------+ +| ``3rd attempt`` | 40 seconds | ++-----------------------------------------+--------------------------------------------+ +| ``4th attempt`` | 80 seconds | ++-----------------------------------------+--------------------------------------------+ +| ``5th attempt`` | 320 seconds | ++-----------------------------------------+--------------------------------------------+ +| ``6th attempt`` | 640 seconds | ++-----------------------------------------+--------------------------------------------+ diff --git a/kombu/asynchronous/aws/sqs/connection.py b/kombu/asynchronous/aws/sqs/connection.py index 1235fe16..ecaac41f 100644 --- a/kombu/asynchronous/aws/sqs/connection.py +++ b/kombu/asynchronous/aws/sqs/connection.py @@ -58,7 +58,7 @@ class AsyncSQSConnection(AsyncAWSQueryConnection): def receive_message(self, queue, queue_url, number_messages=1, visibility_timeout=None, - attributes=None, wait_time_seconds=None, + attributes=('ApproximateReceiveCount',), wait_time_seconds=None, callback=None): params = {'MaxNumberOfMessages': number_messages} if visibility_timeout: diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index 2a2b8ccb..d6c8b51f 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -56,14 +56,21 @@ exist in AWS) you can tell this transport about them as follows: 'url': 'https://sqs.us-east-1.amazonaws.com/xxx/aaa', 'access_key_id': 'a', 'secret_access_key': 'b', + 'backoff_policy': {1: 10, 2: 20, 3: 40, 4: 80, 5: 320, 6: 640}, # optional + 'backoff_tasks': ['svc.tasks.tasks.task1'] # optional }, 'queue-2': { 'url': 'https://sqs.us-east-1.amazonaws.com/xxx/bbb', 'access_key_id': 'c', 'secret_access_key': 'd', + 'backoff_policy': {1: 10, 2: 20, 3: 40, 4: 80, 5: 320, 6: 640}, # optional + 'backoff_tasks': ['svc.tasks.tasks.task2'] # optional }, } } + +backoff_policy & backoff_tasks are optional arguments. These arguments automatically change the message visibility timeout, in order to have different times between specific task +retries. This would apply after task failure. If you authenticate using Okta_ (e.g. calling |gac|_), you can also specify a 'session_token' to connect to a queue. Note that those tokens have a @@ -148,6 +155,48 @@ class UndefinedQueueException(Exception): """Predefined queues are being used and an undefined queue was used.""" +class QoS(virtual.QoS): + def reject(self, delivery_tag, requeue=False): + super().reject(delivery_tag, requeue=requeue) + routing_key, message, backoff_tasks, backoff_policy = self._extract_backoff_policy_configuration_and_message(delivery_tag) + if routing_key and message and backoff_tasks and backoff_policy: + self.apply_backoff_policy(routing_key, delivery_tag, backoff_policy, backoff_tasks) + + def _extract_backoff_policy_configuration_and_message(self, delivery_tag): + try: + message = self._delivered[delivery_tag] + routing_key = message.delivery_info['routing_key'] + except KeyError: + return None, None, None, None + if not routing_key or not message: + return None, None, None, None + queue_config = self.channel.predefined_queues.get(routing_key, {}) + backoff_tasks = queue_config.get('backoff_tasks') + backoff_policy = queue_config.get('backoff_policy') + return routing_key, message, backoff_tasks, backoff_policy + + def apply_backoff_policy(self, routing_key, delivery_tag, backoff_policy, backoff_tasks): + queue_url = self.channel._queue_cache[routing_key] + task_name, number_of_retries = self.extract_task_name_and_number_of_retries(delivery_tag) + if not task_name or not number_of_retries: + return None + policy_value = backoff_policy.get(number_of_retries) + if task_name in backoff_tasks and policy_value is not None: + c = self.channel.sqs(routing_key) + c.change_message_visibility( + QueueUrl=queue_url, + ReceiptHandle=delivery_tag, + VisibilityTimeout=policy_value + ) + + @staticmethod + def extract_task_name_and_number_of_retries(message): + message_headers = message.headers + task_name = message_headers['task'] + number_of_retries = int(message.properties['delivery_info']['sqs_message']['Attributes']['ApproximateReceiveCount']) + return task_name, number_of_retries + + class Channel(virtual.Channel): """SQS Channel.""" @@ -161,6 +210,7 @@ class Channel(virtual.Channel): _predefined_queue_clients = {} # A client for each predefined queue _queue_cache = {} _noack_queues = set() + QoS = QoS def __init__(self, *args, **kwargs): if boto3 is None: diff --git a/t/unit/transport/test_SQS.py b/t/unit/transport/test_SQS.py index 253f19af..12bc81e9 100644 --- a/t/unit/transport/test_SQS.py +++ b/t/unit/transport/test_SQS.py @@ -30,6 +30,8 @@ example_predefined_queues = { 'url': 'https://sqs.us-east-1.amazonaws.com/xxx/queue-1', 'access_key_id': 'a', 'secret_access_key': 'b', + 'backoff_tasks': ['svc.tasks.tasks.task1'], + 'backoff_policy': {1: 10, 2: 20, 3: 40, 4: 80, 5: 320, 6: 640} }, 'queue-2': { 'url': 'https://sqs.us-east-1.amazonaws.com/xxx/queue-2', @@ -648,3 +650,58 @@ class test_Channel: channel.connection._deliver.assert_called() assert len(channel.sqs(queue_name)._queues[queue_name].messages) == 0 + + def test_predefined_queues_backoff_policy(self): + connection = Connection(transport=SQS.Transport, transport_options={ + 'predefined_queues': example_predefined_queues, + }) + channel = connection.channel() + + def apply_backoff_policy(queue_name, delivery_tag, retry_policy, backoff_tasks): + return None + + mock_apply_policy = Mock(side_effect=apply_backoff_policy) + channel.qos.apply_backoff_policy = mock_apply_policy + queue_name = "queue-1" + + exchange = Exchange('test_SQS', type='direct') + queue = Queue(queue_name, exchange, queue_name) + queue(channel).declare() + + message_mock = Mock() + message_mock.delivery_info = {'routing_key': queue_name} + channel.qos._delivered['test_message_id'] = message_mock + channel.qos.reject('test_message_id') + mock_apply_policy.assert_called_once_with('queue-1', 'test_message_id', + {1: 10, 2: 20, 3: 40, 4: 80, 5: 320, 6: 640}, ['svc.tasks.tasks.task1']) + + def test_predefined_queues_change_visibility_timeout(self): + connection = Connection(transport=SQS.Transport, transport_options={ + 'predefined_queues': example_predefined_queues, + }) + channel = connection.channel() + + def extract_task_name_and_number_of_retries(delivery_tag): + return 'svc.tasks.tasks.task1', 2 + + mock_extract_task_name_and_number_of_retries = Mock(side_effect=extract_task_name_and_number_of_retries) + channel.qos.extract_task_name_and_number_of_retries = mock_extract_task_name_and_number_of_retries + + queue_name = "queue-1" + + exchange = Exchange('test_SQS', type='direct') + queue = Queue(queue_name, exchange, queue_name) + queue(channel).declare() + + message_mock = Mock() + message_mock.delivery_info = {'routing_key': queue_name} + channel.qos._delivered['test_message_id'] = message_mock + + channel.sqs = Mock() + sqs_queue_mock = Mock() + channel.sqs.return_value = sqs_queue_mock + channel.qos.reject('test_message_id') + + sqs_queue_mock.change_message_visibility.assert_called_once_with(QueueUrl='https://sqs.us-east-1.amazonaws.com/xxx/queue-1', + ReceiptHandle='test_message_id', + VisibilityTimeout=20)