mirror of https://github.com/celery/kombu.git
raise access denied error when ack
This commit is contained in:
parent
eaaaab0dfd
commit
c18e9626e1
|
@ -176,6 +176,14 @@ class InvalidQueueException(Exception):
|
|||
"""Predefined queues are being used and configuration is not valid."""
|
||||
|
||||
|
||||
class AccessDeniedQueueException(Exception):
|
||||
"""Raised when access to the AWS queue is denied.
|
||||
|
||||
This may occur if the permissions are not correctly set or the
|
||||
credentials are invalid.
|
||||
"""
|
||||
|
||||
|
||||
class QoS(virtual.QoS):
|
||||
"""Quality of Service guarantees implementation for SQS."""
|
||||
|
||||
|
@ -633,7 +641,11 @@ class Channel(virtual.Channel):
|
|||
QueueUrl=message['sqs_queue'],
|
||||
ReceiptHandle=sqs_message['ReceiptHandle']
|
||||
)
|
||||
except ClientError:
|
||||
except ClientError as exception:
|
||||
if exception.response['Error']['Code'] == 'AccessDenied':
|
||||
raise AccessDeniedQueueException(
|
||||
exception.response["Error"]["Message"]
|
||||
)
|
||||
super().basic_reject(delivery_tag)
|
||||
else:
|
||||
super().basic_ack(delivery_tag)
|
||||
|
|
|
@ -646,6 +646,46 @@ class test_Channel:
|
|||
basic_reject_mock.assert_called_with(2)
|
||||
assert not basic_ack_mock.called
|
||||
|
||||
@patch('kombu.transport.virtual.base.Channel.basic_ack')
|
||||
@patch('kombu.transport.virtual.base.Channel.basic_reject')
|
||||
def test_basic_ack_access_denied(self, basic_reject_mock, basic_ack_mock):
|
||||
"""Test that basic_ack raises AccessDeniedQueueException when
|
||||
access is denied"""
|
||||
message = {
|
||||
'sqs_message': {
|
||||
'ReceiptHandle': '2'
|
||||
},
|
||||
'sqs_queue': 'testing_queue'
|
||||
}
|
||||
error_response = {
|
||||
'Error': {
|
||||
'Code': 'AccessDenied',
|
||||
'Message': """An error occurred (AccessDenied) when calling the
|
||||
DeleteMessage operation."""
|
||||
}
|
||||
}
|
||||
operation_name = 'DeleteMessage'
|
||||
|
||||
mock_messages = Mock()
|
||||
mock_messages.delivery_info = message
|
||||
self.channel.qos.append(mock_messages, 2)
|
||||
self.channel.sqs().delete_message = Mock()
|
||||
self.channel.sqs().delete_message.side_effect = ClientError(
|
||||
error_response=error_response,
|
||||
operation_name=operation_name
|
||||
)
|
||||
|
||||
# Expecting the custom AccessDeniedQueueException to be raised
|
||||
with pytest.raises(SQS.AccessDeniedQueueException):
|
||||
self.channel.basic_ack(2)
|
||||
|
||||
self.sqs_conn_mock.delete_message.assert_called_with(
|
||||
QueueUrl=message['sqs_queue'],
|
||||
ReceiptHandle=message['sqs_message']['ReceiptHandle']
|
||||
)
|
||||
assert not basic_reject_mock.called
|
||||
assert not basic_ack_mock.called
|
||||
|
||||
def test_reject_when_no_predefined_queues(self):
|
||||
connection = Connection(transport=SQS.Transport, transport_options={})
|
||||
channel = connection.channel()
|
||||
|
|
Loading…
Reference in New Issue