move Headers/Message/Request/Response into mitmproxy.http

This commit is contained in:
Maximilian Hils 2021-02-04 02:14:27 +01:00
parent a69ef1ec95
commit 9409bf0368
48 changed files with 1207 additions and 1245 deletions

View File

@ -8,7 +8,7 @@ The content view API is explained in the mitmproxy.contentviews module.
from typing import Optional
from mitmproxy import contentviews, flow
from mitmproxy.net import http
from mitmproxy import http
class ViewSwapCase(contentviews.View):

View File

@ -4,7 +4,7 @@ from mitmproxy import http
def request(flow: http.HTTPFlow) -> None:
if flow.request.pretty_url == "http://example.com/path":
flow.response = http.HTTPResponse.make(
flow.response = http.Response.make(
200, # (optional) status code
b"Hello World", # (optional) content
{"Content-Type": "text/html"} # (optional) headers

View File

@ -8,7 +8,7 @@ body.
"""
from mitmproxy import http
from mitmproxy.net.http import Headers
from mitmproxy.http import Headers
def request(flow: http.HTTPFlow):

View File

@ -117,7 +117,7 @@ async def serve(app, flow: http.HTTPFlow):
async def send(event):
if event["type"] == "http.response.start":
flow.response = http.HTTPResponse.make(event["status"], b"", event.get("headers", []))
flow.response = http.Response.make(event["status"], b"", event.get("headers", []))
flow.response.decode()
elif event["type"] == "http.response.body":
flow.response.content += event.get("body", b"")
@ -133,7 +133,7 @@ async def serve(app, flow: http.HTTPFlow):
raise RuntimeError(f"no response sent.")
except Exception:
ctx.log.error(f"Error in asgi app:\n{traceback.format_exc(limit=-5)}")
flow.response = http.HTTPResponse.make(500, b"ASGI Error.")
flow.response = http.Response.make(500, b"ASGI Error.")
finally:
flow.reply.commit()
done.set()

View File

@ -10,7 +10,6 @@ from mitmproxy import ctx
from mitmproxy import exceptions
from mitmproxy import flowfilter
from mitmproxy import http
from mitmproxy.net import http as net_http
from mitmproxy.tcp import TCPFlow, TCPMessage
from mitmproxy.utils import human
from mitmproxy.utils import strutils
@ -79,7 +78,7 @@ class Dumper:
if self.errfp:
self.errfp.flush()
def _echo_headers(self, headers: net_http.Headers):
def _echo_headers(self, headers: http.Headers):
for k, v in headers.fields:
k = strutils.bytes_to_escaped_str(k)
v = strutils.bytes_to_escaped_str(v)
@ -89,7 +88,7 @@ class Dumper:
)
self.echo(out, ident=4)
def _echo_trailers(self, trailers: Optional[net_http.Headers]):
def _echo_trailers(self, trailers: Optional[http.Headers]):
if not trailers:
return
self.echo(click.style("--- HTTP Trailers", fg="magenta"), ident=4)
@ -97,7 +96,7 @@ class Dumper:
def _echo_message(
self,
message: Union[net_http.Message, TCPMessage, WebSocketMessage],
message: Union[http.Message, TCPMessage, WebSocketMessage],
flow: Union[http.HTTPFlow, TCPFlow, WebSocketFlow]
):
_, lines, error = contentviews.get_message_content_view(
@ -205,7 +204,7 @@ class Dumper:
if not flow.response.is_http2:
reason = flow.response.reason
else:
reason = net_http.status_codes.RESPONSES.get(flow.response.status_code, "")
reason = http.status_codes.RESPONSES.get(flow.response.status_code, "")
reason = click.style(
strutils.escape_control_characters(reason),
fg=code_color,

View File

@ -12,7 +12,7 @@ from mitmproxy.net.http.http1 import assemble
from mitmproxy.utils import strutils
def cleanup_request(f: flow.Flow) -> http.HTTPRequest:
def cleanup_request(f: flow.Flow) -> http.Request:
if not getattr(f, "request", None):
raise exceptions.CommandError("Can't export flow with no request.")
assert isinstance(f, http.HTTPFlow)
@ -21,7 +21,7 @@ def cleanup_request(f: flow.Flow) -> http.HTTPRequest:
return request
def pop_headers(request: http.HTTPRequest) -> http.HTTPRequest:
def pop_headers(request: http.Request) -> http.Request:
# Remove some headers that are redundant for curl/httpie export
request.headers.pop('content-length')
if request.headers.get("host", "") == request.host:
@ -31,7 +31,7 @@ def pop_headers(request: http.HTTPRequest) -> http.HTTPRequest:
return request
def cleanup_response(f: flow.Flow) -> http.HTTPResponse:
def cleanup_response(f: flow.Flow) -> http.Response:
if not getattr(f, "response", None):
raise exceptions.CommandError("Can't export flow with no response.")
assert isinstance(f, http.HTTPFlow)
@ -40,7 +40,7 @@ def cleanup_response(f: flow.Flow) -> http.HTTPResponse:
return response
def request_content_for_console(request: http.HTTPRequest) -> str:
def request_content_for_console(request: http.Request) -> str:
try:
text = request.get_text(strict=True)
assert text

View File

@ -139,7 +139,7 @@ class MapLocal:
ctx.log.warn(f"Could not read file: {e}")
continue
flow.response = http.HTTPResponse.make(
flow.response = http.Response.make(
200,
contents,
headers
@ -147,5 +147,5 @@ class MapLocal:
# only set flow.response once, for the first matching rule
return
if all_candidates:
flow.response = http.HTTPResponse.make(404)
flow.response = http.Response.make(404)
ctx.log.info(f"None of the local file candidates exist: {', '.join(str(x) for x in all_candidates)}")

View File

@ -3,7 +3,7 @@ import typing
from pathlib import Path
from mitmproxy import ctx, exceptions, flowfilter, http
from mitmproxy.net.http import Headers
from mitmproxy.http import Headers
from mitmproxy.utils import strutils
from mitmproxy.utils.spec import parse_spec

View File

@ -81,16 +81,16 @@ class ProxyAuth:
else:
return 'Authorization'
def auth_required_response(self) -> http.HTTPResponse:
def auth_required_response(self) -> http.Response:
if self.is_proxy_auth():
return http.make_error_response(
status_codes.PROXY_AUTH_REQUIRED,
headers=mitmproxy.net.http.Headers(Proxy_Authenticate=f'Basic realm="{REALM}"'),
headers=mitmproxy.http.Headers(Proxy_Authenticate=f'Basic realm="{REALM}"'),
)
else:
return http.make_error_response(
status_codes.UNAUTHORIZED,
headers=mitmproxy.net.http.Headers(WWW_Authenticate=f'Basic realm="{REALM}"'),
headers=mitmproxy.http.Headers(WWW_Authenticate=f'Basic realm="{REALM}"'),
)
def check(self, f: http.HTTPFlow) -> Optional[Tuple[str, str]]:

View File

@ -457,7 +457,7 @@ class View(collections.abc.Sequence):
@command.command("view.flows.create")
def create(self, method: str, url: str) -> None:
try:
req = http.HTTPRequest.make(method.upper(), url)
req = http.Request.make(method.upper(), url)
except ValueError as e:
raise exceptions.CommandError("Invalid URL: %s" % e)

View File

@ -16,7 +16,7 @@ from typing import List, Union
from typing import Optional
from mitmproxy import flow
from mitmproxy.net import http
from mitmproxy import http
from mitmproxy.utils import strutils
from . import (
auto, raw, hex, json, xml_html, wbxml, javascript, css,

View File

@ -3,7 +3,7 @@ import typing
from abc import ABC, abstractmethod
from mitmproxy import flow
from mitmproxy.net import http
from mitmproxy import http
KEY_MAX = 30

View File

@ -1,7 +1,7 @@
from typing import Optional
from mitmproxy.coretypes import multidict
from mitmproxy.net import http
from mitmproxy.net.http import multipart
from . import base
@ -16,7 +16,7 @@ class ViewMultipart(base.View):
def __call__(self, data: bytes, content_type: Optional[str] = None, **metadata):
if content_type is None:
return
v = http.multipart.decode(content_type, data)
v = multipart.decode(content_type, data)
if v:
return "Multipart form", self._format(v)

View File

@ -1,7 +1,7 @@
from typing import Optional
from . import base
from ..net import http
from .. import http
class ViewQuery(base.View):

File diff suppressed because it is too large Load Diff

View File

@ -1,13 +0,0 @@
from mitmproxy.net.http.request import Request
from mitmproxy.net.http.response import Response
from mitmproxy.net.http.message import Message
from mitmproxy.net.http.headers import Headers, parse_content_type
from mitmproxy.net.http import http1, status_codes, multipart
__all__ = [
"Request",
"Response",
"Message",
"Headers", "parse_content_type",
"http1", "status_codes", "multipart",
]

View File

@ -1,153 +1,6 @@
import collections
from typing import Dict, Optional, Tuple
from mitmproxy.coretypes import multidict
from mitmproxy.utils import strutils
# See also: http://lucumr.pocoo.org/2013/7/2/the-updated-guide-to-unicode/
# While headers _should_ be ASCII, it's not uncommon for certain headers to be utf-8 encoded.
def _native(x):
return x.decode("utf-8", "surrogateescape")
def _always_bytes(x):
return strutils.always_bytes(x, "utf-8", "surrogateescape")
class Headers(multidict.MultiDict):
"""
Header class which allows both convenient access to individual headers as well as
direct access to the underlying raw data. Provides a full dictionary interface.
Example:
.. code-block:: python
# Create headers with keyword arguments
>>> h = Headers(host="example.com", content_type="application/xml")
# Headers mostly behave like a normal dict.
>>> h["Host"]
"example.com"
# HTTP Headers are case insensitive
>>> h["host"]
"example.com"
# Headers can also be created from a list of raw (header_name, header_value) byte tuples
>>> h = Headers([
(b"Host",b"example.com"),
(b"Accept",b"text/html"),
(b"accept",b"application/xml")
])
# Multiple headers are folded into a single header as per RFC7230
>>> h["Accept"]
"text/html, application/xml"
# Setting a header removes all existing headers with the same name.
>>> h["Accept"] = "application/text"
>>> h["Accept"]
"application/text"
# bytes(h) returns a HTTP1 header block.
>>> print(bytes(h))
Host: example.com
Accept: application/text
# For full control, the raw header fields can be accessed
>>> h.fields
Caveats:
For use with the "Set-Cookie" header, see :py:meth:`get_all`.
"""
def __init__(self, fields=(), **headers):
"""
Args:
fields: (optional) list of ``(name, value)`` header byte tuples,
e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes.
**headers: Additional headers to set. Will overwrite existing values from `fields`.
For convenience, underscores in header names will be transformed to dashes -
this behaviour does not extend to other methods.
If ``**headers`` contains multiple keys that have equal ``.lower()`` s,
the behavior is undefined.
"""
super().__init__(fields)
for key, value in self.fields:
if not isinstance(key, bytes) or not isinstance(value, bytes):
raise TypeError("Header fields must be bytes.")
# content_type -> content-type
headers = {
_always_bytes(name).replace(b"_", b"-"): _always_bytes(value)
for name, value in headers.items()
}
self.update(headers)
@staticmethod
def _reduce_values(values):
# Headers can be folded
return ", ".join(values)
@staticmethod
def _kconv(key):
# Headers are case-insensitive
return key.lower()
def __bytes__(self):
if self.fields:
return b"\r\n".join(b": ".join(field) for field in self.fields) + b"\r\n"
else:
return b""
def __delitem__(self, key):
key = _always_bytes(key)
super().__delitem__(key)
def __iter__(self):
for x in super().__iter__():
yield _native(x)
def get_all(self, name):
"""
Like :py:meth:`get`, but does not fold multiple headers into a single one.
This is useful for Set-Cookie headers, which do not support folding.
See also: https://tools.ietf.org/html/rfc7230#section-3.2.2
"""
name = _always_bytes(name)
return [
_native(x) for x in
super().get_all(name)
]
def set_all(self, name, values):
"""
Explicitly set multiple headers for the given key.
See: :py:meth:`get_all`
"""
name = _always_bytes(name)
values = [_always_bytes(x) for x in values]
return super().set_all(name, values)
def insert(self, index, key, value):
key = _always_bytes(key)
value = _always_bytes(value)
super().insert(index, key, value)
def items(self, multi=False):
if multi:
return (
(_native(k), _native(v))
for k, v in self.fields
)
else:
return super().items()
def parse_content_type(c: str) -> Optional[Tuple[str, str, Dict[str, str]]]:
"""

View File

@ -2,7 +2,8 @@ import re
import time
from typing import List, Tuple, Iterable, Optional
from mitmproxy.net.http import request, response, headers, url
from mitmproxy.http import Request, Headers, Response
from mitmproxy.net.http import url
def get_header_tokens(headers, key):
@ -38,8 +39,8 @@ def connection_close(http_version, headers):
def expected_http_body_size(
request: request.Request,
response: Optional[response.Response] = None,
request: Request,
response: Optional[Response] = None,
expect_continue_as_0: bool = True
):
"""
@ -141,7 +142,7 @@ def _read_response_line(line: bytes) -> Tuple[bytes, int, bytes]:
return http_version, status_code, reason
def _read_headers(lines: Iterable[bytes]) -> headers.Headers:
def _read_headers(lines: Iterable[bytes]) -> Headers:
"""
Read a set of headers.
Stop once a blank line is reached.
@ -168,10 +169,10 @@ def _read_headers(lines: Iterable[bytes]) -> headers.Headers:
ret.append((name, value))
except ValueError:
raise ValueError(f"Invalid header line: {line!r}")
return headers.Headers(ret)
return Headers(ret)
def read_request_head(lines: List[bytes]) -> request.Request:
def read_request_head(lines: List[bytes]) -> Request:
"""
Parse an HTTP request head (request line + headers) from an iterable of lines
@ -187,7 +188,7 @@ def read_request_head(lines: List[bytes]) -> request.Request:
host, port, method, scheme, authority, path, http_version = _read_request_line(lines[0])
headers = _read_headers(lines[1:])
return request.Request(
return Request(
host=host,
port=port,
method=method,
@ -203,7 +204,7 @@ def read_request_head(lines: List[bytes]) -> request.Request:
)
def read_response_head(lines: List[bytes]) -> response.Response:
def read_response_head(lines: List[bytes]) -> Response:
"""
Parse an HTTP response head (response line + headers) from an iterable of lines
@ -219,7 +220,7 @@ def read_response_head(lines: List[bytes]) -> response.Response:
http_version, status_code, reason = _read_response_line(lines[0])
headers = _read_headers(lines[1:])
return response.Response(
return Response(
http_version=http_version,
status_code=status_code,
reason=reason,

View File

@ -1,281 +0,0 @@
import re
from dataclasses import dataclass, fields
from typing import Callable, Optional, Union, cast
from mitmproxy.coretypes import serializable
from mitmproxy.net.http import encoding
from mitmproxy.net.http.headers import Headers, assemble_content_type, parse_content_type
from mitmproxy.utils import strutils, typecheck
@dataclass
class MessageData(serializable.Serializable):
http_version: bytes
headers: Headers
content: Optional[bytes]
trailers: Optional[Headers]
timestamp_start: float
timestamp_end: Optional[float]
# noinspection PyUnreachableCode
if __debug__:
def __post_init__(self):
for field in fields(self):
val = getattr(self, field.name)
typecheck.check_option_type(field.name, val, field.type)
def set_state(self, state):
for k, v in state.items():
if k in ("headers", "trailers") and v is not None:
v = Headers.from_state(v)
setattr(self, k, v)
def get_state(self):
state = vars(self).copy()
state["headers"] = state["headers"].get_state()
if state["trailers"] is not None:
state["trailers"] = state["trailers"].get_state()
return state
@classmethod
def from_state(cls, state):
state["headers"] = Headers.from_state(state["headers"])
if state["trailers"] is not None:
state["trailers"] = Headers.from_state(state["trailers"])
return cls(**state)
class Message(serializable.Serializable):
@classmethod
def from_state(cls, state):
return cls(**state)
def get_state(self):
return self.data.get_state()
def set_state(self, state):
self.data.set_state(state)
data: MessageData
stream: Union[Callable, bool] = False
@property
def http_version(self) -> str:
"""
Version string, e.g. "HTTP/1.1"
"""
return self.data.http_version.decode("utf-8", "surrogateescape")
@http_version.setter
def http_version(self, http_version: Union[str, bytes]) -> None:
self.data.http_version = strutils.always_bytes(http_version, "utf-8", "surrogateescape")
@property
def is_http10(self) -> bool:
return self.data.http_version == b"HTTP/1.0"
@property
def is_http11(self) -> bool:
return self.data.http_version == b"HTTP/1.1"
@property
def is_http2(self) -> bool:
return self.data.http_version == b"HTTP/2.0"
@property
def headers(self) -> Headers:
"""
The HTTP headers.
"""
return self.data.headers
@headers.setter
def headers(self, h: Headers) -> None:
self.data.headers = h
@property
def trailers(self) -> Optional[Headers]:
"""
The HTTP trailers.
"""
return self.data.trailers
@trailers.setter
def trailers(self, h: Optional[Headers]) -> None:
self.data.trailers = h
@property
def raw_content(self) -> Optional[bytes]:
"""
The raw (potentially compressed) HTTP message body as bytes.
See also: :py:attr:`content`, :py:class:`text`
"""
return self.data.content
@raw_content.setter
def raw_content(self, content: Optional[bytes]) -> None:
self.data.content = content
def get_content(self, strict: bool = True) -> Optional[bytes]:
"""
The uncompressed HTTP message body as bytes.
Raises:
ValueError, when the HTTP content-encoding is invalid and strict is True.
See also: :py:class:`raw_content`, :py:attr:`text`
"""
if self.raw_content is None:
return None
ce = self.headers.get("content-encoding")
if ce:
try:
content = encoding.decode(self.raw_content, ce)
# A client may illegally specify a byte -> str encoding here (e.g. utf8)
if isinstance(content, str):
raise ValueError(f"Invalid Content-Encoding: {ce}")
return content
except ValueError:
if strict:
raise
return self.raw_content
else:
return self.raw_content
def set_content(self, value: Optional[bytes]) -> None:
if value is None:
self.raw_content = None
return
if not isinstance(value, bytes):
raise TypeError(
f"Message content must be bytes, not {type(value).__name__}. "
"Please use .text if you want to assign a str."
)
ce = self.headers.get("content-encoding")
try:
self.raw_content = encoding.encode(value, ce or "identity")
except ValueError:
# So we have an invalid content-encoding?
# Let's remove it!
del self.headers["content-encoding"]
self.raw_content = value
self.headers["content-length"] = str(len(self.raw_content))
content = property(get_content, set_content)
@property
def timestamp_start(self) -> float:
"""
First byte timestamp
"""
return self.data.timestamp_start
@timestamp_start.setter
def timestamp_start(self, timestamp_start: float) -> None:
self.data.timestamp_start = timestamp_start
@property
def timestamp_end(self) -> Optional[float]:
"""
Last byte timestamp
"""
return self.data.timestamp_end
@timestamp_end.setter
def timestamp_end(self, timestamp_end: Optional[float]):
self.data.timestamp_end = timestamp_end
def _get_content_type_charset(self) -> Optional[str]:
ct = parse_content_type(self.headers.get("content-type", ""))
if ct:
return ct[2].get("charset")
return None
def _guess_encoding(self, content: bytes = b"") -> str:
enc = self._get_content_type_charset()
if not enc:
if "json" in self.headers.get("content-type", ""):
enc = "utf8"
if not enc:
meta_charset = re.search(rb"""<meta[^>]+charset=['"]?([^'">]+)""", content)
if meta_charset:
enc = meta_charset.group(1).decode("ascii", "ignore")
if not enc:
if "text/css" in self.headers.get("content-type", ""):
# @charset rule must be the very first thing.
css_charset = re.match(rb"""@charset "([^"]+)";""", content)
if css_charset:
enc = css_charset.group(1).decode("ascii", "ignore")
if not enc:
enc = "latin-1"
# Use GB 18030 as the superset of GB2312 and GBK to fix common encoding problems on Chinese websites.
if enc.lower() in ("gb2312", "gbk"):
enc = "gb18030"
return enc
def get_text(self, strict: bool = True) -> Optional[str]:
"""
The uncompressed and decoded HTTP message body as text.
Raises:
ValueError, when either content-encoding or charset is invalid and strict is True.
See also: :py:attr:`content`, :py:class:`raw_content`
"""
content = self.get_content(strict)
if content is None:
return None
enc = self._guess_encoding(content)
try:
return cast(str, encoding.decode(content, enc))
except ValueError:
if strict:
raise
return content.decode("utf8", "surrogateescape")
def set_text(self, text: Optional[str]) -> None:
if text is None:
self.content = None
return
enc = self._guess_encoding()
try:
self.content = encoding.encode(text, enc)
except ValueError:
# Fall back to UTF-8 and update the content-type header.
ct = parse_content_type(self.headers.get("content-type", "")) or ("text", "plain", {})
ct[2]["charset"] = "utf-8"
self.headers["content-type"] = assemble_content_type(*ct)
enc = "utf8"
self.content = text.encode(enc, "surrogateescape")
text = property(get_text, set_text)
def decode(self, strict: bool = True) -> None:
"""
Decodes body based on the current Content-Encoding header, then
removes the header. If there is no Content-Encoding header, no
action is taken.
Raises:
ValueError, when the content-encoding is invalid and strict is True.
"""
decoded = self.get_content(strict)
self.headers.pop("content-encoding", None)
self.content = decoded
def encode(self, e: str) -> None:
"""
Encodes body with the encoding e, where e is "gzip", "deflate", "identity", "br", or "zstd".
Any existing content-encodings are overwritten,
the content is not decoded beforehand.
Raises:
ValueError, when the specified content-encoding is invalid.
"""
self.headers["content-encoding"] = e
self.content = self.raw_content
if "content-encoding" not in self.headers:
raise ValueError("Invalid content encoding {}".format(repr(e)))

View File

@ -1,477 +0,0 @@
import time
import urllib.parse
from dataclasses import dataclass
from typing import Dict, Iterable, Optional, Tuple, Union
import mitmproxy.net.http.url
from mitmproxy.coretypes import multidict
from mitmproxy.net.http import cookies, multipart
from mitmproxy.net.http import message
from mitmproxy.net.http.headers import Headers
from mitmproxy.utils.strutils import always_bytes, always_str
@dataclass
class RequestData(message.MessageData):
host: str
port: int
method: bytes
scheme: bytes
authority: bytes
path: bytes
class Request(message.Message):
"""
An HTTP request.
"""
data: RequestData
def __init__(
self,
host: str,
port: int,
method: bytes,
scheme: bytes,
authority: bytes,
path: bytes,
http_version: bytes,
headers: Union[Headers, Tuple[Tuple[bytes, bytes], ...]],
content: Optional[bytes],
trailers: Union[None, Headers, Tuple[Tuple[bytes, bytes], ...]],
timestamp_start: float,
timestamp_end: Optional[float],
):
# auto-convert invalid types to retain compatibility with older code.
if isinstance(host, bytes):
host = host.decode("idna", "strict")
if isinstance(method, str):
method = method.encode("ascii", "strict")
if isinstance(scheme, str):
scheme = scheme.encode("ascii", "strict")
if isinstance(authority, str):
authority = authority.encode("ascii", "strict")
if isinstance(path, str):
path = path.encode("ascii", "strict")
if isinstance(http_version, str):
http_version = http_version.encode("ascii", "strict")
if isinstance(content, str):
raise ValueError(f"Content must be bytes, not {type(content).__name__}")
if not isinstance(headers, Headers):
headers = Headers(headers)
if trailers is not None and not isinstance(trailers, Headers):
trailers = Headers(trailers)
self.data = RequestData(
host=host,
port=port,
method=method,
scheme=scheme,
authority=authority,
path=path,
http_version=http_version,
headers=headers,
content=content,
trailers=trailers,
timestamp_start=timestamp_start,
timestamp_end=timestamp_end,
)
def __repr__(self) -> str:
if self.host and self.port:
hostport = f"{self.host}:{self.port}"
else:
hostport = ""
path = self.path or ""
return f"Request({self.method} {hostport}{path})"
@classmethod
def make(
cls,
method: str,
url: str,
content: Union[bytes, str] = "",
headers: Union[Headers, Dict[Union[str, bytes], Union[str, bytes]], Iterable[Tuple[bytes, bytes]]] = ()
) -> "Request":
"""
Simplified API for creating request objects.
"""
# Headers can be list or dict, we differentiate here.
if isinstance(headers, Headers):
pass
elif isinstance(headers, dict):
headers = Headers(
(always_bytes(k, "utf-8", "surrogateescape"),
always_bytes(v, "utf-8", "surrogateescape"))
for k, v in headers.items()
)
elif isinstance(headers, Iterable):
headers = Headers(headers)
else:
raise TypeError("Expected headers to be an iterable or dict, but is {}.".format(
type(headers).__name__
))
req = cls(
"",
0,
method.encode("utf-8", "surrogateescape"),
b"",
b"",
b"",
b"HTTP/1.1",
headers,
b"",
None,
time.time(),
time.time(),
)
req.url = url
# Assign this manually to update the content-length header.
if isinstance(content, bytes):
req.content = content
elif isinstance(content, str):
req.text = content
else:
raise TypeError(f"Expected content to be str or bytes, but is {type(content).__name__}.")
return req
@property
def first_line_format(self) -> str:
"""
HTTP request form as defined in `RFC7230 <https://tools.ietf.org/html/rfc7230#section-5.3>`_.
origin-form and asterisk-form are subsumed as "relative".
"""
if self.method == "CONNECT":
return "authority"
elif self.authority:
return "absolute"
else:
return "relative"
@property
def method(self) -> str:
"""
HTTP request method, e.g. "GET".
"""
return self.data.method.decode("utf-8", "surrogateescape").upper()
@method.setter
def method(self, val: Union[str, bytes]) -> None:
self.data.method = always_bytes(val, "utf-8", "surrogateescape")
@property
def scheme(self) -> str:
"""
HTTP request scheme, which should be "http" or "https".
"""
return self.data.scheme.decode("utf-8", "surrogateescape")
@scheme.setter
def scheme(self, val: Union[str, bytes]) -> None:
self.data.scheme = always_bytes(val, "utf-8", "surrogateescape")
@property
def authority(self) -> str:
"""
HTTP request authority.
For HTTP/1, this is the authority portion of the request target
(in either absolute-form or authority-form)
For HTTP/2, this is the :authority pseudo header.
"""
try:
return self.data.authority.decode("idna")
except UnicodeError:
return self.data.authority.decode("utf8", "surrogateescape")
@authority.setter
def authority(self, val: Union[str, bytes]) -> None:
if isinstance(val, str):
try:
val = val.encode("idna", "strict")
except UnicodeError:
val = val.encode("utf8", "surrogateescape") # type: ignore
self.data.authority = val
@property
def host(self) -> str:
"""
Target host. This may be parsed from the raw request
(e.g. from a ``GET http://example.com/ HTTP/1.1`` request line)
or inferred from the proxy mode (e.g. an IP in transparent mode).
Setting the host attribute also updates the host header and authority information, if present.
"""
return self.data.host
@host.setter
def host(self, val: Union[str, bytes]) -> None:
self.data.host = always_str(val, "idna", "strict")
# Update host header
if "Host" in self.data.headers:
self.data.headers["Host"] = val
# Update authority
if self.data.authority:
self.authority = mitmproxy.net.http.url.hostport(self.scheme, self.host, self.port)
@property
def host_header(self) -> Optional[str]:
"""
The request's host/authority header.
This property maps to either ``request.headers["Host"]`` or
``request.authority``, depending on whether it's HTTP/1.x or HTTP/2.0.
"""
if self.is_http2:
return self.authority or self.data.headers.get("Host", None)
else:
return self.data.headers.get("Host", None)
@host_header.setter
def host_header(self, val: Union[None, str, bytes]) -> None:
if val is None:
if self.is_http2:
self.data.authority = b""
self.headers.pop("Host", None)
else:
if self.is_http2:
self.authority = val # type: ignore
if not self.is_http2 or "Host" in self.headers:
# For h2, we only overwrite, but not create, as :authority is the h2 host header.
self.headers["Host"] = val
@property
def port(self) -> int:
"""
Target port
"""
return self.data.port
@port.setter
def port(self, port: int) -> None:
self.data.port = port
@property
def path(self) -> str:
"""
HTTP request path, e.g. "/index.html".
Usually starts with a slash, except for OPTIONS requests, which may just be "*".
"""
return self.data.path.decode("utf-8", "surrogateescape")
@path.setter
def path(self, val: Union[str, bytes]) -> None:
self.data.path = always_bytes(val, "utf-8", "surrogateescape")
@property
def url(self) -> str:
"""
The URL string, constructed from the request's URL components.
"""
if self.first_line_format == "authority":
return f"{self.host}:{self.port}"
return mitmproxy.net.http.url.unparse(self.scheme, self.host, self.port, self.path)
@url.setter
def url(self, val: Union[str, bytes]) -> None:
val = always_str(val, "utf-8", "surrogateescape")
self.scheme, self.host, self.port, self.path = mitmproxy.net.http.url.parse(val)
@property
def pretty_host(self) -> str:
"""
Similar to :py:attr:`host`, but using the host/:authority header as an additional (preferred) data source.
This is useful in transparent mode where :py:attr:`host` is only an IP address,
but may not reflect the actual destination as the Host header could be spoofed.
"""
authority = self.host_header
if authority:
return mitmproxy.net.http.url.parse_authority(authority, check=False)[0]
else:
return self.host
@property
def pretty_url(self) -> str:
"""
Like :py:attr:`url`, but using :py:attr:`pretty_host` instead of :py:attr:`host`.
"""
if self.first_line_format == "authority":
return self.authority
host_header = self.host_header
if not host_header:
return self.url
pretty_host, pretty_port = mitmproxy.net.http.url.parse_authority(host_header, check=False)
pretty_port = pretty_port or mitmproxy.net.http.url.default_port(self.scheme) or 443
return mitmproxy.net.http.url.unparse(self.scheme, pretty_host, pretty_port, self.path)
def _get_query(self):
query = urllib.parse.urlparse(self.url).query
return tuple(mitmproxy.net.http.url.decode(query))
def _set_query(self, query_data):
query = mitmproxy.net.http.url.encode(query_data)
_, _, path, params, _, fragment = urllib.parse.urlparse(self.url)
self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment])
@property
def query(self) -> multidict.MultiDictView:
"""
The request query string as an :py:class:`~mitmproxy.net.multidict.MultiDictView` object.
"""
return multidict.MultiDictView(
self._get_query,
self._set_query
)
@query.setter
def query(self, value):
self._set_query(value)
def _get_cookies(self):
h = self.headers.get_all("Cookie")
return tuple(cookies.parse_cookie_headers(h))
def _set_cookies(self, value):
self.headers["cookie"] = cookies.format_cookie_header(value)
@property
def cookies(self) -> multidict.MultiDictView:
"""
The request cookies.
An empty :py:class:`~mitmproxy.net.multidict.MultiDictView` object if the cookie monster ate them all.
"""
return multidict.MultiDictView(
self._get_cookies,
self._set_cookies
)
@cookies.setter
def cookies(self, value):
self._set_cookies(value)
@property
def path_components(self):
"""
The URL's path components as a tuple of strings.
Components are unquoted.
"""
path = urllib.parse.urlparse(self.url).path
# This needs to be a tuple so that it's immutable.
# Otherwise, this would fail silently:
# request.path_components.append("foo")
return tuple(mitmproxy.net.http.url.unquote(i) for i in path.split("/") if i)
@path_components.setter
def path_components(self, components):
components = map(lambda x: mitmproxy.net.http.url.quote(x, safe=""), components)
path = "/" + "/".join(components)
_, _, _, params, query, fragment = urllib.parse.urlparse(self.url)
self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment])
def anticache(self) -> None:
"""
Modifies this request to remove headers that might produce a cached
response. That is, we remove ETags and If-Modified-Since headers.
"""
delheaders = [
"if-modified-since",
"if-none-match",
]
for i in delheaders:
self.headers.pop(i, None)
def anticomp(self) -> None:
"""
Modifies this request to remove headers that will compress the
resource's data.
"""
self.headers["accept-encoding"] = "identity"
def constrain_encoding(self) -> None:
"""
Limits the permissible Accept-Encoding values, based on what we can
decode appropriately.
"""
accept_encoding = self.headers.get("accept-encoding")
if accept_encoding:
self.headers["accept-encoding"] = (
', '.join(
e
for e in {"gzip", "identity", "deflate", "br", "zstd"}
if e in accept_encoding
)
)
def _get_urlencoded_form(self):
is_valid_content_type = "application/x-www-form-urlencoded" in self.headers.get("content-type", "").lower()
if is_valid_content_type:
return tuple(mitmproxy.net.http.url.decode(self.get_text(strict=False)))
return ()
def _set_urlencoded_form(self, form_data):
"""
Sets the body to the URL-encoded form data, and adds the appropriate content-type header.
This will overwrite the existing content if there is one.
"""
self.headers["content-type"] = "application/x-www-form-urlencoded"
self.content = mitmproxy.net.http.url.encode(form_data, self.get_text(strict=False)).encode()
@property
def urlencoded_form(self):
"""
The URL-encoded form data as an :py:class:`~mitmproxy.net.multidict.MultiDictView` object.
An empty multidict.MultiDictView if the content-type indicates non-form data
or the content could not be parsed.
Starting with mitmproxy 1.0, key and value are strings.
"""
return multidict.MultiDictView(
self._get_urlencoded_form,
self._set_urlencoded_form
)
@urlencoded_form.setter
def urlencoded_form(self, value):
self._set_urlencoded_form(value)
def _get_multipart_form(self):
is_valid_content_type = "multipart/form-data" in self.headers.get("content-type", "").lower()
if is_valid_content_type:
try:
return multipart.decode(self.headers.get("content-type"), self.content)
except ValueError:
pass
return ()
def _set_multipart_form(self, value):
self.content = mitmproxy.net.http.multipart.encode(self.headers, value)
self.headers["content-type"] = "multipart/form-data"
@property
def multipart_form(self):
"""
The multipart form data as an :py:class:`~mitmproxy.net.multidict.MultiDictView` object.
An empty multidict.MultiDictView if the content-type indicates non-form data
or the content could not be parsed.
Key and value are bytes.
"""
return multidict.MultiDictView(
self._get_multipart_form,
self._set_multipart_form
)
@multipart_form.setter
def multipart_form(self, value):
self._set_multipart_form(value)

