diff --git a/kombu/backends/base.py b/kombu/backends/base.py index 89c1d6f3..29ce67a6 100644 --- a/kombu/backends/base.py +++ b/kombu/backends/base.py @@ -4,6 +4,7 @@ Backend base classes. """ from kombu import serialization +from kombu.compression import decompress from kombu.exceptions import MessageStateError ACKNOWLEDGED_STATES = frozenset(["ACK", "REJECTED", "REQUEUED"]) @@ -25,11 +26,15 @@ class BaseMessage(object): self.content_type = content_type self.content_encoding = content_encoding self.delivery_info = delivery_info - self.headers = headers - self.properties = properties + self.headers = headers or {} + self.properties = properties or {} self._decoded_cache = None self._state = "RECEIVED" + compression = self.headers.get("compression") + if compression: + self.body = decompress(self.body, compression) + def decode(self): """Deserialize the message body, returning the original python structure sent by the publisher.""" diff --git a/kombu/compression.py b/kombu/compression.py new file mode 100644 index 00000000..ebe9ee96 --- /dev/null +++ b/kombu/compression.py @@ -0,0 +1,35 @@ +_aliases = {} +_encoders = {} +_decoders = {} + + +def register(encoder, decoder, content_type, aliases=[]): + _encoders[content_type] = encoder + _decoders[content_type] = decoder + _aliases.update((alias, content_type) for alias in aliases) + + +def get_encoder(t): + t = _aliases.get(t, t) + return _encoders[t], t + + +def get_decoder(t): + return _decoders[_aliases.get(t, t)] + + +def compress(body, content_type): + encoder, content_type = get_encoder(content_type) + return encoder(body), content_type + + +def decompress(body, content_type): + return get_decoder(content_type)(body) + + +register(lambda x: x.encode("zlib"), + lambda x: x.decode("zlib"), + "application/x-gzip", aliases=["gzip", "zlib"]) +register(lambda x: x.encode("bz2"), + lambda x: x.decode("bz2"), + "application/x-bz2", aliases=["bzip2", "bzip"]) diff --git a/kombu/messaging.py b/kombu/messaging.py index 6e07066f..b1651c6c 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -1,6 +1,7 @@ from itertools import count from kombu import serialization +from kombu.compression import compress from kombu.entity import Exchange, Binding from kombu.utils import maybe_list @@ -66,7 +67,8 @@ class Producer(object): self.exchange.declare() def _prepare(self, body, serializer=None, - content_type=None, content_encoding=None): + content_type=None, content_encoding=None, compression=None, + headers=None): # No content_type? Then we're serializing the data internally. if not content_type: serializer = serializer or self.serializer @@ -81,15 +83,19 @@ class Producer(object): body = body.encode(content_encoding) # If they passed in a string, we can't know anything - # about it. So assume it's binary data. + # about it. So assume it's binary data. elif not content_encoding: content_encoding = 'binary' + if compression: + body, headers["compression"] = compress(body, compression) + return body, content_type, content_encoding def publish(self, body, routing_key=None, delivery_mode=None, mandatory=False, immediate=False, priority=0, content_type=None, - content_encoding=None, serializer=None, headers=None): + content_encoding=None, serializer=None, headers=None, + compression=None): """Publish message to the specified exchange. :param body: Message body. @@ -105,11 +111,13 @@ class Producer(object): with the message body. """ + headers = headers or {} if routing_key is None: routing_key = self.routing_key body, content_type, content_encoding = self._prepare( - body, content_type, content_encoding) + body, serializer, content_type, content_encoding, + compression, headers) message = self.exchange.Message(body, delivery_mode, priority,