mirror of https://github.com/celery/kombu.git
SQS back-off policy (#1301)
* sqs retry policy * add test * method definition * add validation * rename policy * add kombu doc * improve docstring * add test and doc * test fix * test fixes * add doc * Update docs/reference/kombu.transport.SQS.rst Co-authored-by: Omer Katz <omer.drow@gmail.com> * Update docs/reference/kombu.transport.SQS.rst Co-authored-by: Omer Katz <omer.drow@gmail.com> * Update kombu/transport/SQS.py Co-authored-by: Omer Katz <omer.drow@gmail.com> * Update kombu/transport/SQS.py Co-authored-by: Omer Katz <omer.drow@gmail.com> * Update kombu/transport/SQS.py Co-authored-by: Omer Katz <omer.drow@gmail.com> * review improvements * improvements * add improvements * rename Co-authored-by: galcohen <gal.cohen@autodesk.com> Co-authored-by: Omer Katz <omer.drow@gmail.com> Co-authored-by: Asif Saif Uddin <auvipy@gmail.com>
This commit is contained in:
parent
3e5263c481
commit
2e9856023c
|
@ -22,3 +22,43 @@
|
|||
.. autoclass:: Channel
|
||||
:members:
|
||||
:undoc-members:
|
||||
|
||||
Back-off policy
|
||||
------------------------
|
||||
Back-off policy is using SQS visibility timeout mechanism altering the time difference between task retries.
|
||||
The number of retries is managed by SQS (specifically by the ``ApproximateReceiveCount`` message attribute) and no further action is required by the user.
|
||||
|
||||
Configuring the queues and backoff policy::
|
||||
|
||||
broker_transport_options = {
|
||||
'predefined_queues': {
|
||||
'my-q': {
|
||||
'url': 'https://ap-southeast-2.queue.amazonaws.com/123456/my-q',
|
||||
'access_key_id': 'xxx',
|
||||
'secret_access_key': 'xxx',
|
||||
'backoff_policy': {1: 10, 2: 20, 3: 40, 4: 80, 5: 320, 6: 640},
|
||||
'backoff_tasks': ['svc.tasks.tasks.task1']
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
``backoff_policy`` dictionary where key is number of retries, and value is delay seconds between retries (i.e
|
||||
SQS visibility timeout)
|
||||
``backoff_tasks`` list of task names to apply the above policy
|
||||
|
||||
The above policy:
|
||||
|
||||
+-----------------------------------------+--------------------------------------------+
|
||||
| **Attempt** | **Delay** |
|
||||
+-----------------------------------------+--------------------------------------------+
|
||||
| ``2nd attempt`` | 20 seconds |
|
||||
+-----------------------------------------+--------------------------------------------+
|
||||
| ``3rd attempt`` | 40 seconds |
|
||||
+-----------------------------------------+--------------------------------------------+
|
||||
| ``4th attempt`` | 80 seconds |
|
||||
+-----------------------------------------+--------------------------------------------+
|
||||
| ``5th attempt`` | 320 seconds |
|
||||
+-----------------------------------------+--------------------------------------------+
|
||||
| ``6th attempt`` | 640 seconds |
|
||||
+-----------------------------------------+--------------------------------------------+
|
||||
|
|
|
@ -58,7 +58,7 @@ class AsyncSQSConnection(AsyncAWSQueryConnection):
|
|||
|
||||
def receive_message(self, queue, queue_url,
|
||||
number_messages=1, visibility_timeout=None,
|
||||
attributes=None, wait_time_seconds=None,
|
||||
attributes=('ApproximateReceiveCount',), wait_time_seconds=None,
|
||||
callback=None):
|
||||
params = {'MaxNumberOfMessages': number_messages}
|
||||
if visibility_timeout:
|
||||
|
|
|
@ -56,14 +56,21 @@ exist in AWS) you can tell this transport about them as follows:
|
|||
'url': 'https://sqs.us-east-1.amazonaws.com/xxx/aaa',
|
||||
'access_key_id': 'a',
|
||||
'secret_access_key': 'b',
|
||||
'backoff_policy': {1: 10, 2: 20, 3: 40, 4: 80, 5: 320, 6: 640}, # optional
|
||||
'backoff_tasks': ['svc.tasks.tasks.task1'] # optional
|
||||
},
|
||||
'queue-2': {
|
||||
'url': 'https://sqs.us-east-1.amazonaws.com/xxx/bbb',
|
||||
'access_key_id': 'c',
|
||||
'secret_access_key': 'd',
|
||||
'backoff_policy': {1: 10, 2: 20, 3: 40, 4: 80, 5: 320, 6: 640}, # optional
|
||||
'backoff_tasks': ['svc.tasks.tasks.task2'] # optional
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
backoff_policy & backoff_tasks are optional arguments. These arguments automatically change the message visibility timeout, in order to have different times between specific task
|
||||
retries. This would apply after task failure.
|
||||
|
||||
If you authenticate using Okta_ (e.g. calling |gac|_), you can also specify
|
||||
a 'session_token' to connect to a queue. Note that those tokens have a
|
||||
|
@ -148,6 +155,48 @@ class UndefinedQueueException(Exception):
|
|||
"""Predefined queues are being used and an undefined queue was used."""
|
||||
|
||||
|
||||
class QoS(virtual.QoS):
|
||||
def reject(self, delivery_tag, requeue=False):
|
||||
super().reject(delivery_tag, requeue=requeue)
|
||||
routing_key, message, backoff_tasks, backoff_policy = self._extract_backoff_policy_configuration_and_message(delivery_tag)
|
||||
if routing_key and message and backoff_tasks and backoff_policy:
|
||||
self.apply_backoff_policy(routing_key, delivery_tag, backoff_policy, backoff_tasks)
|
||||
|
||||
def _extract_backoff_policy_configuration_and_message(self, delivery_tag):
|
||||
try:
|
||||
message = self._delivered[delivery_tag]
|
||||
routing_key = message.delivery_info['routing_key']
|
||||
except KeyError:
|
||||
return None, None, None, None
|
||||
if not routing_key or not message:
|
||||
return None, None, None, None
|
||||
queue_config = self.channel.predefined_queues.get(routing_key, {})
|
||||
backoff_tasks = queue_config.get('backoff_tasks')
|
||||
backoff_policy = queue_config.get('backoff_policy')
|
||||
return routing_key, message, backoff_tasks, backoff_policy
|
||||
|
||||
def apply_backoff_policy(self, routing_key, delivery_tag, backoff_policy, backoff_tasks):
|
||||
queue_url = self.channel._queue_cache[routing_key]
|
||||
task_name, number_of_retries = self.extract_task_name_and_number_of_retries(delivery_tag)
|
||||
if not task_name or not number_of_retries:
|
||||
return None
|
||||
policy_value = backoff_policy.get(number_of_retries)
|
||||
if task_name in backoff_tasks and policy_value is not None:
|
||||
c = self.channel.sqs(routing_key)
|
||||
c.change_message_visibility(
|
||||
QueueUrl=queue_url,
|
||||
ReceiptHandle=delivery_tag,
|
||||
VisibilityTimeout=policy_value
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def extract_task_name_and_number_of_retries(message):
|
||||
message_headers = message.headers
|
||||
task_name = message_headers['task']
|
||||
number_of_retries = int(message.properties['delivery_info']['sqs_message']['Attributes']['ApproximateReceiveCount'])
|
||||
return task_name, number_of_retries
|
||||
|
||||
|
||||
class Channel(virtual.Channel):
|
||||
"""SQS Channel."""
|
||||
|
||||
|
@ -161,6 +210,7 @@ class Channel(virtual.Channel):
|
|||
_predefined_queue_clients = {} # A client for each predefined queue
|
||||
_queue_cache = {}
|
||||
_noack_queues = set()
|
||||
QoS = QoS
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
if boto3 is None:
|
||||
|
|
|
@ -30,6 +30,8 @@ example_predefined_queues = {
|
|||
'url': 'https://sqs.us-east-1.amazonaws.com/xxx/queue-1',
|
||||
'access_key_id': 'a',
|
||||
'secret_access_key': 'b',
|
||||
'backoff_tasks': ['svc.tasks.tasks.task1'],
|
||||
'backoff_policy': {1: 10, 2: 20, 3: 40, 4: 80, 5: 320, 6: 640}
|
||||
},
|
||||
'queue-2': {
|
||||
'url': 'https://sqs.us-east-1.amazonaws.com/xxx/queue-2',
|
||||
|
@ -648,3 +650,58 @@ class test_Channel:
|
|||
channel.connection._deliver.assert_called()
|
||||
|
||||
assert len(channel.sqs(queue_name)._queues[queue_name].messages) == 0
|
||||
|
||||
def test_predefined_queues_backoff_policy(self):
|
||||
connection = Connection(transport=SQS.Transport, transport_options={
|
||||
'predefined_queues': example_predefined_queues,
|
||||
})
|
||||
channel = connection.channel()
|
||||
|
||||
def apply_backoff_policy(queue_name, delivery_tag, retry_policy, backoff_tasks):
|
||||
return None
|
||||
|
||||
mock_apply_policy = Mock(side_effect=apply_backoff_policy)
|
||||
channel.qos.apply_backoff_policy = mock_apply_policy
|
||||
queue_name = "queue-1"
|
||||
|
||||
exchange = Exchange('test_SQS', type='direct')
|
||||
queue = Queue(queue_name, exchange, queue_name)
|
||||
queue(channel).declare()
|
||||
|
||||
message_mock = Mock()
|
||||
message_mock.delivery_info = {'routing_key': queue_name}
|
||||
channel.qos._delivered['test_message_id'] = message_mock
|
||||
channel.qos.reject('test_message_id')
|
||||
mock_apply_policy.assert_called_once_with('queue-1', 'test_message_id',
|
||||
{1: 10, 2: 20, 3: 40, 4: 80, 5: 320, 6: 640}, ['svc.tasks.tasks.task1'])
|
||||
|
||||
def test_predefined_queues_change_visibility_timeout(self):
|
||||
connection = Connection(transport=SQS.Transport, transport_options={
|
||||
'predefined_queues': example_predefined_queues,
|
||||
})
|
||||
channel = connection.channel()
|
||||
|
||||
def extract_task_name_and_number_of_retries(delivery_tag):
|
||||
return 'svc.tasks.tasks.task1', 2
|
||||
|
||||
mock_extract_task_name_and_number_of_retries = Mock(side_effect=extract_task_name_and_number_of_retries)
|
||||
channel.qos.extract_task_name_and_number_of_retries = mock_extract_task_name_and_number_of_retries
|
||||
|
||||
queue_name = "queue-1"
|
||||
|
||||
exchange = Exchange('test_SQS', type='direct')
|
||||
queue = Queue(queue_name, exchange, queue_name)
|
||||
queue(channel).declare()
|
||||
|
||||
message_mock = Mock()
|
||||
message_mock.delivery_info = {'routing_key': queue_name}
|
||||
channel.qos._delivered['test_message_id'] = message_mock
|
||||
|
||||
channel.sqs = Mock()
|
||||
sqs_queue_mock = Mock()
|
||||
channel.sqs.return_value = sqs_queue_mock
|
||||
channel.qos.reject('test_message_id')
|
||||
|
||||
sqs_queue_mock.change_message_visibility.assert_called_once_with(QueueUrl='https://sqs.us-east-1.amazonaws.com/xxx/queue-1',
|
||||
ReceiptHandle='test_message_id',
|
||||
VisibilityTimeout=20)
|
||||
|
|
Loading…
Reference in New Issue