diff --git a/mitmproxy/addons/next_layer.py b/mitmproxy/addons/next_layer.py index b1cb179f5..0404bb597 100644 --- a/mitmproxy/addons/next_layer.py +++ b/mitmproxy/addons/next_layer.py @@ -155,23 +155,26 @@ class NextLayer: for rex in hosts ) - def is_reverse_proxy_scheme(self, context: context.Context, *args: str): + def get_http_layer(self, context: context.Context) -> Optional[layers.HttpLayer]: def s(*layers): return stack_match(context, layers) - # we allow all possible security layer combinations and rely on the correctness of ReverseProxy - return ( - ( - s(modes.ReverseProxy) - or - s(modes.ReverseProxy, layers.ClientTLSLayer) - or - s(modes.ReverseProxy, layers.ServerTLSLayer) - or - s(modes.ReverseProxy, layers.ServerTLSLayer, layers.ClientTLSLayer) - ) - and cast(mode_specs.ReverseMode, context.client.proxy_mode).scheme in args - ) + # Setup the HTTP layer for a regular HTTP proxy ... + if ( + s(modes.HttpProxy) + or + # or a "Secure Web Proxy", see https://www.chromium.org/developers/design-documents/secure-web-proxy + s(modes.HttpProxy, (layers.ClientTLSLayer, layers.ClientQuicLayer)) + ): + return layers.HttpLayer(context, HTTPMode.regular) + # ... or an upstream proxy. + if ( + s(modes.HttpUpstreamProxy) + or + s(modes.HttpUpstreamProxy, (layers.ClientTLSLayer, layers.ClientQuicLayer)) + ): + return layers.HttpLayer(context, HTTPMode.upstream) + return None def detect_udp_tls(self, data_client: bytes) -> Optional[tuple[ClientHello, ClientSecurityLayerCls, ServerSecurityLayerCls]]: if len(data_client) == 0: @@ -195,6 +198,36 @@ class NextLayer: # that's all we currently have to offer return None + def raw_udp_layer(self, context: context.Context, ignore: bool = False) -> layer.Layer: + def s(*layers): + return stack_match(context, layers) + + # for regular and upstream HTTP3, if we already created a client QUIC layer + # we need a server and raw QUIC layer as well + if ( + s(modes.HttpProxy, layers.ClientQuicLayer) + or + s(modes.HttpUpstreamProxy, layers.ClientQuicLayer) + ): + server_layer = layers.ServerQuicLayer(context) + server_layer.child_layer = layers.RawQuicLayer(context, ignore=ignore) + return server_layer + + # for reverse HTTP3 and QUIC, we need a client and raw QUIC layer + elif (s(modes.ReverseProxy, layers.ServerQuicLayer)): + client_layer = layers.ClientQuicLayer(context) + client_layer.child_layer = layers.RawQuicLayer(context, ignore=ignore) + return client_layer + + # in other cases we assume `setup_tls_layer` happened, so if the + # top layer is `ClientQuicLayer` we return a raw QUIC layer... + elif isinstance(context.layers[-1], layers.ClientQuicLayer): + return layers.RawQuicLayer(context, ignore=ignore) + + # ... otherwise an UDP layer + else: + return layers.UDPLayer(context, ignore=ignore) + def next_layer(self, nextlayer: layer.NextLayer): if nextlayer.layer is None: nextlayer.layer = self._next_layer( @@ -208,10 +241,6 @@ class NextLayer: ) -> Optional[layer.Layer]: assert context.layers - # helper function to quickly check if the existing layer stack matches a particular configuration. - def s(*layers): - return stack_match(context, layers) - if context.client.transport_protocol == "tcp": if ( len(data_client) < 3 @@ -231,35 +260,15 @@ class NextLayer: if is_tls_record_magic(data_client): return self.setup_tls_layer(context) - # 3. Setup the HTTP layer for a regular HTTP proxy - if ( - s(modes.HttpProxy) - or - # or a "Secure Web Proxy", see https://www.chromium.org/developers/design-documents/secure-web-proxy - s(modes.HttpProxy, layers.ClientTLSLayer) - ): - return layers.HttpLayer(context, HTTPMode.regular) - # 3b. ... or an upstream proxy. - if ( - s(modes.HttpUpstreamProxy) - or - s(modes.HttpUpstreamProxy, layers.ClientTLSLayer) - ): - return layers.HttpLayer(context, HTTPMode.upstream) + # 3. Check for HTTP + if http_layer := self.get_http_layer(context): + return http_layer # 4. Check for --tcp if self.is_destination_in_hosts(context, self.tcp_hosts): return layers.TCPLayer(context) - # 5. Check for raw reverse mode. - if self.is_reverse_proxy_scheme(context, "tcp", "tls"): - return layers.TCPLayer(context) - # NOTE at this point we are either - # - in http or https reverse mode - # - at the top level of a non-reverse/regular/upstream mode - # - at a deeper layer nesting level - - # 6. Check for raw tcp mode. + # 5. Check for raw tcp mode. very_likely_http = context.client.alpn and context.client.alpn in HTTP_ALPNS probably_no_http = not very_likely_http and ( not data_client[ @@ -270,27 +279,12 @@ class NextLayer: if ctx.options.rawtcp and probably_no_http: return layers.TCPLayer(context) - # 7. Assume HTTP by default. + # 6. Assume HTTP by default. return layers.HttpLayer(context, HTTPMode.transparent) elif context.client.transport_protocol == "udp": - # for http3, upstream:http3 and reverse:quic/http3 proxies, there has to be a client quic layer - if ( - s(modes.HttpProxy) - or - s(modes.HttpUpstreamProxy) - or - s(modes.ReverseProxy, layers.ServerQuicLayer) - ): - return layers.ClientQuicLayer(context) - # unlike TCP, we make a decision immediately tls = self.detect_udp_tls(data_client) - raw_layer_cls = ( - layers.RawQuicLayer - if isinstance(context.layers[-1], layers.ClientQuicLayer) else - layers.UDPLayer - ) # 1. check for --ignore/--allow if self.ignore_connection( @@ -299,43 +293,42 @@ class NextLayer: is_tls=lambda _: tls is not None, client_hello=lambda _: None if tls is None else tls[0] ): - return raw_layer_cls(context, ignore=True) + return self.raw_udp_layer(context, ignore=True) # 2. Check for DTLS/QUIC if tls is not None: _, client_layer_cls, server_layer_cls = tls return self.setup_tls_layer(context, client_layer_cls, server_layer_cls) - # 3. Setup the HTTP layer for a regular HTTP proxy - if s(modes.HttpProxy, layers.ClientQuicLayer): - return layers.HttpLayer(context, HTTPMode.regular) - # 3b. ... or an upstream proxy. - if s(modes.HttpUpstreamProxy, layers.ClientQuicLayer): - return layers.HttpLayer(context, HTTPMode.upstream) + # 3. Check for HTTP + if http_layer := self.get_http_layer(context): + return http_layer # 4. Check for --udp if self.is_destination_in_hosts(context, self.udp_hosts): - return raw_layer_cls(context) + return self.raw_udp_layer(context) - # 5. Check for raw reverse mode. - if self.is_reverse_proxy_scheme(context, "udp", "dtls"): - return layers.UDPLayer(context) - - # 6. Check for explicit QUIC reverse modes - if (s(modes.ReverseProxy, layers.ServerQuicLayer, layers.ClientQuicLayer)): + # 5. Check for reverse modes + if (isinstance(context.layers[0], modes.ReverseProxy)): scheme = cast(mode_specs.ReverseMode, context.client.proxy_mode).scheme - if scheme == "quic": - return layers.RawQuicLayer(context) - if scheme == "http3": + if scheme in ("udp", "dtls"): + return layers.UDPLayer(context) + elif scheme == "http3": return layers.HttpLayer(context, HTTPMode.transparent) - # 6b. ... or DNS mode - if self.is_reverse_proxy_scheme(context, "dns"): - return layers.DNSLayer(context) - # NOTE at this point we are either - # - at the top level of a non-reverse/regular/upstream mode - # - at a deeper layer nesting level + elif scheme == "quic": + # if the client supports QUIC, we use QUIC raw layer, + # otherwise we only use the QUIC datagram only + return ( + layers.RawQuicLayer(context) + if isinstance(context.layers[-1], layers.ClientQuicLayer) else + layers.UDPLayer(context) + ) + elif scheme == "dns": + return layers.DNSLayer(context) + else: + raise AssertionError(scheme) - # 7. Check for DNS + # 6. Check for DNS try: dns.Message.unpack(data_client) except struct.error: @@ -343,8 +336,8 @@ class NextLayer: else: return layers.DNSLayer(context) - # 8. Use raw mode. - return raw_layer_cls(context) + # 7. Use raw mode. + return self.raw_udp_layer(context) else: raise AssertionError(context.client.transport_protocol) diff --git a/test/mitmproxy/addons/test_next_layer.py b/test/mitmproxy/addons/test_next_layer.py index 741f47fe1..ced57924b 100644 --- a/test/mitmproxy/addons/test_next_layer.py +++ b/test/mitmproxy/addons/test_next_layer.py @@ -46,6 +46,39 @@ dtls_client_hello_with_extensions = bytes.fromhex( ) +quic_client_hello = bytes.fromhex( + "ca0000000108c0618c84b54541320823fcce946c38d8210044e6a93bbb283593f75ffb6f2696b16cfdcb5b1255" + "577b2af5fc5894188c9568bc65eef253faf7f0520e41341cfa81d6aae573586665ce4e1e41676364820402feec" + "a81f3d22dbb476893422069066104a43e121c951a08c53b83f960becf99cf5304d5bc5346f52f472bd1a04d192" + "0bae025064990d27e5e4c325ac46121d3acadebe7babdb96192fb699693d65e2b2e21c53beeb4f40b50673a2f6" + "c22091cb7c76a845384fedee58df862464d1da505a280bfef91ca83a10bebbcb07855219dbc14aecf8a48da049" + "d03c77459b39d5355c95306cd03d6bdb471694fa998ca3b1f875ce87915b88ead15c5d6313a443f39aad808922" + "57ddfa6b4a898d773bb6fb520ede47ebd59d022431b1054a69e0bbbdf9f0fb32fc8bcc4b6879dd8cd5389474b1" + "99e18333e14d0347740a11916429a818bb8d93295d36e99840a373bb0e14c8b3adcf5e2165e70803f15316fd5e" + "5eeec04ae68d98f1adb22c54611c80fcd8ece619dbdf97b1510032ec374b7a71f94d9492b8b8cb56f56556dd97" + "edf1e50fa90e868ff93636a365678bdf3ee3f8e632588cd506b6f44fbfd4d99988238fbd5884c98f6a124108c1" + "878970780e42b111e3be6215776ef5be5a0205915e6d720d22c6a81a475c9e41ba94e4983b964cb5c8e1f40607" + "76d1d8d1adcef7587ea084231016bd6ee2643d11a3a35eb7fe4cca2b3f1a4b21e040b0d426412cca6c4271ea63" + "fb54ed7f57b41cd1af1be5507f87ea4f4a0c997367e883291de2f1b8a49bdaa52bae30064351b1139703400730" + "18a4104344ec6b4454b50a42e804bc70e78b9b3c82497273859c82ed241b643642d76df6ceab8f916392113a62" + "b231f228c7300624d74a846bec2f479ab8a8c3461f91c7bf806236e3bd2f54ba1ef8e2a1e0bfdde0c5ad227f7d" + "364c52510b1ade862ce0c8d7bd24b6d7d21c99b34de6d177eb3d575787b2af55060d76d6c2060befbb7953a816" + "6f66ad88ecf929dbb0ad3a16cf7dfd39d925e0b4b649c6d0c07ad46ed0229c17fb6a1395f16e1b138aab3af760" + "2b0ac762c4f611f7f3468997224ffbe500a7c53f92f65e41a3765a9f1d7e3f78208f5b4e147962d8c97d6c1a80" + "91ffc36090b2043d71853616f34c2185dc883c54ab6d66e10a6c18e0b9a4742597361f8554a42da3373241d0c8" + "54119bfadccffaf2335b2d97ffee627cb891bda8140a39399f853da4859f7e19682e152243efbaffb662edd19b" + "3819a74107c7dbe05ecb32e79dcdb1260f153b1ef133e978ccca3d9e400a7ed6c458d77e2956d2cb897b7a298b" + "fe144b5defdc23dfd2adf69f1fb0917840703402d524987ae3b1dcb85229843c9a419ef46e1ba0ba7783f2a2ec" + "d057a57518836aef2a7839ebd3688da98b54c942941f642e434727108d59ea25875b3050ca53d4637c76cbcbb9" + "e972c2b0b781131ee0a1403138b55486fe86bbd644920ee6aa578e3bab32d7d784b5c140295286d90c99b14823" + "1487f7ea64157001b745aa358c9ea6bec5a8d8b67a7534ec1f7648ff3b435911dfc3dff798d32fbf2efe2c1fcc" + "278865157590572387b76b78e727d3e7682cb501cdcdf9a0f17676f99d9aa67f10edccc9a92080294e88bf28c2" + "a9f32ae535fdb27fff7706540472abb9eab90af12b2bea005da189874b0ca69e6ae1690a6f2adf75be3853c94e" + "fd8098ed579c20cb37be6885d8d713af4ba52958cee383089b98ed9cb26e11127cf88d1b7d254f15f7903dd7ed" + "297c0013924e88248684fe8f2098326ce51aa6e5" +) + + class TestNextLayer: def test_configure(self): nl = NextLayer() @@ -147,44 +180,180 @@ class TestNextLayer: assert isinstance(nl._next_layer(ctx, b"GET /foo", b""), layers.HttpLayer) assert isinstance(nl._next_layer(ctx, b"", b"hello"), layers.TCPLayer) - def test_next_layer_udp(self): + @pytest.mark.parametrize( + ("client_hello", "client_layer", "server_layer"), + [ + (dtls_client_hello_with_extensions, layers.ClientTLSLayer, layers.ServerTLSLayer), + (quic_client_hello, layers.ClientQuicLayer, layers.ServerQuicLayer), + ] + ) + def test_next_layer_udp( + self, + client_hello: bytes, + client_layer: layer.Layer, + server_layer: layer.Layer, + ): def is_ignored_udp(layer: Optional[layer.Layer]): return isinstance(layer, layers.UDPLayer) and layer.flow is None def is_intercepted_udp(layer: Optional[layer.Layer]): return isinstance(layer, layers.UDPLayer) and layer.flow is not None + def is_http(layer: Optional[layer.Layer], mode: HTTPMode): + return ( + isinstance(layer, layers.HttpLayer) + and layer.mode is mode + ) + nl = NextLayer() ctx = MagicMock() ctx.client.alpn = None ctx.server.address = ("example.com", 443) ctx.client.transport_protocol = "udp" with taddons.context(nl) as tctx: - ctx.layers = [layers.modes.HttpProxy(ctx)] - assert is_intercepted_udp(nl._next_layer(ctx, b"", b"")) + ctx.layers = [layers.modes.HttpProxy(ctx), client_layer(ctx)] + assert is_http(nl._next_layer(ctx, b"", b""), HTTPMode.regular) - ctx.layers = [layers.modes.HttpProxy(ctx)] + ctx.layers = [layers.modes.HttpUpstreamProxy(ctx), client_layer(ctx)] + assert is_http(nl._next_layer(ctx, b"", b""), HTTPMode.upstream) + + ctx.layers = [layers.modes.TransparentProxy(ctx)] + is_intercepted_udp(nl._next_layer(ctx, b"", b"")) + + ctx.layers = [layers.modes.TransparentProxy(ctx)] ctx.server.address = ("nomatch.com", 443) tctx.configure(nl, ignore_hosts=["example.com"]) - assert is_intercepted_udp(nl._next_layer(ctx, dtls_client_hello_with_extensions[:50], b"")) - assert is_ignored_udp(nl._next_layer(ctx, dtls_client_hello_with_extensions, b"")) + assert is_intercepted_udp(nl._next_layer(ctx, client_hello[:50], b"")) + assert is_ignored_udp(nl._next_layer(ctx, client_hello, b"")) - ctx.layers = [layers.modes.HttpProxy(ctx)] + ctx.layers = [layers.modes.TransparentProxy(ctx)] ctx.server.address = ("example.com", 443) - assert is_ignored_udp(nl._next_layer(ctx, dtls_client_hello_with_extensions[:50], b"")) + assert is_ignored_udp(nl._next_layer(ctx, client_hello[:50], b"")) - ctx.layers = [layers.modes.HttpProxy(ctx)] + ctx.layers = [layers.modes.TransparentProxy(ctx)] tctx.configure(nl, ignore_hosts=[]) - assert isinstance(nl._next_layer(ctx, dtls_client_hello_with_extensions, b""), layers.ClientTLSLayer) + decision = nl._next_layer(ctx, client_hello, b"") + assert isinstance(decision, server_layer) + assert isinstance(decision.child_layer, client_layer) - ctx.layers = [layers.modes.HttpProxy(ctx)] + ctx.layers = [layers.modes.ReverseProxy(ctx), server_layer(ctx)] + tctx.configure(nl, ignore_hosts=[]) + assert isinstance(nl._next_layer(ctx, client_hello, b""), client_layer) + + ctx.layers = [layers.modes.TransparentProxy(ctx)] tctx.configure(nl, udp_hosts=["example.com"]) assert isinstance(nl._next_layer(ctx, tflow.tdnsreq().packed, b""), layers.UDPLayer) - ctx.layers = [layers.modes.HttpProxy(ctx)] + ctx.layers = [layers.modes.TransparentProxy(ctx)] tctx.configure(nl, udp_hosts=[]) assert isinstance(nl._next_layer(ctx, tflow.tdnsreq().packed, b""), layers.DNSLayer) + def test_next_layer_reverse_raw(self): + nl = NextLayer() + ctx = MagicMock() + ctx.client.alpn = None + ctx.server.address = ("example.com", 443) + ctx.client.transport_protocol = "udp" + with taddons.context(nl) as tctx: + tctx.configure(nl, ignore_hosts=["example.com"]) + + ctx.layers = [layers.modes.HttpProxy(ctx), layers.ClientQuicLayer(ctx)] + decision = nl._next_layer(ctx, b"", b"") + assert isinstance(decision, layers.ServerQuicLayer) + assert isinstance(decision.child_layer, layers.RawQuicLayer) + + ctx.layers = [layers.modes.ReverseProxy(ctx), layers.ServerQuicLayer(ctx), layers.ClientQuicLayer(ctx)] + assert isinstance(nl._next_layer(ctx, b"", b""), layers.RawQuicLayer) + + ctx.layers = [layers.modes.ReverseProxy(ctx), layers.ServerQuicLayer(ctx)] + decision = nl._next_layer(ctx, b"", b"") + assert isinstance(decision, layers.ClientQuicLayer) + assert isinstance(decision.child_layer, layers.RawQuicLayer) + + tctx.configure(nl, ignore_hosts=[]) + + def test_next_layer_reverse_quic_mode(self): + nl = NextLayer() + ctx = MagicMock() + ctx.client.alpn = None + ctx.server.address = ("example.com", 443) + ctx.client.transport_protocol = "udp" + ctx.client.proxy_mode.scheme = "quic" + ctx.layers = [ + layers.modes.ReverseProxy(ctx), + layers.ServerQuicLayer(ctx), + layers.ClientQuicLayer(ctx), + ] + assert isinstance(nl._next_layer(ctx, b"", b""), layers.RawQuicLayer) + ctx.layers = [ + layers.modes.ReverseProxy(ctx), + layers.ServerQuicLayer(ctx), + ] + assert isinstance(nl._next_layer(ctx, b"", b""), layers.UDPLayer) + + def test_next_layer_reverse_http3_mode(self): + nl = NextLayer() + ctx = MagicMock() + ctx.client.alpn = None + ctx.server.address = ("example.com", 443) + ctx.client.transport_protocol = "udp" + ctx.client.proxy_mode.scheme = "http3" + ctx.layers = [ + layers.modes.ReverseProxy(ctx), + layers.ServerQuicLayer(ctx), + layers.ClientQuicLayer(ctx), + ] + decision = nl._next_layer(ctx, b"", b"") + assert isinstance(decision, layers.HttpLayer) + assert decision.mode is HTTPMode.transparent + + def test_next_layer_reverse_invalid_mode(self): + nl = NextLayer() + ctx = MagicMock() + ctx.client.alpn = None + ctx.server.address = ("example.com", 443) + ctx.client.transport_protocol = "udp" + ctx.client.proxy_mode.scheme = "invalidscheme" + ctx.layers = [layers.modes.ReverseProxy(ctx)] + with pytest.raises(AssertionError, match="invalidscheme"): + nl._next_layer(ctx, b"", b"") + + def test_next_layer_reverse_dtls_mode(self): + nl = NextLayer() + ctx = MagicMock() + ctx.client.alpn = None + ctx.server.address = ("example.com", 443) + ctx.client.transport_protocol = "udp" + ctx.client.proxy_mode.scheme = "dtls" + ctx.layers = [layers.modes.ReverseProxy(ctx), layers.ServerTLSLayer(ctx)] + assert isinstance(nl._next_layer(ctx, b"", b""), layers.UDPLayer) + ctx.layers = [layers.modes.ReverseProxy(ctx), layers.ServerTLSLayer(ctx), layers.ClientTLSLayer(ctx)] + assert isinstance(nl._next_layer(ctx, b"", b""), layers.UDPLayer) + + def test_next_layer_reverse_udp_mode(self): + nl = NextLayer() + ctx = MagicMock() + ctx.client.alpn = None + ctx.server.address = ("example.com", 443) + ctx.client.transport_protocol = "udp" + ctx.client.proxy_mode.scheme = "udp" + ctx.layers = [layers.modes.ReverseProxy(ctx)] + assert isinstance(nl._next_layer(ctx, b"", b""), layers.UDPLayer) + ctx.layers = [layers.modes.ReverseProxy(ctx), layers.ClientTLSLayer(ctx)] + assert isinstance(nl._next_layer(ctx, b"", b""), layers.UDPLayer) + + def test_next_layer_reverse_dns_mode(self): + nl = NextLayer() + ctx = MagicMock() + ctx.client.alpn = None + ctx.server.address = ("example.com", 443) + ctx.client.transport_protocol = "udp" + ctx.client.proxy_mode.scheme = "dns" + ctx.layers = [layers.modes.ReverseProxy(ctx)] + assert isinstance(nl._next_layer(ctx, b"", b""), layers.DNSLayer) + ctx.layers = [layers.modes.ReverseProxy(ctx), layers.ClientTLSLayer(ctx)] + assert isinstance(nl._next_layer(ctx, b"", b""), layers.DNSLayer) + def test_next_layer_invalid_proto(self): nl = NextLayer() ctx = MagicMock()