Set redelivered property for Celery with Redis (#1484)

* Set redelivered property for Celery with Redis

Fixed setting `redelivered` value for Celery when Redis broker is used.

* Add `test_do_restore_message_celery` test

* Fix long line
This commit is contained in:
Denis Kubashevskiy 2022-01-28 17:16:22 +03:00 committed by GitHub
parent 5bed2a8f98
commit 3ec6dc0fd1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 64 additions and 0 deletions

View File

@ -717,6 +717,7 @@ class Channel(virtual.Channel):
try:
try:
payload['headers']['redelivered'] = True
payload['properties']['delivery_info']['redelivered'] = True
except KeyError:
pass
for queue in self._lookup(exchange, routing_key):

View File

@ -1,3 +1,5 @@
import base64
import copy
import socket
import types
from collections import defaultdict
@ -399,6 +401,67 @@ class test_Channel:
)
crit.assert_called()
def test_do_restore_message_celery(self):
# Payload value from real Celery project
payload = {
"body": base64.b64encode(dumps([
[],
{},
{
"callbacks": None,
"errbacks": None,
"chain": None,
"chord": None,
},
]).encode()).decode(),
"content-encoding": "utf-8",
"content-type": "application/json",
"headers": {
"lang": "py",
"task": "common.tasks.test_task",
"id": "980ad2bf-104c-4ce0-8643-67d1947173f6",
"shadow": None,
"eta": None,
"expires": None,
"group": None,
"group_index": None,
"retries": 0,
"timelimit": [None, None],
"root_id": "980ad2bf-104c-4ce0-8643-67d1947173f6",
"parent_id": None,
"argsrepr": "()",
"kwargsrepr": "{}",
"origin": "gen3437@Desktop",
"ignore_result": False,
},
"properties": {
"correlation_id": "980ad2bf-104c-4ce0-8643-67d1947173f6",
"reply_to": "512f2489-ca40-3585-bc10-9b801a981782",
"delivery_mode": 2,
"delivery_info": {
"exchange": "",
"routing_key": "celery",
},
"priority": 0,
"body_encoding": "base64",
"delivery_tag": "badb725e-9c3e-45be-b0a4-07e44630519f",
},
}
result_payload = copy.deepcopy(payload)
result_payload['headers']['redelivered'] = True
result_payload['properties']['delivery_info']['redelivered'] = True
queue = 'celery'
client = Mock(name='client')
lookup = self.channel._lookup = Mock(name='_lookup')
lookup.return_value = [queue]
self.channel._do_restore_message(
payload, 'exchange', 'routing_key', client,
)
client.rpush.assert_called_with(queue, dumps(result_payload))
def test_restore_no_messages(self):
message = Mock(name='message')