Apply retry policy to maybe_declare(). (#2174)

This commit is contained in:
Omer Katz 2024-10-24 19:15:15 +03:00 committed by GitHub
parent c48be8800d
commit 1686c35a06
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 38 additions and 7 deletions

View File

@ -126,7 +126,7 @@ def _ensure_channel_is_bound(entity, channel):
raise ChannelError(
f"Cannot bind channel {channel} to entity {entity}")
entity = entity.bind(channel)
return entity
return entity
def _maybe_declare(entity, channel):
@ -159,7 +159,7 @@ def _maybe_declare(entity, channel):
def _imaybe_declare(entity, channel, **retry_policy):
_ensure_channel_is_bound(entity, channel)
entity = _ensure_channel_is_bound(entity, channel)
if not entity.channel.connection:
raise RecoverableConnectionError('channel disconnected')

View File

@ -187,12 +187,14 @@ class Producer:
return _publish(
body, priority, content_type, content_encoding,
headers, properties, routing_key, mandatory, immediate,
exchange_name, declare, timeout
exchange_name, declare, timeout, retry, retry_policy
)
def _publish(self, body, priority, content_type, content_encoding,
headers, properties, routing_key, mandatory,
immediate, exchange, declare, timeout=None):
immediate, exchange, declare, timeout=None,
retry=False, retry_policy=None):
retry_policy = {} if retry_policy is None else retry_policy
channel = self.channel
message = channel.prepare_message(
body, priority, content_type,
@ -200,7 +202,8 @@ class Producer:
)
if declare:
maybe_declare = self.maybe_declare
[maybe_declare(entity) for entity in declare]
for entity in declare:
maybe_declare(entity, retry=retry, **retry_policy)
# handle autogenerated queue names for reply_to
reply_to = properties.get('reply_to')

View File

@ -3,7 +3,7 @@ from __future__ import annotations
import pickle
import sys
from collections import defaultdict
from unittest.mock import Mock, patch
from unittest.mock import ANY, Mock, patch
import pytest
@ -170,6 +170,34 @@ class test_Producer:
timeout = p._channel.basic_publish.call_args[1]['timeout']
assert timeout == 1
@patch('kombu.messaging.maybe_declare')
def test_publish_maybe_declare_with_retry_policy(self, maybe_declare):
p = self.connection.Producer(exchange=Exchange('foo'))
p.channel = Mock()
expected_retry_policy = {
"max_retries": 20,
"interval_start": 1,
"interval_step": 2,
"interval_max": 30,
"retry_errors": (OperationalError,)
}
p.publish('test_maybe_declare', exchange=Exchange('foo'), retry=True, retry_policy=expected_retry_policy)
maybe_declare.assert_called_once_with(ANY, ANY, True, **expected_retry_policy)
@patch('kombu.common._imaybe_declare')
def test_publish_maybe_declare_with_retry_policy_ensure_connection(self, _imaybe_declare):
p = self.connection.Producer(exchange=Exchange('foo'))
p.channel = Mock()
expected_retry_policy = {
"max_retries": 20,
"interval_start": 1,
"interval_step": 2,
"interval_max": 30,
"retry_errors": (OperationalError,)
}
p.publish('test_maybe_declare', exchange=Exchange('foo'), retry=True, retry_policy=expected_retry_policy)
_imaybe_declare.assert_called_once_with(ANY, ANY, **expected_retry_policy)
def test_publish_with_reply_to(self):
p = self.connection.Producer()
p.channel = Mock()
@ -200,7 +228,7 @@ class test_Producer:
p.connection.ensure = Mock()
ex = Exchange('foo')
p._publish('hello', 0, '', '', {}, {}, 'rk', 0, 0, ex, declare=[ex])
p.maybe_declare.assert_called_with(ex)
p.maybe_declare.assert_called_with(ex, retry=False)
def test_revive_when_channel_is_connection(self):
p = self.connection.Producer()