Fix #4635: Enhance ISO-TP Soft Socket Implementation (#4636)

This commit is contained in:
Nils Weiss 2025-02-04 12:44:28 +01:00 committed by GitHub
parent 3e00f42554
commit d2c6055429
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 61 additions and 48 deletions

View File

@ -6,19 +6,8 @@
# scapy.contrib.description = ISO-TP (ISO 15765-2) Packet Definitions
# scapy.contrib.status = library
import struct
import logging
from scapy.config import conf
from scapy.packet import Packet
from scapy.fields import BitField, FlagsField, StrLenField, \
ThreeBytesField, XBitField, ConditionalField, \
BitEnumField, ByteField, XByteField, BitFieldLenField, StrField, \
FieldLenField, IntField, ShortField
from scapy.compat import chb, orb
from scapy.layers.can import CAN, CAN_FD_MAX_DLEN as CAN_FD_MAX_DLEN
from scapy.error import Scapy_Exception
import struct
# Typing imports
from typing import (
Optional,
@ -29,6 +18,16 @@ from typing import (
cast,
)
from scapy.compat import chb, orb
from scapy.config import conf
from scapy.error import Scapy_Exception
from scapy.fields import BitField, FlagsField, StrLenField, \
ThreeBytesField, XBitField, ConditionalField, \
BitEnumField, ByteField, XByteField, BitFieldLenField, StrField, \
FieldLenField, IntField, ShortField
from scapy.layers.can import CAN, CAN_FD_MAX_DLEN as CAN_FD_MAX_DLEN, CANFD
from scapy.packet import Packet
log_isotp = logging.getLogger("scapy.contrib.isotp")
CAN_MAX_IDENTIFIER = (1 << 29) - 1 # Maximum 29-bit identifier
@ -103,6 +102,7 @@ class ISOTP(Packet):
"""
fd = kargs.pop("fd", False)
pkt_cls = CANFD if fd else CAN
def _get_data_len():
# type: () -> int
@ -122,10 +122,10 @@ class ISOTP(Packet):
frame_data = struct.pack('B', self.rx_ext_address) + frame_data
if self.rx_id is None or self.rx_id <= 0x7ff:
pkt = CAN(identifier=self.rx_id, data=frame_data)
pkt = pkt_cls(identifier=self.rx_id, data=frame_data)
else:
pkt = CAN(identifier=self.rx_id, flags="extended",
data=frame_data)
pkt = pkt_cls(identifier=self.rx_id, flags="extended",
data=frame_data)
return [pkt]
# Construct the first frame
@ -138,10 +138,10 @@ class ISOTP(Packet):
idx = _get_data_len() - len(frame_header)
frame_data = self.data[0:idx]
if self.rx_id is None or self.rx_id <= 0x7ff:
frame = CAN(identifier=self.rx_id, data=frame_header + frame_data)
frame = pkt_cls(identifier=self.rx_id, data=frame_header + frame_data)
else:
frame = CAN(identifier=self.rx_id, flags="extended",
data=frame_header + frame_data)
frame = pkt_cls(identifier=self.rx_id, flags="extended",
data=frame_header + frame_data)
# Construct consecutive frames
n = 1
@ -156,10 +156,10 @@ class ISOTP(Packet):
if self.rx_ext_address:
frame_header = struct.pack('B', self.rx_ext_address) + frame_header # noqa: E501
if self.rx_id is None or self.rx_id <= 0x7ff:
pkt = CAN(identifier=self.rx_id, data=frame_header + frame_data) # noqa: E501
pkt = pkt_cls(identifier=self.rx_id, data=frame_header + frame_data) # noqa: E501
else:
pkt = CAN(identifier=self.rx_id, flags="extended",
data=frame_header + frame_data)
pkt = pkt_cls(identifier=self.rx_id, flags="extended",
data=frame_header + frame_data)
pkts.append(pkt)
return cast(List[Packet], pkts)

View File

@ -4,29 +4,16 @@
# Copyright (C) Nils Weiss <nils@we155.de>
# Copyright (C) Enrico Pozzobon <enricopozzobon@gmail.com>
import heapq
# scapy.contrib.description = ISO-TP (ISO 15765-2) Soft Socket Library
# scapy.contrib.status = library
import logging
import socket
import struct
import time
import traceback
import heapq
import socket
from threading import Thread, Event, RLock
from bisect import bisect_left
from scapy.packet import Packet
from scapy.layers.can import CAN
from scapy.error import Scapy_Exception
from scapy.supersocket import SuperSocket
from scapy.config import conf
from scapy.consts import LINUX
from scapy.utils import EDecimal
from scapy.automaton import ObjectPipe, select_objects
from scapy.contrib.isotp.isotp_packet import ISOTP, CAN_MAX_DLEN, N_PCI_SF, \
N_PCI_CF, N_PCI_FC, N_PCI_FF, ISOTP_MAX_DLEN, ISOTP_MAX_DLEN_2015, CAN_FD_MAX_DLEN
from threading import Thread, Event, RLock
# Typing imports
from typing import (
Optional,
@ -40,6 +27,17 @@ from typing import (
TYPE_CHECKING,
)
from scapy.automaton import ObjectPipe, select_objects
from scapy.config import conf
from scapy.consts import LINUX
from scapy.contrib.isotp.isotp_packet import ISOTP, CAN_MAX_DLEN, N_PCI_SF, \
N_PCI_CF, N_PCI_FC, N_PCI_FF, ISOTP_MAX_DLEN, ISOTP_MAX_DLEN_2015, CAN_FD_MAX_DLEN
from scapy.error import Scapy_Exception
from scapy.layers.can import CAN, CANFD
from scapy.packet import Packet
from scapy.supersocket import SuperSocket
from scapy.utils import EDecimal
if TYPE_CHECKING:
from scapy.contrib.cansocket import CANSocket
@ -209,8 +207,10 @@ class ISOTPSoftSocket(SuperSocket):
"""This function is called during sendrecv() routine to wait for
sockets to be ready to receive
"""
obj_pipes = [x.impl.rx_queue for x in sockets if
isinstance(x, ISOTPSoftSocket) and not x.closed]
obj_pipes: List[Union[SuperSocket, ObjectPipe[Tuple[bytes, Union[float, EDecimal]]]]] = [ # noqa: E501
x.impl.rx_queue for x in sockets if
isinstance(x, ISOTPSoftSocket) and not x.closed]
obj_pipes += [x for x in sockets if isinstance(x, ObjectPipe) and not x.closed]
ready_pipes = select_objects(obj_pipes, remain)
@ -579,13 +579,15 @@ class ISOTPSocketImplementation:
return fd_accepted_sizes[-1]
return fd_accepted_sizes[pos]
pkt_cls = CANFD if self.fd else CAN
if self.padding:
load += b"\xCC" * (_get_padding_size(len(load)) - len(load))
if self.tx_id is None or self.tx_id <= 0x7ff:
self.can_socket.send(CAN(identifier=self.tx_id, data=load))
self.can_socket.send(pkt_cls(identifier=self.tx_id, data=load))
else:
self.can_socket.send(CAN(identifier=self.tx_id, flags="extended",
data=load))
self.can_socket.send(pkt_cls(identifier=self.tx_id, flags="extended",
data=load))
def can_recv(self):
# type: () -> None

View File

@ -138,7 +138,7 @@ with TestSocket(CANFD) as cans, TestSocket(CANFD) as stim, ISOTPSoftSocket(cans,
s.failure_analysis()
raise Scapy_Exception("ERROR")
msg = pkts[0]
assert msg.data == dhex(data_str)
assert dhex(data_str) in msg.data
= Two frame receive
@ -235,7 +235,8 @@ def testfd():
cfs = stim.sniff(timeout=20, count=len(fragments) - 1,
started_callback=lambda: stim.send(ack))
for fragment, cf in zip(fragments, ff + cfs):
assert (bytes(fragment) == bytes(cf))
print(bytes(fragment), bytes(cf))
assert (bytes(fragment) in bytes(cf))
testfd()
@ -370,7 +371,7 @@ with TestSocket(CANFD) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x2
assert can[0].data == dhex("10 {} {}".format("%02X" % size_to_send, " ".join(["%02X" % x for x in range(max_pl_size)])))
can = cans.sniff(timeout=1, count=1, started_callback=lambda: cans.send(CANFD(identifier = 0x241, data=dhex("30 00 00"))))
assert can[0].identifier == 0x641
assert can[0].data == dhex("21 {}".format(" ".join(["%02X" % x for x in range(max_pl_size, size_to_send)])))
assert dhex("21 {}".format(" ".join(["%02X" % x for x in range(max_pl_size, size_to_send)]))) in can[0].data
= Send single frame ISOTP message
with TestSocket(CAN) as cans, TestSocket(CAN) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x241) as s:
@ -468,7 +469,7 @@ with TestSocket(CANFD) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x2
raise Scapy_Exception("ERROR")
can = pkts[0]
assert can.identifier == 0x641
assert can.data == dhex("21 {}".format(" ".join(["%02X" % x for x in range(max_pl_size, size_to_send)])))
assert dhex("21 {}".format(" ".join(["%02X" % x for x in range(max_pl_size, size_to_send)]))) in can.data
thread.join(15)
acks.close()
@ -557,7 +558,7 @@ with TestSocket(CANFD) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x2
raise Scapy_Exception("ERROR")
can = pkts[0]
assert can.identifier == 0x641
assert can.data == dhex("21 {}".format(" ".join(["%02X" % x for x in range(max_pl_size, size_to_send)])))
assert dhex("21 {}".format(" ".join(["%02X" % x for x in range(max_pl_size, size_to_send)]))) in can.data
thread.join(15)
acks.close()
@ -644,7 +645,7 @@ with TestSocket(CANFD) as isocan, ISOTPSoftSocket(isocan, tx_id=0x641, rx_id=0x2
raise Scapy_Exception("ERROR")
can = pkts[0]
assert can.identifier == 0x641
assert can.data == dhex("21 {}".format(" ".join(["%02X" % x for x in range(max_pl_size, size_to_send)])))
assert dhex("21 {}".format(" ".join(["%02X" % x for x in range(max_pl_size, size_to_send)]))) in can.data
thread.join(15)
acks.close()
@ -943,6 +944,16 @@ assert rx == msg
assert rx2 is not None
assert rx2 == msg
= ISOTPSoftSocket sr1 timeout
msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33')
with TestSocket(CAN) as isocan_tx, ISOTPSoftSocket(isocan_tx, 0x123, 0x321) as sock_tx, \
TestSocket(CAN) as isocan_rx, ISOTPSoftSocket(isocan_rx, 0x321, 0x123) as sock_rx:
isocan_rx.pair(isocan_tx)
rx2 = sock_tx.sr1(msg, timeout=1, verbose=True)
assert rx2 is None
= ISOTPSoftSocket sniff
msg = ISOTP(b'\x11\x22\x33\x11\x22\x33\x11\x22\x33\x11\x22\x33')