Fix #4472: Add ability to parse multiple DoIP packets in one TCP pkt (#4515)

* Fix #4472: Add ability to parse multiple DoIP packets in one TCP packet via TCPSession

* update

* When not in app mode, tcp_reassemble sub-packets

---------

Co-authored-by: gpotter2 <10530980+gpotter2@users.noreply.github.com>
This commit is contained in:
Nils Weiss 2024-09-07 22:46:59 +02:00 committed by GitHub
parent 867f92a37d
commit 45c216f135
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 35 additions and 8 deletions

View File

@ -257,15 +257,15 @@ class DoIP(Packet):
def extract_padding(self, s):
# type: (bytes) -> Tuple[bytes, Optional[bytes]]
if self.payload_type == 0x8001:
return s[:self.payload_length - 4], None
return s[:self.payload_length - 4], s[self.payload_length - 4:]
else:
return b"", None
return b"", s
@classmethod
def tcp_reassemble(cls, data, metadata, session):
# type: (bytes, Dict[str, Any], Dict[str, Any]) -> Optional[Packet]
length = struct.unpack("!I", data[4:8])[0] + 8
if len(data) == length:
if len(data) >= length:
return DoIP(data)
return None

View File

@ -367,11 +367,22 @@ class TCPSession(IPSession):
metadata.clear()
# Check for padding
padding = self._strip_padding(packet)
if padding:
while padding:
# There is remaining data for the next payload.
full_length = data.content_len - len(padding)
metadata["relative_seq"] = relative_seq + full_length
data.shiftleft(full_length)
# There might be a sub-payload hidden in the padding
sub_packet = tcp_reassemble(
bytes(data),
metadata,
tcp_session
)
if sub_packet:
packet /= sub_packet
padding = self._strip_padding(sub_packet)
else:
break
else:
# No padding (data) left. Clear
data.clear()
@ -397,10 +408,15 @@ class TCPSession(IPSession):
"""
pkt = sock.recv(stop_dissection_after=self.stop_dissection_after)
# Now handle TCP reassembly
while pkt is not None:
pkt = self.process(pkt)
if self.app:
while pkt is not None:
pkt = self.process(pkt)
if pkt:
yield pkt
# keep calling process as there might be more
pkt = b"" # type: ignore
else:
pkt = self.process(pkt) # type: ignore
if pkt:
yield pkt
# keep calling process as there might be more
pkt = b"" # type: ignore
return None

View File

@ -394,6 +394,17 @@ pkts = sniff(offline=tmp_file, session=TCPSession)
assert pkts[0].haslayer(UDS_TP)
assert pkts[0].service == 0x3E
= TCPSession Test multiple DoIP messages
filename = scapy_path("/test/pcaps/multiple_doip_layers.pcap.gz")
pkts = sniff(offline=filename, session=TCPSession)
print(repr(pkts[0]))
print(repr(pkts[1]))
assert len(pkts) == 2
assert pkts[0][DoIP].payload_length == 2
assert pkts[0][DoIP:2].payload_length == 7
assert pkts[1][DoIP].payload_length == 103
+ DoIP Communication tests

Binary file not shown.