mirror of https://github.com/celery/kombu.git
fix: non kombu json message decoding in SQS transport (#1306)
* fix: non kombu json message decoding in SQS transport * fix: non kombu json message decoding in SQS transport - add tests Co-authored-by: Max Nikitenko <max.nikitenko@namecheap.com>
This commit is contained in:
parent
fccbb0d5d6
commit
2a704e3585
|
@ -370,11 +370,21 @@ class Channel(virtual.Channel):
|
||||||
else:
|
else:
|
||||||
c.send_message(**kwargs)
|
c.send_message(**kwargs)
|
||||||
|
|
||||||
def _message_to_python(self, message, queue_name, queue):
|
@staticmethod
|
||||||
|
def __b64_encoded(byte_string):
|
||||||
try:
|
try:
|
||||||
body = base64.b64decode(message['Body'].encode())
|
return base64.b64encode(base64.b64decode(byte_string)) == byte_string
|
||||||
|
except Exception: # pylint: disable=broad-except
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _message_to_python(self, message, queue_name, queue):
|
||||||
|
body = message['Body'].encode()
|
||||||
|
try:
|
||||||
|
if self.__b64_encoded(body):
|
||||||
|
body = base64.b64decode(body)
|
||||||
except TypeError:
|
except TypeError:
|
||||||
body = message['Body'].encode()
|
pass
|
||||||
|
|
||||||
payload = loads(bytes_to_str(body))
|
payload = loads(bytes_to_str(body))
|
||||||
if queue_name in self._noack_queues:
|
if queue_name in self._noack_queues:
|
||||||
queue = self._new_queue(queue_name)
|
queue = self._new_queue(queue_name)
|
||||||
|
|
|
@ -4,8 +4,7 @@ NOTE: The SQSQueueMock and SQSConnectionMock classes originally come from
|
||||||
http://github.com/pcsforeducation/sqs-mock-python. They have been patched
|
http://github.com/pcsforeducation/sqs-mock-python. They have been patched
|
||||||
slightly.
|
slightly.
|
||||||
"""
|
"""
|
||||||
|
import base64
|
||||||
|
|
||||||
import os
|
import os
|
||||||
import pytest
|
import pytest
|
||||||
import random
|
import random
|
||||||
|
@ -330,6 +329,14 @@ class test_Channel:
|
||||||
with pytest.raises(Empty):
|
with pytest.raises(Empty):
|
||||||
self.channel._get_bulk(self.queue_name)
|
self.channel._get_bulk(self.queue_name)
|
||||||
|
|
||||||
|
def test_is_base64_encoded(self):
|
||||||
|
raw = b'{"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77","task": "celery.task.PingTask",' \
|
||||||
|
b'"args": [],"kwargs": {},"retries": 0,"eta": "2009-11-17T12:30:56.527191"}'
|
||||||
|
b64_enc = base64.b64encode(raw)
|
||||||
|
assert self.channel._Channel__b64_encoded(b64_enc)
|
||||||
|
assert not self.channel._Channel__b64_encoded(raw)
|
||||||
|
assert not self.channel._Channel__b64_encoded(b"test123")
|
||||||
|
|
||||||
def test_messages_to_python(self):
|
def test_messages_to_python(self):
|
||||||
from kombu.asynchronous.aws.sqs.message import Message
|
from kombu.asynchronous.aws.sqs.message import Message
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue