216 lines
6.6 KiB
Python
216 lines
6.6 KiB
Python
from __future__ import absolute_import, print_function, division
|
|
|
|
import re
|
|
|
|
import six
|
|
from netlib import multidict
|
|
from netlib import strutils
|
|
|
|
# See also: http://lucumr.pocoo.org/2013/7/2/the-updated-guide-to-unicode/
|
|
|
|
if six.PY2: # pragma: no cover
|
|
def _native(x):
|
|
return x
|
|
|
|
def _always_bytes(x):
|
|
return x
|
|
else:
|
|
# While headers _should_ be ASCII, it's not uncommon for certain headers to be utf-8 encoded.
|
|
def _native(x):
|
|
return x.decode("utf-8", "surrogateescape")
|
|
|
|
def _always_bytes(x):
|
|
return strutils.always_bytes(x, "utf-8", "surrogateescape")
|
|
|
|
|
|
class Headers(multidict.MultiDict):
|
|
"""
|
|
Header class which allows both convenient access to individual headers as well as
|
|
direct access to the underlying raw data. Provides a full dictionary interface.
|
|
|
|
Example:
|
|
|
|
.. code-block:: python
|
|
|
|
# Create headers with keyword arguments
|
|
>>> h = Headers(host="example.com", content_type="application/xml")
|
|
|
|
# Headers mostly behave like a normal dict.
|
|
>>> h["Host"]
|
|
"example.com"
|
|
|
|
# HTTP Headers are case insensitive
|
|
>>> h["host"]
|
|
"example.com"
|
|
|
|
# Headers can also be created from a list of raw (header_name, header_value) byte tuples
|
|
>>> h = Headers([
|
|
(b"Host",b"example.com"),
|
|
(b"Accept",b"text/html"),
|
|
(b"accept",b"application/xml")
|
|
])
|
|
|
|
# Multiple headers are folded into a single header as per RFC7230
|
|
>>> h["Accept"]
|
|
"text/html, application/xml"
|
|
|
|
# Setting a header removes all existing headers with the same name.
|
|
>>> h["Accept"] = "application/text"
|
|
>>> h["Accept"]
|
|
"application/text"
|
|
|
|
# bytes(h) returns a HTTP1 header block.
|
|
>>> print(bytes(h))
|
|
Host: example.com
|
|
Accept: application/text
|
|
|
|
# For full control, the raw header fields can be accessed
|
|
>>> h.fields
|
|
|
|
Caveats:
|
|
For use with the "Set-Cookie" header, see :py:meth:`get_all`.
|
|
"""
|
|
|
|
def __init__(self, fields=(), **headers):
|
|
"""
|
|
Args:
|
|
fields: (optional) list of ``(name, value)`` header byte tuples,
|
|
e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes.
|
|
**headers: Additional headers to set. Will overwrite existing values from `fields`.
|
|
For convenience, underscores in header names will be transformed to dashes -
|
|
this behaviour does not extend to other methods.
|
|
If ``**headers`` contains multiple keys that have equal ``.lower()`` s,
|
|
the behavior is undefined.
|
|
"""
|
|
super(Headers, self).__init__(fields)
|
|
|
|
for key, value in self.fields:
|
|
if not isinstance(key, bytes) or not isinstance(value, bytes):
|
|
raise TypeError("Header fields must be bytes.")
|
|
|
|
# content_type -> content-type
|
|
headers = {
|
|
_always_bytes(name).replace(b"_", b"-"): _always_bytes(value)
|
|
for name, value in six.iteritems(headers)
|
|
}
|
|
self.update(headers)
|
|
|
|
@staticmethod
|
|
def _reduce_values(values):
|
|
# Headers can be folded
|
|
return ", ".join(values)
|
|
|
|
@staticmethod
|
|
def _kconv(key):
|
|
# Headers are case-insensitive
|
|
return key.lower()
|
|
|
|
def __bytes__(self):
|
|
if self.fields:
|
|
return b"\r\n".join(b": ".join(field) for field in self.fields) + b"\r\n"
|
|
else:
|
|
return b""
|
|
|
|
if six.PY2: # pragma: no cover
|
|
__str__ = __bytes__
|
|
|
|
def __delitem__(self, key):
|
|
key = _always_bytes(key)
|
|
super(Headers, self).__delitem__(key)
|
|
|
|
def __iter__(self):
|
|
for x in super(Headers, self).__iter__():
|
|
yield _native(x)
|
|
|
|
def get_all(self, name):
|
|
"""
|
|
Like :py:meth:`get`, but does not fold multiple headers into a single one.
|
|
This is useful for Set-Cookie headers, which do not support folding.
|
|
See also: https://tools.ietf.org/html/rfc7230#section-3.2.2
|
|
"""
|
|
name = _always_bytes(name)
|
|
return [
|
|
_native(x) for x in
|
|
super(Headers, self).get_all(name)
|
|
]
|
|
|
|
def set_all(self, name, values):
|
|
"""
|
|
Explicitly set multiple headers for the given key.
|
|
See: :py:meth:`get_all`
|
|
"""
|
|
name = _always_bytes(name)
|
|
values = [_always_bytes(x) for x in values]
|
|
return super(Headers, self).set_all(name, values)
|
|
|
|
def insert(self, index, key, value):
|
|
key = _always_bytes(key)
|
|
value = _always_bytes(value)
|
|
super(Headers, self).insert(index, key, value)
|
|
|
|
def items(self, multi=False):
|
|
if multi:
|
|
return (
|
|
(_native(k), _native(v))
|
|
for k, v in self.fields
|
|
)
|
|
else:
|
|
return super(Headers, self).items()
|
|
|
|
def replace(self, pattern, repl, flags=0):
|
|
"""
|
|
Replaces a regular expression pattern with repl in each "name: value"
|
|
header line.
|
|
|
|
Returns:
|
|
The number of replacements made.
|
|
"""
|
|
if isinstance(pattern, six.text_type):
|
|
pattern = strutils.escaped_str_to_bytes(pattern)
|
|
if isinstance(repl, six.text_type):
|
|
repl = strutils.escaped_str_to_bytes(repl)
|
|
pattern = re.compile(pattern, flags)
|
|
replacements = 0
|
|
|
|
fields = []
|
|
for name, value in self.fields:
|
|
line, n = pattern.subn(repl, name + b": " + value)
|
|
try:
|
|
name, value = line.split(b": ", 1)
|
|
except ValueError:
|
|
# We get a ValueError if the replacement removed the ": "
|
|
# There's not much we can do about this, so we just keep the header as-is.
|
|
pass
|
|
else:
|
|
replacements += n
|
|
fields.append([name, value])
|
|
self.fields = fields
|
|
return replacements
|
|
|
|
|
|
def parse_content_type(c):
|
|
"""
|
|
A simple parser for content-type values. Returns a (type, subtype,
|
|
parameters) tuple, where type and subtype are strings, and parameters
|
|
is a dict. If the string could not be parsed, return None.
|
|
|
|
E.g. the following string:
|
|
|
|
text/html; charset=UTF-8
|
|
|
|
Returns:
|
|
|
|
("text", "html", {"charset": "UTF-8"})
|
|
"""
|
|
parts = c.split(";", 1)
|
|
ts = parts[0].split("/", 1)
|
|
if len(ts) != 2:
|
|
return None
|
|
d = {}
|
|
if len(parts) == 2:
|
|
for i in parts[1].split(";"):
|
|
clause = i.split("=", 1)
|
|
if len(clause) == 2:
|
|
d[clause[0].strip()] = clause[1].strip()
|
|
return ts[0].lower(), ts[1].lower(), d
|