Added support for automatic compression/decompression of messages.

This is done by adding the "compression" header to messages.
Currently supported compression formats are: zlib and bzip2.
Custom compression formats can be added using kombu.compression.register::

    >>> compression.register(encoder, decoder, content_type, aliases={})

e.g.::

    >>> compression.register(lambda x: x.encode("zlib"),
    ...                      lambda x: x.decode("zlib"),
    ...                      "application/x-zlib",
    ...                      aliases=["zlib", "gzip"])

To enable compression you use the ``compression`` argument to
:meth:`Producer.publish`. When these messages are then consumed by
kombu they will be automatically decompressed, other clients need to
decompress them manually by looking at the ``compression`` header.

Example using zlib compression:

    >>> producer.publish(message, serializer="json", compression="zlib")

Example using bzip2 compression:

    >>> producer.publish(message, serializer="json", compression="bz2")
This commit is contained in:
Ask Solem 2010-07-31 14:26:55 +02:00
parent c0b540af69
commit c5cb3a9c59
3 changed files with 54 additions and 6 deletions

View File

@ -4,6 +4,7 @@ Backend base classes.
""" """
from kombu import serialization from kombu import serialization
from kombu.compression import decompress
from kombu.exceptions import MessageStateError from kombu.exceptions import MessageStateError
ACKNOWLEDGED_STATES = frozenset(["ACK", "REJECTED", "REQUEUED"]) ACKNOWLEDGED_STATES = frozenset(["ACK", "REJECTED", "REQUEUED"])
@ -25,11 +26,15 @@ class BaseMessage(object):
self.content_type = content_type self.content_type = content_type
self.content_encoding = content_encoding self.content_encoding = content_encoding
self.delivery_info = delivery_info self.delivery_info = delivery_info
self.headers = headers self.headers = headers or {}
self.properties = properties self.properties = properties or {}
self._decoded_cache = None self._decoded_cache = None
self._state = "RECEIVED" self._state = "RECEIVED"
compression = self.headers.get("compression")
if compression:
self.body = decompress(self.body, compression)
def decode(self): def decode(self):
"""Deserialize the message body, returning the original """Deserialize the message body, returning the original
python structure sent by the publisher.""" python structure sent by the publisher."""

35
kombu/compression.py Normal file
View File

@ -0,0 +1,35 @@
_aliases = {}
_encoders = {}
_decoders = {}
def register(encoder, decoder, content_type, aliases=[]):
_encoders[content_type] = encoder
_decoders[content_type] = decoder
_aliases.update((alias, content_type) for alias in aliases)
def get_encoder(t):
t = _aliases.get(t, t)
return _encoders[t], t
def get_decoder(t):
return _decoders[_aliases.get(t, t)]
def compress(body, content_type):
encoder, content_type = get_encoder(content_type)
return encoder(body), content_type
def decompress(body, content_type):
return get_decoder(content_type)(body)
register(lambda x: x.encode("zlib"),
lambda x: x.decode("zlib"),
"application/x-gzip", aliases=["gzip", "zlib"])
register(lambda x: x.encode("bz2"),
lambda x: x.decode("bz2"),
"application/x-bz2", aliases=["bzip2", "bzip"])

View File

@ -1,6 +1,7 @@
from itertools import count from itertools import count
from kombu import serialization from kombu import serialization
from kombu.compression import compress
from kombu.entity import Exchange, Binding from kombu.entity import Exchange, Binding
from kombu.utils import maybe_list from kombu.utils import maybe_list
@ -66,7 +67,8 @@ class Producer(object):
self.exchange.declare() self.exchange.declare()
def _prepare(self, body, serializer=None, def _prepare(self, body, serializer=None,
content_type=None, content_encoding=None): content_type=None, content_encoding=None, compression=None,
headers=None):
# No content_type? Then we're serializing the data internally. # No content_type? Then we're serializing the data internally.
if not content_type: if not content_type:
serializer = serializer or self.serializer serializer = serializer or self.serializer
@ -81,15 +83,19 @@ class Producer(object):
body = body.encode(content_encoding) body = body.encode(content_encoding)
# If they passed in a string, we can't know anything # If they passed in a string, we can't know anything
# about it. So assume it's binary data. # about it. So assume it's binary data.
elif not content_encoding: elif not content_encoding:
content_encoding = 'binary' content_encoding = 'binary'
if compression:
body, headers["compression"] = compress(body, compression)
return body, content_type, content_encoding return body, content_type, content_encoding
def publish(self, body, routing_key=None, delivery_mode=None, def publish(self, body, routing_key=None, delivery_mode=None,
mandatory=False, immediate=False, priority=0, content_type=None, mandatory=False, immediate=False, priority=0, content_type=None,
content_encoding=None, serializer=None, headers=None): content_encoding=None, serializer=None, headers=None,
compression=None):
"""Publish message to the specified exchange. """Publish message to the specified exchange.
:param body: Message body. :param body: Message body.
@ -105,11 +111,13 @@ class Producer(object):
with the message body. with the message body.
""" """
headers = headers or {}
if routing_key is None: if routing_key is None:
routing_key = self.routing_key routing_key = self.routing_key
body, content_type, content_encoding = self._prepare( body, content_type, content_encoding = self._prepare(
body, content_type, content_encoding) body, serializer, content_type, content_encoding,
compression, headers)
message = self.exchange.Message(body, message = self.exchange.Message(body,
delivery_mode, delivery_mode,
priority, priority,