[quic] next layer work
This commit is contained in:
parent
c552455ccd
commit
f8dc5a6683
|
@ -155,23 +155,26 @@ class NextLayer:
|
|||
for rex in hosts
|
||||
)
|
||||
|
||||
def is_reverse_proxy_scheme(self, context: context.Context, *args: str):
|
||||
def get_http_layer(self, context: context.Context) -> Optional[layers.HttpLayer]:
|
||||
def s(*layers):
|
||||
return stack_match(context, layers)
|
||||
|
||||
# we allow all possible security layer combinations and rely on the correctness of ReverseProxy
|
||||
return (
|
||||
(
|
||||
s(modes.ReverseProxy)
|
||||
or
|
||||
s(modes.ReverseProxy, layers.ClientTLSLayer)
|
||||
or
|
||||
s(modes.ReverseProxy, layers.ServerTLSLayer)
|
||||
or
|
||||
s(modes.ReverseProxy, layers.ServerTLSLayer, layers.ClientTLSLayer)
|
||||
)
|
||||
and cast(mode_specs.ReverseMode, context.client.proxy_mode).scheme in args
|
||||
)
|
||||
# Setup the HTTP layer for a regular HTTP proxy ...
|
||||
if (
|
||||
s(modes.HttpProxy)
|
||||
or
|
||||
# or a "Secure Web Proxy", see https://www.chromium.org/developers/design-documents/secure-web-proxy
|
||||
s(modes.HttpProxy, (layers.ClientTLSLayer, layers.ClientQuicLayer))
|
||||
):
|
||||
return layers.HttpLayer(context, HTTPMode.regular)
|
||||
# ... or an upstream proxy.
|
||||
if (
|
||||
s(modes.HttpUpstreamProxy)
|
||||
or
|
||||
s(modes.HttpUpstreamProxy, (layers.ClientTLSLayer, layers.ClientQuicLayer))
|
||||
):
|
||||
return layers.HttpLayer(context, HTTPMode.upstream)
|
||||
return None
|
||||
|
||||
def detect_udp_tls(self, data_client: bytes) -> Optional[tuple[ClientHello, ClientSecurityLayerCls, ServerSecurityLayerCls]]:
|
||||
if len(data_client) == 0:
|
||||
|
@ -195,6 +198,36 @@ class NextLayer:
|
|||
# that's all we currently have to offer
|
||||
return None
|
||||
|
||||
def raw_udp_layer(self, context: context.Context, ignore: bool = False) -> layer.Layer:
|
||||
def s(*layers):
|
||||
return stack_match(context, layers)
|
||||
|
||||
# for regular and upstream HTTP3, if we already created a client QUIC layer
|
||||
# we need a server and raw QUIC layer as well
|
||||
if (
|
||||
s(modes.HttpProxy, layers.ClientQuicLayer)
|
||||
or
|
||||
s(modes.HttpUpstreamProxy, layers.ClientQuicLayer)
|
||||
):
|
||||
server_layer = layers.ServerQuicLayer(context)
|
||||
server_layer.child_layer = layers.RawQuicLayer(context, ignore=ignore)
|
||||
return server_layer
|
||||
|
||||
# for reverse HTTP3 and QUIC, we need a client and raw QUIC layer
|
||||
elif (s(modes.ReverseProxy, layers.ServerQuicLayer)):
|
||||
client_layer = layers.ClientQuicLayer(context)
|
||||
client_layer.child_layer = layers.RawQuicLayer(context, ignore=ignore)
|
||||
return client_layer
|
||||
|
||||
# in other cases we assume `setup_tls_layer` happened, so if the
|
||||
# top layer is `ClientQuicLayer` we return a raw QUIC layer...
|
||||
elif isinstance(context.layers[-1], layers.ClientQuicLayer):
|
||||
return layers.RawQuicLayer(context, ignore=ignore)
|
||||
|
||||
# ... otherwise an UDP layer
|
||||
else:
|
||||
return layers.UDPLayer(context, ignore=ignore)
|
||||
|
||||
def next_layer(self, nextlayer: layer.NextLayer):
|
||||
if nextlayer.layer is None:
|
||||
nextlayer.layer = self._next_layer(
|
||||
|
@ -208,10 +241,6 @@ class NextLayer:
|
|||
) -> Optional[layer.Layer]:
|
||||
assert context.layers
|
||||
|
||||
# helper function to quickly check if the existing layer stack matches a particular configuration.
|
||||
def s(*layers):
|
||||
return stack_match(context, layers)
|
||||
|
||||
if context.client.transport_protocol == "tcp":
|
||||
if (
|
||||
len(data_client) < 3
|
||||
|
@ -231,35 +260,15 @@ class NextLayer:
|
|||
if is_tls_record_magic(data_client):
|
||||
return self.setup_tls_layer(context)
|
||||
|
||||
# 3. Setup the HTTP layer for a regular HTTP proxy
|
||||
if (
|
||||
s(modes.HttpProxy)
|
||||
or
|
||||
# or a "Secure Web Proxy", see https://www.chromium.org/developers/design-documents/secure-web-proxy
|
||||
s(modes.HttpProxy, layers.ClientTLSLayer)
|
||||
):
|
||||
return layers.HttpLayer(context, HTTPMode.regular)
|
||||
# 3b. ... or an upstream proxy.
|
||||
if (
|
||||
s(modes.HttpUpstreamProxy)
|
||||
or
|
||||
s(modes.HttpUpstreamProxy, layers.ClientTLSLayer)
|
||||
):
|
||||
return layers.HttpLayer(context, HTTPMode.upstream)
|
||||
# 3. Check for HTTP
|
||||
if http_layer := self.get_http_layer(context):
|
||||
return http_layer
|
||||
|
||||
# 4. Check for --tcp
|
||||
if self.is_destination_in_hosts(context, self.tcp_hosts):
|
||||
return layers.TCPLayer(context)
|
||||
|
||||
# 5. Check for raw reverse mode.
|
||||
if self.is_reverse_proxy_scheme(context, "tcp", "tls"):
|
||||
return layers.TCPLayer(context)
|
||||
# NOTE at this point we are either
|
||||
# - in http or https reverse mode
|
||||
# - at the top level of a non-reverse/regular/upstream mode
|
||||
# - at a deeper layer nesting level
|
||||
|
||||
# 6. Check for raw tcp mode.
|
||||
# 5. Check for raw tcp mode.
|
||||
very_likely_http = context.client.alpn and context.client.alpn in HTTP_ALPNS
|
||||
probably_no_http = not very_likely_http and (
|
||||
not data_client[
|
||||
|
@ -270,27 +279,12 @@ class NextLayer:
|
|||
if ctx.options.rawtcp and probably_no_http:
|
||||
return layers.TCPLayer(context)
|
||||
|
||||
# 7. Assume HTTP by default.
|
||||
# 6. Assume HTTP by default.
|
||||
return layers.HttpLayer(context, HTTPMode.transparent)
|
||||
|
||||
elif context.client.transport_protocol == "udp":
|
||||
# for http3, upstream:http3 and reverse:quic/http3 proxies, there has to be a client quic layer
|
||||
if (
|
||||
s(modes.HttpProxy)
|
||||
or
|
||||
s(modes.HttpUpstreamProxy)
|
||||
or
|
||||
s(modes.ReverseProxy, layers.ServerQuicLayer)
|
||||
):
|
||||
return layers.ClientQuicLayer(context)
|
||||
|
||||
# unlike TCP, we make a decision immediately
|
||||
tls = self.detect_udp_tls(data_client)
|
||||
raw_layer_cls = (
|
||||
layers.RawQuicLayer
|
||||
if isinstance(context.layers[-1], layers.ClientQuicLayer) else
|
||||
layers.UDPLayer
|
||||
)
|
||||
|
||||
# 1. check for --ignore/--allow
|
||||
if self.ignore_connection(
|
||||
|
@ -299,43 +293,42 @@ class NextLayer:
|
|||
is_tls=lambda _: tls is not None,
|
||||
client_hello=lambda _: None if tls is None else tls[0]
|
||||
):
|
||||
return raw_layer_cls(context, ignore=True)
|
||||
return self.raw_udp_layer(context, ignore=True)
|
||||
|
||||
# 2. Check for DTLS/QUIC
|
||||
if tls is not None:
|
||||
_, client_layer_cls, server_layer_cls = tls
|
||||
return self.setup_tls_layer(context, client_layer_cls, server_layer_cls)
|
||||
|
||||
# 3. Setup the HTTP layer for a regular HTTP proxy
|
||||
if s(modes.HttpProxy, layers.ClientQuicLayer):
|
||||
return layers.HttpLayer(context, HTTPMode.regular)
|
||||
# 3b. ... or an upstream proxy.
|
||||
if s(modes.HttpUpstreamProxy, layers.ClientQuicLayer):
|
||||
return layers.HttpLayer(context, HTTPMode.upstream)
|
||||
# 3. Check for HTTP
|
||||
if http_layer := self.get_http_layer(context):
|
||||
return http_layer
|
||||
|
||||
# 4. Check for --udp
|
||||
if self.is_destination_in_hosts(context, self.udp_hosts):
|
||||
return raw_layer_cls(context)
|
||||
return self.raw_udp_layer(context)
|
||||
|
||||
# 5. Check for raw reverse mode.
|
||||
if self.is_reverse_proxy_scheme(context, "udp", "dtls"):
|
||||
return layers.UDPLayer(context)
|
||||
|
||||
# 6. Check for explicit QUIC reverse modes
|
||||
if (s(modes.ReverseProxy, layers.ServerQuicLayer, layers.ClientQuicLayer)):
|
||||
# 5. Check for reverse modes
|
||||
if (isinstance(context.layers[0], modes.ReverseProxy)):
|
||||
scheme = cast(mode_specs.ReverseMode, context.client.proxy_mode).scheme
|
||||
if scheme == "quic":
|
||||
return layers.RawQuicLayer(context)
|
||||
if scheme == "http3":
|
||||
if scheme in ("udp", "dtls"):
|
||||
return layers.UDPLayer(context)
|
||||
elif scheme == "http3":
|
||||
return layers.HttpLayer(context, HTTPMode.transparent)
|
||||
# 6b. ... or DNS mode
|
||||
if self.is_reverse_proxy_scheme(context, "dns"):
|
||||
return layers.DNSLayer(context)
|
||||
# NOTE at this point we are either
|
||||
# - at the top level of a non-reverse/regular/upstream mode
|
||||
# - at a deeper layer nesting level
|
||||
elif scheme == "quic":
|
||||
# if the client supports QUIC, we use QUIC raw layer,
|
||||
# otherwise we only use the QUIC datagram only
|
||||
return (
|
||||
layers.RawQuicLayer(context)
|
||||
if isinstance(context.layers[-1], layers.ClientQuicLayer) else
|
||||
layers.UDPLayer(context)
|
||||
)
|
||||
elif scheme == "dns":
|
||||
return layers.DNSLayer(context)
|
||||
else:
|
||||
raise AssertionError(scheme)
|
||||
|
||||
# 7. Check for DNS
|
||||
# 6. Check for DNS
|
||||
try:
|
||||
dns.Message.unpack(data_client)
|
||||
except struct.error:
|
||||
|
@ -343,8 +336,8 @@ class NextLayer:
|
|||
else:
|
||||
return layers.DNSLayer(context)
|
||||
|
||||
# 8. Use raw mode.
|
||||
return raw_layer_cls(context)
|
||||
# 7. Use raw mode.
|
||||
return self.raw_udp_layer(context)
|
||||
|
||||
else:
|
||||
raise AssertionError(context.client.transport_protocol)
|
||||
|
|
|
@ -46,6 +46,39 @@ dtls_client_hello_with_extensions = bytes.fromhex(
|
|||
)
|
||||
|
||||
|
||||
quic_client_hello = bytes.fromhex(
|
||||
"ca0000000108c0618c84b54541320823fcce946c38d8210044e6a93bbb283593f75ffb6f2696b16cfdcb5b1255"
|
||||
"577b2af5fc5894188c9568bc65eef253faf7f0520e41341cfa81d6aae573586665ce4e1e41676364820402feec"
|
||||
"a81f3d22dbb476893422069066104a43e121c951a08c53b83f960becf99cf5304d5bc5346f52f472bd1a04d192"
|
||||
"0bae025064990d27e5e4c325ac46121d3acadebe7babdb96192fb699693d65e2b2e21c53beeb4f40b50673a2f6"
|
||||
"c22091cb7c76a845384fedee58df862464d1da505a280bfef91ca83a10bebbcb07855219dbc14aecf8a48da049"
|
||||
"d03c77459b39d5355c95306cd03d6bdb471694fa998ca3b1f875ce87915b88ead15c5d6313a443f39aad808922"
|
||||
"57ddfa6b4a898d773bb6fb520ede47ebd59d022431b1054a69e0bbbdf9f0fb32fc8bcc4b6879dd8cd5389474b1"
|
||||
"99e18333e14d0347740a11916429a818bb8d93295d36e99840a373bb0e14c8b3adcf5e2165e70803f15316fd5e"
|
||||
"5eeec04ae68d98f1adb22c54611c80fcd8ece619dbdf97b1510032ec374b7a71f94d9492b8b8cb56f56556dd97"
|
||||
"edf1e50fa90e868ff93636a365678bdf3ee3f8e632588cd506b6f44fbfd4d99988238fbd5884c98f6a124108c1"
|
||||
"878970780e42b111e3be6215776ef5be5a0205915e6d720d22c6a81a475c9e41ba94e4983b964cb5c8e1f40607"
|
||||
"76d1d8d1adcef7587ea084231016bd6ee2643d11a3a35eb7fe4cca2b3f1a4b21e040b0d426412cca6c4271ea63"
|
||||
"fb54ed7f57b41cd1af1be5507f87ea4f4a0c997367e883291de2f1b8a49bdaa52bae30064351b1139703400730"
|
||||
"18a4104344ec6b4454b50a42e804bc70e78b9b3c82497273859c82ed241b643642d76df6ceab8f916392113a62"
|
||||
"b231f228c7300624d74a846bec2f479ab8a8c3461f91c7bf806236e3bd2f54ba1ef8e2a1e0bfdde0c5ad227f7d"
|
||||
"364c52510b1ade862ce0c8d7bd24b6d7d21c99b34de6d177eb3d575787b2af55060d76d6c2060befbb7953a816"
|
||||
"6f66ad88ecf929dbb0ad3a16cf7dfd39d925e0b4b649c6d0c07ad46ed0229c17fb6a1395f16e1b138aab3af760"
|
||||
"2b0ac762c4f611f7f3468997224ffbe500a7c53f92f65e41a3765a9f1d7e3f78208f5b4e147962d8c97d6c1a80"
|
||||
"91ffc36090b2043d71853616f34c2185dc883c54ab6d66e10a6c18e0b9a4742597361f8554a42da3373241d0c8"
|
||||
"54119bfadccffaf2335b2d97ffee627cb891bda8140a39399f853da4859f7e19682e152243efbaffb662edd19b"
|
||||
"3819a74107c7dbe05ecb32e79dcdb1260f153b1ef133e978ccca3d9e400a7ed6c458d77e2956d2cb897b7a298b"
|
||||
"fe144b5defdc23dfd2adf69f1fb0917840703402d524987ae3b1dcb85229843c9a419ef46e1ba0ba7783f2a2ec"
|
||||
"d057a57518836aef2a7839ebd3688da98b54c942941f642e434727108d59ea25875b3050ca53d4637c76cbcbb9"
|
||||
"e972c2b0b781131ee0a1403138b55486fe86bbd644920ee6aa578e3bab32d7d784b5c140295286d90c99b14823"
|
||||
"1487f7ea64157001b745aa358c9ea6bec5a8d8b67a7534ec1f7648ff3b435911dfc3dff798d32fbf2efe2c1fcc"
|
||||
"278865157590572387b76b78e727d3e7682cb501cdcdf9a0f17676f99d9aa67f10edccc9a92080294e88bf28c2"
|
||||
"a9f32ae535fdb27fff7706540472abb9eab90af12b2bea005da189874b0ca69e6ae1690a6f2adf75be3853c94e"
|
||||
"fd8098ed579c20cb37be6885d8d713af4ba52958cee383089b98ed9cb26e11127cf88d1b7d254f15f7903dd7ed"
|
||||
"297c0013924e88248684fe8f2098326ce51aa6e5"
|
||||
)
|
||||
|
||||
|
||||
class TestNextLayer:
|
||||
def test_configure(self):
|
||||
nl = NextLayer()
|
||||
|
@ -147,44 +180,180 @@ class TestNextLayer:
|
|||
assert isinstance(nl._next_layer(ctx, b"GET /foo", b""), layers.HttpLayer)
|
||||
assert isinstance(nl._next_layer(ctx, b"", b"hello"), layers.TCPLayer)
|
||||
|
||||
def test_next_layer_udp(self):
|
||||
@pytest.mark.parametrize(
|
||||
("client_hello", "client_layer", "server_layer"),
|
||||
[
|
||||
(dtls_client_hello_with_extensions, layers.ClientTLSLayer, layers.ServerTLSLayer),
|
||||
(quic_client_hello, layers.ClientQuicLayer, layers.ServerQuicLayer),
|
||||
]
|
||||
)
|
||||
def test_next_layer_udp(
|
||||
self,
|
||||
client_hello: bytes,
|
||||
client_layer: layer.Layer,
|
||||
server_layer: layer.Layer,
|
||||
):
|
||||
def is_ignored_udp(layer: Optional[layer.Layer]):
|
||||
return isinstance(layer, layers.UDPLayer) and layer.flow is None
|
||||
|
||||
def is_intercepted_udp(layer: Optional[layer.Layer]):
|
||||
return isinstance(layer, layers.UDPLayer) and layer.flow is not None
|
||||
|
||||
def is_http(layer: Optional[layer.Layer], mode: HTTPMode):
|
||||
return (
|
||||
isinstance(layer, layers.HttpLayer)
|
||||
and layer.mode is mode
|
||||
)
|
||||
|
||||
nl = NextLayer()
|
||||
ctx = MagicMock()
|
||||
ctx.client.alpn = None
|
||||
ctx.server.address = ("example.com", 443)
|
||||
ctx.client.transport_protocol = "udp"
|
||||
with taddons.context(nl) as tctx:
|
||||
ctx.layers = [layers.modes.HttpProxy(ctx)]
|
||||
assert is_intercepted_udp(nl._next_layer(ctx, b"", b""))
|
||||
ctx.layers = [layers.modes.HttpProxy(ctx), client_layer(ctx)]
|
||||
assert is_http(nl._next_layer(ctx, b"", b""), HTTPMode.regular)
|
||||
|
||||
ctx.layers = [layers.modes.HttpProxy(ctx)]
|
||||
ctx.layers = [layers.modes.HttpUpstreamProxy(ctx), client_layer(ctx)]
|
||||
assert is_http(nl._next_layer(ctx, b"", b""), HTTPMode.upstream)
|
||||
|
||||
ctx.layers = [layers.modes.TransparentProxy(ctx)]
|
||||
is_intercepted_udp(nl._next_layer(ctx, b"", b""))
|
||||
|
||||
ctx.layers = [layers.modes.TransparentProxy(ctx)]
|
||||
ctx.server.address = ("nomatch.com", 443)
|
||||
tctx.configure(nl, ignore_hosts=["example.com"])
|
||||
assert is_intercepted_udp(nl._next_layer(ctx, dtls_client_hello_with_extensions[:50], b""))
|
||||
assert is_ignored_udp(nl._next_layer(ctx, dtls_client_hello_with_extensions, b""))
|
||||
assert is_intercepted_udp(nl._next_layer(ctx, client_hello[:50], b""))
|
||||
assert is_ignored_udp(nl._next_layer(ctx, client_hello, b""))
|
||||
|
||||
ctx.layers = [layers.modes.HttpProxy(ctx)]
|
||||
ctx.layers = [layers.modes.TransparentProxy(ctx)]
|
||||
ctx.server.address = ("example.com", 443)
|
||||
assert is_ignored_udp(nl._next_layer(ctx, dtls_client_hello_with_extensions[:50], b""))
|
||||
assert is_ignored_udp(nl._next_layer(ctx, client_hello[:50], b""))
|
||||
|
||||
ctx.layers = [layers.modes.HttpProxy(ctx)]
|
||||
ctx.layers = [layers.modes.TransparentProxy(ctx)]
|
||||
tctx.configure(nl, ignore_hosts=[])
|
||||
assert isinstance(nl._next_layer(ctx, dtls_client_hello_with_extensions, b""), layers.ClientTLSLayer)
|
||||
decision = nl._next_layer(ctx, client_hello, b"")
|
||||
assert isinstance(decision, server_layer)
|
||||
assert isinstance(decision.child_layer, client_layer)
|
||||
|
||||
ctx.layers = [layers.modes.HttpProxy(ctx)]
|
||||
ctx.layers = [layers.modes.ReverseProxy(ctx), server_layer(ctx)]
|
||||
tctx.configure(nl, ignore_hosts=[])
|
||||
assert isinstance(nl._next_layer(ctx, client_hello, b""), client_layer)
|
||||
|
||||
ctx.layers = [layers.modes.TransparentProxy(ctx)]
|
||||
tctx.configure(nl, udp_hosts=["example.com"])
|
||||
assert isinstance(nl._next_layer(ctx, tflow.tdnsreq().packed, b""), layers.UDPLayer)
|
||||
|
||||
ctx.layers = [layers.modes.HttpProxy(ctx)]
|
||||
ctx.layers = [layers.modes.TransparentProxy(ctx)]
|
||||
tctx.configure(nl, udp_hosts=[])
|
||||
assert isinstance(nl._next_layer(ctx, tflow.tdnsreq().packed, b""), layers.DNSLayer)
|
||||
|
||||
def test_next_layer_reverse_raw(self):
|
||||
nl = NextLayer()
|
||||
ctx = MagicMock()
|
||||
ctx.client.alpn = None
|
||||
ctx.server.address = ("example.com", 443)
|
||||
ctx.client.transport_protocol = "udp"
|
||||
with taddons.context(nl) as tctx:
|
||||
tctx.configure(nl, ignore_hosts=["example.com"])
|
||||
|
||||
ctx.layers = [layers.modes.HttpProxy(ctx), layers.ClientQuicLayer(ctx)]
|
||||
decision = nl._next_layer(ctx, b"", b"")
|
||||
assert isinstance(decision, layers.ServerQuicLayer)
|
||||
assert isinstance(decision.child_layer, layers.RawQuicLayer)
|
||||
|
||||
ctx.layers = [layers.modes.ReverseProxy(ctx), layers.ServerQuicLayer(ctx), layers.ClientQuicLayer(ctx)]
|
||||
assert isinstance(nl._next_layer(ctx, b"", b""), layers.RawQuicLayer)
|
||||
|
||||
ctx.layers = [layers.modes.ReverseProxy(ctx), layers.ServerQuicLayer(ctx)]
|
||||
decision = nl._next_layer(ctx, b"", b"")
|
||||
assert isinstance(decision, layers.ClientQuicLayer)
|
||||
assert isinstance(decision.child_layer, layers.RawQuicLayer)
|
||||
|
||||
tctx.configure(nl, ignore_hosts=[])
|
||||
|
||||
def test_next_layer_reverse_quic_mode(self):
|
||||
nl = NextLayer()
|
||||
ctx = MagicMock()
|
||||
ctx.client.alpn = None
|
||||
ctx.server.address = ("example.com", 443)
|
||||
ctx.client.transport_protocol = "udp"
|
||||
ctx.client.proxy_mode.scheme = "quic"
|
||||
ctx.layers = [
|
||||
layers.modes.ReverseProxy(ctx),
|
||||
layers.ServerQuicLayer(ctx),
|
||||
layers.ClientQuicLayer(ctx),
|
||||
]
|
||||
assert isinstance(nl._next_layer(ctx, b"", b""), layers.RawQuicLayer)
|
||||
ctx.layers = [
|
||||
layers.modes.ReverseProxy(ctx),
|
||||
layers.ServerQuicLayer(ctx),
|
||||
]
|
||||
assert isinstance(nl._next_layer(ctx, b"", b""), layers.UDPLayer)
|
||||
|
||||
def test_next_layer_reverse_http3_mode(self):
|
||||
nl = NextLayer()
|
||||
ctx = MagicMock()
|
||||
ctx.client.alpn = None
|
||||
ctx.server.address = ("example.com", 443)
|
||||
ctx.client.transport_protocol = "udp"
|
||||
ctx.client.proxy_mode.scheme = "http3"
|
||||
ctx.layers = [
|
||||
layers.modes.ReverseProxy(ctx),
|
||||
layers.ServerQuicLayer(ctx),
|
||||
layers.ClientQuicLayer(ctx),
|
||||
]
|
||||
decision = nl._next_layer(ctx, b"", b"")
|
||||
assert isinstance(decision, layers.HttpLayer)
|
||||
assert decision.mode is HTTPMode.transparent
|
||||
|
||||
def test_next_layer_reverse_invalid_mode(self):
|
||||
nl = NextLayer()
|
||||
ctx = MagicMock()
|
||||
ctx.client.alpn = None
|
||||
ctx.server.address = ("example.com", 443)
|
||||
ctx.client.transport_protocol = "udp"
|
||||
ctx.client.proxy_mode.scheme = "invalidscheme"
|
||||
ctx.layers = [layers.modes.ReverseProxy(ctx)]
|
||||
with pytest.raises(AssertionError, match="invalidscheme"):
|
||||
nl._next_layer(ctx, b"", b"")
|
||||
|
||||
def test_next_layer_reverse_dtls_mode(self):
|
||||
nl = NextLayer()
|
||||
ctx = MagicMock()
|
||||
ctx.client.alpn = None
|
||||
ctx.server.address = ("example.com", 443)
|
||||
ctx.client.transport_protocol = "udp"
|
||||
ctx.client.proxy_mode.scheme = "dtls"
|
||||
ctx.layers = [layers.modes.ReverseProxy(ctx), layers.ServerTLSLayer(ctx)]
|
||||
assert isinstance(nl._next_layer(ctx, b"", b""), layers.UDPLayer)
|
||||
ctx.layers = [layers.modes.ReverseProxy(ctx), layers.ServerTLSLayer(ctx), layers.ClientTLSLayer(ctx)]
|
||||
assert isinstance(nl._next_layer(ctx, b"", b""), layers.UDPLayer)
|
||||
|
||||
def test_next_layer_reverse_udp_mode(self):
|
||||
nl = NextLayer()
|
||||
ctx = MagicMock()
|
||||
ctx.client.alpn = None
|
||||
ctx.server.address = ("example.com", 443)
|
||||
ctx.client.transport_protocol = "udp"
|
||||
ctx.client.proxy_mode.scheme = "udp"
|
||||
ctx.layers = [layers.modes.ReverseProxy(ctx)]
|
||||
assert isinstance(nl._next_layer(ctx, b"", b""), layers.UDPLayer)
|
||||
ctx.layers = [layers.modes.ReverseProxy(ctx), layers.ClientTLSLayer(ctx)]
|
||||
assert isinstance(nl._next_layer(ctx, b"", b""), layers.UDPLayer)
|
||||
|
||||
def test_next_layer_reverse_dns_mode(self):
|
||||
nl = NextLayer()
|
||||
ctx = MagicMock()
|
||||
ctx.client.alpn = None
|
||||
ctx.server.address = ("example.com", 443)
|
||||
ctx.client.transport_protocol = "udp"
|
||||
ctx.client.proxy_mode.scheme = "dns"
|
||||
ctx.layers = [layers.modes.ReverseProxy(ctx)]
|
||||
assert isinstance(nl._next_layer(ctx, b"", b""), layers.DNSLayer)
|
||||
ctx.layers = [layers.modes.ReverseProxy(ctx), layers.ClientTLSLayer(ctx)]
|
||||
assert isinstance(nl._next_layer(ctx, b"", b""), layers.DNSLayer)
|
||||
|
||||
def test_next_layer_invalid_proto(self):
|
||||
nl = NextLayer()
|
||||
ctx = MagicMock()
|
||||
|
|
Loading…
Reference in New Issue