From c18e9626e1dec56fa58cfe384403b25f34fe4473 Mon Sep 17 00:00:00 2001 From: flolas Date: Fri, 1 Sep 2023 08:44:44 -0400 Subject: [PATCH] raise access denied error when ack --- kombu/transport/SQS.py | 14 ++++++++++++- t/unit/transport/test_SQS.py | 40 ++++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index 8546fb0b..8485ce78 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -176,6 +176,14 @@ class InvalidQueueException(Exception): """Predefined queues are being used and configuration is not valid.""" +class AccessDeniedQueueException(Exception): + """Raised when access to the AWS queue is denied. + + This may occur if the permissions are not correctly set or the + credentials are invalid. + """ + + class QoS(virtual.QoS): """Quality of Service guarantees implementation for SQS.""" @@ -633,7 +641,11 @@ class Channel(virtual.Channel): QueueUrl=message['sqs_queue'], ReceiptHandle=sqs_message['ReceiptHandle'] ) - except ClientError: + except ClientError as exception: + if exception.response['Error']['Code'] == 'AccessDenied': + raise AccessDeniedQueueException( + exception.response["Error"]["Message"] + ) super().basic_reject(delivery_tag) else: super().basic_ack(delivery_tag) diff --git a/t/unit/transport/test_SQS.py b/t/unit/transport/test_SQS.py index 50cec89c..1663b656 100644 --- a/t/unit/transport/test_SQS.py +++ b/t/unit/transport/test_SQS.py @@ -646,6 +646,46 @@ class test_Channel: basic_reject_mock.assert_called_with(2) assert not basic_ack_mock.called + @patch('kombu.transport.virtual.base.Channel.basic_ack') + @patch('kombu.transport.virtual.base.Channel.basic_reject') + def test_basic_ack_access_denied(self, basic_reject_mock, basic_ack_mock): + """Test that basic_ack raises AccessDeniedQueueException when + access is denied""" + message = { + 'sqs_message': { + 'ReceiptHandle': '2' + }, + 'sqs_queue': 'testing_queue' + } + error_response = { + 'Error': { + 'Code': 'AccessDenied', + 'Message': """An error occurred (AccessDenied) when calling the + DeleteMessage operation.""" + } + } + operation_name = 'DeleteMessage' + + mock_messages = Mock() + mock_messages.delivery_info = message + self.channel.qos.append(mock_messages, 2) + self.channel.sqs().delete_message = Mock() + self.channel.sqs().delete_message.side_effect = ClientError( + error_response=error_response, + operation_name=operation_name + ) + + # Expecting the custom AccessDeniedQueueException to be raised + with pytest.raises(SQS.AccessDeniedQueueException): + self.channel.basic_ack(2) + + self.sqs_conn_mock.delete_message.assert_called_with( + QueueUrl=message['sqs_queue'], + ReceiptHandle=message['sqs_message']['ReceiptHandle'] + ) + assert not basic_reject_mock.called + assert not basic_ack_mock.called + def test_reject_when_no_predefined_queues(self): connection = Connection(transport=SQS.Transport, transport_options={}) channel = connection.channel()