diff --git a/mitmproxy/dns.py b/mitmproxy/dns.py index 0f4313c72..1170dedfd 100644 --- a/mitmproxy/dns.py +++ b/mitmproxy/dns.py @@ -276,25 +276,22 @@ class Message(stateobject.StateObject): def unpack_from(cls, buffer: Union[bytes, bytearray], offset: int) -> Tuple[int, Message]: """Converts the buffer from a given offset into a DNS message and also returns its length.""" id, flags, len_questions, len_answers, len_authorities, len_additionals = Message.HEADER.unpack_from(buffer, offset) - try: - msg = Message( - timestamp=time.time(), - id=id, - query=(flags & (1 << 15)) == 0, - op_code = (flags >> 11) & 0b1111, - authoritative_answer=(flags & (1 << 10)) != 0, - truncation = (flags & (1 << 9)) != 0, - recursion_desired = (flags & (1 << 8)) != 0, - recursion_available = (flags & (1 << 7)) != 0, - reserved = (flags >> 4) & 0b111, - response_code = flags & 0b1111, - questions=[], - answers=[], - authorities=[], - additionals=[], - ) - except ValueError as e: - raise struct.error(str(e)) + msg = Message( + timestamp=time.time(), + id=id, + query=(flags & (1 << 15)) == 0, + op_code=(flags >> 11) & 0b1111, + authoritative_answer=(flags & (1 << 10)) != 0, + truncation=(flags & (1 << 9)) != 0, + recursion_desired=(flags & (1 << 8)) != 0, + recursion_available=(flags & (1 << 7)) != 0, + reserved=(flags >> 4) & 0b111, + response_code=flags & 0b1111, + questions=[], + answers=[], + authorities=[], + additionals=[], + ) offset += Message.HEADER.size cached_names = domain_names.cache() diff --git a/setup.cfg b/setup.cfg index 6ce884af0..d433c5746 100644 --- a/setup.cfg +++ b/setup.cfg @@ -53,7 +53,6 @@ exclude = mitmproxy/contentviews/base.py mitmproxy/contentviews/grpc.py mitmproxy/ctx.py - mitmproxy/dns.py mitmproxy/exceptions.py mitmproxy/flow.py mitmproxy/io/io.py diff --git a/test/mitmproxy/test_dns.py b/test/mitmproxy/test_dns.py index 401e16b43..d00261b96 100644 --- a/test/mitmproxy/test_dns.py +++ b/test/mitmproxy/test_dns.py @@ -35,6 +35,15 @@ class TestResourceRecord: class TestMessage: + def test_json(self): + resp = tutils.tdnsresp() + json = resp.to_json() + assert json["id"] == resp.id + assert len(json["questions"]) == len(resp.questions) + assert json["questions"][0]["name"] == resp.questions[0].name + assert len(json["answers"]) == len(resp.answers) + assert json["answers"][0]["data"] == str(resp.answers[0]) + def test_responses(self): req = tutils.tdnsreq() resp = tutils.tdnsresp() @@ -65,6 +74,8 @@ class TestMessage: test("id", 0, 2 ** 16 - 1) test("reserved", 0, 7) + test("op_code", 0, 0b1111) + test("response_code", 0, 0b1111) def test_packing(self): def assert_eq(m: dns.Message, b: bytes) -> None: @@ -80,6 +91,27 @@ class TestMessage: b'\x00\x2a\x81\x80\x00\x01\x00\x02\x00\x00\x00\x00\x03dns\x06google\x00\x00\x01\x00\x01' + b'\xc0\x0c\x00\x01\x00\x01\x00\x00\x00 \x00\x04\x08\x08\x08\x08\xc0\x0c\x00\x01\x00\x01\x00\x00\x00 \x00\x04\x08\x08\x04\x04' )) + with pytest.raises(struct.error): # question error + dns.Message.unpack(b'\x00\x2a\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x03dns\x06goo') + with pytest.raises(struct.error): # rr length error + dns.Message.unpack( + b'\x00\x2a\x81\x80\x00\x01\x00\x02\x00\x00\x00\x00\x03dns\x06google\x00\x00\x01\x00\x01' + + b'\xc0\x0c\x00\x01\x00\x01\x00\x00\x00 \x00\x04\x08\x08\x08\x08\xc0\x0c\x00\x01\x00\x01\x00\x00\x00 \x00\x04\x08\x08\x04' + ) + txt = dns.Message.unpack( + b'V\x1a\x81\x80\x00\x01\x00\x01\x00\x01\x00\x01\x05alive\x06github\x03com\x00\x00' + + b'\x10\x00\x01\xc0\x0c\x00\x05\x00\x01\x00\x00\x0b\xc6\x00\x07\x04live\xc0\x12\xc0\x12\x00\x06\x00\x01' + + b'\x00\x00\x03\x84\x00H\x07ns-1707\tawsdns-21\x02co\x02uk\x00\x11awsdns-hostmaster\x06amazon\xc0\x19\x00' + + b'\x00\x00\x01\x00\x00\x1c \x00\x00\x03\x84\x00\x12u\x00\x00\x01Q\x80\x00\x00)\x02\x00\x00\x00\x00\x00\x00\x00' + ) + assert txt.answers[0].domain_name == "live.github.com" + invalid_rr_domain_name = dns.Message.unpack( + b'V\x1a\x81\x80\x00\x01\x00\x01\x00\x01\x00\x01\x05alive\x06github\x03com\x00\x00' + + b'\x10\x00\x01\xc0\x0c\x00\x05\x00\x01\x00\x00\x0b\xc6\x00\x07\x99live\xc0\x12\xc0\x12\x00\x06\x00\x01' + + b'\x00\x00\x03\x84\x00H\x07ns-1707\tawsdns-21\x02co\x02uk\x00\x11awsdns-hostmaster\x06amazon\xc0\x19\x00' + + b'\x00\x00\x01\x00\x00\x1c \x00\x00\x03\x84\x00\x12u\x00\x00\x01Q\x80\x00\x00)\x02\x00\x00\x00\x00\x00\x00\x00' + ) + assert invalid_rr_domain_name.answers[0].data == b'\x99live\xc0\x12' req = tutils.tdnsreq() for flag in "authoritative_answer", "truncation", "recursion_desired", "recursion_available":