161 lines
3.9 KiB
Python
161 lines
3.9 KiB
Python
"""
|
|
Utility functions for decoding response bodies.
|
|
"""
|
|
from __future__ import absolute_import
|
|
|
|
import codecs
|
|
import collections
|
|
from io import BytesIO
|
|
import gzip
|
|
import zlib
|
|
import brotli
|
|
|
|
from typing import Union # noqa
|
|
|
|
|
|
# We have a shared single-element cache for encoding and decoding.
|
|
# This is quite useful in practice, e.g.
|
|
# flow.request.content = flow.request.content.replace(b"foo", b"bar")
|
|
# does not require an .encode() call if content does not contain b"foo"
|
|
CachedDecode = collections.namedtuple("CachedDecode", "encoded encoding errors decoded")
|
|
_cache = CachedDecode(None, None, None, None)
|
|
|
|
|
|
def decode(encoded, encoding, errors='strict'):
|
|
# type: (Union[str, bytes], str, str) -> Union[str, bytes]
|
|
"""
|
|
Decode the given input object
|
|
|
|
Returns:
|
|
The decoded value
|
|
|
|
Raises:
|
|
ValueError, if decoding fails.
|
|
"""
|
|
global _cache
|
|
cached = (
|
|
isinstance(encoded, bytes) and
|
|
_cache.encoded == encoded and
|
|
_cache.encoding == encoding and
|
|
_cache.errors == errors
|
|
)
|
|
if cached:
|
|
return _cache.decoded
|
|
try:
|
|
try:
|
|
decoded = custom_decode[encoding](encoded)
|
|
except KeyError:
|
|
decoded = codecs.decode(encoded, encoding, errors)
|
|
if encoding in ("gzip", "deflate", "br"):
|
|
_cache = CachedDecode(encoded, encoding, errors, decoded)
|
|
return decoded
|
|
except Exception as e:
|
|
raise ValueError("{} when decoding {} with {}".format(
|
|
type(e).__name__,
|
|
repr(encoded)[:10],
|
|
repr(encoding),
|
|
))
|
|
|
|
|
|
def encode(decoded, encoding, errors='strict'):
|
|
# type: (Union[str, bytes], str, str) -> Union[str, bytes]
|
|
"""
|
|
Encode the given input object
|
|
|
|
Returns:
|
|
The encoded value
|
|
|
|
Raises:
|
|
ValueError, if encoding fails.
|
|
"""
|
|
global _cache
|
|
cached = (
|
|
isinstance(decoded, bytes) and
|
|
_cache.decoded == decoded and
|
|
_cache.encoding == encoding and
|
|
_cache.errors == errors
|
|
)
|
|
if cached:
|
|
return _cache.encoded
|
|
try:
|
|
try:
|
|
encoded = custom_encode[encoding](decoded)
|
|
except KeyError:
|
|
encoded = codecs.encode(decoded, encoding, errors)
|
|
if encoding in ("gzip", "deflate", "br"):
|
|
_cache = CachedDecode(encoded, encoding, errors, decoded)
|
|
return encoded
|
|
except Exception as e:
|
|
raise ValueError("{} when encoding {} with {}".format(
|
|
type(e).__name__,
|
|
repr(decoded)[:10],
|
|
repr(encoding),
|
|
))
|
|
|
|
|
|
def identity(content):
|
|
"""
|
|
Returns content unchanged. Identity is the default value of
|
|
Accept-Encoding headers.
|
|
"""
|
|
return content
|
|
|
|
|
|
def decode_gzip(content):
|
|
gfile = gzip.GzipFile(fileobj=BytesIO(content))
|
|
return gfile.read()
|
|
|
|
|
|
def encode_gzip(content):
|
|
s = BytesIO()
|
|
gf = gzip.GzipFile(fileobj=s, mode='wb')
|
|
gf.write(content)
|
|
gf.close()
|
|
return s.getvalue()
|
|
|
|
|
|
def decode_brotli(content):
|
|
return brotli.decompress(content)
|
|
|
|
|
|
def encode_brotli(content):
|
|
return brotli.compress(content)
|
|
|
|
|
|
def decode_deflate(content):
|
|
"""
|
|
Returns decompressed data for DEFLATE. Some servers may respond with
|
|
compressed data without a zlib header or checksum. An undocumented
|
|
feature of zlib permits the lenient decompression of data missing both
|
|
values.
|
|
|
|
http://bugs.python.org/issue5784
|
|
"""
|
|
try:
|
|
return zlib.decompress(content)
|
|
except zlib.error:
|
|
return zlib.decompress(content, -15)
|
|
|
|
|
|
def encode_deflate(content):
|
|
"""
|
|
Returns compressed content, always including zlib header and checksum.
|
|
"""
|
|
return zlib.compress(content)
|
|
|
|
|
|
custom_decode = {
|
|
"identity": identity,
|
|
"gzip": decode_gzip,
|
|
"deflate": decode_deflate,
|
|
"br": decode_brotli,
|
|
}
|
|
custom_encode = {
|
|
"identity": identity,
|
|
"gzip": encode_gzip,
|
|
"deflate": encode_deflate,
|
|
"br": encode_brotli,
|
|
}
|
|
|
|
__all__ = ["encode", "decode"]
|