Added confirm_timeout argument. (#2167)

This commit is contained in:
Omer Katz 2024-10-26 19:25:25 +03:00 committed by GitHub
parent 1220144b64
commit 087527f209
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 14 additions and 4 deletions

View File

@ -124,6 +124,7 @@ class Producer:
content_type=None, content_encoding=None, serializer=None,
headers=None, compression=None, exchange=None, retry=False,
retry_policy=None, declare=None, expiration=None, timeout=None,
confirm_timeout=None,
**properties):
"""Publish message to the specified exchange.
@ -155,6 +156,8 @@ class Producer:
Default is no expiration.
timeout (float): Set timeout to wait maximum timeout second
for message to publish.
confirm_timeout (float): Set confirm timeout to wait maximum timeout second
for message to confirm publishing if the channel is set to confirm publish mode.
**properties (Any): Additional message properties, see AMQP spec.
"""
_publish = self._publish
@ -187,13 +190,12 @@ class Producer:
return _publish(
body, priority, content_type, content_encoding,
headers, properties, routing_key, mandatory, immediate,
exchange_name, declare, timeout, retry, retry_policy
exchange_name, declare, timeout, confirm_timeout, retry, retry_policy
)
def _publish(self, body, priority, content_type, content_encoding,
headers, properties, routing_key, mandatory,
immediate, exchange, declare, timeout=None,
retry=False, retry_policy=None):
immediate, exchange, declare, timeout=None, confirm_timeout=None, retry=False, retry_policy=None):
retry_policy = {} if retry_policy is None else retry_policy
channel = self.channel
message = channel.prepare_message(
@ -213,7 +215,7 @@ class Producer:
message,
exchange=exchange, routing_key=routing_key,
mandatory=mandatory, immediate=immediate,
timeout=timeout
timeout=timeout, confirm_timeout=confirm_timeout
)
def _get_channel(self):

View File

@ -170,6 +170,14 @@ class test_Producer:
timeout = p._channel.basic_publish.call_args[1]['timeout']
assert timeout == 1
def test_publish_with_confirm_timeout(self):
p = self.connection.Producer()
p.channel = Mock()
p.channel.connection.client.declared_entities = set()
p.publish('test_timeout', exchange=Exchange('foo'), confirm_timeout=1)
confirm_timeout = p._channel.basic_publish.call_args[1]['confirm_timeout']
assert confirm_timeout == 1
@patch('kombu.messaging.maybe_declare')
def test_publish_maybe_declare_with_retry_policy(self, maybe_declare):
p = self.connection.Producer(exchange=Exchange('foo'))