View File

@ -1,211 +0,0 @@
import time
from dataclasses import dataclass
from email.utils import formatdate, mktime_tz, parsedate_tz
from typing import Mapping
from typing import Iterable
from typing import Optional
from typing import Tuple
from typing import Union
from mitmproxy.coretypes import multidict
from mitmproxy.net.http import cookies, message
from mitmproxy.net.http import status_codes
from mitmproxy.net.http.headers import Headers
from mitmproxy.utils import human
from mitmproxy.utils import strutils
from mitmproxy.utils.strutils import always_bytes
@dataclass
class ResponseData(message.MessageData):
status_code: int
reason: bytes
class Response(message.Message):
"""
An HTTP response.
"""
data: ResponseData
def __init__(
self,
http_version: bytes,
status_code: int,
reason: bytes,
headers: Union[Headers, Tuple[Tuple[bytes, bytes], ...]],
content: Optional[bytes],
trailers: Union[None, Headers, Tuple[Tuple[bytes, bytes], ...]],
timestamp_start: float,
timestamp_end: Optional[float],
):
# auto-convert invalid types to retain compatibility with older code.
if isinstance(http_version, str):
http_version = http_version.encode("ascii", "strict")
if isinstance(reason, str):
reason = reason.encode("ascii", "strict")
if isinstance(content, str):
raise ValueError("Content must be bytes, not {}".format(type(content).__name__))
if not isinstance(headers, Headers):
headers = Headers(headers)
if trailers is not None and not isinstance(trailers, Headers):
trailers = Headers(trailers)
self.data = ResponseData(
http_version=http_version,
status_code=status_code,
reason=reason,
headers=headers,
content=content,
trailers=trailers,
timestamp_start=timestamp_start,
timestamp_end=timestamp_end,
)
def __repr__(self) -> str:
if self.raw_content:
ct = self.headers.get("content-type", "unknown content type")
size = human.pretty_size(len(self.raw_content))
details = f"{ct}, {size}"
else:
details = "no content"
return f"Response({self.status_code}, {details})"
@classmethod
def make(
cls,
status_code: int = 200,
content: Union[bytes, str] = b"",
headers: Union[Headers, Mapping[str, Union[str, bytes]], Iterable[Tuple[bytes, bytes]]] = ()
) -> "Response":
"""
Simplified API for creating response objects.
"""
if isinstance(headers, Headers):
headers = headers
elif isinstance(headers, dict):
headers = Headers(
(always_bytes(k, "utf-8", "surrogateescape"),
always_bytes(v, "utf-8", "surrogateescape"))
for k, v in headers.items()
)
elif isinstance(headers, Iterable):
headers = Headers(headers)
else:
raise TypeError("Expected headers to be an iterable or dict, but is {}.".format(
type(headers).__name__
))
resp = cls(
b"HTTP/1.1",
status_code,
status_codes.RESPONSES.get(status_code, "").encode(),
headers,
None,
None,
time.time(),
time.time(),
)
# Assign this manually to update the content-length header.
if isinstance(content, bytes):
resp.content = content
elif isinstance(content, str):
resp.text = content
else:
raise TypeError(f"Expected content to be str or bytes, but is {type(content).__name__}.")
return resp
@property
def status_code(self) -> int:
"""
HTTP Status Code, e.g. ``200``.
"""
return self.data.status_code
@status_code.setter
def status_code(self, status_code: int) -> None:
self.data.status_code = status_code
@property
def reason(self) -> str:
"""
HTTP Reason Phrase, e.g. "Not Found".
HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead.
"""
# Encoding: http://stackoverflow.com/a/16674906/934719
return self.data.reason.decode("ISO-8859-1")
@reason.setter
def reason(self, reason: Union[str, bytes]) -> None:
self.data.reason = strutils.always_bytes(reason, "ISO-8859-1")
def _get_cookies(self):
h = self.headers.get_all("set-cookie")
all_cookies = cookies.parse_set_cookie_headers(h)
return tuple(
(name, (value, attrs))
for name, value, attrs in all_cookies
)
def _set_cookies(self, value):
cookie_headers = []
for k, v in value:
header = cookies.format_set_cookie_header([(k, v[0], v[1])])
cookie_headers.append(header)
self.headers.set_all("set-cookie", cookie_headers)
@property
def cookies(self) -> multidict.MultiDictView:
"""
The response cookies. A possibly empty
:py:class:`~mitmproxy.net.multidict.MultiDictView`, where the keys are cookie
name strings, and values are (value, attr) tuples. Value is a string,
and attr is an MultiDictView containing cookie attributes. Within
attrs, unary attributes (e.g. HTTPOnly) are indicated by a Null value.
Caveats:
Updating the attr
"""
return multidict.MultiDictView(
self._get_cookies,
self._set_cookies
)
@cookies.setter
def cookies(self, value):
self._set_cookies(value)
def refresh(self, now=None):
"""
This fairly complex and heuristic function refreshes a server
response for replay.
- It adjusts date, expires and last-modified headers.
- It adjusts cookie expiration.
"""
if not now:
now = time.time()
delta = now - self.timestamp_start
refresh_headers = [
"date",
"expires",
"last-modified",
]
for i in refresh_headers:
if i in self.headers:
d = parsedate_tz(self.headers[i])
if d:
new = mktime_tz(d) + delta
self.headers[i] = formatdate(new, usegmt=True)
c = []
for set_cookie_header in self.headers.get_all("set-cookie"):
try:
refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta)
except ValueError:
refreshed = set_cookie_header
c.append(refreshed)
if c:
self.headers.set_all("set-cookie", c)

