diff --git a/kombu/messaging.py b/kombu/messaging.py index ffd63190..dcbc0c22 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -124,6 +124,7 @@ class Producer: content_type=None, content_encoding=None, serializer=None, headers=None, compression=None, exchange=None, retry=False, retry_policy=None, declare=None, expiration=None, timeout=None, + confirm_timeout=None, **properties): """Publish message to the specified exchange. @@ -155,6 +156,8 @@ class Producer: Default is no expiration. timeout (float): Set timeout to wait maximum timeout second for message to publish. + confirm_timeout (float): Set confirm timeout to wait maximum timeout second + for message to confirm publishing if the channel is set to confirm publish mode. **properties (Any): Additional message properties, see AMQP spec. """ _publish = self._publish @@ -187,13 +190,12 @@ class Producer: return _publish( body, priority, content_type, content_encoding, headers, properties, routing_key, mandatory, immediate, - exchange_name, declare, timeout, retry, retry_policy + exchange_name, declare, timeout, confirm_timeout, retry, retry_policy ) def _publish(self, body, priority, content_type, content_encoding, headers, properties, routing_key, mandatory, - immediate, exchange, declare, timeout=None, - retry=False, retry_policy=None): + immediate, exchange, declare, timeout=None, confirm_timeout=None, retry=False, retry_policy=None): retry_policy = {} if retry_policy is None else retry_policy channel = self.channel message = channel.prepare_message( @@ -213,7 +215,7 @@ class Producer: message, exchange=exchange, routing_key=routing_key, mandatory=mandatory, immediate=immediate, - timeout=timeout + timeout=timeout, confirm_timeout=confirm_timeout ) def _get_channel(self): diff --git a/t/unit/test_messaging.py b/t/unit/test_messaging.py index 9c8e22d6..b480ffb7 100644 --- a/t/unit/test_messaging.py +++ b/t/unit/test_messaging.py @@ -170,6 +170,14 @@ class test_Producer: timeout = p._channel.basic_publish.call_args[1]['timeout'] assert timeout == 1 + def test_publish_with_confirm_timeout(self): + p = self.connection.Producer() + p.channel = Mock() + p.channel.connection.client.declared_entities = set() + p.publish('test_timeout', exchange=Exchange('foo'), confirm_timeout=1) + confirm_timeout = p._channel.basic_publish.call_args[1]['confirm_timeout'] + assert confirm_timeout == 1 + @patch('kombu.messaging.maybe_declare') def test_publish_maybe_declare_with_retry_policy(self, maybe_declare): p = self.connection.Producer(exchange=Exchange('foo'))