diff --git a/kombu/messaging.py b/kombu/messaging.py index 43b9bde9..19324e8c 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -14,6 +14,8 @@ class Producer(object): :keyword exchange: Exchange to publish to. :keyword routing_key: Default routing key. :keyword serializer: Default serializer. Default is ``"json"``. + :keyword compression: Default compression method. Default is no + compression. :keyword auto_declare: Automatically declare the exchange at instantiation. Default is ``True``. @@ -43,13 +45,15 @@ class Producer(object): serializer = None auto_declare = True routing_key = "" + compression = None def __init__(self, channel, exchange=None, routing_key=None, - serializer=None, auto_declare=None): + serializer=None, auto_declare=None, compression=None): self.channel = channel self.exchange = exchange or self.exchange self.routing_key = routing_key or self.routing_key self.serializer = serializer or self.serializer + self.compression = compression or self.compression if auto_declare is not None: self.auto_declare = auto_declare @@ -70,6 +74,7 @@ class Producer(object): def _prepare(self, body, serializer=None, content_type=None, content_encoding=None, compression=None, headers=None): + # No content_type? Then we're serializing the data internally. if not content_type: serializer = serializer or self.serializer @@ -115,6 +120,8 @@ class Producer(object): headers = headers or {} if routing_key is None: routing_key = self.routing_key + if compression is None: + compression = self.compression body, content_type, content_encoding = self._prepare( body, serializer, content_type, content_encoding, @@ -211,23 +218,25 @@ class Consumer(object): for queue in self.queues: queue.declare() - def consume(self, delivery_tag=None): + def consume(self, delivery_tag=None, no_ack=None): """Register consumer on server. :keyword: delivery_tag: Unique delivery tag for this channel. If not specified a new tag will be automatically generated. """ + if no_ack is None: + no_ack = self.no_ack if not self._consuming: H, T = self.queues[:-1], self.queues[-1] for queue in H: queue.consume(self._add_tag(queue, delivery_tag), self._receive_callback, - self.no_ack, + no_ack=no_ack, nowait=True) T.consume(self._add_tag(T), self._receive_callback, - self.no_ack, + no_ack=no_ack, nowait=False) self._consuming = False