* [#4235] Automatic view based on should_render method instead of content_types property * [#4235] Update CHENGELOG * [#4235] Fix linter warnings * Add an explicit test for the new forward-compatible behaviour * wip * contentviews: introduce render_priority (2/2) * coverage++, lint! * minor fixes Co-authored-by: Maximilian Hils <git@maximilianhils.com>
This commit is contained in:
parent
a11cfd45c4
commit
f51019cb63
|
@ -41,6 +41,8 @@ If you depend on these features, please raise your voice in
|
|||
* mitmproxy's command line interface now supports Windows (@mhils)
|
||||
* The `clientconnect`, `clientdisconnect`, `serverconnect`, `serverdisconnect`, and `log`
|
||||
events have been replaced with new events, see addon documentation for details (@mhils)
|
||||
* Contentviews now implement `render_priority` instead of `should_render`, allowing more specialization (@mhils)
|
||||
* Automatic JSON view mode when `+json` suffix in content type (@kam800)
|
||||
* Use pyca/cryptography to generate certificates, not pyOpenSSL (@mhils)
|
||||
* Remove the legacy protocol stack (@Kriechi)
|
||||
* Remove all deprecated pathod and pathoc tools and modules (@Kriechi)
|
||||
|
|
|
@ -5,16 +5,32 @@ This example shows how one can add a custom contentview to mitmproxy,
|
|||
which is used to pretty-print HTTP bodies for example.
|
||||
The content view API is explained in the mitmproxy.contentviews module.
|
||||
"""
|
||||
from mitmproxy import contentviews
|
||||
from typing import Optional
|
||||
|
||||
from mitmproxy import contentviews, flow
|
||||
from mitmproxy.net import http
|
||||
|
||||
|
||||
class ViewSwapCase(contentviews.View):
|
||||
name = "swapcase"
|
||||
content_types = ["text/plain"]
|
||||
|
||||
def __call__(self, data, **metadata) -> contentviews.TViewResult:
|
||||
return "case-swapped text", contentviews.format_text(data.swapcase())
|
||||
|
||||
def render_priority(
|
||||
self,
|
||||
data: bytes,
|
||||
*,
|
||||
content_type: Optional[str] = None,
|
||||
flow: Optional[flow.Flow] = None,
|
||||
http_message: Optional[http.Message] = None,
|
||||
**unknown_metadata,
|
||||
) -> float:
|
||||
if content_type == "text/plain":
|
||||
return 1
|
||||
else:
|
||||
return 0
|
||||
|
||||
|
||||
view = ViewSwapCase()
|
||||
|
||||
|
|
|
@ -1,19 +1,20 @@
|
|||
import itertools
|
||||
import shutil
|
||||
import sys
|
||||
from typing import Optional, TextIO
|
||||
from typing import Optional, TextIO, Union
|
||||
|
||||
import click
|
||||
|
||||
from mitmproxy import contentviews
|
||||
from mitmproxy import ctx
|
||||
from mitmproxy import flow
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import flowfilter
|
||||
from mitmproxy import http
|
||||
from mitmproxy.net import http as net_http
|
||||
from mitmproxy.tcp import TCPFlow, TCPMessage
|
||||
from mitmproxy.utils import human
|
||||
from mitmproxy.utils import strutils
|
||||
from mitmproxy.websocket import WebSocketFlow, WebSocketMessage
|
||||
|
||||
|
||||
def indent(n: int, text: str) -> str:
|
||||
|
@ -94,7 +95,11 @@ class Dumper:
|
|||
self.echo(click.style("--- HTTP Trailers", fg="magenta"), ident=4)
|
||||
self._echo_headers(trailers)
|
||||
|
||||
def _echo_message(self, message, flow: flow.Flow):
|
||||
def _echo_message(
|
||||
self,
|
||||
message: Union[net_http.Message, TCPMessage, WebSocketMessage],
|
||||
flow: Union[http.HTTPFlow, TCPFlow, WebSocketFlow]
|
||||
):
|
||||
_, lines, error = contentviews.get_message_content_view(
|
||||
ctx.options.dumper_default_contentview,
|
||||
message,
|
||||
|
@ -165,8 +170,8 @@ class Dumper:
|
|||
|
||||
http_version = ""
|
||||
if (
|
||||
flow.request.http_version not in ("HTTP/1.1", "HTTP/1.0")
|
||||
or flow.request.http_version != getattr(flow.response, "http_version", "HTTP/1.1")
|
||||
not (flow.request.is_http10 or flow.request.is_http11)
|
||||
or flow.request.http_version != getattr(flow.response, "http_version", "HTTP/1.1")
|
||||
):
|
||||
# Hide version for h1 <-> h1 connections.
|
||||
http_version = " " + flow.request.http_version
|
||||
|
@ -215,8 +220,8 @@ class Dumper:
|
|||
|
||||
http_version = ""
|
||||
if (
|
||||
flow.response.http_version not in ("HTTP/1.1", "HTTP/1.0")
|
||||
or flow.request.http_version != flow.response.http_version
|
||||
not (flow.response.is_http10 or flow.response.is_http11)
|
||||
or flow.request.http_version != flow.response.http_version
|
||||
):
|
||||
# Hide version for h1 <-> h1 connections.
|
||||
http_version = f"{flow.response.http_version} "
|
||||
|
@ -226,7 +231,8 @@ class Dumper:
|
|||
# This aligns the HTTP response code with the HTTP request method:
|
||||
# 127.0.0.1:59519: GET http://example.com/
|
||||
# << 304 Not Modified 0b
|
||||
pad = max(0, len(human.format_address(flow.client_conn.peername)) - (2 + len(http_version) + len(replay_str)))
|
||||
pad = max(0,
|
||||
len(human.format_address(flow.client_conn.peername)) - (2 + len(http_version) + len(replay_str)))
|
||||
arrows = " " * pad + arrows
|
||||
|
||||
self.echo(f"{replay}{arrows} {http_version}{code} {reason} {size}")
|
||||
|
|
|
@ -3,21 +3,19 @@ Mitmproxy Content Views
|
|||
=======================
|
||||
|
||||
mitmproxy includes a set of content views which can be used to
|
||||
format/decode/highlight data. While they are currently used for HTTP message
|
||||
bodies only, the may be used in other contexts in the future, e.g. to decode
|
||||
protobuf messages sent as WebSocket frames.
|
||||
format/decode/highlight data. While they are mostly used for HTTP message
|
||||
bodies, the may be used in other contexts, e.g. to decode WebSocket messages.
|
||||
|
||||
Thus, the View API is very minimalistic. The only arguments are `data` and
|
||||
`**metadata`, where `data` is the actual content (as bytes). The contents on
|
||||
metadata depend on the protocol in use. For HTTP, the message headers and
|
||||
message trailers are passed as the ``headers`` and ``trailers`` keyword
|
||||
argument. For HTTP requests, the query parameters are passed as the ``query``
|
||||
keyword argument.
|
||||
metadata depend on the protocol in use. Known attributes can be found in
|
||||
`base.View`.
|
||||
"""
|
||||
import traceback
|
||||
from typing import Dict, Optional # noqa
|
||||
from typing import List # noqa
|
||||
from typing import List, Union
|
||||
from typing import Optional
|
||||
|
||||
from mitmproxy import flow
|
||||
from mitmproxy.net import http
|
||||
from mitmproxy.utils import strutils
|
||||
from . import (
|
||||
|
@ -25,9 +23,11 @@ from . import (
|
|||
urlencoded, multipart, image, query, protobuf, msgpack
|
||||
)
|
||||
from .base import View, KEY_MAX, format_text, format_dict, TViewResult
|
||||
from ..http import HTTPFlow
|
||||
from ..tcp import TCPMessage, TCPFlow
|
||||
from ..websocket import WebSocketMessage, WebSocketFlow
|
||||
|
||||
views: List[View] = []
|
||||
content_types_map: Dict[str, List[View]] = {}
|
||||
|
||||
|
||||
def get(name: str) -> Optional[View]:
|
||||
|
@ -45,19 +45,8 @@ def add(view: View) -> None:
|
|||
|
||||
views.append(view)
|
||||
|
||||
for ct in view.content_types:
|
||||
l = content_types_map.setdefault(ct, [])
|
||||
l.append(view)
|
||||
|
||||
|
||||
def remove(view: View) -> None:
|
||||
for ct in view.content_types:
|
||||
l = content_types_map.setdefault(ct, [])
|
||||
l.remove(view)
|
||||
|
||||
if not len(l):
|
||||
del content_types_map[ct]
|
||||
|
||||
views.remove(view)
|
||||
|
||||
|
||||
|
@ -75,16 +64,24 @@ def safe_to_print(lines, encoding="utf8"):
|
|||
yield clean_line
|
||||
|
||||
|
||||
def get_message_content_view(viewname, message, flow):
|
||||
def get_message_content_view(
|
||||
viewname: str,
|
||||
message: Union[http.Message, TCPMessage, WebSocketMessage],
|
||||
flow: Union[HTTPFlow, TCPFlow, WebSocketFlow],
|
||||
):
|
||||
"""
|
||||
Like get_content_view, but also handles message encoding.
|
||||
"""
|
||||
viewmode = get(viewname)
|
||||
if not viewmode:
|
||||
viewmode = get("auto")
|
||||
assert viewmode
|
||||
|
||||
content: Optional[bytes]
|
||||
try:
|
||||
content = message.content
|
||||
content = message.content # type: ignore
|
||||
except ValueError:
|
||||
assert isinstance(message, http.Message)
|
||||
content = message.raw_content
|
||||
enc = "[cannot decode]"
|
||||
else:
|
||||
|
@ -93,22 +90,24 @@ def get_message_content_view(viewname, message, flow):
|
|||
message.headers.get("content-encoding")
|
||||
)
|
||||
else:
|
||||
enc = None
|
||||
enc = ""
|
||||
|
||||
if content is None:
|
||||
return "", iter([[("error", "content missing")]]), None
|
||||
|
||||
metadata = {}
|
||||
if isinstance(message, http.Request):
|
||||
metadata["query"] = message.query
|
||||
content_type = None
|
||||
http_message = None
|
||||
if isinstance(message, http.Message):
|
||||
metadata["headers"] = message.headers
|
||||
metadata["trailers"] = message.trailers
|
||||
metadata["message"] = message
|
||||
metadata["flow"] = flow
|
||||
http_message = message
|
||||
if ctype := message.headers.get("content-type"):
|
||||
if ct := http.parse_content_type(ctype):
|
||||
content_type = f"{ct[0]}/{ct[1]}"
|
||||
|
||||
description, lines, error = get_content_view(
|
||||
viewmode, content, **metadata
|
||||
viewmode, content,
|
||||
content_type=content_type,
|
||||
flow=flow,
|
||||
http_message=http_message,
|
||||
)
|
||||
|
||||
if enc:
|
||||
|
@ -117,7 +116,11 @@ def get_message_content_view(viewname, message, flow):
|
|||
return description, lines, error
|
||||
|
||||
|
||||
def get_tcp_content_view(viewname: str, data: bytes):
|
||||
def get_tcp_content_view(
|
||||
viewname: str,
|
||||
data: bytes,
|
||||
flow: TCPFlow,
|
||||
):
|
||||
viewmode = get(viewname)
|
||||
if not viewmode:
|
||||
viewmode = get("auto")
|
||||
|
@ -125,12 +128,19 @@ def get_tcp_content_view(viewname: str, data: bytes):
|
|||
# https://github.com/mitmproxy/mitmproxy/pull/3970#issuecomment-623024447
|
||||
assert viewmode
|
||||
|
||||
description, lines, error = get_content_view(viewmode, data)
|
||||
description, lines, error = get_content_view(viewmode, data, flow=flow)
|
||||
|
||||
return description, lines, error
|
||||
|
||||
|
||||
def get_content_view(viewmode: View, data: bytes, **metadata):
|
||||
def get_content_view(
|
||||
viewmode: View,
|
||||
data: bytes,
|
||||
*,
|
||||
content_type: Optional[str] = None,
|
||||
flow: Optional[flow.Flow] = None,
|
||||
http_message: Optional[http.Message] = None,
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
viewmode: the view to use.
|
||||
|
@ -143,9 +153,11 @@ def get_content_view(viewmode: View, data: bytes, **metadata):
|
|||
In contrast to calling the views directly, text is always safe-to-print unicode.
|
||||
"""
|
||||
try:
|
||||
ret = viewmode(data, **metadata)
|
||||
ret = viewmode(data, content_type=content_type, flow=flow, http_message=http_message)
|
||||
if ret is None:
|
||||
ret = "Couldn't parse: falling back to Raw", get("Raw")(data, **metadata)[1]
|
||||
ret = "Couldn't parse: falling back to Raw", get("Raw")(
|
||||
data, content_type=content_type, flow=flow, http_message=http_message
|
||||
)[1]
|
||||
desc, content = ret
|
||||
error = None
|
||||
# Third-party viewers can fail in unexpected ways...
|
||||
|
@ -153,11 +165,8 @@ def get_content_view(viewmode: View, data: bytes, **metadata):
|
|||
desc = "Couldn't parse: falling back to Raw"
|
||||
raw = get("Raw")
|
||||
assert raw
|
||||
content = raw(data, **metadata)[1]
|
||||
error = "{} Content viewer failed: \n{}".format(
|
||||
getattr(viewmode, "name"),
|
||||
traceback.format_exc()
|
||||
)
|
||||
content = raw(data, content_type=content_type, flow=flow, http_message=http_message)[1]
|
||||
error = f"{getattr(viewmode, 'name')} content viewer failed: \n{traceback.format_exc()}"
|
||||
|
||||
return desc, safe_to_print(content), error
|
||||
|
||||
|
|
|
@ -1,6 +1,4 @@
|
|||
from mitmproxy import contentviews
|
||||
from mitmproxy.net import http
|
||||
from mitmproxy.utils import strutils
|
||||
from . import base
|
||||
|
||||
|
||||
|
@ -8,21 +6,15 @@ class ViewAuto(base.View):
|
|||
name = "Auto"
|
||||
|
||||
def __call__(self, data, **metadata):
|
||||
headers = metadata.get("headers", {})
|
||||
ctype = headers.get("content-type")
|
||||
if data and ctype:
|
||||
ct = http.parse_content_type(ctype) if ctype else None
|
||||
ct = "{}/{}".format(ct[0], ct[1])
|
||||
if ct in contentviews.content_types_map:
|
||||
return contentviews.content_types_map[ct][0](data, **metadata)
|
||||
elif strutils.is_xml(data):
|
||||
return contentviews.get("XML/HTML")(data, **metadata)
|
||||
elif ct.startswith("image/"):
|
||||
return contentviews.get("Image")(data, **metadata)
|
||||
if metadata.get("query"):
|
||||
return contentviews.get("Query")(data, **metadata)
|
||||
if data and strutils.is_mostly_bin(data):
|
||||
return contentviews.get("Hex")(data)
|
||||
if not data:
|
||||
# TODO: The auto view has little justification now that views implement render_priority,
|
||||
# but we keep it around for now to not touch more parts.
|
||||
priority, view = max(
|
||||
(v.render_priority(data, **metadata), v)
|
||||
for v in contentviews.views
|
||||
)
|
||||
if priority == 0 and not data:
|
||||
return "No content", []
|
||||
return contentviews.get("Raw")(data)
|
||||
return view(data, **metadata)
|
||||
|
||||
def render_priority(self, data: bytes, **metadata) -> float:
|
||||
return -1 # don't recurse.
|
||||
|
|
|
@ -1,5 +1,9 @@
|
|||
# Default view cutoff *in lines*
|
||||
import typing
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from mitmproxy import flow
|
||||
from mitmproxy.net import http
|
||||
|
||||
KEY_MAX = 30
|
||||
|
||||
|
@ -8,37 +12,62 @@ TViewLine = typing.List[typing.Tuple[str, TTextType]]
|
|||
TViewResult = typing.Tuple[str, typing.Iterator[TViewLine]]
|
||||
|
||||
|
||||
class View:
|
||||
class View(ABC):
|
||||
name: typing.ClassVar[str]
|
||||
content_types: typing.ClassVar[typing.List[str]] = []
|
||||
|
||||
def __call__(self, data: bytes, **metadata) -> TViewResult:
|
||||
@abstractmethod
|
||||
def __call__(
|
||||
self,
|
||||
data: bytes,
|
||||
*,
|
||||
content_type: typing.Optional[str] = None,
|
||||
flow: typing.Optional[flow.Flow] = None,
|
||||
http_message: typing.Optional[http.Message] = None,
|
||||
**unknown_metadata,
|
||||
) -> TViewResult:
|
||||
"""
|
||||
Transform raw data into human-readable output.
|
||||
|
||||
Args:
|
||||
data: the data to decode/format.
|
||||
metadata: optional keyword-only arguments for metadata. Implementations must not
|
||||
rely on a given argument being present.
|
||||
Returns a (description, content generator) tuple.
|
||||
The content generator yields lists of (style, text) tuples, where each list represents
|
||||
a single line. ``text`` is a unfiltered string which may need to be escaped,
|
||||
depending on the used output. For example, it may contain terminal control sequences
|
||||
or unfiltered HTML.
|
||||
|
||||
Returns:
|
||||
A (description, content generator) tuple.
|
||||
Except for `data`, implementations must not rely on any given argument to be present.
|
||||
To ensure compatibility with future mitmproxy versions, unknown keyword arguments should be ignored.
|
||||
|
||||
The content generator yields lists of (style, text) tuples, where each list represents
|
||||
a single line. ``text`` is a unfiltered byte string which may need to be escaped,
|
||||
depending on the used output.
|
||||
|
||||
Caveats:
|
||||
The content generator must not yield tuples of tuples,
|
||||
because urwid cannot process that. You have to yield a *list* of tuples per line.
|
||||
The content generator must not yield tuples of tuples, because urwid cannot process that.
|
||||
You have to yield a *list* of tuples per line.
|
||||
"""
|
||||
raise NotImplementedError() # pragma: no cover
|
||||
|
||||
def render_priority(
|
||||
self,
|
||||
data: bytes,
|
||||
*,
|
||||
content_type: typing.Optional[str] = None,
|
||||
flow: typing.Optional[flow.Flow] = None,
|
||||
http_message: typing.Optional[http.Message] = None,
|
||||
**unknown_metadata,
|
||||
) -> float:
|
||||
"""
|
||||
Return the priority of this view for rendering `data`.
|
||||
If no particular view is chosen by the user, the view with the highest priority is selected.
|
||||
|
||||
Except for `data`, implementations must not rely on any given argument to be present.
|
||||
To ensure compatibility with future mitmproxy versions, unknown keyword arguments should be ignored.
|
||||
"""
|
||||
return 0
|
||||
|
||||
def __lt__(self, other):
|
||||
assert isinstance(other, View)
|
||||
return self.name.__lt__(other.name)
|
||||
|
||||
|
||||
def format_pairs(
|
||||
items: typing.Iterable[typing.Tuple[TTextType, TTextType]]
|
||||
items: typing.Iterable[typing.Tuple[TTextType, TTextType]]
|
||||
) -> typing.Iterator[TViewLine]:
|
||||
|
||||
"""
|
||||
Helper function that accepts a list of (k,v) pairs into a list of
|
||||
[
|
||||
|
@ -67,7 +96,7 @@ def format_pairs(
|
|||
|
||||
|
||||
def format_dict(
|
||||
d: typing.Mapping[TTextType, TTextType]
|
||||
d: typing.Mapping[TTextType, TTextType]
|
||||
) -> typing.Iterator[TViewLine]:
|
||||
"""
|
||||
Helper function that transforms the given dictionary into a list of
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import re
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
from mitmproxy.contentviews import base
|
||||
from mitmproxy.utils import strutils
|
||||
|
@ -50,15 +51,15 @@ def beautify(data: str, indent: str = " "):
|
|||
|
||||
class ViewCSS(base.View):
|
||||
name = "CSS"
|
||||
content_types = [
|
||||
"text/css"
|
||||
]
|
||||
|
||||
def __call__(self, data, **metadata):
|
||||
data = data.decode("utf8", "surrogateescape")
|
||||
beautified = beautify(data)
|
||||
return "CSS", base.format_text(beautified)
|
||||
|
||||
def render_priority(self, data: bytes, *, content_type: Optional[str] = None, **metadata) -> float:
|
||||
return float(content_type == "text/css")
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
with open("../tools/web/static/vendor.css") as f:
|
||||
|
|
|
@ -16,3 +16,6 @@ class ViewHex(base.View):
|
|||
|
||||
def __call__(self, data, **metadata):
|
||||
return "Hex", self._format(data)
|
||||
|
||||
def render_priority(self, data: bytes, **metadata) -> float:
|
||||
return 0.2 * strutils.is_mostly_bin(data)
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import imghdr
|
||||
from typing import Optional
|
||||
|
||||
from mitmproxy.contentviews import base
|
||||
from mitmproxy.coretypes import multidict
|
||||
|
@ -16,16 +17,6 @@ imghdr.tests.append(test_ico)
|
|||
class ViewImage(base.View):
|
||||
name = "Image"
|
||||
|
||||
# there is also a fallback in the auto view for image/*.
|
||||
content_types = [
|
||||
"image/png",
|
||||
"image/jpeg",
|
||||
"image/gif",
|
||||
"image/vnd.microsoft.icon",
|
||||
"image/x-icon",
|
||||
"image/webp",
|
||||
]
|
||||
|
||||
def __call__(self, data, **metadata):
|
||||
image_type = imghdr.what('', h=data)
|
||||
if image_type == 'png':
|
||||
|
@ -45,3 +36,10 @@ class ViewImage(base.View):
|
|||
else:
|
||||
view_name = "Unknown Image"
|
||||
return view_name, base.format_dict(multidict.MultiDict(image_metadata))
|
||||
|
||||
def render_priority(self, data: bytes, *, content_type: Optional[str] = None, **metadata) -> float:
|
||||
return float(bool(
|
||||
content_type
|
||||
and content_type.startswith("image/")
|
||||
and content_type != "image/svg+xml"
|
||||
))
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import io
|
||||
import re
|
||||
from typing import Optional
|
||||
|
||||
from mitmproxy.utils import strutils
|
||||
from mitmproxy.contentviews import base
|
||||
|
@ -46,13 +47,16 @@ def beautify(data):
|
|||
|
||||
class ViewJavaScript(base.View):
|
||||
name = "JavaScript"
|
||||
content_types = [
|
||||
__content_types = (
|
||||
"application/x-javascript",
|
||||
"application/javascript",
|
||||
"text/javascript"
|
||||
]
|
||||
)
|
||||
|
||||
def __call__(self, data, **metadata):
|
||||
data = data.decode("utf-8", "replace")
|
||||
res = beautify(data)
|
||||
return "JavaScript", base.format_text(res)
|
||||
|
||||
def render_priority(self, data: bytes, *, content_type: Optional[str] = None, **metadata) -> float:
|
||||
return float(content_type in self.__content_types)
|
||||
|
|
|
@ -38,13 +38,18 @@ def format_json(data: typing.Any) -> typing.Iterator[base.TViewLine]:
|
|||
|
||||
class ViewJSON(base.View):
|
||||
name = "JSON"
|
||||
content_types = [
|
||||
"application/json",
|
||||
"application/json-rpc",
|
||||
"application/vnd.api+json"
|
||||
]
|
||||
|
||||
def __call__(self, data, **metadata):
|
||||
data = parse_json(data)
|
||||
if data is not PARSE_ERROR:
|
||||
return "JSON", format_json(data)
|
||||
|
||||
def render_priority(self, data: bytes, *, content_type: typing.Optional[str] = None, **metadata) -> float:
|
||||
if content_type in (
|
||||
"application/json",
|
||||
"application/json-rpc",
|
||||
):
|
||||
return 1
|
||||
if content_type and content_type.startswith("application/") and content_type.endswith("+json"):
|
||||
return 1
|
||||
return 0
|
||||
|
|
|
@ -39,12 +39,15 @@ def format_msgpack(data):
|
|||
|
||||
class ViewMsgPack(base.View):
|
||||
name = "MsgPack"
|
||||
content_types = [
|
||||
__content_types = (
|
||||
"application/msgpack",
|
||||
"application/x-msgpack",
|
||||
]
|
||||
)
|
||||
|
||||
def __call__(self, data, **metadata):
|
||||
data = parse_msgpack(data)
|
||||
if data is not PARSE_ERROR:
|
||||
return "MsgPack", format_msgpack(data)
|
||||
|
||||
def render_priority(self, data: bytes, *, content_type: typing.Optional[str] = None, **metadata) -> float:
|
||||
return float(content_type in self.__content_types)
|
||||
|
|
|
@ -1,19 +1,24 @@
|
|||
from mitmproxy.net import http
|
||||
from typing import Optional
|
||||
|
||||
from mitmproxy.coretypes import multidict
|
||||
from mitmproxy.net import http
|
||||
from . import base
|
||||
|
||||
|
||||
class ViewMultipart(base.View):
|
||||
name = "Multipart Form"
|
||||
content_types = ["multipart/form-data"]
|
||||
|
||||
@staticmethod
|
||||
def _format(v):
|
||||
yield [("highlight", "Form data:\n")]
|
||||
yield from base.format_dict(multidict.MultiDict(v))
|
||||
|
||||
def __call__(self, data, **metadata):
|
||||
headers = metadata.get("headers", {})
|
||||
v = http.multipart.decode(headers, data)
|
||||
def __call__(self, data: bytes, content_type: Optional[str] = None, **metadata):
|
||||
if content_type is None:
|
||||
return
|
||||
v = http.multipart.decode(content_type, data)
|
||||
if v:
|
||||
return "Multipart form", self._format(v)
|
||||
|
||||
def render_priority(self, data: bytes, *, content_type: Optional[str] = None, **metadata) -> float:
|
||||
return float(content_type == "multipart/form-data")
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import io
|
||||
from typing import Optional
|
||||
|
||||
from kaitaistruct import KaitaiStream
|
||||
from . import base
|
||||
|
@ -66,7 +67,7 @@ class ViewProtobuf(base.View):
|
|||
"""
|
||||
|
||||
name = "Protocol Buffer"
|
||||
content_types = [
|
||||
__content_types = [
|
||||
"application/x-protobuf",
|
||||
"application/x-protobuffer",
|
||||
]
|
||||
|
@ -77,3 +78,6 @@ class ViewProtobuf(base.View):
|
|||
raise ValueError("Failed to parse input.")
|
||||
|
||||
return "Protobuf", base.format_text(decoded)
|
||||
|
||||
def render_priority(self, data: bytes, *, content_type: Optional[str] = None, **metadata) -> float:
|
||||
return float(content_type in self.__content_types)
|
||||
|
|
|
@ -1,14 +1,18 @@
|
|||
from typing import List # noqa
|
||||
from typing import Optional
|
||||
|
||||
from . import base
|
||||
from ..net import http
|
||||
|
||||
|
||||
class ViewQuery(base.View):
|
||||
name = "Query"
|
||||
|
||||
def __call__(self, data, **metadata):
|
||||
query = metadata.get("query")
|
||||
def __call__(self, data: bytes, http_message: Optional[http.Message] = None, **metadata):
|
||||
query = getattr(http_message, "query", None)
|
||||
if query:
|
||||
return "Query", base.format_pairs(query.items(multi=True))
|
||||
else:
|
||||
return "Query", base.format_text("")
|
||||
|
||||
def render_priority(self, data: bytes, *, http_message: Optional[http.Message] = None, **metadata) -> float:
|
||||
return 0.3 * float(bool(getattr(http_message, "query", False)))
|
||||
|
|
|
@ -9,3 +9,6 @@ class ViewRaw(base.View):
|
|||
|
||||
def __call__(self, data, **metadata):
|
||||
return "Raw", base.format_text(strutils.bytes_to_escaped_str(data, True))
|
||||
|
||||
def render_priority(self, data: bytes, **metadata) -> float:
|
||||
return 0.1 * float(bool(data))
|
||||
|
|
|
@ -1,10 +1,11 @@
|
|||
from typing import Optional
|
||||
|
||||
from mitmproxy.net.http import url
|
||||
from . import base
|
||||
|
||||
|
||||
class ViewURLEncoded(base.View):
|
||||
name = "URL-encoded"
|
||||
content_types = ["application/x-www-form-urlencoded"]
|
||||
|
||||
def __call__(self, data, **metadata):
|
||||
try:
|
||||
|
@ -13,3 +14,6 @@ class ViewURLEncoded(base.View):
|
|||
return None
|
||||
d = url.decode(data)
|
||||
return "URLEncoded form", base.format_pairs(d)
|
||||
|
||||
def render_priority(self, data: bytes, *, content_type: Optional[str] = None, **metadata) -> float:
|
||||
return float(content_type == "application/x-www-form-urlencoded")
|
||||
|
|
|
@ -1,13 +1,15 @@
|
|||
from typing import Optional
|
||||
|
||||
from mitmproxy.contrib.wbxml import ASCommandResponse
|
||||
from . import base
|
||||
|
||||
|
||||
class ViewWBXML(base.View):
|
||||
name = "WBXML"
|
||||
content_types = [
|
||||
__content_types = (
|
||||
"application/vnd.wap.wbxml",
|
||||
"application/vnd.ms-sync.wbxml"
|
||||
]
|
||||
)
|
||||
|
||||
def __call__(self, data, **metadata):
|
||||
try:
|
||||
|
@ -17,3 +19,6 @@ class ViewWBXML(base.View):
|
|||
return "WBXML", base.format_text(parsedContent)
|
||||
except:
|
||||
return None
|
||||
|
||||
def render_priority(self, data: bytes, *, content_type: Optional[str] = None, **metadata) -> float:
|
||||
return float(content_type in self.__content_types)
|
||||
|
|
|
@ -4,7 +4,7 @@ import textwrap
|
|||
from typing import Iterable, Optional
|
||||
|
||||
from mitmproxy.contentviews import base
|
||||
from mitmproxy.utils import sliding_window
|
||||
from mitmproxy.utils import sliding_window, strutils
|
||||
|
||||
"""
|
||||
A custom XML/HTML prettifier. Compared to other prettifiers, its main features are:
|
||||
|
@ -214,7 +214,7 @@ def format_xml(tokens: Iterable[Token]) -> str:
|
|||
|
||||
class ViewXmlHtml(base.View):
|
||||
name = "XML/HTML"
|
||||
content_types = ["text/xml", "text/html"]
|
||||
__content_types = ("text/xml", "text/html")
|
||||
|
||||
def __call__(self, data, **metadata):
|
||||
# TODO:
|
||||
|
@ -233,3 +233,10 @@ class ViewXmlHtml(base.View):
|
|||
else:
|
||||
t = "XML"
|
||||
return t, pretty
|
||||
|
||||
def render_priority(self, data: bytes, *, content_type: Optional[str] = None, **metadata) -> float:
|
||||
if content_type in self.__content_types:
|
||||
return 1
|
||||
elif strutils.is_xml(data):
|
||||
return 0.4
|
||||
return float(content_type in self.__content_types)
|
||||
|
|
|
@ -65,9 +65,9 @@ class HTTPFlow(flow.Flow):
|
|||
|
||||
|
||||
def make_error_response(
|
||||
status_code: int,
|
||||
message: str = "",
|
||||
headers: Optional[http.Headers] = None,
|
||||
status_code: int,
|
||||
message: str = "",
|
||||
headers: Optional[http.Headers] = None,
|
||||
) -> http.Response:
|
||||
body: bytes = """
|
||||
<html>
|
||||
|
|
|
@ -1,11 +1,12 @@
|
|||
import re
|
||||
import mimetypes
|
||||
import re
|
||||
from typing import Tuple, List, Optional
|
||||
from urllib.parse import quote
|
||||
|
||||
from mitmproxy.net.http import headers
|
||||
|
||||
|
||||
def encode(head, l):
|
||||
|
||||
k = head.get("content-type")
|
||||
if k:
|
||||
k = headers.parse_content_type(k)
|
||||
|
@ -38,17 +39,16 @@ def encode(head, l):
|
|||
return temp
|
||||
|
||||
|
||||
def decode(hdrs, content):
|
||||
def decode(content_type: Optional[str], content: bytes) -> List[Tuple[bytes, bytes]]:
|
||||
"""
|
||||
Takes a multipart boundary encoded string and returns list of (key, value) tuples.
|
||||
"""
|
||||
v = hdrs.get("content-type")
|
||||
if v:
|
||||
v = headers.parse_content_type(v)
|
||||
if not v:
|
||||
if content_type:
|
||||
ct = headers.parse_content_type(content_type)
|
||||
if not ct:
|
||||
return []
|
||||
try:
|
||||
boundary = v[2]["boundary"].encode("ascii")
|
||||
boundary = ct[2]["boundary"].encode("ascii")
|
||||
except (KeyError, UnicodeError):
|
||||
return []
|
||||
|
||||
|
|
|
@ -449,7 +449,7 @@ class Request(message.Message):
|
|||
is_valid_content_type = "multipart/form-data" in self.headers.get("content-type", "").lower()
|
||||
if is_valid_content_type:
|
||||
try:
|
||||
return multipart.decode(self.headers, self.content)
|
||||
return multipart.decode(self.headers.get("content-type"), self.content)
|
||||
except ValueError:
|
||||
pass
|
||||
return ()
|
||||
|
|
|
@ -152,7 +152,7 @@ class FlowDetails(tabs.Tabs):
|
|||
|
||||
from_client = flow.messages[0].from_client
|
||||
for m in messages:
|
||||
_, lines, _ = contentviews.get_tcp_content_view(viewmode, m)
|
||||
_, lines, _ = contentviews.get_tcp_content_view(viewmode, m, flow)
|
||||
|
||||
for line in lines:
|
||||
if from_client:
|
||||
|
|
|
@ -5,7 +5,7 @@ import shutil
|
|||
import time
|
||||
import typing
|
||||
|
||||
from mitmproxy import contentviews
|
||||
from mitmproxy import contentviews, http
|
||||
from mitmproxy import ctx
|
||||
from mitmproxy import flowfilter
|
||||
from mitmproxy import io, flow
|
||||
|
@ -49,6 +49,7 @@ def save_flows(path: pathlib.Path, flows: typing.Iterable[flow.Flow]) -> None:
|
|||
|
||||
def save_flows_content(path: pathlib.Path, flows: typing.Iterable[flow.Flow]) -> None:
|
||||
for f in flows:
|
||||
assert isinstance(f, http.HTTPFlow)
|
||||
for m in ('request', 'response'):
|
||||
message = getattr(f, m)
|
||||
message_path = path / "flows" / f.id / m
|
||||
|
|
|
@ -156,7 +156,7 @@ def check():
|
|||
|
||||
# Check for underscores in the options. Options always follow '--'.
|
||||
for argument in args:
|
||||
underscoreParam = re.search('[-]{2}((.*?_)(.*?(\s|$)))+', argument)
|
||||
underscoreParam = re.search(r'[-]{2}((.*?_)(.*?(\s|$)))+', argument)
|
||||
if underscoreParam is not None:
|
||||
print("{} uses underscores, please use hyphens {}".format(
|
||||
argument,
|
||||
|
|
|
@ -133,7 +133,11 @@ def is_mostly_bin(s: bytes) -> bool:
|
|||
|
||||
|
||||
def is_xml(s: bytes) -> bool:
|
||||
return s.strip().startswith(b"<")
|
||||
for char in s:
|
||||
if char in (9, 10, 32): # is space?
|
||||
continue
|
||||
return char == 60 # is a "<"?
|
||||
return False
|
||||
|
||||
|
||||
def clean_hanging_newline(t):
|
||||
|
|
|
@ -15,3 +15,15 @@ def test_view_image(tdata):
|
|||
assert img.split(".")[-1].upper() in viewname
|
||||
|
||||
assert v(b"flibble") == ('Unknown Image', [[('header', 'Image Format: '), ('text', 'unknown')]])
|
||||
|
||||
|
||||
def test_render_priority():
|
||||
v = image.ViewImage()
|
||||
assert v.render_priority(b"", content_type="image/png")
|
||||
assert v.render_priority(b"", content_type="image/jpeg")
|
||||
assert v.render_priority(b"", content_type="image/gif")
|
||||
assert v.render_priority(b"", content_type="image/vnd.microsoft.icon")
|
||||
assert v.render_priority(b"", content_type="image/x-icon")
|
||||
assert v.render_priority(b"", content_type="image/webp")
|
||||
assert v.render_priority(b"", content_type="image/future-unknown-format-42")
|
||||
assert not v.render_priority(b"", content_type="image/svg+xml")
|
||||
|
|
|
@ -3,14 +3,18 @@ from unittest import mock
|
|||
import pytest
|
||||
|
||||
from mitmproxy import contentviews
|
||||
from mitmproxy.net.http import Headers
|
||||
from mitmproxy.test import tflow
|
||||
from mitmproxy.test import tutils
|
||||
|
||||
|
||||
class TestContentView(contentviews.View):
|
||||
name = "test"
|
||||
content_types = ["test/123"]
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def should_render(self, content_type):
|
||||
return content_type == "test/123"
|
||||
|
||||
|
||||
def test_add_remove():
|
||||
|
@ -38,7 +42,7 @@ def test_get_content_view():
|
|||
desc, lines, err = contentviews.get_content_view(
|
||||
contentviews.get("Auto"),
|
||||
b"[1, 2, 3]",
|
||||
headers=Headers(content_type="application/json")
|
||||
content_type="application/json",
|
||||
)
|
||||
assert desc == "JSON"
|
||||
assert list(lines)
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
from mitmproxy.contentviews import auto
|
||||
from mitmproxy.net import http
|
||||
from mitmproxy.coretypes import multidict
|
||||
from mitmproxy.test import tflow
|
||||
from . import full_eval
|
||||
|
||||
|
||||
|
@ -8,37 +7,42 @@ def test_view_auto():
|
|||
v = full_eval(auto.ViewAuto())
|
||||
f = v(
|
||||
b"foo",
|
||||
headers=http.Headers()
|
||||
)
|
||||
assert f[0] == "Raw"
|
||||
|
||||
f = v(
|
||||
b"<html></html>",
|
||||
headers=http.Headers(content_type="text/html")
|
||||
content_type="text/html",
|
||||
)
|
||||
assert f[0] == "HTML"
|
||||
|
||||
f = v(
|
||||
b"foo",
|
||||
headers=http.Headers(content_type="text/flibble")
|
||||
content_type="text/flibble",
|
||||
)
|
||||
assert f[0] == "Raw"
|
||||
|
||||
f = v(
|
||||
b"<xml></xml>",
|
||||
headers=http.Headers(content_type="text/flibble")
|
||||
content_type="text/flibble",
|
||||
)
|
||||
assert f[0].startswith("XML")
|
||||
|
||||
f = v(
|
||||
b"<svg></svg>",
|
||||
headers=http.Headers(content_type="image/svg+xml")
|
||||
content_type="image/svg+xml",
|
||||
)
|
||||
assert f[0].startswith("XML")
|
||||
|
||||
f = v(
|
||||
b"{}",
|
||||
content_type="application/acme+json",
|
||||
)
|
||||
assert f[0].startswith("JSON")
|
||||
|
||||
f = v(
|
||||
b"verybinary",
|
||||
headers=http.Headers(content_type="image/new-magic-image-format")
|
||||
content_type="image/new-magic-image-format",
|
||||
)
|
||||
assert f[0] == "Unknown Image"
|
||||
|
||||
|
@ -47,13 +51,14 @@ def test_view_auto():
|
|||
|
||||
f = v(
|
||||
b"",
|
||||
headers=http.Headers()
|
||||
)
|
||||
assert f[0] == "No content"
|
||||
|
||||
flow = tflow.tflow()
|
||||
flow.request.query = [("foo", "bar")]
|
||||
f = v(
|
||||
b"",
|
||||
headers=http.Headers(),
|
||||
query=multidict.MultiDict([("foo", "bar")]),
|
||||
flow=flow,
|
||||
http_message=flow.request,
|
||||
)
|
||||
assert f[0] == "Query"
|
||||
|
|
|
@ -37,3 +37,9 @@ def test_simple():
|
|||
assert v(b"console.log('not really css')") == (
|
||||
'CSS', [[('text', "console.log('not really css')")]]
|
||||
)
|
||||
|
||||
|
||||
def test_render_priority():
|
||||
v = css.ViewCSS()
|
||||
assert v.render_priority(b"", content_type="text/css")
|
||||
assert not v.render_priority(b"", content_type="text/plain")
|
||||
|
|
|
@ -5,3 +5,10 @@ from . import full_eval
|
|||
def test_view_hex():
|
||||
v = full_eval(hex.ViewHex())
|
||||
assert v(b"foo")
|
||||
|
||||
|
||||
def test_render_priority():
|
||||
v = hex.ViewHex()
|
||||
assert not v.render_priority(b"ascii")
|
||||
assert v.render_priority(b"\xFF")
|
||||
assert not v.render_priority(b"")
|
||||
|
|
|
@ -27,3 +27,11 @@ def test_format_xml(filename, tdata):
|
|||
expected = f.read()
|
||||
js = javascript.beautify(input)
|
||||
assert js == expected
|
||||
|
||||
|
||||
def test_render_priority():
|
||||
v = javascript.ViewJavaScript()
|
||||
assert v.render_priority(b"", content_type="application/x-javascript")
|
||||
assert v.render_priority(b"", content_type="application/javascript")
|
||||
assert v.render_priority(b"", content_type="text/javascript")
|
||||
assert not v.render_priority(b"", content_type="text/plain")
|
||||
|
|
|
@ -41,3 +41,12 @@ def test_view_json():
|
|||
def test_view_json_doesnt_crash(data):
|
||||
v = full_eval(json.ViewJSON())
|
||||
v(data)
|
||||
|
||||
|
||||
def test_render_priority():
|
||||
v = json.ViewJSON()
|
||||
assert v.render_priority(b"", content_type="application/json")
|
||||
assert v.render_priority(b"", content_type="application/json-rpc")
|
||||
assert v.render_priority(b"", content_type="application/vnd.api+json")
|
||||
assert v.render_priority(b"", content_type="application/acme+json")
|
||||
assert not v.render_priority(b"", content_type="text/plain")
|
||||
|
|
|
@ -44,3 +44,10 @@ def test_view_msgpack():
|
|||
def test_view_msgpack_doesnt_crash(data):
|
||||
v = full_eval(msgpack.ViewMsgPack())
|
||||
v(data)
|
||||
|
||||
|
||||
def test_render_priority():
|
||||
v = msgpack.ViewMsgPack()
|
||||
assert v.render_priority(b"", content_type="application/msgpack")
|
||||
assert v.render_priority(b"", content_type="application/x-msgpack")
|
||||
assert not v.render_priority(b"", content_type="text/plain")
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
from mitmproxy.contentviews import multipart
|
||||
from mitmproxy.net import http
|
||||
from . import full_eval
|
||||
|
||||
|
||||
|
@ -12,14 +11,16 @@ Content-Disposition: form-data; name="submit-name"
|
|||
Larry
|
||||
--AaB03x
|
||||
""".strip()
|
||||
h = http.Headers(content_type="multipart/form-data; boundary=AaB03x")
|
||||
assert view(v, headers=h)
|
||||
assert view(v, content_type="multipart/form-data; boundary=AaB03x")
|
||||
|
||||
h = http.Headers()
|
||||
assert not view(v, headers=h)
|
||||
assert not view(v)
|
||||
|
||||
h = http.Headers(content_type="multipart/form-data")
|
||||
assert not view(v, headers=h)
|
||||
assert not view(v, content_type="multipart/form-data")
|
||||
|
||||
h = http.Headers(content_type="unparseable")
|
||||
assert not view(v, headers=h)
|
||||
assert not view(v, content_type="unparseable")
|
||||
|
||||
|
||||
def test_render_priority():
|
||||
v = multipart.ViewMultipart()
|
||||
assert v.render_priority(b"", content_type="multipart/form-data")
|
||||
assert not v.render_priority(b"", content_type="text/plain")
|
||||
|
|
|
@ -28,3 +28,10 @@ def test_format_pbuf(filename, tdata):
|
|||
expected = f.read()
|
||||
|
||||
assert protobuf.format_pbuf(input) == expected
|
||||
|
||||
|
||||
def test_render_priority():
|
||||
v = protobuf.ViewProtobuf()
|
||||
assert v.render_priority(b"", content_type="application/x-protobuf")
|
||||
assert v.render_priority(b"", content_type="application/x-protobuffer")
|
||||
assert not v.render_priority(b"", content_type="text/plain")
|
||||
|
|
|
@ -1,13 +1,23 @@
|
|||
from mitmproxy.contentviews import query
|
||||
from mitmproxy.coretypes import multidict
|
||||
from mitmproxy.test import tutils
|
||||
from . import full_eval
|
||||
|
||||
|
||||
def test_view_query():
|
||||
d = ""
|
||||
v = full_eval(query.ViewQuery())
|
||||
f = v(d, query=multidict.MultiDict([("foo", "bar"), ("foo", "baz")]))
|
||||
req = tutils.treq()
|
||||
req.query = [("foo", "bar"), ("foo", "baz")]
|
||||
f = v(d, http_message=req)
|
||||
assert f[0] == "Query"
|
||||
assert f[1] == [[("header", "foo: "), ("text", "bar")], [("header", "foo: "), ("text", "baz")]]
|
||||
|
||||
assert v(d) == ("Query", [])
|
||||
|
||||
|
||||
def test_render_priority():
|
||||
view = query.ViewQuery()
|
||||
req = tutils.treq()
|
||||
req.query = [("foo", "bar"), ("foo", "baz")]
|
||||
assert view.render_priority(b"", http_message=req)
|
||||
assert not view.render_priority(b"")
|
||||
|
|
|
@ -5,3 +5,9 @@ from . import full_eval
|
|||
def test_view_raw():
|
||||
v = full_eval(raw.ViewRaw())
|
||||
assert v(b"foo")
|
||||
|
||||
|
||||
def test_render_priority():
|
||||
v = raw.ViewRaw()
|
||||
assert v.render_priority(b"anything")
|
||||
assert not v.render_priority(b"")
|
||||
|
|
|
@ -13,3 +13,9 @@ def test_view_urlencoded():
|
|||
assert v(d)
|
||||
|
||||
assert not v(b"\xFF\x00")
|
||||
|
||||
|
||||
def test_render_priority():
|
||||
v = urlencoded.ViewURLEncoded()
|
||||
assert v.render_priority(b"", content_type="application/x-www-form-urlencoded")
|
||||
assert not v.render_priority(b"", content_type="text/plain")
|
||||
|
|
|
@ -18,3 +18,10 @@ def test_wbxml(tdata):
|
|||
|
||||
p = wbxml.ASCommandResponse.ASCommandResponse(input)
|
||||
assert p.xmlString == expected
|
||||
|
||||
|
||||
def test_render_priority():
|
||||
v = wbxml.ViewWBXML()
|
||||
assert v.render_priority(b"", content_type="application/vnd.wap.wbxml")
|
||||
assert v.render_priority(b"", content_type="application/vnd.ms-sync.wbxml")
|
||||
assert not v.render_priority(b"", content_type="text/plain")
|
||||
|
|
|
@ -34,3 +34,12 @@ def test_format_xml(filename, tdata):
|
|||
expected = f.read()
|
||||
tokens = xml_html.tokenize(input)
|
||||
assert xml_html.format_xml(tokens) == expected
|
||||
|
||||
|
||||
def test_render_priority():
|
||||
v = xml_html.ViewXmlHtml()
|
||||
assert v.render_priority(b"", content_type="text/xml")
|
||||
assert v.render_priority(b"", content_type="text/xml")
|
||||
assert v.render_priority(b"", content_type="text/html")
|
||||
assert not v.render_priority(b"", content_type="text/plain")
|
||||
assert v.render_priority(b"<html/>")
|
||||
|
|
|
@ -1,13 +1,11 @@
|
|||
import pytest
|
||||
|
||||
from mitmproxy.net.http import Headers
|
||||
from mitmproxy.net.http import multipart
|
||||
import pytest
|
||||
|
||||
|
||||
def test_decode():
|
||||
boundary = 'somefancyboundary'
|
||||
headers = Headers(
|
||||
content_type='multipart/form-data; boundary=' + boundary
|
||||
)
|
||||
content = (
|
||||
"--{0}\n"
|
||||
"Content-Disposition: form-data; name=\"field1\"\n\n"
|
||||
|
@ -17,24 +15,17 @@ def test_decode():
|
|||
"value2\n"
|
||||
"--{0}--".format(boundary).encode()
|
||||
)
|
||||
|
||||
form = multipart.decode(headers, content)
|
||||
form = multipart.decode(f'multipart/form-data; boundary={boundary}', content)
|
||||
|
||||
assert len(form) == 2
|
||||
assert form[0] == (b"field1", b"value1")
|
||||
assert form[1] == (b"field2", b"value2")
|
||||
|
||||
boundary = 'boundary茅莽'
|
||||
headers = Headers(
|
||||
content_type='multipart/form-data; boundary=' + boundary
|
||||
)
|
||||
result = multipart.decode(headers, content)
|
||||
result = multipart.decode(f'multipart/form-data; boundary={boundary}', content)
|
||||
assert result == []
|
||||
|
||||
headers = Headers(
|
||||
content_type=''
|
||||
)
|
||||
assert multipart.decode(headers, content) == []
|
||||
assert multipart.decode("", content) == []
|
||||
|
||||
|
||||
def test_encode():
|
||||
|
|
|
@ -83,6 +83,7 @@ def test_is_mostly_bin():
|
|||
|
||||
|
||||
def test_is_xml():
|
||||
assert not strutils.is_xml(b"")
|
||||
assert not strutils.is_xml(b"foo")
|
||||
assert strutils.is_xml(b"<foo")
|
||||
assert strutils.is_xml(b" \n<foo")
|
||||
|
|
Loading…
Reference in New Issue