add MQTT contentview
This commit is contained in:
parent
a6ede600f3
commit
12e2aecdf9
|
@ -35,6 +35,8 @@
|
|||
([#5590](https://github.com/mitmproxy/mitmproxy/pull/5590), @mhils)
|
||||
* Deprecate `mitmproxy.ctx.log` in favor of Python's builtin `logging` module.
|
||||
([#5590](https://github.com/mitmproxy/mitmproxy/pull/5590), @mhils)
|
||||
* Add MQTT content view.
|
||||
([#5588](https://github.com/mitmproxy/mitmproxy/pull/5588), @nikitastupin, @abbbe)
|
||||
|
||||
## 28 June 2022: mitmproxy 8.1.1
|
||||
|
||||
|
|
|
@ -35,6 +35,7 @@ from . import (
|
|||
msgpack,
|
||||
graphql,
|
||||
grpc,
|
||||
mqtt,
|
||||
)
|
||||
|
||||
try:
|
||||
|
@ -254,6 +255,7 @@ add(query.ViewQuery())
|
|||
add(protobuf.ViewProtobuf())
|
||||
add(msgpack.ViewMsgPack())
|
||||
add(grpc.ViewGrpcProtobuf())
|
||||
add(mqtt.ViewMQTT())
|
||||
if http3 is not None:
|
||||
add(http3.ViewHttp3())
|
||||
|
||||
|
|
|
@ -0,0 +1,273 @@
|
|||
from typing import Optional
|
||||
|
||||
from mitmproxy.contentviews import base
|
||||
from mitmproxy.utils import strutils
|
||||
|
||||
import struct
|
||||
|
||||
# from https://github.com/nikitastupin/mitmproxy-mqtt-script
|
||||
|
||||
|
||||
class MQTTControlPacket:
|
||||
# Packet types
|
||||
(
|
||||
CONNECT,
|
||||
CONNACK,
|
||||
PUBLISH,
|
||||
PUBACK,
|
||||
PUBREC,
|
||||
PUBREL,
|
||||
PUBCOMP,
|
||||
SUBSCRIBE,
|
||||
SUBACK,
|
||||
UNSUBSCRIBE,
|
||||
UNSUBACK,
|
||||
PINGREQ,
|
||||
PINGRESP,
|
||||
DISCONNECT,
|
||||
) = range(1, 15)
|
||||
|
||||
# http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_2.1_-
|
||||
Names = [
|
||||
"reserved",
|
||||
"CONNECT",
|
||||
"CONNACK",
|
||||
"PUBLISH",
|
||||
"PUBACK",
|
||||
"PUBREC",
|
||||
"PUBREL",
|
||||
"PUBCOMP",
|
||||
"SUBSCRIBE",
|
||||
"SUBACK",
|
||||
"UNSUBSCRIBE",
|
||||
"UNSUBACK",
|
||||
"PINGREQ",
|
||||
"PINGRESP",
|
||||
"DISCONNECT",
|
||||
"reserved",
|
||||
]
|
||||
|
||||
PACKETS_WITH_IDENTIFIER = [
|
||||
PUBACK,
|
||||
PUBREC,
|
||||
PUBREL,
|
||||
PUBCOMP,
|
||||
SUBSCRIBE,
|
||||
SUBACK,
|
||||
UNSUBSCRIBE,
|
||||
UNSUBACK,
|
||||
]
|
||||
|
||||
def __init__(self, packet):
|
||||
self._packet = packet
|
||||
# Fixed header
|
||||
# http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718020
|
||||
self.packet_type = self._parse_packet_type()
|
||||
self.packet_type_human = self.Names[self.packet_type]
|
||||
self.dup, self.qos, self.retain = self._parse_flags()
|
||||
self.remaining_length = self._parse_remaining_length()
|
||||
# Variable header & Payload
|
||||
# http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718024
|
||||
# http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718026
|
||||
if self.packet_type == self.CONNECT:
|
||||
self._parse_connect_variable_headers()
|
||||
self._parse_connect_payload()
|
||||
elif self.packet_type == self.PUBLISH:
|
||||
self._parse_publish_variable_headers()
|
||||
self._parse_publish_payload()
|
||||
elif self.packet_type == self.SUBSCRIBE:
|
||||
self._parse_subscribe_variable_headers()
|
||||
self._parse_subscribe_payload()
|
||||
elif self.packet_type == self.SUBACK:
|
||||
pass
|
||||
elif self.packet_type == self.UNSUBSCRIBE:
|
||||
pass
|
||||
else:
|
||||
self.payload = None
|
||||
|
||||
def pprint(self):
|
||||
s = f"[{self.Names[self.packet_type]}]"
|
||||
|
||||
if self.packet_type == self.CONNECT:
|
||||
s += f"""
|
||||
|
||||
Client Id: {self.payload['ClientId']}
|
||||
Will Topic: {self.payload.get('WillTopic')}
|
||||
Will Message: {strutils.bytes_to_escaped_str(self.payload.get('WillMessage', b'None'))}
|
||||
User Name: {self.payload.get('UserName')}
|
||||
Password: {strutils.bytes_to_escaped_str(self.payload.get('Password', b'None'))}
|
||||
"""
|
||||
elif self.packet_type == self.SUBSCRIBE:
|
||||
s += " sent topic filters: "
|
||||
s += ", ".join([f"'{tf}'" for tf in self.topic_filters])
|
||||
elif self.packet_type == self.PUBLISH:
|
||||
topic_name = strutils.bytes_to_escaped_str(self.topic_name)
|
||||
payload = strutils.bytes_to_escaped_str(self.payload)
|
||||
|
||||
s += f" '{payload}' to topic '{topic_name}'"
|
||||
elif self.packet_type in [self.PINGREQ, self.PINGRESP]:
|
||||
pass
|
||||
else:
|
||||
s = f"Packet type {self.Names[self.packet_type]} is not supported yet!"
|
||||
|
||||
return s
|
||||
|
||||
def _parse_length_prefixed_bytes(self, offset):
|
||||
field_length_bytes = self._packet[offset : offset + 2]
|
||||
field_length = struct.unpack("!H", field_length_bytes)[0]
|
||||
|
||||
field_content_bytes = self._packet[offset + 2 : offset + 2 + field_length]
|
||||
|
||||
return field_length + 2, field_content_bytes
|
||||
|
||||
def _parse_publish_variable_headers(self):
|
||||
offset = len(self._packet) - self.remaining_length
|
||||
|
||||
field_length, field_content_bytes = self._parse_length_prefixed_bytes(offset)
|
||||
self.topic_name = field_content_bytes
|
||||
|
||||
if self.qos in [0x01, 0x02]:
|
||||
offset += field_length
|
||||
self.packet_identifier = self._packet[offset : offset + 2]
|
||||
|
||||
def _parse_publish_payload(self):
|
||||
fixed_header_length = len(self._packet) - self.remaining_length
|
||||
variable_header_length = 2 + len(self.topic_name)
|
||||
|
||||
if self.qos in [0x01, 0x02]:
|
||||
variable_header_length += 2
|
||||
|
||||
offset = fixed_header_length + variable_header_length
|
||||
|
||||
self.payload = self._packet[offset:]
|
||||
|
||||
def _parse_subscribe_variable_headers(self):
|
||||
self._parse_packet_identifier()
|
||||
|
||||
def _parse_subscribe_payload(self):
|
||||
offset = len(self._packet) - self.remaining_length + 2
|
||||
|
||||
self.topic_filters = {}
|
||||
|
||||
while len(self._packet) - offset > 0:
|
||||
field_length, topic_filter_bytes = self._parse_length_prefixed_bytes(offset)
|
||||
offset += field_length
|
||||
|
||||
qos = self._packet[offset : offset + 1]
|
||||
offset += 1
|
||||
|
||||
topic_filter = topic_filter_bytes.decode("utf-8")
|
||||
self.topic_filters[topic_filter] = {"qos": qos}
|
||||
|
||||
# http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718030
|
||||
def _parse_connect_variable_headers(self):
|
||||
offset = len(self._packet) - self.remaining_length
|
||||
|
||||
self.variable_headers = {}
|
||||
self.connect_flags = {}
|
||||
|
||||
self.variable_headers["ProtocolName"] = self._packet[offset : offset + 6]
|
||||
self.variable_headers["ProtocolLevel"] = self._packet[offset + 6 : offset + 7]
|
||||
self.variable_headers["ConnectFlags"] = self._packet[offset + 7 : offset + 8]
|
||||
self.variable_headers["KeepAlive"] = self._packet[offset + 8 : offset + 10]
|
||||
# http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349229
|
||||
self.connect_flags["CleanSession"] = bool(
|
||||
self.variable_headers["ConnectFlags"][0] & 0x02
|
||||
)
|
||||
self.connect_flags["Will"] = bool(
|
||||
self.variable_headers["ConnectFlags"][0] & 0x04
|
||||
)
|
||||
self.will_qos = (self.variable_headers["ConnectFlags"][0] >> 3) & 0x03
|
||||
self.connect_flags["WillRetain"] = bool(
|
||||
self.variable_headers["ConnectFlags"][0] & 0x20
|
||||
)
|
||||
self.connect_flags["Password"] = bool(
|
||||
self.variable_headers["ConnectFlags"][0] & 0x40
|
||||
)
|
||||
self.connect_flags["UserName"] = bool(
|
||||
self.variable_headers["ConnectFlags"][0] & 0x80
|
||||
)
|
||||
|
||||
# http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718031
|
||||
def _parse_connect_payload(self):
|
||||
fields = []
|
||||
offset = len(self._packet) - self.remaining_length + 10
|
||||
|
||||
while len(self._packet) - offset > 0:
|
||||
field_length, field_content = self._parse_length_prefixed_bytes(offset)
|
||||
fields.append(field_content)
|
||||
offset += field_length
|
||||
|
||||
self.payload = {}
|
||||
|
||||
for f in fields:
|
||||
# http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349242
|
||||
if "ClientId" not in self.payload:
|
||||
self.payload["ClientId"] = f.decode("utf-8")
|
||||
# http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349243
|
||||
elif self.connect_flags["Will"] and "WillTopic" not in self.payload:
|
||||
self.payload["WillTopic"] = f.decode("utf-8")
|
||||
elif self.connect_flags["Will"] and "WillMessage" not in self.payload:
|
||||
self.payload["WillMessage"] = f
|
||||
elif self.connect_flags["UserName"] and "UserName" not in self.payload: # pragma: no cover
|
||||
self.payload["UserName"] = f.decode("utf-8")
|
||||
elif self.connect_flags["Password"] and "Password" not in self.payload: # pragma: no cover
|
||||
self.payload["Password"] = f
|
||||
else:
|
||||
raise AssertionError(f"Unknown field in CONNECT payload: {f}")
|
||||
|
||||
def _parse_packet_type(self):
|
||||
return self._packet[0] >> 4
|
||||
|
||||
# http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718022
|
||||
def _parse_flags(self):
|
||||
dup = None
|
||||
qos = None
|
||||
retain = None
|
||||
|
||||
if self.packet_type == self.PUBLISH:
|
||||
dup = (self._packet[0] >> 3) & 0x01
|
||||
qos = (self._packet[0] >> 1) & 0x03
|
||||
retain = self._packet[0] & 0x01
|
||||
|
||||
return dup, qos, retain
|
||||
|
||||
# http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_2.4_Size
|
||||
def _parse_remaining_length(self):
|
||||
multiplier = 1
|
||||
value = 0
|
||||
i = 1
|
||||
|
||||
while True:
|
||||
encodedByte = self._packet[i]
|
||||
value += (encodedByte & 127) * multiplier
|
||||
multiplier *= 128
|
||||
|
||||
if multiplier > 128 * 128 * 128:
|
||||
raise ValueError("Malformed Remaining Length")
|
||||
|
||||
if encodedByte & 128 == 0:
|
||||
break
|
||||
|
||||
i += 1
|
||||
|
||||
return value
|
||||
|
||||
# http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_2.5_-
|
||||
def _parse_packet_identifier(self):
|
||||
offset = len(self._packet) - self.remaining_length
|
||||
self.packet_identifier = self._packet[offset : offset + 2]
|
||||
|
||||
|
||||
class ViewMQTT(base.View):
|
||||
name = "MQTT"
|
||||
|
||||
def __call__(self, data, **metadata):
|
||||
mqtt_packet = MQTTControlPacket(data)
|
||||
text = mqtt_packet.pprint()
|
||||
return "MQTT", base.format_text(text)
|
||||
|
||||
def render_priority(
|
||||
self, data: bytes, *, content_type: Optional[str] = None, **metadata
|
||||
) -> float:
|
||||
return 0
|
|
@ -0,0 +1,67 @@
|
|||
import pytest
|
||||
|
||||
from mitmproxy.contentviews import mqtt
|
||||
from . import full_eval
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"data,expected_text",
|
||||
[
|
||||
pytest.param(b"\xC0\x00", "[PINGREQ]", id="PINGREQ"),
|
||||
pytest.param(b"\xD0\x00", "[PINGRESP]", id="PINGRESP"),
|
||||
pytest.param(b"\x90\x00", "Packet type SUBACK is not supported yet!", id="SUBACK"),
|
||||
pytest.param(b"\xA0\x00", "Packet type UNSUBSCRIBE is not supported yet!", id="UNSUBSCRIBE"),
|
||||
pytest.param(
|
||||
b"\x82\x31\x00\x03\x00\x2cxxxx/yy/zzzzzz/56:6F:5E:6A:01:05/messages/in\x01",
|
||||
"[SUBSCRIBE] sent topic filters: 'xxxx/yy/zzzzzz/56:6F:5E:6A:01:05/messages/in'",
|
||||
id="SUBSCRIBE",
|
||||
),
|
||||
pytest.param(
|
||||
b"""\x32\x9a\x01\x00\x2dxxxx/yy/zzzzzz/56:6F:5E:6A:01:05/messages/out\x00\x04"""
|
||||
b"""{"body":{"parameters":null},"header":{"from":"56:6F:5E:6A:01:05","messageId":"connected","type":"event"}}""",
|
||||
"""[PUBLISH] '{"body":{"parameters":null},"header":{"from":"56:6F:5E:6A:01:05","""
|
||||
""""messageId":"connected","type":"event"}}' to topic 'xxxx/yy/zzzzzz/56:6F:5E:6A:01:05/messages/out'""",
|
||||
id="PUBLISH",
|
||||
),
|
||||
pytest.param(
|
||||
b"""\x10\xba\x01\x00\x04MQTT\x04\x06\x00\x1e\x00\x1156:6F:5E:6A:01:05\x00-"""
|
||||
b"""xxxx/yy/zzzzzz/56:6F:5E:6A:01:05/messages/out"""
|
||||
b"""\x00l{"body":{"parameters":null},"header":{"from":"56:6F:5E:6A:01:05","messageId":"disconnected","type":"event"}}""",
|
||||
[
|
||||
"[CONNECT]",
|
||||
"",
|
||||
"Client Id: 56:6F:5E:6A:01:05",
|
||||
"Will Topic: xxxx/yy/zzzzzz/56:6F:5E:6A:01:05/messages/out",
|
||||
"""Will Message: {"body":{"parameters":null},"header":{"from":"56:6F:5E:6A:01:05","""
|
||||
""""messageId":"disconnected","type":"event"}}""",
|
||||
"User Name: None",
|
||||
"Password: None",
|
||||
],
|
||||
id="CONNECT",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_view_mqtt(data, expected_text):
|
||||
"""testing helper for single line messages"""
|
||||
v = full_eval(mqtt.ViewMQTT())
|
||||
content_type, output = v(data)
|
||||
assert content_type == "MQTT"
|
||||
if isinstance(expected_text, list):
|
||||
assert output == [[("text", text)] for text in expected_text]
|
||||
else:
|
||||
assert output == [[("text", expected_text)]]
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"data",
|
||||
[b"\xC0\xFF\xFF\xFF\xFF"]
|
||||
)
|
||||
def test_mqtt_malformed(data):
|
||||
v = full_eval(mqtt.ViewMQTT())
|
||||
with pytest.raises(Exception):
|
||||
v(data)
|
||||
|
||||
|
||||
def test_render_priority():
|
||||
# missing: good MQTT heuristics.
|
||||
assert mqtt.ViewMQTT().render_priority(b"") == 0
|
Loading…
Reference in New Issue