Merge pull request #3526 from pierlon/feature/allow-hosts
Add --allow_hosts option
This commit is contained in:
commit
26e55b0a7f
|
@ -67,6 +67,10 @@ class Options(optmanager.OptManager):
|
|||
regular expression and matched on the ip or the hostname.
|
||||
"""
|
||||
)
|
||||
self.add_option(
|
||||
"allow_hosts", Sequence[str], [],
|
||||
"Opposite of --ignore-hosts."
|
||||
)
|
||||
self.add_option(
|
||||
"listen_host", str, "",
|
||||
"Address to bind proxy to."
|
||||
|
|
|
@ -14,7 +14,8 @@ CONF_BASENAME = "mitmproxy"
|
|||
|
||||
class HostMatcher:
|
||||
|
||||
def __init__(self, patterns=tuple()):
|
||||
def __init__(self, handle, patterns=tuple()):
|
||||
self.handle = handle
|
||||
self.patterns = list(patterns)
|
||||
self.regexes = [re.compile(p, re.IGNORECASE) for p in self.patterns]
|
||||
|
||||
|
@ -22,10 +23,10 @@ class HostMatcher:
|
|||
if not address:
|
||||
return False
|
||||
host = "%s:%s" % address
|
||||
if any(rex.search(host) for rex in self.regexes):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
if self.handle in ["ignore", "tcp"]:
|
||||
return any(rex.search(host) for rex in self.regexes)
|
||||
else: # self.handle == "allow"
|
||||
return any(not rex.search(host) for rex in self.regexes)
|
||||
|
||||
def __bool__(self):
|
||||
return bool(self.patterns)
|
||||
|
@ -36,7 +37,7 @@ class ProxyConfig:
|
|||
def __init__(self, options: moptions.Options) -> None:
|
||||
self.options = options
|
||||
|
||||
self.check_ignore: HostMatcher = None
|
||||
self.check_filter: HostMatcher = None
|
||||
self.check_tcp: HostMatcher = None
|
||||
self.certstore: certs.CertStore = None
|
||||
self.upstream_server: typing.Optional[server_spec.ServerSpec] = None
|
||||
|
@ -44,10 +45,18 @@ class ProxyConfig:
|
|||
options.changed.connect(self.configure)
|
||||
|
||||
def configure(self, options: moptions.Options, updated: typing.Any) -> None:
|
||||
if "ignore_hosts" in updated:
|
||||
self.check_ignore = HostMatcher(options.ignore_hosts)
|
||||
if options.allow_hosts and options.ignore_hosts:
|
||||
raise exceptions.OptionsError("--ignore-hosts and --allow-hosts are mutually "
|
||||
"exclusive; please choose one.")
|
||||
|
||||
if options.ignore_hosts:
|
||||
self.check_filter = HostMatcher("ignore", options.ignore_hosts)
|
||||
elif options.allow_hosts:
|
||||
self.check_filter = HostMatcher("allow", options.allow_hosts)
|
||||
else:
|
||||
self.check_filter = HostMatcher(False)
|
||||
if "tcp_hosts" in updated:
|
||||
self.check_tcp = HostMatcher(options.tcp_hosts)
|
||||
self.check_tcp = HostMatcher("tcp", options.tcp_hosts)
|
||||
|
||||
certstore_path = os.path.expanduser(options.confdir)
|
||||
if not os.path.exists(os.path.dirname(certstore_path)):
|
||||
|
|
|
@ -48,17 +48,17 @@ class RootContext:
|
|||
raise exceptions.ProtocolException(str(e))
|
||||
client_tls = tls.is_tls_record_magic(d)
|
||||
|
||||
# 1. check for --ignore
|
||||
if self.config.check_ignore:
|
||||
ignore = self.config.check_ignore(top_layer.server_conn.address)
|
||||
if not ignore and client_tls:
|
||||
# 1. check for filter
|
||||
if self.config.check_filter:
|
||||
is_filtered = self.config.check_filter(top_layer.server_conn.address)
|
||||
if not is_filtered and client_tls:
|
||||
try:
|
||||
client_hello = tls.ClientHello.from_file(self.client_conn.rfile)
|
||||
except exceptions.TlsProtocolException as e:
|
||||
self.log("Cannot parse Client Hello: %s" % repr(e), "error")
|
||||
else:
|
||||
ignore = self.config.check_ignore((client_hello.sni, 443))
|
||||
if ignore:
|
||||
is_filtered = self.config.check_filter((client_hello.sni, 443))
|
||||
if is_filtered:
|
||||
return protocol.RawTCPLayer(top_layer, ignore=True)
|
||||
|
||||
# 2. Always insert a TLS layer, even if there's neither client nor server tls.
|
||||
|
|
|
@ -57,6 +57,7 @@ def common_options(parser, opts):
|
|||
opts.make_parser(group, "listen_port", metavar="PORT", short="p")
|
||||
opts.make_parser(group, "server", short="n")
|
||||
opts.make_parser(group, "ignore_hosts", metavar="HOST")
|
||||
opts.make_parser(group, "allow_hosts", metavar="HOST")
|
||||
opts.make_parser(group, "tcp_hosts", metavar="HOST")
|
||||
opts.make_parser(group, "upstream_auth", metavar="USER:PASS")
|
||||
opts.make_parser(group, "proxyauth", metavar="SPEC")
|
||||
|
|
|
@ -215,6 +215,10 @@ class StatusBar(urwid.WidgetWrap):
|
|||
r.append("[")
|
||||
r.append(("heading_key", "I"))
|
||||
r.append("gnore:%d]" % len(self.master.options.ignore_hosts))
|
||||
elif self.master.options.allow_hosts:
|
||||
r.append("[")
|
||||
r.append(("heading_key", "A"))
|
||||
r.append("llow:%d]" % len(self.master.options.allow_hosts))
|
||||
if self.master.options.tcp_hosts:
|
||||
r.append("[")
|
||||
r.append(("heading_key", "T"))
|
||||
|
|
|
@ -17,3 +17,12 @@ class TestProxyConfig:
|
|||
opts.certs = [tdata.path("mitmproxy/data/dumpfile-011")]
|
||||
with pytest.raises(exceptions.OptionsError, match="Invalid certificate format"):
|
||||
ProxyConfig(opts)
|
||||
|
||||
def test_cannot_set_both_allow_and_filter_options(self):
|
||||
opts = options.Options()
|
||||
opts.ignore_hosts = ["foo"]
|
||||
opts.allow_hosts = ["bar"]
|
||||
with pytest.raises(exceptions.OptionsError, match="--ignore-hosts and --allow-hosts are "
|
||||
"mutually exclusive; please choose "
|
||||
"one."):
|
||||
ProxyConfig(opts)
|
||||
|
|
|
@ -78,6 +78,16 @@ class TcpMixin:
|
|||
self.options.ignore_hosts = self._ignore_backup
|
||||
del self._ignore_backup
|
||||
|
||||
def _allow_on(self):
|
||||
assert not hasattr(self, "_allow_backup")
|
||||
self._allow_backup = self.options.allow_hosts
|
||||
self.options.allow_hosts = ["(127.0.0.1|None):\\d+"] + self.options.allow_hosts
|
||||
|
||||
def _allow_off(self):
|
||||
assert hasattr(self, "_allow_backup")
|
||||
self.options.allow_hosts = self._allow_backup
|
||||
del self._allow_backup
|
||||
|
||||
def test_ignore(self):
|
||||
n = self.pathod("304")
|
||||
self._ignore_on()
|
||||
|
@ -111,6 +121,40 @@ class TcpMixin:
|
|||
|
||||
self._ignore_off()
|
||||
|
||||
def test_allow(self):
|
||||
n = self.pathod("304")
|
||||
self._allow_on()
|
||||
i = self.pathod("305")
|
||||
i2 = self.pathod("306")
|
||||
self._allow_off()
|
||||
|
||||
assert n.status_code == 304
|
||||
assert i.status_code == 305
|
||||
assert i2.status_code == 306
|
||||
|
||||
assert any(f.response.status_code == 304 for f in self.master.state.flows)
|
||||
assert any(f.response.status_code == 305 for f in self.master.state.flows)
|
||||
assert any(f.response.status_code == 306 for f in self.master.state.flows)
|
||||
|
||||
# Test that we get the original SSL cert
|
||||
if self.ssl:
|
||||
i_cert = certs.Cert(i.sslinfo.certchain[0])
|
||||
i2_cert = certs.Cert(i2.sslinfo.certchain[0])
|
||||
n_cert = certs.Cert(n.sslinfo.certchain[0])
|
||||
|
||||
assert i_cert == i2_cert
|
||||
assert i_cert != n_cert
|
||||
|
||||
# Test Non-HTTP traffic
|
||||
spec = "200:i0,@100:d0" # this results in just 100 random bytes
|
||||
# mitmproxy responds with bad gateway
|
||||
assert self.pathod(spec).status_code == 502
|
||||
self._allow_on()
|
||||
|
||||
self.pathod(spec) # pathoc parses answer as HTTP
|
||||
|
||||
self._allow_off()
|
||||
|
||||
def _tcpproxy_on(self):
|
||||
assert not hasattr(self, "_tcpproxy_backup")
|
||||
self._tcpproxy_backup = self.options.tcp_hosts
|
||||
|
@ -852,10 +896,12 @@ class TestUpstreamProxySSL(
|
|||
|
||||
def _host_pattern_on(self, attr):
|
||||
"""
|
||||
Updates config.check_tcp or check_ignore, depending on attr.
|
||||
Updates config.check_tcp or check_filter, depending on attr.
|
||||
"""
|
||||
assert not hasattr(self, "_ignore_%s_backup" % attr)
|
||||
backup = []
|
||||
handle = attr
|
||||
attr = "filter" if attr in ["allow", "ignore"] else attr
|
||||
for proxy in self.chain:
|
||||
old_matcher = getattr(
|
||||
proxy.tmaster.server.config,
|
||||
|
@ -865,12 +911,13 @@ class TestUpstreamProxySSL(
|
|||
setattr(
|
||||
proxy.tmaster.server.config,
|
||||
"check_%s" % attr,
|
||||
HostMatcher([".+:%s" % self.server.port] + old_matcher.patterns)
|
||||
HostMatcher(handle, [".+:%s" % self.server.port] + old_matcher.patterns)
|
||||
)
|
||||
|
||||
setattr(self, "_ignore_%s_backup" % attr, backup)
|
||||
|
||||
def _host_pattern_off(self, attr):
|
||||
attr = "filter" if attr in ["allow", "ignore"] else attr
|
||||
backup = getattr(self, "_ignore_%s_backup" % attr)
|
||||
for proxy in reversed(self.chain):
|
||||
setattr(
|
||||
|
|
Loading…
Reference in New Issue