diff --git a/mitmproxy/options.py b/mitmproxy/options.py index a6ab3d504..561461538 100644 --- a/mitmproxy/options.py +++ b/mitmproxy/options.py @@ -67,6 +67,10 @@ class Options(optmanager.OptManager): regular expression and matched on the ip or the hostname. """ ) + self.add_option( + "allow_hosts", Sequence[str], [], + "Opposite of --ignore-hosts." + ) self.add_option( "listen_host", str, "", "Address to bind proxy to." diff --git a/mitmproxy/proxy/config.py b/mitmproxy/proxy/config.py index f32d30865..75e372ae6 100644 --- a/mitmproxy/proxy/config.py +++ b/mitmproxy/proxy/config.py @@ -14,7 +14,8 @@ CONF_BASENAME = "mitmproxy" class HostMatcher: - def __init__(self, patterns=tuple()): + def __init__(self, handle, patterns=tuple()): + self.handle = handle self.patterns = list(patterns) self.regexes = [re.compile(p, re.IGNORECASE) for p in self.patterns] @@ -22,10 +23,10 @@ class HostMatcher: if not address: return False host = "%s:%s" % address - if any(rex.search(host) for rex in self.regexes): - return True - else: - return False + if self.handle in ["ignore", "tcp"]: + return any(rex.search(host) for rex in self.regexes) + else: # self.handle == "allow" + return any(not rex.search(host) for rex in self.regexes) def __bool__(self): return bool(self.patterns) @@ -36,7 +37,7 @@ class ProxyConfig: def __init__(self, options: moptions.Options) -> None: self.options = options - self.check_ignore: HostMatcher = None + self.check_filter: HostMatcher = None self.check_tcp: HostMatcher = None self.certstore: certs.CertStore = None self.upstream_server: typing.Optional[server_spec.ServerSpec] = None @@ -44,10 +45,18 @@ class ProxyConfig: options.changed.connect(self.configure) def configure(self, options: moptions.Options, updated: typing.Any) -> None: - if "ignore_hosts" in updated: - self.check_ignore = HostMatcher(options.ignore_hosts) + if options.allow_hosts and options.ignore_hosts: + raise exceptions.OptionsError("--ignore-hosts and --allow-hosts are mutually " + "exclusive; please choose one.") + + if options.ignore_hosts: + self.check_filter = HostMatcher("ignore", options.ignore_hosts) + elif options.allow_hosts: + self.check_filter = HostMatcher("allow", options.allow_hosts) + else: + self.check_filter = HostMatcher(False) if "tcp_hosts" in updated: - self.check_tcp = HostMatcher(options.tcp_hosts) + self.check_tcp = HostMatcher("tcp", options.tcp_hosts) certstore_path = os.path.expanduser(options.confdir) if not os.path.exists(os.path.dirname(certstore_path)): diff --git a/mitmproxy/proxy/root_context.py b/mitmproxy/proxy/root_context.py index eb0008cf2..4805f874e 100644 --- a/mitmproxy/proxy/root_context.py +++ b/mitmproxy/proxy/root_context.py @@ -48,17 +48,17 @@ class RootContext: raise exceptions.ProtocolException(str(e)) client_tls = tls.is_tls_record_magic(d) - # 1. check for --ignore - if self.config.check_ignore: - ignore = self.config.check_ignore(top_layer.server_conn.address) - if not ignore and client_tls: + # 1. check for filter + if self.config.check_filter: + is_filtered = self.config.check_filter(top_layer.server_conn.address) + if not is_filtered and client_tls: try: client_hello = tls.ClientHello.from_file(self.client_conn.rfile) except exceptions.TlsProtocolException as e: self.log("Cannot parse Client Hello: %s" % repr(e), "error") else: - ignore = self.config.check_ignore((client_hello.sni, 443)) - if ignore: + is_filtered = self.config.check_filter((client_hello.sni, 443)) + if is_filtered: return protocol.RawTCPLayer(top_layer, ignore=True) # 2. Always insert a TLS layer, even if there's neither client nor server tls. diff --git a/mitmproxy/tools/cmdline.py b/mitmproxy/tools/cmdline.py index eb4a984dc..2c7817fac 100644 --- a/mitmproxy/tools/cmdline.py +++ b/mitmproxy/tools/cmdline.py @@ -57,6 +57,7 @@ def common_options(parser, opts): opts.make_parser(group, "listen_port", metavar="PORT", short="p") opts.make_parser(group, "server", short="n") opts.make_parser(group, "ignore_hosts", metavar="HOST") + opts.make_parser(group, "allow_hosts", metavar="HOST") opts.make_parser(group, "tcp_hosts", metavar="HOST") opts.make_parser(group, "upstream_auth", metavar="USER:PASS") opts.make_parser(group, "proxyauth", metavar="SPEC") diff --git a/mitmproxy/tools/console/statusbar.py b/mitmproxy/tools/console/statusbar.py index 2d32f4875..56f0674f2 100644 --- a/mitmproxy/tools/console/statusbar.py +++ b/mitmproxy/tools/console/statusbar.py @@ -215,6 +215,10 @@ class StatusBar(urwid.WidgetWrap): r.append("[") r.append(("heading_key", "I")) r.append("gnore:%d]" % len(self.master.options.ignore_hosts)) + elif self.master.options.allow_hosts: + r.append("[") + r.append(("heading_key", "A")) + r.append("llow:%d]" % len(self.master.options.allow_hosts)) if self.master.options.tcp_hosts: r.append("[") r.append(("heading_key", "T")) diff --git a/test/mitmproxy/proxy/test_config.py b/test/mitmproxy/proxy/test_config.py index 1da031c66..1319d1a9d 100644 --- a/test/mitmproxy/proxy/test_config.py +++ b/test/mitmproxy/proxy/test_config.py @@ -17,3 +17,12 @@ class TestProxyConfig: opts.certs = [tdata.path("mitmproxy/data/dumpfile-011")] with pytest.raises(exceptions.OptionsError, match="Invalid certificate format"): ProxyConfig(opts) + + def test_cannot_set_both_allow_and_filter_options(self): + opts = options.Options() + opts.ignore_hosts = ["foo"] + opts.allow_hosts = ["bar"] + with pytest.raises(exceptions.OptionsError, match="--ignore-hosts and --allow-hosts are " + "mutually exclusive; please choose " + "one."): + ProxyConfig(opts) diff --git a/test/mitmproxy/proxy/test_server.py b/test/mitmproxy/proxy/test_server.py index 01ab068d5..b5852d607 100644 --- a/test/mitmproxy/proxy/test_server.py +++ b/test/mitmproxy/proxy/test_server.py @@ -78,6 +78,16 @@ class TcpMixin: self.options.ignore_hosts = self._ignore_backup del self._ignore_backup + def _allow_on(self): + assert not hasattr(self, "_allow_backup") + self._allow_backup = self.options.allow_hosts + self.options.allow_hosts = ["(127.0.0.1|None):\\d+"] + self.options.allow_hosts + + def _allow_off(self): + assert hasattr(self, "_allow_backup") + self.options.allow_hosts = self._allow_backup + del self._allow_backup + def test_ignore(self): n = self.pathod("304") self._ignore_on() @@ -111,6 +121,40 @@ class TcpMixin: self._ignore_off() + def test_allow(self): + n = self.pathod("304") + self._allow_on() + i = self.pathod("305") + i2 = self.pathod("306") + self._allow_off() + + assert n.status_code == 304 + assert i.status_code == 305 + assert i2.status_code == 306 + + assert any(f.response.status_code == 304 for f in self.master.state.flows) + assert any(f.response.status_code == 305 for f in self.master.state.flows) + assert any(f.response.status_code == 306 for f in self.master.state.flows) + + # Test that we get the original SSL cert + if self.ssl: + i_cert = certs.Cert(i.sslinfo.certchain[0]) + i2_cert = certs.Cert(i2.sslinfo.certchain[0]) + n_cert = certs.Cert(n.sslinfo.certchain[0]) + + assert i_cert == i2_cert + assert i_cert != n_cert + + # Test Non-HTTP traffic + spec = "200:i0,@100:d0" # this results in just 100 random bytes + # mitmproxy responds with bad gateway + assert self.pathod(spec).status_code == 502 + self._allow_on() + + self.pathod(spec) # pathoc parses answer as HTTP + + self._allow_off() + def _tcpproxy_on(self): assert not hasattr(self, "_tcpproxy_backup") self._tcpproxy_backup = self.options.tcp_hosts @@ -852,10 +896,12 @@ class TestUpstreamProxySSL( def _host_pattern_on(self, attr): """ - Updates config.check_tcp or check_ignore, depending on attr. + Updates config.check_tcp or check_filter, depending on attr. """ assert not hasattr(self, "_ignore_%s_backup" % attr) backup = [] + handle = attr + attr = "filter" if attr in ["allow", "ignore"] else attr for proxy in self.chain: old_matcher = getattr( proxy.tmaster.server.config, @@ -865,12 +911,13 @@ class TestUpstreamProxySSL( setattr( proxy.tmaster.server.config, "check_%s" % attr, - HostMatcher([".+:%s" % self.server.port] + old_matcher.patterns) + HostMatcher(handle, [".+:%s" % self.server.port] + old_matcher.patterns) ) setattr(self, "_ignore_%s_backup" % attr, backup) def _host_pattern_off(self, attr): + attr = "filter" if attr in ["allow", "ignore"] else attr backup = getattr(self, "_ignore_%s_backup" % attr) for proxy in reversed(self.chain): setattr(