diff --git a/kombu/async/aws/connection.py b/kombu/async/aws/connection.py index 303f3577..ce6515bf 100644 --- a/kombu/async/aws/connection.py +++ b/kombu/async/aws/connection.py @@ -195,9 +195,6 @@ class AsyncAWSQueryConnection(AsyncConnection): signer.sign(operation, request, signing_type=signing_type) prepared_request = request.prepare() - # print(prepared_request.url) - # print(prepared_request.headers) - # print(prepared_request.body) return self._mexe(prepared_request, callback=callback) def get_list(self, operation, params, markers, path='/', parent=None, verb='POST', callback=None): # noqa diff --git a/kombu/async/aws/sqs/message.py b/kombu/async/aws/sqs/message.py index b4b28331..28ca6db6 100644 --- a/kombu/async/aws/sqs/message.py +++ b/kombu/async/aws/sqs/message.py @@ -2,9 +2,11 @@ """Amazon SQS message implementation.""" from __future__ import absolute_import, unicode_literals -from kombu.message import Message import base64 +from kombu.message import Message +from kombu.utils.encoding import str_to_bytes + class BaseAsyncMessage(Message): """Base class for messages received on async client.""" @@ -19,7 +21,7 @@ class AsyncMessage(BaseAsyncMessage): def encode(self, value): """Encode/decode the value using Base64 encoding.""" - return base64.b64encode(value).decode('utf-8') + return base64.b64encode(str_to_bytes(value)).decode() def __getitem__(self, item): """Support Boto3-style access on a message.""" diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index 59cfdc87..91d8a106 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -37,6 +37,7 @@ SQS Features supported by this transport: from __future__ import absolute_import, unicode_literals +import base64 import socket import string import uuid @@ -211,7 +212,8 @@ class Channel(virtual.Channel): self.sqs.send_message(**kwargs) def _message_to_python(self, message, queue_name, queue): - payload = loads(bytes_to_str(message['Body'])) + body = base64.b64decode(message['Body'].encode()) + payload = loads(bytes_to_str(body)) if queue_name in self._noack_queues: queue = self._new_queue(queue_name) self.asynsqs.delete_message(queue, message['ReceiptHandle']) @@ -224,7 +226,7 @@ class Channel(virtual.Channel): delivery_info = {} properties = {'delivery_info': delivery_info} payload.update({ - 'body': bytes_to_str(message['Body']), + 'body': bytes_to_str(body), 'properties': properties, }) # set delivery tag to SQS receipt handle diff --git a/t/integration/tests/test_SQS.py b/t/integration/tests/test_SQS.py index 1c52d648..4949bcad 100644 --- a/t/integration/tests/test_SQS.py +++ b/t/integration/tests/test_SQS.py @@ -7,7 +7,6 @@ from kombu.tests.case import skip @skip.unless_environ('AWS_ACCESS_KEY_ID') @skip.unless_environ('AWS_SECRET_ACCESS_KEY') -@skip.unless_module('boto') @skip.unless_module('boto3') class test_SQS(transport.TransportCase): transport = 'SQS'