View File

@ -144,7 +144,7 @@ class HttpStream(layer.Layer):
self.flow.request = event.request
if err := validate_request(self.mode, self.flow.request):
self.flow.response = http.HTTPResponse.make(502, str(err))
self.flow.response = http.Response.make(502, str(err))
self.client_state = self.state_errored
return (yield from self.send_response())
@ -162,7 +162,7 @@ class HttpStream(layer.Layer):
try:
host, port = url.parse_authority(self.flow.request.host_header or "", check=True)
except ValueError:
self.flow.response = http.HTTPResponse.make(
self.flow.response = http.Response.make(
400,
"HTTP request has no host header, destination unknown."
)
@ -194,7 +194,7 @@ class HttpStream(layer.Layer):
return
if self.flow.request.headers.get("expect", "").lower() == "100-continue":
continue_response = http.HTTPResponse.make(100)
continue_response = http.Response.make(100)
continue_response.headers.clear()
yield SendHttp(ResponseHeaders(self.stream_id, continue_response), self.context.client)
self.flow.request.headers.pop("expect")
@ -427,7 +427,7 @@ class HttpStream(layer.Layer):
if not self.flow.response and self.context.options.connection_strategy == "eager":
err = yield commands.OpenConnection(self.context.server)
if err:
self.flow.response = http.HTTPResponse.make(
self.flow.response = http.Response.make(
502, f"Cannot connect to {human.format_address(self.context.server.address)}: {err}"
)
self.child_layer = layer.NextLayer(self.context)

View File

@ -8,7 +8,7 @@ from ._base import HttpEvent
@dataclass
class RequestHeaders(HttpEvent):
request: http.HTTPRequest
request: http.Request
end_stream: bool
"""
If True, we already know at this point that there is no message body. This is useful for HTTP/2, where it allows
@ -21,7 +21,7 @@ class RequestHeaders(HttpEvent):
@dataclass
class ResponseHeaders(HttpEvent):
response: http.HTTPResponse
response: http.Response
end_stream: bool = False

View File

@ -6,7 +6,6 @@ from h11._readers import ChunkedReader, ContentLengthReader, Http10Reader
from h11._receivebuffer import ReceiveBuffer
from mitmproxy import http
from mitmproxy.net import http as net_http
from mitmproxy.net.http import http1, status_codes
from mitmproxy.proxy import commands, events, layer
from mitmproxy.proxy.context import Connection, ConnectionState, Context
@ -22,8 +21,8 @@ TBodyReader = Union[ChunkedReader, Http10Reader, ContentLengthReader]
class Http1Connection(HttpConnection, metaclass=abc.ABCMeta):
stream_id: Optional[StreamId] = None
request: Optional[http.HTTPRequest] = None
response: Optional[http.HTTPResponse] = None
request: Optional[http.Request] = None
response: Optional[http.Response] = None
request_done: bool = False
response_done: bool = False
# this is a bit of a hack to make both mypy and PyCharm happy.
@ -345,7 +344,7 @@ class Http1Client(Http1Connection):
raise AssertionError(f"Unexpected event: {event}")
def should_make_pipe(request: net_http.Request, response: net_http.Response) -> bool:
def should_make_pipe(request: http.Request, response: http.Response) -> bool:
if response.status_code == 101:
return True
elif response.status_code == 200 and request.method.upper() == "CONNECT":

View File

@ -13,7 +13,6 @@ import h2.stream
import h2.utilities
from mitmproxy import http
from mitmproxy.net import http as net_http
from mitmproxy.net.http import url, status_codes
from mitmproxy.utils import human
from . import RequestData, RequestEndOfMessage, RequestHeaders, RequestProtocolError, ResponseData, \
@ -197,9 +196,9 @@ class Http2Connection(HttpConnection):
return False
def protocol_error(
self,
message: str,
error_code: int = h2.errors.ErrorCodes.PROTOCOL_ERROR,
self,
message: str,
error_code: int = h2.errors.ErrorCodes.PROTOCOL_ERROR,
) -> CommandGenerator[None]:
yield Log(f"{human.format_address(self.conn.peername)}: {message}")
self.h2_conn.close_connection(error_code, message.encode())
@ -272,7 +271,7 @@ class Http2Server(Http2Connection):
except ValueError as e:
yield from self.protocol_error(f"Invalid HTTP/2 request headers: {e}")
return True
request = http.HTTPRequest(
request = http.Request(
host=host,
port=port,
method=method,
@ -333,8 +332,8 @@ class Http2Client(Http2Connection):
ours = self.our_stream_id.get(event.stream_id, None)
if ours is None:
no_free_streams = (
self.h2_conn.open_outbound_streams >=
(self.provisional_max_concurrency or self.h2_conn.remote_settings.max_concurrent_streams)
self.h2_conn.open_outbound_streams >=
(self.provisional_max_concurrency or self.h2_conn.remote_settings.max_concurrent_streams)
)
if no_free_streams:
self.stream_queue[event.stream_id].append(event)
@ -350,10 +349,10 @@ class Http2Client(Http2Connection):
yield cmd
can_resume_queue = (
self.stream_queue and
self.h2_conn.open_outbound_streams < (
self.provisional_max_concurrency or self.h2_conn.remote_settings.max_concurrent_streams
)
self.stream_queue and
self.h2_conn.open_outbound_streams < (
self.provisional_max_concurrency or self.h2_conn.remote_settings.max_concurrent_streams
)
)
if can_resume_queue:
# popitem would be LIFO, but we want FIFO.
@ -402,7 +401,7 @@ class Http2Client(Http2Connection):
yield from self.protocol_error(f"Invalid HTTP/2 response headers: {e}")
return True
response = http.HTTPResponse(
response = http.Response(
http_version=b"HTTP/2.0",
status_code=status_code,
reason=b"",
@ -427,7 +426,7 @@ class Http2Client(Http2Connection):
return (yield from super().handle_h2_event(event))
def split_pseudo_headers(h2_headers: Sequence[Tuple[bytes, bytes]]) -> Tuple[Dict[bytes, bytes], net_http.Headers]:
def split_pseudo_headers(h2_headers: Sequence[Tuple[bytes, bytes]]) -> Tuple[Dict[bytes, bytes], http.Headers]:
pseudo_headers: Dict[bytes, bytes] = {}
i = 0
for (header, value) in h2_headers:
@ -440,14 +439,14 @@ def split_pseudo_headers(h2_headers: Sequence[Tuple[bytes, bytes]]) -> Tuple[Dic
# Pseudo-headers must be at the start, we are done here.
break
headers = net_http.Headers(h2_headers[i:])
headers = http.Headers(h2_headers[i:])
return pseudo_headers, headers
def parse_h2_request_headers(
h2_headers: Sequence[Tuple[bytes, bytes]]
) -> Tuple[str, int, bytes, bytes, bytes, bytes, net_http.Headers]:
h2_headers: Sequence[Tuple[bytes, bytes]]
) -> Tuple[str, int, bytes, bytes, bytes, bytes, http.Headers]:
"""Split HTTP/2 pseudo-headers from the actual headers and parse them."""
pseudo_headers, headers = split_pseudo_headers(h2_headers)
@ -473,7 +472,7 @@ def parse_h2_request_headers(
return host, port, method, scheme, authority, path, headers
def parse_h2_response_headers(h2_headers: Sequence[Tuple[bytes, bytes]]) -> Tuple[int, net_http.Headers]:
def parse_h2_response_headers(h2_headers: Sequence[Tuple[bytes, bytes]]) -> Tuple[int, http.Headers]:
"""Split HTTP/2 pseudo-headers from the actual headers and parse them."""
pseudo_headers, headers = split_pseudo_headers(h2_headers)

View File

@ -401,7 +401,7 @@ if __name__ == "__main__": # pragma: no cover
def request(flow: http.HTTPFlow):
if "cached" in flow.request.path:
flow.response = http.HTTPResponse.make(418, f"(cached) {flow.request.text}")
flow.response = http.Response.make(418, f"(cached) {flow.request.text}")
if "toggle-tls" in flow.request.path:
if flow.request.url.startswith("https://"):
flow.request.url = flow.request.url.replace("https://", "http://")

View File

@ -5,7 +5,7 @@ from mitmproxy import flow
from mitmproxy import http
from mitmproxy import tcp
from mitmproxy import websocket
from mitmproxy.net import http as net_http
from mitmproxy.net.http import status_codes
from mitmproxy.proxy import context
from mitmproxy.test import tutils
from wsproto.frame_protocol import Opcode
@ -37,7 +37,7 @@ def twebsocketflow(client_conn=True, server_conn=True, messages=True, err=None,
if server_conn is True:
server_conn = tserver_conn()
if handshake_flow is True:
req = http.HTTPRequest(
req = http.Request(
"example.com",
80,
b"GET",
@ -45,7 +45,7 @@ def twebsocketflow(client_conn=True, server_conn=True, messages=True, err=None,
b"example.com",
b"/ws",
b"HTTP/1.1",
headers=net_http.Headers(
headers=http.Headers(
connection="upgrade",
upgrade="websocket",
sec_websocket_version="13",
@ -57,11 +57,11 @@ def twebsocketflow(client_conn=True, server_conn=True, messages=True, err=None,
timestamp_end=946681201,
)
resp = http.HTTPResponse(
resp = http.Response(
b"HTTP/1.1",
101,
reason=net_http.status_codes.RESPONSES.get(101),
headers=net_http.Headers(
reason=status_codes.RESPONSES.get(101),
headers=http.Headers(
connection='upgrade',
upgrade='websocket',
sec_websocket_accept=b'',
@ -99,8 +99,8 @@ def tflow(client_conn=True, server_conn=True, req=True, resp=None, err=None):
"""
@type client_conn: bool | None | mitmproxy.proxy.connection.ClientConnection
@type server_conn: bool | None | mitmproxy.proxy.connection.ServerConnection
@type req: bool | None | mitmproxy.proxy.protocol.http.HTTPRequest
@type resp: bool | None | mitmproxy.proxy.protocol.http.HTTPResponse
@type req: bool | None | mitmproxy.proxy.protocol.http.Request
@type resp: bool | None | mitmproxy.proxy.protocol.http.Response
@type err: bool | None | mitmproxy.proxy.protocol.primitives.Error
@return: mitmproxy.proxy.protocol.http.HTTPFlow
"""

View File

@ -1,4 +1,4 @@
from mitmproxy.net import http
from mitmproxy import http
def treq(**kwargs) -> http.Request:

View File

@ -419,7 +419,7 @@ class ConsoleAddon:
flow.response is None
)
if require_dummy_response:
flow.response = http.HTTPResponse.make()
flow.response = http.Response.make()
if flow_part == "cookies":
self.master.switch_view("edit_focus_cookies")
elif flow_part == "urlencoded form":

View File

@ -24,8 +24,8 @@ def flowdetails(state, flow: mitmproxy.flow.Flow):
sc = flow.server_conn
cc = flow.client_conn
req: typing.Optional[http.HTTPRequest]
resp: typing.Optional[http.HTTPResponse]
req: typing.Optional[http.Request]
resp: typing.Optional[http.Response]
if isinstance(flow, http.HTTPFlow):
req = flow.request
resp = flow.response

View File

@ -205,7 +205,7 @@ class FlowDetails(tabs.Tabs):
if error:
self.master.log.debug(error)
# Give hint that you have to tab for the response.
if description == "No content" and isinstance(message, http.HTTPRequest):
if description == "No content" and isinstance(message, http.Request):
description = "No request content"
# If the users has a wide terminal, he gets fewer lines; this should not be an issue.

View File

@ -2,7 +2,7 @@ import urwid
import typing
from mitmproxy import exceptions
from mitmproxy.net.http import Headers
from mitmproxy.http import Headers
from mitmproxy.tools.console import layoutwidget
from mitmproxy.tools.console import signals
from mitmproxy.tools.console.grideditor import base

View File

@ -2,7 +2,7 @@ from mitmproxy import contentviews
from mitmproxy.test import tflow
from mitmproxy.test import tutils
from mitmproxy.test import taddons
from mitmproxy.net.http import Headers
from mitmproxy.http import Headers
from ..mitmproxy import tservers

View File

@ -6,7 +6,7 @@ import pytest
from mitmproxy import exceptions
from mitmproxy.addons import dumper
from mitmproxy.net.http import Headers
from mitmproxy.http import Headers
from mitmproxy.test import taddons
from mitmproxy.test import tflow
from mitmproxy.test import tutils

View File

@ -1,6 +1,6 @@
import pytest
from mitmproxy.net.http import Headers
from mitmproxy.http import Headers
from mitmproxy.net.http.http1.assemble import (
assemble_request, assemble_request_head, assemble_response,
assemble_response_head, _assemble_request_line, _assemble_request_headers,

View File

@ -1,6 +1,6 @@
import pytest
from mitmproxy.net.http import Headers
from mitmproxy.http import Headers
from mitmproxy.net.http.http1.read import (
read_request_head,
read_response_head, connection_close, expected_http_body_size,

View File

@ -1,8 +1,8 @@
import collections
import pytest
from mitmproxy.net.http.headers import Headers, parse_content_type, assemble_content_type
from mitmproxy.http import Headers
from mitmproxy.net.http.headers import parse_content_type, assemble_content_type
class TestHeaders:
def _2host(self):

View File

@ -1,7 +1,7 @@
import pytest
from mitmproxy.test import tutils
from mitmproxy.net import http
from mitmproxy import http
def _test_passthrough_attr(message, attr):

View File

@ -1,6 +1,6 @@
import pytest
from mitmproxy.net.http import Headers
from mitmproxy.http import Headers
from mitmproxy.net.http import multipart

View File

@ -1,7 +1,7 @@
from unittest import mock
import pytest
from mitmproxy.net.http import Headers, Request
from mitmproxy.http import Headers, Request
from mitmproxy.test.tutils import treq
from .test_message import _test_decoded_attr, _test_passthrough_attr

View File

@ -3,8 +3,8 @@ import time
import pytest
from unittest import mock
from mitmproxy.net.http import Headers
from mitmproxy.net.http import Response
from mitmproxy.http import Headers
from mitmproxy.http import Response
from mitmproxy.net.http.cookies import CookieAttrs
from mitmproxy.test.tutils import tresp
from .test_message import _test_passthrough_attr

View File

@ -1,7 +1,7 @@
from unittest import mock
import pytest
from mitmproxy.net.http import encoding
from mitmproxy.net import encoding
@pytest.mark.parametrize("encoder", [

View File

@ -1,7 +1,7 @@
import pytest
from mitmproxy.flow import Error
from mitmproxy.http import HTTPFlow, HTTPResponse
from mitmproxy.http import HTTPFlow, Response
from mitmproxy.net.server_spec import ServerSpec
from mitmproxy.proxy.layers.http import HTTPMode
from mitmproxy.proxy import layer
@ -205,7 +205,7 @@ def test_http_reply_from_proxy(tctx):
"""Test a response served by mitmproxy itself."""
def reply_from_proxy(flow: HTTPFlow):
flow.response = HTTPResponse.make(418)
flow.response = Response.make(418)
assert (
Playbook(http.HttpLayer(tctx, HTTPMode.regular), hooks=False)
@ -843,7 +843,7 @@ def test_kill_flow(tctx, when):
return assert_kill()
if when == "script-response-responseheaders":
assert (playbook
>> reply(side_effect=lambda f: setattr(f, "response", HTTPResponse.make()))
>> reply(side_effect=lambda f: setattr(f, "response", Response.make()))
<< http.HttpResponseHeadersHook(flow))
return assert_kill()
assert (playbook

View File

@ -1,6 +1,6 @@
import pytest
from mitmproxy.net import http
from mitmproxy import http
from mitmproxy.proxy.commands import SendData
from mitmproxy.proxy.events import DataReceived
from mitmproxy.proxy.layers.http import Http1Server, ReceiveHttp, RequestHeaders, RequestEndOfMessage, \

View File

@ -7,8 +7,8 @@ import pytest
from h2.errors import ErrorCodes
from mitmproxy.flow import Error
from mitmproxy.http import HTTPFlow
from mitmproxy.net.http import Headers, Request, status_codes
from mitmproxy.http import HTTPFlow, Headers, Request
from mitmproxy.net.http import status_codes
from mitmproxy.proxy.layers.http import HTTPMode
from mitmproxy.proxy.commands import CloseConnection, OpenConnection, SendData
from mitmproxy.proxy.context import Context, Server

View File

@ -5,8 +5,7 @@ import pytest
import wsproto
import wsproto.events
from mitmproxy.http import HTTPFlow
from mitmproxy.net.http import Request, Response
from mitmproxy.http import HTTPFlow, Request, Response
from mitmproxy.proxy.layers.http import HTTPMode
from mitmproxy.proxy.commands import SendData, CloseConnection, Log
from mitmproxy.proxy.context import ConnectionState

View File

@ -5,7 +5,7 @@ from mitmproxy import flow
from mitmproxy import flowfilter
from mitmproxy import http
from mitmproxy.exceptions import ControlException
from mitmproxy.net.http import Headers
from mitmproxy.http import Headers
from mitmproxy.test import tflow