diff --git a/CHANGELOG.md b/CHANGELOG.md index 479d65bb4..8af4f8f7c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,8 @@ ([#5590](https://github.com/mitmproxy/mitmproxy/pull/5590), @mhils) * Deprecate `mitmproxy.ctx.log` in favor of Python's builtin `logging` module. ([#5590](https://github.com/mitmproxy/mitmproxy/pull/5590), @mhils) +* Add MQTT content view. + ([#5588](https://github.com/mitmproxy/mitmproxy/pull/5588), @nikitastupin, @abbbe) ## 28 June 2022: mitmproxy 8.1.1 diff --git a/mitmproxy/contentviews/__init__.py b/mitmproxy/contentviews/__init__.py index f1f85363f..66dc3a98e 100644 --- a/mitmproxy/contentviews/__init__.py +++ b/mitmproxy/contentviews/__init__.py @@ -35,6 +35,7 @@ from . import ( msgpack, graphql, grpc, + mqtt, ) try: @@ -254,6 +255,7 @@ add(query.ViewQuery()) add(protobuf.ViewProtobuf()) add(msgpack.ViewMsgPack()) add(grpc.ViewGrpcProtobuf()) +add(mqtt.ViewMQTT()) if http3 is not None: add(http3.ViewHttp3()) diff --git a/mitmproxy/contentviews/mqtt.py b/mitmproxy/contentviews/mqtt.py new file mode 100644 index 000000000..c344fed20 --- /dev/null +++ b/mitmproxy/contentviews/mqtt.py @@ -0,0 +1,273 @@ +from typing import Optional + +from mitmproxy.contentviews import base +from mitmproxy.utils import strutils + +import struct + +# from https://github.com/nikitastupin/mitmproxy-mqtt-script + + +class MQTTControlPacket: + # Packet types + ( + CONNECT, + CONNACK, + PUBLISH, + PUBACK, + PUBREC, + PUBREL, + PUBCOMP, + SUBSCRIBE, + SUBACK, + UNSUBSCRIBE, + UNSUBACK, + PINGREQ, + PINGRESP, + DISCONNECT, + ) = range(1, 15) + + # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_2.1_- + Names = [ + "reserved", + "CONNECT", + "CONNACK", + "PUBLISH", + "PUBACK", + "PUBREC", + "PUBREL", + "PUBCOMP", + "SUBSCRIBE", + "SUBACK", + "UNSUBSCRIBE", + "UNSUBACK", + "PINGREQ", + "PINGRESP", + "DISCONNECT", + "reserved", + ] + + PACKETS_WITH_IDENTIFIER = [ + PUBACK, + PUBREC, + PUBREL, + PUBCOMP, + SUBSCRIBE, + SUBACK, + UNSUBSCRIBE, + UNSUBACK, + ] + + def __init__(self, packet): + self._packet = packet + # Fixed header + # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718020 + self.packet_type = self._parse_packet_type() + self.packet_type_human = self.Names[self.packet_type] + self.dup, self.qos, self.retain = self._parse_flags() + self.remaining_length = self._parse_remaining_length() + # Variable header & Payload + # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718024 + # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718026 + if self.packet_type == self.CONNECT: + self._parse_connect_variable_headers() + self._parse_connect_payload() + elif self.packet_type == self.PUBLISH: + self._parse_publish_variable_headers() + self._parse_publish_payload() + elif self.packet_type == self.SUBSCRIBE: + self._parse_subscribe_variable_headers() + self._parse_subscribe_payload() + elif self.packet_type == self.SUBACK: + pass + elif self.packet_type == self.UNSUBSCRIBE: + pass + else: + self.payload = None + + def pprint(self): + s = f"[{self.Names[self.packet_type]}]" + + if self.packet_type == self.CONNECT: + s += f""" + +Client Id: {self.payload['ClientId']} +Will Topic: {self.payload.get('WillTopic')} +Will Message: {strutils.bytes_to_escaped_str(self.payload.get('WillMessage', b'None'))} +User Name: {self.payload.get('UserName')} +Password: {strutils.bytes_to_escaped_str(self.payload.get('Password', b'None'))} +""" + elif self.packet_type == self.SUBSCRIBE: + s += " sent topic filters: " + s += ", ".join([f"'{tf}'" for tf in self.topic_filters]) + elif self.packet_type == self.PUBLISH: + topic_name = strutils.bytes_to_escaped_str(self.topic_name) + payload = strutils.bytes_to_escaped_str(self.payload) + + s += f" '{payload}' to topic '{topic_name}'" + elif self.packet_type in [self.PINGREQ, self.PINGRESP]: + pass + else: + s = f"Packet type {self.Names[self.packet_type]} is not supported yet!" + + return s + + def _parse_length_prefixed_bytes(self, offset): + field_length_bytes = self._packet[offset : offset + 2] + field_length = struct.unpack("!H", field_length_bytes)[0] + + field_content_bytes = self._packet[offset + 2 : offset + 2 + field_length] + + return field_length + 2, field_content_bytes + + def _parse_publish_variable_headers(self): + offset = len(self._packet) - self.remaining_length + + field_length, field_content_bytes = self._parse_length_prefixed_bytes(offset) + self.topic_name = field_content_bytes + + if self.qos in [0x01, 0x02]: + offset += field_length + self.packet_identifier = self._packet[offset : offset + 2] + + def _parse_publish_payload(self): + fixed_header_length = len(self._packet) - self.remaining_length + variable_header_length = 2 + len(self.topic_name) + + if self.qos in [0x01, 0x02]: + variable_header_length += 2 + + offset = fixed_header_length + variable_header_length + + self.payload = self._packet[offset:] + + def _parse_subscribe_variable_headers(self): + self._parse_packet_identifier() + + def _parse_subscribe_payload(self): + offset = len(self._packet) - self.remaining_length + 2 + + self.topic_filters = {} + + while len(self._packet) - offset > 0: + field_length, topic_filter_bytes = self._parse_length_prefixed_bytes(offset) + offset += field_length + + qos = self._packet[offset : offset + 1] + offset += 1 + + topic_filter = topic_filter_bytes.decode("utf-8") + self.topic_filters[topic_filter] = {"qos": qos} + + # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718030 + def _parse_connect_variable_headers(self): + offset = len(self._packet) - self.remaining_length + + self.variable_headers = {} + self.connect_flags = {} + + self.variable_headers["ProtocolName"] = self._packet[offset : offset + 6] + self.variable_headers["ProtocolLevel"] = self._packet[offset + 6 : offset + 7] + self.variable_headers["ConnectFlags"] = self._packet[offset + 7 : offset + 8] + self.variable_headers["KeepAlive"] = self._packet[offset + 8 : offset + 10] + # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349229 + self.connect_flags["CleanSession"] = bool( + self.variable_headers["ConnectFlags"][0] & 0x02 + ) + self.connect_flags["Will"] = bool( + self.variable_headers["ConnectFlags"][0] & 0x04 + ) + self.will_qos = (self.variable_headers["ConnectFlags"][0] >> 3) & 0x03 + self.connect_flags["WillRetain"] = bool( + self.variable_headers["ConnectFlags"][0] & 0x20 + ) + self.connect_flags["Password"] = bool( + self.variable_headers["ConnectFlags"][0] & 0x40 + ) + self.connect_flags["UserName"] = bool( + self.variable_headers["ConnectFlags"][0] & 0x80 + ) + + # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718031 + def _parse_connect_payload(self): + fields = [] + offset = len(self._packet) - self.remaining_length + 10 + + while len(self._packet) - offset > 0: + field_length, field_content = self._parse_length_prefixed_bytes(offset) + fields.append(field_content) + offset += field_length + + self.payload = {} + + for f in fields: + # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349242 + if "ClientId" not in self.payload: + self.payload["ClientId"] = f.decode("utf-8") + # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349243 + elif self.connect_flags["Will"] and "WillTopic" not in self.payload: + self.payload["WillTopic"] = f.decode("utf-8") + elif self.connect_flags["Will"] and "WillMessage" not in self.payload: + self.payload["WillMessage"] = f + elif self.connect_flags["UserName"] and "UserName" not in self.payload: # pragma: no cover + self.payload["UserName"] = f.decode("utf-8") + elif self.connect_flags["Password"] and "Password" not in self.payload: # pragma: no cover + self.payload["Password"] = f + else: + raise AssertionError(f"Unknown field in CONNECT payload: {f}") + + def _parse_packet_type(self): + return self._packet[0] >> 4 + + # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718022 + def _parse_flags(self): + dup = None + qos = None + retain = None + + if self.packet_type == self.PUBLISH: + dup = (self._packet[0] >> 3) & 0x01 + qos = (self._packet[0] >> 1) & 0x03 + retain = self._packet[0] & 0x01 + + return dup, qos, retain + + # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_2.4_Size + def _parse_remaining_length(self): + multiplier = 1 + value = 0 + i = 1 + + while True: + encodedByte = self._packet[i] + value += (encodedByte & 127) * multiplier + multiplier *= 128 + + if multiplier > 128 * 128 * 128: + raise ValueError("Malformed Remaining Length") + + if encodedByte & 128 == 0: + break + + i += 1 + + return value + + # http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_2.5_- + def _parse_packet_identifier(self): + offset = len(self._packet) - self.remaining_length + self.packet_identifier = self._packet[offset : offset + 2] + + +class ViewMQTT(base.View): + name = "MQTT" + + def __call__(self, data, **metadata): + mqtt_packet = MQTTControlPacket(data) + text = mqtt_packet.pprint() + return "MQTT", base.format_text(text) + + def render_priority( + self, data: bytes, *, content_type: Optional[str] = None, **metadata + ) -> float: + return 0 diff --git a/test/mitmproxy/contentviews/test_mqtt.py b/test/mitmproxy/contentviews/test_mqtt.py new file mode 100644 index 000000000..7acc33541 --- /dev/null +++ b/test/mitmproxy/contentviews/test_mqtt.py @@ -0,0 +1,67 @@ +import pytest + +from mitmproxy.contentviews import mqtt +from . import full_eval + + +@pytest.mark.parametrize( + "data,expected_text", + [ + pytest.param(b"\xC0\x00", "[PINGREQ]", id="PINGREQ"), + pytest.param(b"\xD0\x00", "[PINGRESP]", id="PINGRESP"), + pytest.param(b"\x90\x00", "Packet type SUBACK is not supported yet!", id="SUBACK"), + pytest.param(b"\xA0\x00", "Packet type UNSUBSCRIBE is not supported yet!", id="UNSUBSCRIBE"), + pytest.param( + b"\x82\x31\x00\x03\x00\x2cxxxx/yy/zzzzzz/56:6F:5E:6A:01:05/messages/in\x01", + "[SUBSCRIBE] sent topic filters: 'xxxx/yy/zzzzzz/56:6F:5E:6A:01:05/messages/in'", + id="SUBSCRIBE", + ), + pytest.param( + b"""\x32\x9a\x01\x00\x2dxxxx/yy/zzzzzz/56:6F:5E:6A:01:05/messages/out\x00\x04""" + b"""{"body":{"parameters":null},"header":{"from":"56:6F:5E:6A:01:05","messageId":"connected","type":"event"}}""", + """[PUBLISH] '{"body":{"parameters":null},"header":{"from":"56:6F:5E:6A:01:05",""" + """"messageId":"connected","type":"event"}}' to topic 'xxxx/yy/zzzzzz/56:6F:5E:6A:01:05/messages/out'""", + id="PUBLISH", + ), + pytest.param( + b"""\x10\xba\x01\x00\x04MQTT\x04\x06\x00\x1e\x00\x1156:6F:5E:6A:01:05\x00-""" + b"""xxxx/yy/zzzzzz/56:6F:5E:6A:01:05/messages/out""" + b"""\x00l{"body":{"parameters":null},"header":{"from":"56:6F:5E:6A:01:05","messageId":"disconnected","type":"event"}}""", + [ + "[CONNECT]", + "", + "Client Id: 56:6F:5E:6A:01:05", + "Will Topic: xxxx/yy/zzzzzz/56:6F:5E:6A:01:05/messages/out", + """Will Message: {"body":{"parameters":null},"header":{"from":"56:6F:5E:6A:01:05",""" + """"messageId":"disconnected","type":"event"}}""", + "User Name: None", + "Password: None", + ], + id="CONNECT", + ), + ], +) +def test_view_mqtt(data, expected_text): + """testing helper for single line messages""" + v = full_eval(mqtt.ViewMQTT()) + content_type, output = v(data) + assert content_type == "MQTT" + if isinstance(expected_text, list): + assert output == [[("text", text)] for text in expected_text] + else: + assert output == [[("text", expected_text)]] + + +@pytest.mark.parametrize( + "data", + [b"\xC0\xFF\xFF\xFF\xFF"] +) +def test_mqtt_malformed(data): + v = full_eval(mqtt.ViewMQTT()) + with pytest.raises(Exception): + v(data) + + +def test_render_priority(): + # missing: good MQTT heuristics. + assert mqtt.ViewMQTT().render_priority(b"") == 0