mirror of https://github.com/celery/kombu.git
Bugfix: Support Python 3.4, decode from base64 when converting messag… (#714)
* Bugfix: Support Python 3.4, decode from base64 when converting message to Python * Use str_to_bytes
This commit is contained in:
parent
129a9e4ed0
commit
6c3bb07e19
|
@ -195,9 +195,6 @@ class AsyncAWSQueryConnection(AsyncConnection):
|
|||
signer.sign(operation, request, signing_type=signing_type)
|
||||
prepared_request = request.prepare()
|
||||
|
||||
# print(prepared_request.url)
|
||||
# print(prepared_request.headers)
|
||||
# print(prepared_request.body)
|
||||
return self._mexe(prepared_request, callback=callback)
|
||||
|
||||
def get_list(self, operation, params, markers, path='/', parent=None, verb='POST', callback=None): # noqa
|
||||
|
|
|
@ -2,9 +2,11 @@
|
|||
"""Amazon SQS message implementation."""
|
||||
from __future__ import absolute_import, unicode_literals
|
||||
|
||||
from kombu.message import Message
|
||||
import base64
|
||||
|
||||
from kombu.message import Message
|
||||
from kombu.utils.encoding import str_to_bytes
|
||||
|
||||
|
||||
class BaseAsyncMessage(Message):
|
||||
"""Base class for messages received on async client."""
|
||||
|
@ -19,7 +21,7 @@ class AsyncMessage(BaseAsyncMessage):
|
|||
|
||||
def encode(self, value):
|
||||
"""Encode/decode the value using Base64 encoding."""
|
||||
return base64.b64encode(value).decode('utf-8')
|
||||
return base64.b64encode(str_to_bytes(value)).decode()
|
||||
|
||||
def __getitem__(self, item):
|
||||
"""Support Boto3-style access on a message."""
|
||||
|
|
|
@ -37,6 +37,7 @@ SQS Features supported by this transport:
|
|||
|
||||
from __future__ import absolute_import, unicode_literals
|
||||
|
||||
import base64
|
||||
import socket
|
||||
import string
|
||||
import uuid
|
||||
|
@ -211,7 +212,8 @@ class Channel(virtual.Channel):
|
|||
self.sqs.send_message(**kwargs)
|
||||
|
||||
def _message_to_python(self, message, queue_name, queue):
|
||||
payload = loads(bytes_to_str(message['Body']))
|
||||
body = base64.b64decode(message['Body'].encode())
|
||||
payload = loads(bytes_to_str(body))
|
||||
if queue_name in self._noack_queues:
|
||||
queue = self._new_queue(queue_name)
|
||||
self.asynsqs.delete_message(queue, message['ReceiptHandle'])
|
||||
|
@ -224,7 +226,7 @@ class Channel(virtual.Channel):
|
|||
delivery_info = {}
|
||||
properties = {'delivery_info': delivery_info}
|
||||
payload.update({
|
||||
'body': bytes_to_str(message['Body']),
|
||||
'body': bytes_to_str(body),
|
||||
'properties': properties,
|
||||
})
|
||||
# set delivery tag to SQS receipt handle
|
||||
|
|
|
@ -7,7 +7,6 @@ from kombu.tests.case import skip
|
|||
|
||||
@skip.unless_environ('AWS_ACCESS_KEY_ID')
|
||||
@skip.unless_environ('AWS_SECRET_ACCESS_KEY')
|
||||
@skip.unless_module('boto')
|
||||
@skip.unless_module('boto3')
|
||||
class test_SQS(transport.TransportCase):
|
||||
transport = 'SQS'
|
||||
|
|
Loading…
Reference in New Issue