Revamp options
- Options are now explicitly initialized with an add_option method - We have one canonical Options class - ditch dump.Options
This commit is contained in:
parent
e0644398b6
commit
67381ae550
|
@ -21,186 +21,94 @@ DEFAULT_CLIENT_CIPHERS = "ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES128-GCM-SHA
|
|||
|
||||
|
||||
class Options(optmanager.OptManager):
|
||||
def __init__(
|
||||
self,
|
||||
*, # all args are keyword-only.
|
||||
onboarding: bool = True,
|
||||
onboarding_host: str = APP_HOST,
|
||||
onboarding_port: int = APP_PORT,
|
||||
anticache: bool = False,
|
||||
anticomp: bool = False,
|
||||
client_replay: Sequence[str] = [],
|
||||
replay_kill_extra: bool = False,
|
||||
keepserving: bool = True,
|
||||
no_server: bool = False,
|
||||
server_replay_nopop: bool = False,
|
||||
refresh_server_playback: bool = True,
|
||||
rfile: Optional[str] = None,
|
||||
scripts: Sequence[str] = [],
|
||||
showhost: bool = False,
|
||||
replacements: Sequence[Union[Tuple[str, str, str], str]] = [],
|
||||
replacement_files: Sequence[Union[Tuple[str, str, str], str]] = [],
|
||||
server_replay_use_headers: Sequence[str] = [],
|
||||
setheaders: Sequence[Union[Tuple[str, str, str], str]] = [],
|
||||
server_replay: Sequence[str] = [],
|
||||
stickycookie: Optional[str] = None,
|
||||
stickyauth: Optional[str] = None,
|
||||
stream_large_bodies: Optional[int] = None,
|
||||
verbosity: int = 2,
|
||||
default_contentview: str = "auto",
|
||||
streamfile: Optional[str] = None,
|
||||
streamfile_append: bool = False,
|
||||
server_replay_ignore_content: bool = False,
|
||||
server_replay_ignore_params: Sequence[str] = [],
|
||||
server_replay_ignore_payload_params: Sequence[str] = [],
|
||||
server_replay_ignore_host: bool = False,
|
||||
|
||||
# Proxy options
|
||||
auth_nonanonymous: bool = False,
|
||||
auth_singleuser: Optional[str] = None,
|
||||
auth_htpasswd: Optional[str] = None,
|
||||
add_upstream_certs_to_client_chain: bool = False,
|
||||
body_size_limit: Optional[int] = None,
|
||||
cadir: str = CA_DIR,
|
||||
certs: Sequence[Tuple[str, str]] = [],
|
||||
ciphers_client: str=DEFAULT_CLIENT_CIPHERS,
|
||||
ciphers_server: Optional[str]=None,
|
||||
clientcerts: Optional[str] = None,
|
||||
ignore_hosts: Sequence[str] = [],
|
||||
listen_host: str = "",
|
||||
listen_port: int = LISTEN_PORT,
|
||||
upstream_bind_address: str = "",
|
||||
mode: str = "regular",
|
||||
no_upstream_cert: bool = False,
|
||||
keep_host_header: bool = False,
|
||||
|
||||
http2: bool = True,
|
||||
http2_priority: bool = False,
|
||||
websocket: bool = True,
|
||||
rawtcp: bool = False,
|
||||
|
||||
spoof_source_address: bool = False,
|
||||
upstream_server: Optional[str] = None,
|
||||
upstream_auth: Optional[str] = None,
|
||||
ssl_version_client: str = "secure",
|
||||
ssl_version_server: str = "secure",
|
||||
ssl_insecure: bool = False,
|
||||
ssl_verify_upstream_trusted_cadir: Optional[str] = None,
|
||||
ssl_verify_upstream_trusted_ca: Optional[str] = None,
|
||||
tcp_hosts: Sequence[str] = [],
|
||||
|
||||
intercept: Optional[str] = None,
|
||||
|
||||
# Console options
|
||||
console_eventlog: bool = False,
|
||||
console_focus_follow: bool = False,
|
||||
console_palette: Optional[str] = "dark",
|
||||
console_palette_transparent: bool = False,
|
||||
console_no_mouse: bool = False,
|
||||
console_order: Optional[str] = None,
|
||||
console_order_reversed: bool = False,
|
||||
|
||||
filter: Optional[str] = None,
|
||||
|
||||
# Web options
|
||||
web_open_browser: bool = True,
|
||||
web_debug: bool = False,
|
||||
web_port: int = 8081,
|
||||
web_iface: str = "127.0.0.1",
|
||||
|
||||
# Dump options
|
||||
filtstr: Optional[str] = None,
|
||||
flow_detail: int = 1
|
||||
) -> None:
|
||||
# We could replace all assignments with clever metaprogramming,
|
||||
# but type hints are a much more valueable asset.
|
||||
|
||||
self.onboarding = onboarding
|
||||
self.onboarding_host = onboarding_host
|
||||
self.onboarding_port = onboarding_port
|
||||
self.anticache = anticache
|
||||
self.anticomp = anticomp
|
||||
self.client_replay = client_replay
|
||||
self.keepserving = keepserving
|
||||
self.replay_kill_extra = replay_kill_extra
|
||||
self.no_server = no_server
|
||||
self.server_replay_nopop = server_replay_nopop
|
||||
self.refresh_server_playback = refresh_server_playback
|
||||
self.rfile = rfile
|
||||
self.scripts = scripts
|
||||
self.showhost = showhost
|
||||
self.replacements = replacements
|
||||
self.replacement_files = replacement_files
|
||||
self.server_replay_use_headers = server_replay_use_headers
|
||||
self.setheaders = setheaders
|
||||
self.server_replay = server_replay
|
||||
self.stickycookie = stickycookie
|
||||
self.stickyauth = stickyauth
|
||||
self.stream_large_bodies = stream_large_bodies
|
||||
self.verbosity = verbosity
|
||||
self.default_contentview = default_contentview
|
||||
self.streamfile = streamfile
|
||||
self.streamfile_append = streamfile_append
|
||||
self.server_replay_ignore_content = server_replay_ignore_content
|
||||
self.server_replay_ignore_params = server_replay_ignore_params
|
||||
self.server_replay_ignore_payload_params = server_replay_ignore_payload_params
|
||||
self.server_replay_ignore_host = server_replay_ignore_host
|
||||
|
||||
# Proxy options
|
||||
self.auth_nonanonymous = auth_nonanonymous
|
||||
self.auth_singleuser = auth_singleuser
|
||||
self.auth_htpasswd = auth_htpasswd
|
||||
self.add_upstream_certs_to_client_chain = add_upstream_certs_to_client_chain
|
||||
self.body_size_limit = body_size_limit
|
||||
self.cadir = cadir
|
||||
self.certs = certs
|
||||
self.ciphers_client = ciphers_client
|
||||
self.ciphers_server = ciphers_server
|
||||
self.clientcerts = clientcerts
|
||||
self.ignore_hosts = ignore_hosts
|
||||
self.listen_host = listen_host
|
||||
self.listen_port = listen_port
|
||||
self.upstream_bind_address = upstream_bind_address
|
||||
self.mode = mode
|
||||
self.no_upstream_cert = no_upstream_cert
|
||||
self.keep_host_header = keep_host_header
|
||||
|
||||
self.http2 = http2
|
||||
self.http2_priority = http2_priority
|
||||
self.websocket = websocket
|
||||
self.rawtcp = rawtcp
|
||||
|
||||
self.spoof_source_address = spoof_source_address
|
||||
self.upstream_server = upstream_server
|
||||
self.upstream_auth = upstream_auth
|
||||
self.ssl_version_client = ssl_version_client
|
||||
self.ssl_version_server = ssl_version_server
|
||||
self.ssl_insecure = ssl_insecure
|
||||
self.ssl_verify_upstream_trusted_cadir = ssl_verify_upstream_trusted_cadir
|
||||
self.ssl_verify_upstream_trusted_ca = ssl_verify_upstream_trusted_ca
|
||||
self.tcp_hosts = tcp_hosts
|
||||
|
||||
self.intercept = intercept
|
||||
|
||||
# Console options
|
||||
self.console_eventlog = console_eventlog
|
||||
self.console_focus_follow = console_focus_follow
|
||||
self.console_palette = console_palette
|
||||
self.console_palette_transparent = console_palette_transparent
|
||||
self.console_no_mouse = console_no_mouse
|
||||
self.console_order = console_order
|
||||
self.console_order_reversed = console_order_reversed
|
||||
|
||||
self.filter = filter
|
||||
|
||||
# Web options
|
||||
self.web_open_browser = web_open_browser
|
||||
self.web_debug = web_debug
|
||||
self.web_port = web_port
|
||||
self.web_iface = web_iface
|
||||
|
||||
# Dump options
|
||||
self.filtstr = filtstr
|
||||
self.flow_detail = flow_detail
|
||||
|
||||
def __init__(self, **kwargs) -> None:
|
||||
super().__init__()
|
||||
self.add_option("onboarding", True, bool)
|
||||
self.add_option("onboarding_host", APP_HOST, str)
|
||||
self.add_option("onboarding_port", APP_PORT, int)
|
||||
self.add_option("anticache", False, bool)
|
||||
self.add_option("anticomp", False, bool)
|
||||
self.add_option("client_replay", [], Sequence[str])
|
||||
self.add_option("replay_kill_extra", False, bool)
|
||||
self.add_option("keepserving", True, bool)
|
||||
self.add_option("no_server", False, bool)
|
||||
self.add_option("server_replay_nopop", False, bool)
|
||||
self.add_option("refresh_server_playback", True, bool)
|
||||
self.add_option("rfile", None, Optional[str])
|
||||
self.add_option("scripts", [], Sequence[str])
|
||||
self.add_option("showhost", False, bool)
|
||||
self.add_option("replacements", [], Sequence[Union[Tuple[str, str, str], str]])
|
||||
self.add_option("replacement_files", [], Sequence[Union[Tuple[str, str, str], str]])
|
||||
self.add_option("server_replay_use_headers", [], Sequence[str])
|
||||
self.add_option("setheaders", [], Sequence[Union[Tuple[str, str, str], str]])
|
||||
self.add_option("server_replay", [], Sequence[str])
|
||||
self.add_option("stickycookie", None, Optional[str])
|
||||
self.add_option("stickyauth", None, Optional[str])
|
||||
self.add_option("stream_large_bodies", None, Optional[int])
|
||||
self.add_option("verbosity", 2, int)
|
||||
self.add_option("default_contentview", "auto", str)
|
||||
self.add_option("streamfile", None, Optional[str])
|
||||
self.add_option("streamfile_append", False, bool)
|
||||
self.add_option("server_replay_ignore_content", False, bool)
|
||||
self.add_option("server_replay_ignore_params", [], Sequence[str])
|
||||
self.add_option("server_replay_ignore_payload_params", [], Sequence[str])
|
||||
self.add_option("server_replay_ignore_host", False, bool)
|
||||
|
||||
# Proxy options
|
||||
self.add_option("auth_nonanonymous", False, bool)
|
||||
self.add_option("auth_singleuser", None, Optional[str])
|
||||
self.add_option("auth_htpasswd", None, Optional[str])
|
||||
self.add_option("add_upstream_certs_to_client_chain", False, bool)
|
||||
self.add_option("body_size_limit", None, Optional[int])
|
||||
self.add_option("cadir", CA_DIR, str)
|
||||
self.add_option("certs", [], Sequence[Tuple[str, str]])
|
||||
self.add_option("ciphers_client", DEFAULT_CLIENT_CIPHERS, str)
|
||||
self.add_option("ciphers_server", None, Optional[str])
|
||||
self.add_option("clientcerts", None, Optional[str])
|
||||
self.add_option("ignore_hosts", [], Sequence[str])
|
||||
self.add_option("listen_host", "", str)
|
||||
self.add_option("listen_port", LISTEN_PORT, int)
|
||||
self.add_option("upstream_bind_address", "", str)
|
||||
self.add_option("mode", "regular", str)
|
||||
self.add_option("no_upstream_cert", False, bool)
|
||||
self.add_option("keep_host_header", False, bool)
|
||||
|
||||
self.add_option("http2", True, bool)
|
||||
self.add_option("http2_priority", False, bool)
|
||||
self.add_option("websocket", True, bool)
|
||||
self.add_option("rawtcp", False, bool)
|
||||
|
||||
self.add_option("spoof_source_address", False, bool)
|
||||
self.add_option("upstream_server", None, Optional[str])
|
||||
self.add_option("upstream_auth", None, Optional[str])
|
||||
self.add_option("ssl_version_client", "secure", str)
|
||||
self.add_option("ssl_version_server", "secure", str)
|
||||
self.add_option("ssl_insecure", False, bool)
|
||||
self.add_option("ssl_verify_upstream_trusted_cadir", None, Optional[str])
|
||||
self.add_option("ssl_verify_upstream_trusted_ca", None, Optional[str])
|
||||
self.add_option("tcp_hosts", [], Sequence[str])
|
||||
|
||||
self.add_option("intercept", None, Optional[str])
|
||||
|
||||
# Console options
|
||||
self.add_option("console_eventlog", False, bool)
|
||||
self.add_option("console_focus_follow", False, bool)
|
||||
self.add_option("console_palette", "dark", Optional[str])
|
||||
self.add_option("console_palette_transparent", False, bool)
|
||||
self.add_option("console_no_mouse", False, bool)
|
||||
self.add_option("console_order", None, Optional[str])
|
||||
self.add_option("console_order_reversed", False, bool)
|
||||
|
||||
self.add_option("filter", None, Optional[str])
|
||||
|
||||
# Web options
|
||||
self.add_option("web_open_browser", True, bool)
|
||||
self.add_option("web_debug", False, bool)
|
||||
self.add_option("web_port", 8081, int)
|
||||
self.add_option("web_iface", "127.0.0.1", str)
|
||||
|
||||
# Dump options
|
||||
self.add_option("filtstr", None, Optional[str])
|
||||
self.add_option("flow_detail", 1, int)
|
||||
|
||||
self.update(**kwargs)
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
import contextlib
|
||||
import blinker
|
||||
import pprint
|
||||
import inspect
|
||||
import copy
|
||||
import functools
|
||||
import weakref
|
||||
import os
|
||||
import typing
|
||||
|
||||
import ruamel.yaml
|
||||
|
||||
|
@ -17,21 +17,62 @@ from mitmproxy.utils import typecheck
|
|||
The base implementation for Options.
|
||||
"""
|
||||
|
||||
|
||||
class _DefaultsMeta(type):
|
||||
def __new__(cls, name, bases, namespace, **kwds):
|
||||
ret = type.__new__(cls, name, bases, dict(namespace))
|
||||
defaults = {}
|
||||
for klass in reversed(inspect.getmro(ret)):
|
||||
for p in inspect.signature(klass.__init__).parameters.values():
|
||||
if p.kind in (p.KEYWORD_ONLY, p.POSITIONAL_OR_KEYWORD):
|
||||
if not p.default == p.empty:
|
||||
defaults[p.name] = p.default
|
||||
ret._defaults = defaults
|
||||
return ret
|
||||
unset = object()
|
||||
|
||||
|
||||
class OptManager(metaclass=_DefaultsMeta):
|
||||
class _Option:
|
||||
__slots__ = ("name", "typespec", "value", "_default")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
default: typing.Any,
|
||||
typespec: typing.Type
|
||||
) -> None:
|
||||
typecheck.check_type(name, default, typespec)
|
||||
self.name = name
|
||||
self._default = default
|
||||
self.typespec = typespec
|
||||
self.value = unset
|
||||
|
||||
def __repr__(self):
|
||||
return "{value} [{type}]".format(value=self.current(), type=self.typespec)
|
||||
|
||||
@property
|
||||
def default(self):
|
||||
return copy.deepcopy(self._default)
|
||||
|
||||
def current(self) -> typing.Any:
|
||||
if self.value is unset:
|
||||
v = self.default
|
||||
else:
|
||||
v = self.value
|
||||
return copy.deepcopy(v)
|
||||
|
||||
def set(self, value: typing.Any) -> None:
|
||||
typecheck.check_type(self.name, value, self.typespec)
|
||||
self.value = value
|
||||
|
||||
def reset(self) -> None:
|
||||
self.value = unset
|
||||
|
||||
def has_changed(self) -> bool:
|
||||
return self.value is not unset
|
||||
|
||||
def __eq__(self, other) -> bool:
|
||||
for i in self.__slots__:
|
||||
if getattr(self, i) != getattr(other, i):
|
||||
return False
|
||||
return True
|
||||
|
||||
def __deepcopy__(self, _):
|
||||
o = _Option(self.name, self.default, self.typespec)
|
||||
if self.has_changed():
|
||||
o.value = self.current()
|
||||
return o
|
||||
|
||||
|
||||
class OptManager:
|
||||
"""
|
||||
OptManager is the base class from which Options objects are derived.
|
||||
Note that the __init__ method of all child classes must force all
|
||||
|
@ -45,32 +86,26 @@ class OptManager(metaclass=_DefaultsMeta):
|
|||
Optmanager always returns a deep copy of options to ensure that
|
||||
mutation doesn't change the option state inadvertently.
|
||||
"""
|
||||
_initialized = False
|
||||
attributes = []
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
# Initialize instance._opts before __init__ is called.
|
||||
# This allows us to call super().__init__() last, which then sets
|
||||
# ._initialized = True as the final operation.
|
||||
instance = super().__new__(cls)
|
||||
instance.__dict__["_opts"] = {}
|
||||
return instance
|
||||
|
||||
def __init__(self):
|
||||
self.__dict__["_options"] = {}
|
||||
self.__dict__["changed"] = blinker.Signal()
|
||||
self.__dict__["errored"] = blinker.Signal()
|
||||
self.__dict__["_initialized"] = True
|
||||
|
||||
def add_option(self, name: str, default: typing.Any, typespec: typing.Type) -> None:
|
||||
if name in self._options:
|
||||
raise ValueError("Option %s already exists" % name)
|
||||
self._options[name] = _Option(name, default, typespec)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def rollback(self, updated):
|
||||
old = self._opts.copy()
|
||||
old = copy.deepcopy(self._options)
|
||||
try:
|
||||
yield
|
||||
except exceptions.OptionsError as e:
|
||||
# Notify error handlers
|
||||
self.errored.send(self, exc=e)
|
||||
# Rollback
|
||||
self.__dict__["_opts"] = old
|
||||
self.__dict__["_options"] = old
|
||||
self.changed.send(self, updated=updated)
|
||||
|
||||
def subscribe(self, func, opts):
|
||||
|
@ -95,61 +130,48 @@ class OptManager(metaclass=_DefaultsMeta):
|
|||
self.changed.connect(_call, weak=False)
|
||||
|
||||
def __eq__(self, other):
|
||||
return self._opts == other._opts
|
||||
return self._options == other._options
|
||||
|
||||
def __copy__(self):
|
||||
return self.__class__(**self._opts)
|
||||
o = OptManager()
|
||||
o.__dict__["_options"] = copy.deepcopy(self._options)
|
||||
return o
|
||||
|
||||
def __getattr__(self, attr):
|
||||
if attr in self._opts:
|
||||
return copy.deepcopy(self._opts[attr])
|
||||
if attr in self._options:
|
||||
return self._options[attr].current()
|
||||
else:
|
||||
raise AttributeError("No such option: %s" % attr)
|
||||
|
||||
def __setattr__(self, attr, value):
|
||||
if not self._initialized:
|
||||
self._typecheck(attr, value)
|
||||
self._opts[attr] = value
|
||||
return
|
||||
self.update(**{attr: value})
|
||||
|
||||
def _typecheck(self, attr, value):
|
||||
expected_type = typecheck.get_arg_type_from_constructor_annotation(
|
||||
type(self), attr
|
||||
)
|
||||
if expected_type is None:
|
||||
return # no type info :(
|
||||
typecheck.check_type(attr, value, expected_type)
|
||||
|
||||
def keys(self):
|
||||
return set(self._opts.keys())
|
||||
return set(self._options.keys())
|
||||
|
||||
def reset(self):
|
||||
"""
|
||||
Restore defaults for all options.
|
||||
"""
|
||||
self.update(**self._defaults)
|
||||
|
||||
@classmethod
|
||||
def default(klass, opt):
|
||||
return copy.deepcopy(klass._defaults[opt])
|
||||
for o in self._options.values():
|
||||
o.reset()
|
||||
|
||||
def update(self, **kwargs):
|
||||
updated = set(kwargs.keys())
|
||||
for k, v in kwargs.items():
|
||||
if k not in self._opts:
|
||||
raise KeyError("No such option: %s" % k)
|
||||
self._typecheck(k, v)
|
||||
with self.rollback(updated):
|
||||
self._opts.update(kwargs)
|
||||
for k, v in kwargs.items():
|
||||
if k not in self._options:
|
||||
raise KeyError("No such option: %s" % k)
|
||||
self._options[k].set(v)
|
||||
self.changed.send(self, updated=updated)
|
||||
return self
|
||||
|
||||
def setter(self, attr):
|
||||
"""
|
||||
Generate a setter for a given attribute. This returns a callable
|
||||
taking a single argument.
|
||||
"""
|
||||
if attr not in self._opts:
|
||||
if attr not in self._options:
|
||||
raise KeyError("No such option: %s" % attr)
|
||||
|
||||
def setter(x):
|
||||
|
@ -161,19 +183,24 @@ class OptManager(metaclass=_DefaultsMeta):
|
|||
Generate a toggler for a boolean attribute. This returns a callable
|
||||
that takes no arguments.
|
||||
"""
|
||||
if attr not in self._opts:
|
||||
if attr not in self._options:
|
||||
raise KeyError("No such option: %s" % attr)
|
||||
o = self._options[attr]
|
||||
if o.typespec != bool:
|
||||
raise ValueError("Toggler can only be used with boolean options")
|
||||
|
||||
def toggle():
|
||||
setattr(self, attr, not getattr(self, attr))
|
||||
return toggle
|
||||
|
||||
def default(self, option: str) -> typing.Any:
|
||||
return self._options[option].default
|
||||
|
||||
def has_changed(self, option):
|
||||
"""
|
||||
Has the option changed from the default?
|
||||
"""
|
||||
if getattr(self, option) != self._defaults[option]:
|
||||
return True
|
||||
return self._options[option].has_changed()
|
||||
|
||||
def save(self, path, defaults=False):
|
||||
"""
|
||||
|
@ -204,7 +231,7 @@ class OptManager(metaclass=_DefaultsMeta):
|
|||
if defaults or self.has_changed(k):
|
||||
data[k] = getattr(self, k)
|
||||
for k in list(data.keys()):
|
||||
if k not in self._opts:
|
||||
if k not in self._options:
|
||||
del data[k]
|
||||
return ruamel.yaml.round_trip_dump(data)
|
||||
|
||||
|
@ -268,7 +295,7 @@ class OptManager(metaclass=_DefaultsMeta):
|
|||
self.update(**toset)
|
||||
|
||||
def __repr__(self):
|
||||
options = pprint.pformat(self._opts, indent=4).strip(" {}")
|
||||
options = pprint.pformat(self._options, indent=4).strip(" {}")
|
||||
if "\n" in options:
|
||||
options = "\n " + options + "\n"
|
||||
return "{mod}.{cls}({{{options}}})".format(
|
||||
|
|
|
@ -4,7 +4,6 @@ import mitmproxy.master
|
|||
import mitmproxy.options
|
||||
from mitmproxy import proxy
|
||||
from mitmproxy import eventsequence
|
||||
from mitmproxy import exceptions
|
||||
|
||||
|
||||
class RecordingMaster(mitmproxy.master.Master):
|
||||
|
@ -43,14 +42,6 @@ class context:
|
|||
return False
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _rollback(self, opts, updates):
|
||||
old = opts._opts.copy()
|
||||
try:
|
||||
yield
|
||||
except exceptions.OptionsError as e:
|
||||
opts.__dict__["_opts"] = old
|
||||
raise
|
||||
|
||||
def cycle(self, addon, f):
|
||||
"""
|
||||
Cycles the flow through the events for the flow. Stops if a reply
|
||||
|
@ -70,6 +61,5 @@ class context:
|
|||
Options object with the given keyword arguments, then calls the
|
||||
configure method on the addon with the updated value.
|
||||
"""
|
||||
with self._rollback(self.options, kwargs):
|
||||
self.options.update(**kwargs)
|
||||
addon.configure(self.options, kwargs.keys())
|
||||
self.options.update(**kwargs)
|
||||
addon.configure(self.options, kwargs.keys())
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
from typing import Optional
|
||||
|
||||
from mitmproxy import controller
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import addons
|
||||
|
@ -12,26 +10,11 @@ class DumpError(Exception):
|
|||
pass
|
||||
|
||||
|
||||
class Options(options.Options):
|
||||
def __init__(
|
||||
self,
|
||||
*, # all args are keyword-only.
|
||||
keepserving: bool = False,
|
||||
filtstr: Optional[str] = None,
|
||||
flow_detail: int = 1,
|
||||
**kwargs
|
||||
) -> None:
|
||||
self.filtstr = filtstr
|
||||
self.flow_detail = flow_detail
|
||||
self.keepserving = keepserving
|
||||
super().__init__(**kwargs)
|
||||
|
||||
|
||||
class DumpMaster(master.Master):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
options: Options,
|
||||
options: options.Options,
|
||||
server,
|
||||
with_termlog=True,
|
||||
with_dumper=True,
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import typing
|
||||
|
||||
|
||||
def check_type(attr_name: str, value: typing.Any, typeinfo: type) -> None:
|
||||
def check_type(name: str, value: typing.Any, typeinfo: type) -> None:
|
||||
"""
|
||||
This function checks if the provided value is an instance of typeinfo
|
||||
and raises a TypeError otherwise.
|
||||
|
@ -17,7 +17,7 @@ def check_type(attr_name: str, value: typing.Any, typeinfo: type) -> None:
|
|||
|
||||
e = TypeError("Expected {} for {}, but got {}.".format(
|
||||
typeinfo,
|
||||
attr_name,
|
||||
name,
|
||||
type(value)
|
||||
))
|
||||
|
||||
|
@ -32,7 +32,7 @@ def check_type(attr_name: str, value: typing.Any, typeinfo: type) -> None:
|
|||
|
||||
for T in types:
|
||||
try:
|
||||
check_type(attr_name, value, T)
|
||||
check_type(name, value, T)
|
||||
except TypeError:
|
||||
pass
|
||||
else:
|
||||
|
@ -50,7 +50,7 @@ def check_type(attr_name: str, value: typing.Any, typeinfo: type) -> None:
|
|||
if len(types) != len(value):
|
||||
raise e
|
||||
for i, (x, T) in enumerate(zip(value, types)):
|
||||
check_type("{}[{}]".format(attr_name, i), x, T)
|
||||
check_type("{}[{}]".format(name, i), x, T)
|
||||
return
|
||||
elif typename.startswith("typing.Sequence"):
|
||||
try:
|
||||
|
@ -62,7 +62,7 @@ def check_type(attr_name: str, value: typing.Any, typeinfo: type) -> None:
|
|||
if not isinstance(value, (tuple, list)):
|
||||
raise e
|
||||
for v in value:
|
||||
check_type(attr_name, v, T)
|
||||
check_type(name, v, T)
|
||||
elif typename.startswith("typing.IO"):
|
||||
if hasattr(value, "read"):
|
||||
return
|
||||
|
@ -70,12 +70,3 @@ def check_type(attr_name: str, value: typing.Any, typeinfo: type) -> None:
|
|||
raise e
|
||||
elif not isinstance(value, typeinfo):
|
||||
raise e
|
||||
|
||||
|
||||
def get_arg_type_from_constructor_annotation(cls: type, attr: str) -> typing.Optional[type]:
|
||||
"""
|
||||
Returns the first type annotation for attr in the class hierarchy.
|
||||
"""
|
||||
for c in cls.mro():
|
||||
if attr in getattr(c.__init__, "__annotations__", ()):
|
||||
return c.__init__.__annotations__[attr]
|
||||
|
|
|
@ -4,12 +4,12 @@ import click
|
|||
from mitmproxy.addons import dumper
|
||||
from mitmproxy.test import tflow
|
||||
from mitmproxy.test import taddons
|
||||
from mitmproxy.tools import dump
|
||||
from mitmproxy.tools import options
|
||||
|
||||
|
||||
def show(flow_detail, flows):
|
||||
d = dumper.Dumper()
|
||||
with taddons.context(options=dump.Options()) as ctx:
|
||||
with taddons.context(options=options.Options()) as ctx:
|
||||
ctx.configure(d, flow_detail=flow_detail)
|
||||
for f in flows:
|
||||
ctx.cycle(d, f)
|
||||
|
|
|
@ -9,13 +9,13 @@ from mitmproxy.test import tutils
|
|||
|
||||
from mitmproxy.addons import dumper
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy.tools import dump
|
||||
from mitmproxy import http
|
||||
from mitmproxy import options
|
||||
|
||||
|
||||
def test_configure():
|
||||
d = dumper.Dumper()
|
||||
with taddons.context(options=dump.Options()) as ctx:
|
||||
with taddons.context(options=options.Options()) as ctx:
|
||||
ctx.configure(d, filtstr="~b foo")
|
||||
assert d.filter
|
||||
|
||||
|
@ -34,7 +34,7 @@ def test_configure():
|
|||
def test_simple():
|
||||
sio = io.StringIO()
|
||||
d = dumper.Dumper(sio)
|
||||
with taddons.context(options=dump.Options()) as ctx:
|
||||
with taddons.context(options=options.Options()) as ctx:
|
||||
ctx.configure(d, flow_detail=0)
|
||||
d.response(tflow.tflow(resp=True))
|
||||
assert not sio.getvalue()
|
||||
|
@ -103,7 +103,7 @@ def test_echo_body():
|
|||
|
||||
sio = io.StringIO()
|
||||
d = dumper.Dumper(sio)
|
||||
with taddons.context(options=dump.Options()) as ctx:
|
||||
with taddons.context(options=options.Options()) as ctx:
|
||||
ctx.configure(d, flow_detail=3)
|
||||
d._echo_message(f.response)
|
||||
t = sio.getvalue()
|
||||
|
@ -113,7 +113,7 @@ def test_echo_body():
|
|||
def test_echo_request_line():
|
||||
sio = io.StringIO()
|
||||
d = dumper.Dumper(sio)
|
||||
with taddons.context(options=dump.Options()) as ctx:
|
||||
with taddons.context(options=options.Options()) as ctx:
|
||||
ctx.configure(d, flow_detail=3, showhost=True)
|
||||
f = tflow.tflow(client_conn=None, server_conn=True, resp=True)
|
||||
f.request.is_replay = True
|
||||
|
@ -148,7 +148,7 @@ class TestContentView:
|
|||
view_auto.side_effect = exceptions.ContentViewException("")
|
||||
sio = io.StringIO()
|
||||
d = dumper.Dumper(sio)
|
||||
with taddons.context(options=dump.Options()) as ctx:
|
||||
with taddons.context(options=options.Options()) as ctx:
|
||||
ctx.configure(d, flow_detail=4, verbosity=3)
|
||||
d.response(tflow.tflow())
|
||||
assert "Content viewer failed" in ctx.master.event_log[0][1]
|
||||
|
@ -157,7 +157,7 @@ class TestContentView:
|
|||
def test_tcp():
|
||||
sio = io.StringIO()
|
||||
d = dumper.Dumper(sio)
|
||||
with taddons.context(options=dump.Options()) as ctx:
|
||||
with taddons.context(options=options.Options()) as ctx:
|
||||
ctx.configure(d, flow_detail=3, showhost=True)
|
||||
f = tflow.ttcpflow()
|
||||
d.tcp_message(f)
|
||||
|
@ -172,7 +172,7 @@ def test_tcp():
|
|||
def test_websocket():
|
||||
sio = io.StringIO()
|
||||
d = dumper.Dumper(sio)
|
||||
with taddons.context(options=dump.Options()) as ctx:
|
||||
with taddons.context(options=options.Options()) as ctx:
|
||||
ctx.configure(d, flow_detail=3, showhost=True)
|
||||
f = tflow.twebsocketflow()
|
||||
d.websocket_message(f)
|
||||
|
|
|
@ -7,15 +7,9 @@ from mitmproxy.test import taddons
|
|||
from mitmproxy.test import tflow
|
||||
|
||||
|
||||
class Options(options.Options):
|
||||
def __init__(self, *, intercept=None, **kwargs):
|
||||
self.intercept = intercept
|
||||
super().__init__(**kwargs)
|
||||
|
||||
|
||||
def test_simple():
|
||||
r = intercept.Intercept()
|
||||
with taddons.context(options=Options()) as tctx:
|
||||
with taddons.context(options=options.Options()) as tctx:
|
||||
assert not r.filt
|
||||
tctx.configure(r, intercept="~q")
|
||||
assert r.filt
|
||||
|
|
|
@ -7,13 +7,13 @@ from mitmproxy.test import taddons
|
|||
|
||||
from mitmproxy import io
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy.tools import dump
|
||||
from mitmproxy import options
|
||||
from mitmproxy.addons import streamfile
|
||||
|
||||
|
||||
def test_configure():
|
||||
sa = streamfile.StreamFile()
|
||||
with taddons.context(options=dump.Options()) as tctx:
|
||||
with taddons.context(options=options.Options()) as tctx:
|
||||
with tutils.tmpdir() as tdir:
|
||||
p = os.path.join(tdir, "foo")
|
||||
with pytest.raises(exceptions.OptionsError):
|
||||
|
|
|
@ -3,7 +3,7 @@ import pytest
|
|||
|
||||
from mitmproxy.addons import termlog
|
||||
from mitmproxy import log
|
||||
from mitmproxy.tools.dump import Options
|
||||
from mitmproxy.options import Options
|
||||
from mitmproxy.test import taddons
|
||||
|
||||
|
||||
|
|
|
@ -15,23 +15,6 @@ def tft(*, method="get", start=0):
|
|||
return f
|
||||
|
||||
|
||||
class Options(options.Options):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
filter=None,
|
||||
console_order=None,
|
||||
console_order_reversed=False,
|
||||
console_focus_follow=False,
|
||||
**kwargs
|
||||
):
|
||||
self.filter = filter
|
||||
self.console_order = console_order
|
||||
self.console_order_reversed = console_order_reversed
|
||||
self.console_focus_follow = console_focus_follow
|
||||
super().__init__(**kwargs)
|
||||
|
||||
|
||||
def test_order_refresh():
|
||||
v = view.View()
|
||||
sargs = []
|
||||
|
@ -42,7 +25,7 @@ def test_order_refresh():
|
|||
v.sig_view_refresh.connect(save)
|
||||
|
||||
tf = tflow.tflow(resp=True)
|
||||
with taddons.context(options=Options()) as tctx:
|
||||
with taddons.context(options=options.Options()) as tctx:
|
||||
tctx.configure(v, console_order="time")
|
||||
v.add(tf)
|
||||
tf.request.timestamp_start = 1
|
||||
|
@ -149,7 +132,7 @@ def test_filter():
|
|||
|
||||
def test_order():
|
||||
v = view.View()
|
||||
with taddons.context(options=Options()) as tctx:
|
||||
with taddons.context(options=options.Options()) as tctx:
|
||||
v.request(tft(method="get", start=1))
|
||||
v.request(tft(method="put", start=2))
|
||||
v.request(tft(method="get", start=3))
|
||||
|
@ -280,7 +263,7 @@ def test_signals():
|
|||
|
||||
def test_focus_follow():
|
||||
v = view.View()
|
||||
with taddons.context(options=Options()) as tctx:
|
||||
with taddons.context(options=options.Options()) as tctx:
|
||||
tctx.configure(v, console_focus_follow=True, filter="~m get")
|
||||
|
||||
v.add(tft(start=5))
|
||||
|
@ -394,7 +377,7 @@ def test_settings():
|
|||
|
||||
def test_configure():
|
||||
v = view.View()
|
||||
with taddons.context(options=Options()) as tctx:
|
||||
with taddons.context(options=options.Options()) as tctx:
|
||||
tctx.configure(v, filter="~q")
|
||||
with pytest.raises(Exception, match="Invalid interception filter"):
|
||||
tctx.configure(v, filter="~~")
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import copy
|
||||
import os
|
||||
import pytest
|
||||
import typing
|
||||
|
||||
from mitmproxy import options
|
||||
from mitmproxy import optmanager
|
||||
|
@ -9,48 +10,45 @@ from mitmproxy.test import tutils
|
|||
|
||||
|
||||
class TO(optmanager.OptManager):
|
||||
def __init__(self, one=None, two=None):
|
||||
self.one = one
|
||||
self.two = two
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.add_option("one", None, typing.Optional[int])
|
||||
self.add_option("two", 2, typing.Optional[int])
|
||||
self.add_option("bool", False, bool)
|
||||
|
||||
|
||||
class TD(optmanager.OptManager):
|
||||
def __init__(self, *, one="done", two="dtwo", three="error"):
|
||||
self.one = one
|
||||
self.two = two
|
||||
self.three = three
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.add_option("one", "done", str)
|
||||
self.add_option("two", "dtwo", str)
|
||||
|
||||
|
||||
class TD2(TD):
|
||||
def __init__(self, *, three="dthree", four="dfour", **kwargs):
|
||||
self.three = three
|
||||
self.four = four
|
||||
super().__init__(three=three, **kwargs)
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.add_option("three", "dthree", str)
|
||||
self.add_option("four", "dfour", str)
|
||||
|
||||
|
||||
class TM(optmanager.OptManager):
|
||||
def __init__(self, one="one", two=["foo"], three=None):
|
||||
self.one = one
|
||||
self.two = two
|
||||
self.three = three
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.add_option("two", ["foo"], typing.Sequence[str])
|
||||
self.add_option("one", None, typing.Optional[str])
|
||||
|
||||
|
||||
def test_defaults():
|
||||
assert TD2.default("one") == "done"
|
||||
assert TD2.default("two") == "dtwo"
|
||||
assert TD2.default("three") == "dthree"
|
||||
assert TD2.default("four") == "dfour"
|
||||
|
||||
o = TD2()
|
||||
assert o._defaults == {
|
||||
defaults = {
|
||||
"one": "done",
|
||||
"two": "dtwo",
|
||||
"three": "dthree",
|
||||
"four": "dfour",
|
||||
}
|
||||
for k, v in defaults.items():
|
||||
assert o.default(k) == v
|
||||
|
||||
assert not o.has_changed("one")
|
||||
newvals = dict(
|
||||
one="xone",
|
||||
|
@ -64,18 +62,19 @@ def test_defaults():
|
|||
assert v == getattr(o, k)
|
||||
o.reset()
|
||||
assert not o.has_changed("one")
|
||||
for k, v in o._defaults.items():
|
||||
assert v == getattr(o, k)
|
||||
|
||||
for k in o.keys():
|
||||
assert not o.has_changed(k)
|
||||
|
||||
|
||||
def test_options():
|
||||
o = TO(two="three")
|
||||
assert o.keys() == set(["one", "two"])
|
||||
o = TO()
|
||||
assert o.keys() == set(["bool", "one", "two"])
|
||||
|
||||
assert o.one is None
|
||||
assert o.two == "three"
|
||||
o.one = "one"
|
||||
assert o.one == "one"
|
||||
assert o.two == 2
|
||||
o.one = 1
|
||||
assert o.one == 1
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
TO(nonexistent = "value")
|
||||
|
@ -91,34 +90,38 @@ def test_options():
|
|||
|
||||
o.changed.connect(sub)
|
||||
|
||||
o.one = "ninety"
|
||||
o.one = 90
|
||||
assert len(rec) == 1
|
||||
assert rec[-1].one == "ninety"
|
||||
assert rec[-1].one == 90
|
||||
|
||||
o.update(one="oink")
|
||||
o.update(one=3)
|
||||
assert len(rec) == 2
|
||||
assert rec[-1].one == "oink"
|
||||
assert rec[-1].one == 3
|
||||
|
||||
|
||||
def test_setter():
|
||||
o = TO(two="three")
|
||||
o = TO()
|
||||
f = o.setter("two")
|
||||
f("xxx")
|
||||
assert o.two == "xxx"
|
||||
f(99)
|
||||
assert o.two == 99
|
||||
with pytest.raises(Exception, match="No such option"):
|
||||
o.setter("nonexistent")
|
||||
|
||||
|
||||
def test_toggler():
|
||||
o = TO(two=True)
|
||||
f = o.toggler("two")
|
||||
o = TO()
|
||||
f = o.toggler("bool")
|
||||
assert o.bool is False
|
||||
f()
|
||||
assert o.two is False
|
||||
assert o.bool is True
|
||||
f()
|
||||
assert o.two is True
|
||||
assert o.bool is False
|
||||
with pytest.raises(Exception, match="No such option"):
|
||||
o.toggler("nonexistent")
|
||||
|
||||
with pytest.raises(Exception, match="boolean options"):
|
||||
o.toggler("one")
|
||||
|
||||
|
||||
class Rec():
|
||||
def __init__(self):
|
||||
|
@ -132,19 +135,19 @@ def test_subscribe():
|
|||
o = TO()
|
||||
r = Rec()
|
||||
o.subscribe(r, ["two"])
|
||||
o.one = "foo"
|
||||
o.one = 2
|
||||
assert not r.called
|
||||
o.two = "foo"
|
||||
o.two = 3
|
||||
assert r.called
|
||||
|
||||
assert len(o.changed.receivers) == 1
|
||||
del r
|
||||
o.two = "bar"
|
||||
o.two = 4
|
||||
assert len(o.changed.receivers) == 0
|
||||
|
||||
|
||||
def test_rollback():
|
||||
o = TO(one="two")
|
||||
o = TO()
|
||||
|
||||
rec = []
|
||||
|
||||
|
@ -157,27 +160,24 @@ def test_rollback():
|
|||
recerr.append(kwargs)
|
||||
|
||||
def err(opts, updated):
|
||||
if opts.one == "ten":
|
||||
if opts.one == 10:
|
||||
raise exceptions.OptionsError()
|
||||
|
||||
o.changed.connect(sub)
|
||||
o.changed.connect(err)
|
||||
o.errored.connect(errsub)
|
||||
|
||||
o.one = "ten"
|
||||
assert o.one is None
|
||||
o.one = 10
|
||||
assert isinstance(recerr[0]["exc"], exceptions.OptionsError)
|
||||
assert o.one == "two"
|
||||
assert o.one is None
|
||||
assert len(rec) == 2
|
||||
assert rec[0].one == "ten"
|
||||
assert rec[1].one == "two"
|
||||
assert rec[0].one == 10
|
||||
assert rec[1].one is None
|
||||
|
||||
|
||||
def test_repr():
|
||||
assert repr(TO()) == "test.mitmproxy.test_optmanager.TO({'one': None, 'two': None})"
|
||||
assert repr(TO(one='x' * 60)) == """test.mitmproxy.test_optmanager.TO({
|
||||
'one': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
|
||||
'two': None
|
||||
})"""
|
||||
assert repr(TO())
|
||||
|
||||
|
||||
def test_serialize():
|
||||
|
@ -249,3 +249,17 @@ def test_merge():
|
|||
assert m.one == "two"
|
||||
m.merge(dict(two=["bar"]))
|
||||
assert m.two == ["foo", "bar"]
|
||||
|
||||
|
||||
def test_option():
|
||||
o = optmanager._Option("test", 1, int)
|
||||
assert o.current() == 1
|
||||
with pytest.raises(TypeError):
|
||||
o.set("foo")
|
||||
with pytest.raises(TypeError):
|
||||
optmanager._Option("test", 1, str)
|
||||
|
||||
o2 = optmanager._Option("test", 1, int)
|
||||
assert o2 == o
|
||||
o2.set(5)
|
||||
assert o2 != o
|
||||
|
|
|
@ -5,6 +5,7 @@ from unittest import mock
|
|||
from mitmproxy import proxy
|
||||
from mitmproxy import log
|
||||
from mitmproxy import controller
|
||||
from mitmproxy import options
|
||||
from mitmproxy.tools import dump
|
||||
|
||||
from mitmproxy.test import tutils
|
||||
|
@ -12,8 +13,8 @@ from .. import tservers
|
|||
|
||||
|
||||
class TestDumpMaster(tservers.MasterTest):
|
||||
def mkmaster(self, flt, **options):
|
||||
o = dump.Options(filtstr=flt, verbosity=-1, flow_detail=0, **options)
|
||||
def mkmaster(self, flt, **opts):
|
||||
o = options.Options(filtstr=flt, verbosity=-1, flow_detail=0, **opts)
|
||||
m = dump.DumpMaster(o, proxy.DummyServer(), with_termlog=False, with_dumper=False)
|
||||
return m
|
||||
|
||||
|
@ -40,13 +41,13 @@ class TestDumpMaster(tservers.MasterTest):
|
|||
@pytest.mark.parametrize("termlog", [False, True])
|
||||
def test_addons_termlog(self, termlog):
|
||||
with mock.patch('sys.stdout'):
|
||||
o = dump.Options()
|
||||
o = options.Options()
|
||||
m = dump.DumpMaster(o, proxy.DummyServer(), with_termlog=termlog)
|
||||
assert (m.addons.get('termlog') is not None) == termlog
|
||||
|
||||
@pytest.mark.parametrize("dumper", [False, True])
|
||||
def test_addons_dumper(self, dumper):
|
||||
with mock.patch('sys.stdout'):
|
||||
o = dump.Options()
|
||||
o = options.Options()
|
||||
m = dump.DumpMaster(o, proxy.DummyServer(), with_dumper=dumper)
|
||||
assert (m.addons.get('dumper') is not None) == dumper
|
||||
|
|
|
@ -16,12 +16,6 @@ class T(TBase):
|
|||
super(T, self).__init__(42)
|
||||
|
||||
|
||||
def test_get_arg_type_from_constructor_annotation():
|
||||
assert typecheck.get_arg_type_from_constructor_annotation(T, "foo") == str
|
||||
assert typecheck.get_arg_type_from_constructor_annotation(T, "bar") == int
|
||||
assert not typecheck.get_arg_type_from_constructor_annotation(T, "baz")
|
||||
|
||||
|
||||
def test_check_type():
|
||||
typecheck.check_type("foo", 42, int)
|
||||
with pytest.raises(TypeError):
|
||||
|
|
Loading…
Reference in New Issue