[dns] 100% coverage
This commit is contained in:
parent
99bcfb7f55
commit
0553b73d36
|
@ -276,25 +276,22 @@ class Message(stateobject.StateObject):
|
|||
def unpack_from(cls, buffer: Union[bytes, bytearray], offset: int) -> Tuple[int, Message]:
|
||||
"""Converts the buffer from a given offset into a DNS message and also returns its length."""
|
||||
id, flags, len_questions, len_answers, len_authorities, len_additionals = Message.HEADER.unpack_from(buffer, offset)
|
||||
try:
|
||||
msg = Message(
|
||||
timestamp=time.time(),
|
||||
id=id,
|
||||
query=(flags & (1 << 15)) == 0,
|
||||
op_code = (flags >> 11) & 0b1111,
|
||||
authoritative_answer=(flags & (1 << 10)) != 0,
|
||||
truncation = (flags & (1 << 9)) != 0,
|
||||
recursion_desired = (flags & (1 << 8)) != 0,
|
||||
recursion_available = (flags & (1 << 7)) != 0,
|
||||
reserved = (flags >> 4) & 0b111,
|
||||
response_code = flags & 0b1111,
|
||||
questions=[],
|
||||
answers=[],
|
||||
authorities=[],
|
||||
additionals=[],
|
||||
)
|
||||
except ValueError as e:
|
||||
raise struct.error(str(e))
|
||||
msg = Message(
|
||||
timestamp=time.time(),
|
||||
id=id,
|
||||
query=(flags & (1 << 15)) == 0,
|
||||
op_code=(flags >> 11) & 0b1111,
|
||||
authoritative_answer=(flags & (1 << 10)) != 0,
|
||||
truncation=(flags & (1 << 9)) != 0,
|
||||
recursion_desired=(flags & (1 << 8)) != 0,
|
||||
recursion_available=(flags & (1 << 7)) != 0,
|
||||
reserved=(flags >> 4) & 0b111,
|
||||
response_code=flags & 0b1111,
|
||||
questions=[],
|
||||
answers=[],
|
||||
authorities=[],
|
||||
additionals=[],
|
||||
)
|
||||
offset += Message.HEADER.size
|
||||
cached_names = domain_names.cache()
|
||||
|
||||
|
|
|
@ -53,7 +53,6 @@ exclude =
|
|||
mitmproxy/contentviews/base.py
|
||||
mitmproxy/contentviews/grpc.py
|
||||
mitmproxy/ctx.py
|
||||
mitmproxy/dns.py
|
||||
mitmproxy/exceptions.py
|
||||
mitmproxy/flow.py
|
||||
mitmproxy/io/io.py
|
||||
|
|
|
@ -35,6 +35,15 @@ class TestResourceRecord:
|
|||
|
||||
class TestMessage:
|
||||
|
||||
def test_json(self):
|
||||
resp = tutils.tdnsresp()
|
||||
json = resp.to_json()
|
||||
assert json["id"] == resp.id
|
||||
assert len(json["questions"]) == len(resp.questions)
|
||||
assert json["questions"][0]["name"] == resp.questions[0].name
|
||||
assert len(json["answers"]) == len(resp.answers)
|
||||
assert json["answers"][0]["data"] == str(resp.answers[0])
|
||||
|
||||
def test_responses(self):
|
||||
req = tutils.tdnsreq()
|
||||
resp = tutils.tdnsresp()
|
||||
|
@ -65,6 +74,8 @@ class TestMessage:
|
|||
|
||||
test("id", 0, 2 ** 16 - 1)
|
||||
test("reserved", 0, 7)
|
||||
test("op_code", 0, 0b1111)
|
||||
test("response_code", 0, 0b1111)
|
||||
|
||||
def test_packing(self):
|
||||
def assert_eq(m: dns.Message, b: bytes) -> None:
|
||||
|
@ -80,6 +91,27 @@ class TestMessage:
|
|||
b'\x00\x2a\x81\x80\x00\x01\x00\x02\x00\x00\x00\x00\x03dns\x06google\x00\x00\x01\x00\x01' +
|
||||
b'\xc0\x0c\x00\x01\x00\x01\x00\x00\x00 \x00\x04\x08\x08\x08\x08\xc0\x0c\x00\x01\x00\x01\x00\x00\x00 \x00\x04\x08\x08\x04\x04'
|
||||
))
|
||||
with pytest.raises(struct.error): # question error
|
||||
dns.Message.unpack(b'\x00\x2a\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x03dns\x06goo')
|
||||
with pytest.raises(struct.error): # rr length error
|
||||
dns.Message.unpack(
|
||||
b'\x00\x2a\x81\x80\x00\x01\x00\x02\x00\x00\x00\x00\x03dns\x06google\x00\x00\x01\x00\x01' +
|
||||
b'\xc0\x0c\x00\x01\x00\x01\x00\x00\x00 \x00\x04\x08\x08\x08\x08\xc0\x0c\x00\x01\x00\x01\x00\x00\x00 \x00\x04\x08\x08\x04'
|
||||
)
|
||||
txt = dns.Message.unpack(
|
||||
b'V\x1a\x81\x80\x00\x01\x00\x01\x00\x01\x00\x01\x05alive\x06github\x03com\x00\x00' +
|
||||
b'\x10\x00\x01\xc0\x0c\x00\x05\x00\x01\x00\x00\x0b\xc6\x00\x07\x04live\xc0\x12\xc0\x12\x00\x06\x00\x01' +
|
||||
b'\x00\x00\x03\x84\x00H\x07ns-1707\tawsdns-21\x02co\x02uk\x00\x11awsdns-hostmaster\x06amazon\xc0\x19\x00' +
|
||||
b'\x00\x00\x01\x00\x00\x1c \x00\x00\x03\x84\x00\x12u\x00\x00\x01Q\x80\x00\x00)\x02\x00\x00\x00\x00\x00\x00\x00'
|
||||
)
|
||||
assert txt.answers[0].domain_name == "live.github.com"
|
||||
invalid_rr_domain_name = dns.Message.unpack(
|
||||
b'V\x1a\x81\x80\x00\x01\x00\x01\x00\x01\x00\x01\x05alive\x06github\x03com\x00\x00' +
|
||||
b'\x10\x00\x01\xc0\x0c\x00\x05\x00\x01\x00\x00\x0b\xc6\x00\x07\x99live\xc0\x12\xc0\x12\x00\x06\x00\x01' +
|
||||
b'\x00\x00\x03\x84\x00H\x07ns-1707\tawsdns-21\x02co\x02uk\x00\x11awsdns-hostmaster\x06amazon\xc0\x19\x00' +
|
||||
b'\x00\x00\x01\x00\x00\x1c \x00\x00\x03\x84\x00\x12u\x00\x00\x01Q\x80\x00\x00)\x02\x00\x00\x00\x00\x00\x00\x00'
|
||||
)
|
||||
assert invalid_rr_domain_name.answers[0].data == b'\x99live\xc0\x12'
|
||||
|
||||
req = tutils.tdnsreq()
|
||||
for flag in "authoritative_answer", "truncation", "recursion_desired", "recursion_available":
|
||||
|
|
Loading…
Reference in New Issue