Added kombu.messaging.Producer.compression attribute.

This commit is contained in:
Ask Solem 2010-08-01 17:08:21 +02:00
parent 3c96d0affe
commit ba262e6f70
1 changed files with 13 additions and 4 deletions

View File

@ -14,6 +14,8 @@ class Producer(object):
:keyword exchange: Exchange to publish to. :keyword exchange: Exchange to publish to.
:keyword routing_key: Default routing key. :keyword routing_key: Default routing key.
:keyword serializer: Default serializer. Default is ``"json"``. :keyword serializer: Default serializer. Default is ``"json"``.
:keyword compression: Default compression method. Default is no
compression.
:keyword auto_declare: Automatically declare the exchange :keyword auto_declare: Automatically declare the exchange
at instantiation. Default is ``True``. at instantiation. Default is ``True``.
@ -43,13 +45,15 @@ class Producer(object):
serializer = None serializer = None
auto_declare = True auto_declare = True
routing_key = "" routing_key = ""
compression = None
def __init__(self, channel, exchange=None, routing_key=None, def __init__(self, channel, exchange=None, routing_key=None,
serializer=None, auto_declare=None): serializer=None, auto_declare=None, compression=None):
self.channel = channel self.channel = channel
self.exchange = exchange or self.exchange self.exchange = exchange or self.exchange
self.routing_key = routing_key or self.routing_key self.routing_key = routing_key or self.routing_key
self.serializer = serializer or self.serializer self.serializer = serializer or self.serializer
self.compression = compression or self.compression
if auto_declare is not None: if auto_declare is not None:
self.auto_declare = auto_declare self.auto_declare = auto_declare
@ -70,6 +74,7 @@ class Producer(object):
def _prepare(self, body, serializer=None, def _prepare(self, body, serializer=None,
content_type=None, content_encoding=None, compression=None, content_type=None, content_encoding=None, compression=None,
headers=None): headers=None):
# No content_type? Then we're serializing the data internally. # No content_type? Then we're serializing the data internally.
if not content_type: if not content_type:
serializer = serializer or self.serializer serializer = serializer or self.serializer
@ -115,6 +120,8 @@ class Producer(object):
headers = headers or {} headers = headers or {}
if routing_key is None: if routing_key is None:
routing_key = self.routing_key routing_key = self.routing_key
if compression is None:
compression = self.compression
body, content_type, content_encoding = self._prepare( body, content_type, content_encoding = self._prepare(
body, serializer, content_type, content_encoding, body, serializer, content_type, content_encoding,
@ -211,23 +218,25 @@ class Consumer(object):
for queue in self.queues: for queue in self.queues:
queue.declare() queue.declare()
def consume(self, delivery_tag=None): def consume(self, delivery_tag=None, no_ack=None):
"""Register consumer on server. """Register consumer on server.
:keyword: delivery_tag: Unique delivery tag for this channel. :keyword: delivery_tag: Unique delivery tag for this channel.
If not specified a new tag will be automatically generated. If not specified a new tag will be automatically generated.
""" """
if no_ack is None:
no_ack = self.no_ack
if not self._consuming: if not self._consuming:
H, T = self.queues[:-1], self.queues[-1] H, T = self.queues[:-1], self.queues[-1]
for queue in H: for queue in H:
queue.consume(self._add_tag(queue, delivery_tag), queue.consume(self._add_tag(queue, delivery_tag),
self._receive_callback, self._receive_callback,
self.no_ack, no_ack=no_ack,
nowait=True) nowait=True)
T.consume(self._add_tag(T), T.consume(self._add_tag(T),
self._receive_callback, self._receive_callback,
self.no_ack, no_ack=no_ack,
nowait=False) nowait=False)
self._consuming = False self._consuming = False