Allow parsing of HTTPS record from DNS RDATA (#6884)

* Unpack HTTPS DNS record data

* Fix linting issues

* Add entry to CHANGELOG.md

* [autofix.ci] apply automated fixes

* Reorder functions

* [autofix.ci] apply automated fixes

* Rename private methods

* Use Enum to store constants

* Restructure constants

* Handle errors

* Use dataclasses to represent HTTPS records

* [autofix.ci] apply automated fixes

* Fix mypy errors

* [autofix.ci] apply automated fixes

* Allow packing of HTTPSRecords to its byte format

* Add tests for https_record

* [autofix.ci] apply automated fixes

* Rename https_record to https_records

* [autofix.ci] apply automated fixes

* Increase test coverage

* [autofix.ci] apply automated fixes

* Increase test coverage

* [autofix.ci] apply automated fixes

* Increase test coverage

* [autofix.ci] apply automated fixes

* Add comments

* Restructure HTTPS record API

* [autofix.ci] apply automated fixes

* Remove  from public API

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Gaurav Jain 2024-06-10 01:26:21 +05:30 committed by GitHub
parent bd39c7b96f
commit 03c8db88bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 307 additions and 1 deletions

View File

@ -19,7 +19,8 @@
([#6875](https://github.com/mitmproxy/mitmproxy/pull/6875), @aib)
* Add an option to strip HTTPS records from DNS responses to block encrypted ClientHellos.
([#6876](https://github.com/mitmproxy/mitmproxy/pull/6876), @errorxyz)
* Allow parsing of HTTPS records from DNS RDATA
([#6884](https://github.com/mitmproxy/mitmproxy/pull/6884), @errorxyz)
## 17 April 2024: mitmproxy 10.3.0

View File

@ -1,5 +1,6 @@
from __future__ import annotations
import base64
import itertools
import random
import struct
@ -13,9 +14,12 @@ from mitmproxy import flow
from mitmproxy.coretypes import serializable
from mitmproxy.net.dns import classes
from mitmproxy.net.dns import domain_names
from mitmproxy.net.dns import https_records
from mitmproxy.net.dns import op_codes
from mitmproxy.net.dns import response_codes
from mitmproxy.net.dns import types
from mitmproxy.net.dns.https_records import HTTPSRecord
from mitmproxy.net.dns.https_records import SVCParamKeys
# DNS parameters taken from https://www.iana.org/assignments/dns-parameters/dns-parameters.xml
@ -64,6 +68,8 @@ class ResourceRecord(serializable.SerializableDataclass):
return self.domain_name
if self.type == types.TXT:
return self.text
if self.type == types.HTTPS:
return str(https_records.unpack(self.data))
except Exception:
return f"0x{self.data.hex()} (invalid {types.to_str(self.type)} data)"
return f"0x{self.data.hex()}"
@ -100,6 +106,25 @@ class ResourceRecord(serializable.SerializableDataclass):
def domain_name(self, name: str) -> None:
self.data = domain_names.pack(name)
@property
def https_ech(self) -> str | None:
record = https_records.unpack(self.data)
ech_bytes = record.params.get(SVCParamKeys.ECH.value, None)
if ech_bytes is not None:
return base64.b64encode(ech_bytes).decode("utf-8")
else:
return None
@https_ech.setter
def https_ech(self, ech: str | None) -> None:
record = https_records.unpack(self.data)
if ech is None:
record.params.pop(SVCParamKeys.ECH.value, None)
else:
ech_bytes = base64.b64decode(ech.encode("utf-8"))
record.params[SVCParamKeys.ECH.value] = ech_bytes
self.data = https_records.pack(record)
def to_json(self) -> dict:
"""
Converts the resource record into json for mitmweb.
@ -142,6 +167,13 @@ class ResourceRecord(serializable.SerializableDataclass):
"""Create a textual resource record."""
return cls(name, types.TXT, classes.IN, ttl, text.encode("utf-8"))
@classmethod
def HTTPS(
cls, name: str, record: HTTPSRecord, ttl: int = DEFAULT_TTL
) -> ResourceRecord:
"""Create a HTTPS resource record"""
return cls(name, types.HTTPS, classes.IN, ttl, https_records.pack(record))
# comments are taken from rfc1035
@dataclass

View File

@ -0,0 +1,142 @@
import enum
import struct
from dataclasses import dataclass
"""
HTTPS records are formatted as follows (as per RFC9460):
- a 2-octet field for SvcPriority as an integer in network byte order.
- the uncompressed, fully qualified TargetName, represented as a sequence of length-prefixed labels per Section 3.1 of [RFC1035].
- the SvcParams, consuming the remainder of the record (so smaller than 65535 octets and constrained by the RDATA and DNS message sizes).
When the list of SvcParams is non-empty, it contains a series of SvcParamKey=SvcParamValue pairs, represented as:
- a 2-octet field containing the SvcParamKey as an integer in network byte order. (See Section 14.3.2 for the defined values.)
- a 2-octet field containing the length of the SvcParamValue as an integer between 0 and 65535 in network byte order.
- an octet string of this length whose contents are the SvcParamValue in a format determined by the SvcParamKey.
https://datatracker.ietf.org/doc/rfc9460/
https://datatracker.ietf.org/doc/rfc1035/
"""
class SVCParamKeys(enum.Enum):
MANDATORY = 0
ALPN = 1
NO_DEFAULT_ALPN = 2
PORT = 3
IPV4HINT = 4
ECH = 5
IPV6HINT = 6
@dataclass
class HTTPSRecord:
priority: int
target_name: str
params: dict[int, bytes]
def __repr__(self):
params = {}
for param_type, param_value in self.params.items():
try:
name = SVCParamKeys(param_type).name.lower()
except ValueError:
name = f"key{param_type}"
params[name] = param_value
return f"priority: {self.priority} target_name: '{self.target_name}' {params}"
def _unpack_params(data: bytes, offset: int) -> dict[int, bytes]:
"""Unpacks the service parameters from the given offset."""
params = {}
while offset < len(data):
param_type = struct.unpack("!H", data[offset : offset + 2])[0]
offset += 2
param_length = struct.unpack("!H", data[offset : offset + 2])[0]
offset += 2
if offset + param_length > len(data):
raise struct.error(
"unpack requires a buffer of %i bytes" % (offset + param_length)
)
param_value = data[offset : offset + param_length]
offset += param_length
params[param_type] = param_value
return params
def _unpack_target_name(data: bytes, offset: int) -> tuple[str, int]:
"""Unpacks the DNS-encoded domain name from data starting at the given offset."""
labels = []
while True:
if offset >= len(data):
raise struct.error("unpack requires a buffer of %i bytes" % offset)
length = data[offset]
offset += 1
if length == 0:
break
if offset + length > len(data):
raise struct.error(
"unpack requires a buffer of %i bytes" % (offset + length)
)
try:
labels.append(data[offset : offset + length].decode("utf-8"))
except UnicodeDecodeError:
raise struct.error(
"unpack encountered illegal characters at offset %i" % (offset)
)
offset += length
return ".".join(labels), offset
def unpack(data: bytes) -> HTTPSRecord:
"""
Unpacks HTTPS RDATA from byte data.
Raises:
struct.error if the record is malformed.
"""
offset = 0
# Priority (2 bytes)
priority = struct.unpack("!h", data[offset : offset + 2])[0]
offset += 2
# TargetName (variable length)
target_name, offset = _unpack_target_name(data, offset)
# Service Parameters (remaining bytes)
params = _unpack_params(data, offset)
return HTTPSRecord(priority=priority, target_name=target_name, params=params)
def _pack_params(params: dict[int, bytes]) -> bytes:
"""Converts the service parameters into the raw byte format"""
buffer = bytearray()
for k, v in params.items():
buffer.extend(struct.pack("!H", k))
buffer.extend(struct.pack("!H", len(v)))
buffer.extend(v)
return bytes(buffer)
def _pack_target_name(name: str) -> bytes:
"""Converts the target name into its DNS encoded format"""
buffer = bytearray()
for label in name.split("."):
if len(label) == 0:
break
buffer.extend(struct.pack("!B", len(label)))
buffer.extend(label.encode("utf-8"))
buffer.extend(struct.pack("!B", 0))
return bytes(buffer)
def pack(record: HTTPSRecord) -> bytes:
"""Packs the HTTPS record into its bytes form."""
buffer = bytearray()
buffer.extend(struct.pack("!h", record.priority))
buffer.extend(_pack_target_name(record.target_name))
buffer.extend(_pack_params(record.params))
return bytes(buffer)

View File

@ -0,0 +1,109 @@
import re
import struct
import pytest
from hypothesis import given
from hypothesis import strategies as st
from mitmproxy.net.dns import https_records
class TestHTTPSRecords:
def test_simple(self):
assert https_records.SVCParamKeys.ALPN.value == 1
assert https_records.SVCParamKeys(1).name == "ALPN"
def test_httpsrecord(self):
with pytest.raises(
TypeError,
match=re.escape(
"HTTPSRecord.__init__() missing 3 required positional arguments: 'priority', 'target_name', and 'params'"
),
):
https_records.HTTPSRecord()
def test_unpack(self):
params = {
0: b"\x00\x04\x00\x06",
1: b"\x02h2\x02h3",
2: b"",
3: b"\x01\xbb",
4: b"\xb9\xc7l\x99\xb9\xc7m\x99\xb9\xc7n\x99\xb9\xc7o\x99",
5: b"testbytes",
6: b"&\x06P\xc0\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01S&\x06P\xc0\x80\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01S&\x06P\xc0\x80\x02\x00\x00\x00\x00\x00\x00\x00\x00\x01S&\x06P\xc0\x80\x03\x00\x00\x00\x00\x00\x00\x00\x00\x01S",
}
record = https_records.HTTPSRecord(1, "example.com", params)
assert https_records.unpack(https_records.pack(record)) == record
with pytest.raises(
struct.error, match=re.escape("unpack requires a buffer of 2 bytes")
):
https_records.unpack(b"")
with pytest.raises(
struct.error,
match=re.escape("unpack encountered illegal characters at offset 3"),
):
https_records.unpack(
b"\x00\x01\x07exampl\x87\x03com\x00\x00\x01\x00\x06\x02h2\x02h3"
)
with pytest.raises(
struct.error, match=re.escape("unpack requires a buffer of 25 bytes")
):
https_records.unpack(
b"\x00\x01\x07example\x03com\x00\x00\x01\x00\x06\x02h2"
)
with pytest.raises(
struct.error, match=re.escape("unpack requires a buffer of 10 bytes")
):
https_records.unpack(b"\x00\x01\x07exa")
def test_pack(self):
params = {
0: b"\x00\x04\x00\x06",
1: b"\x02h2\x02h3",
2: b"",
3: b"\x01\xbb",
4: b"\xb9\xc7l\x99\xb9\xc7m\x99\xb9\xc7n\x99\xb9\xc7o\x99",
5: b"testbytes",
6: b"&\x06P\xc0\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01S&\x06P\xc0\x80\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01S&\x06P\xc0\x80\x02\x00\x00\x00\x00\x00\x00\x00\x00\x01S&\x06P\xc0\x80\x03\x00\x00\x00\x00\x00\x00\x00\x00\x01S",
}
record = https_records.HTTPSRecord(1, "example.com", params)
assert (
https_records.pack(record)
== b"\x00\x01\x07example\x03com\x00\x00\x00\x00\x04\x00\x04\x00\x06\x00\x01\x00\x06\x02h2\x02h3\x00\x02\x00\x00\x00\x03\x00\x02\x01\xbb\x00\x04\x00\x10\xb9\xc7l\x99\xb9\xc7m\x99\xb9\xc7n\x99\xb9\xc7o\x99\x00\x05\x00\ttestbytes\x00\x06\x00@&\x06P\xc0\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01S&\x06P\xc0\x80\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01S&\x06P\xc0\x80\x02\x00\x00\x00\x00\x00\x00\x00\x00\x01S&\x06P\xc0\x80\x03\x00\x00\x00\x00\x00\x00\x00\x00\x01S"
)
record = https_records.HTTPSRecord(1, "", {})
assert https_records.pack(record) == b"\x00\x01\x00"
@given(st.binary())
def test_fuzz_unpack(self, data: bytes):
try:
https_records.unpack(data)
except struct.error:
pass
def test_str(self):
params = {
0: b"\x00",
1: b"\x01",
2: b"",
3: b"\x02",
4: b"\x03",
5: b"\x04",
6: b"\x05",
}
record = https_records.HTTPSRecord(1, "example.com", params)
assert (
str(record)
== "priority: 1 target_name: 'example.com' {'mandatory': b'\\x00', 'alpn': b'\\x01', 'no_default_alpn': b'', 'port': b'\\x02', 'ipv4hint': b'\\x03', 'ech': b'\\x04', 'ipv6hint': b'\\x05'}"
)
params = {111: b"\x00"}
record = https_records.HTTPSRecord(1, "example.com", params)
assert (
str(record) == "priority: 1 target_name: 'example.com' {'key111': b'\\x00'}"
)

View File

@ -28,6 +28,20 @@ class TestResourceRecord:
assert (
str(dns.ResourceRecord.TXT("test", "unicode text 😀")) == "unicode text 😀"
)
params = {
0: b"\x00",
1: b"\x01",
2: b"",
3: b"\x02",
4: b"\x03",
5: b"\x04",
6: b"\x05",
}
record = dns.https_records.HTTPSRecord(1, "example.com", params)
assert (
str(dns.ResourceRecord.HTTPS("example.com", record))
== "priority: 1 target_name: 'example.com' {'mandatory': b'\\x00', 'alpn': b'\\x01', 'no_default_alpn': b'', 'port': b'\\x02', 'ipv4hint': b'\\x03', 'ech': b'\\x04', 'ipv6hint': b'\\x05'}"
)
assert (
str(
dns.ResourceRecord(
@ -65,6 +79,14 @@ class TestResourceRecord:
assert rr.domain_name == "www.example.org"
rr.text = "sample text"
assert rr.text == "sample text"
params = {3: b"\x01\xbb"}
record = dns.https_records.HTTPSRecord(1, "example.org", params)
rr.data = dns.https_records.pack(record)
assert rr.https_ech is None
rr.https_ech = "dGVzdHN0cmluZwo="
assert rr.https_ech == "dGVzdHN0cmluZwo="
rr.https_ech = None
assert rr.https_ech is None
class TestMessage: