diff --git a/kombu/transport/pypika.py b/kombu/transport/pypika.py index 71ad54d4..d4fe1d1c 100644 --- a/kombu/transport/pypika.py +++ b/kombu/transport/pypika.py @@ -46,6 +46,7 @@ class Channel(channel.Channel): def basic_get(self, queue, no_ack): method = channel.Channel.basic_get(self, queue=queue, no_ack=no_ack) + # pika returns semi-predicates (GetEmpty/GetOk). if isinstance(method, Basic.GetEmpty): return return None, method, method._properties, method._body @@ -66,11 +67,16 @@ class Channel(channel.Channel): mandatory, immediate) finally: + # Pika does not automatically flush the outbound buffer + # TODO async: Needs to support `nowait`. self.handler.connection.flush_outbound() def basic_consume(self, queue, no_ack=False, consumer_tag=None, callback=None, nowait=False): + # Kombu callbacks only take a single `message` argument, + # but pika applies with 4 arguments, so need to wrap + # these into a single tuple. def _callback_decode(channel, method, header, body): return callback((channel, method, header, body)) @@ -81,7 +87,6 @@ class Channel(channel.Channel): def prepare_message(self, message_data, priority=None, content_type=None, content_encoding=None, headers=None, properties=None): - """Encapsulate data into a AMQP message.""" properties = BasicProperties(priority=priority, content_type=content_type, content_encoding=content_encoding, @@ -90,7 +95,6 @@ class Channel(channel.Channel): return message_data, properties def message_to_python(self, raw_message): - """Convert encoded message body back to a Python value.""" return self.Message(channel=self, amqp_message=raw_message) def basic_ack(self, delivery_tag):