mirror of https://github.com/secdev/scapy.git
Python 3: fix PNIO
This commit is contained in:
parent
26cd61b0fd
commit
b65b74e4de
|
@ -8,7 +8,7 @@ from scapy.contrib.pnio import *
|
|||
+ Check DCE/RPC layer
|
||||
|
||||
= ProfinetIO default values
|
||||
str(ProfinetIO()) == b'\x00\x00'
|
||||
raw(ProfinetIO()) == b'\x00\x00'
|
||||
|
||||
= ProfinetIO overloads Ethertype
|
||||
p = Ether() / ProfinetIO()
|
||||
|
@ -19,14 +19,14 @@ p = UDP() / ProfinetIO()
|
|||
p.dport == 0x8892
|
||||
|
||||
= Ether guesses ProfinetIO as payload class
|
||||
p = Ether('ffffffffffff00000000000088920102'.decode('hex'))
|
||||
p = Ether(hex_bytes('ffffffffffff00000000000088920102'))
|
||||
p.payload.__class__ == ProfinetIO and p.frameID == 0x0102
|
||||
|
||||
= UDP guesses ProfinetIO as payload class
|
||||
p = UDP('12348892000a00000102'.decode('hex'))
|
||||
p = UDP(hex_bytes('12348892000a00000102'))
|
||||
p.payload.__class__ == ProfinetIO and p.frameID == 0x0102
|
||||
|
||||
= ProfinetIO guess payload to PNIORealTime
|
||||
p = UDP('12348892000c000080020102'.decode('hex'))
|
||||
p = UDP(hex_bytes('12348892000c000080020102'))
|
||||
p.payload.payload.__class__.__name__ == 'PNIORealTime' and p.frameID == 0x8002
|
||||
|
||||
|
|
|
@ -36,6 +36,7 @@ from scapy.fields import BitEnumField, BitField, ByteField,\
|
|||
|
||||
# local imports
|
||||
from scapy.contrib.pnio import ProfinetIO
|
||||
from scapy.compat import orb
|
||||
from scapy.modules.six.moves import range
|
||||
|
||||
|
||||
|
@ -226,13 +227,13 @@ class PNIORealTime(Packet):
|
|||
# dissected packets
|
||||
pkt_len = self.getfieldval("len")
|
||||
if pkt_len is not None:
|
||||
return max(0, pkt_len - len(fld.addfield(self, "", val)) - 4)
|
||||
return max(0, pkt_len - len(fld.addfield(self, b"", val)) - 4)
|
||||
|
||||
if isinstance(self.underlayer, ProfinetIO) and \
|
||||
isinstance(self.underlayer.underlayer, UDP):
|
||||
return max(0, 12 - len(fld.addfield(self, "", val)))
|
||||
return max(0, 12 - len(fld.addfield(self, b"", val)))
|
||||
else:
|
||||
return max(0, 40 - len(fld.addfield(self, "", val)))
|
||||
return max(0, 40 - len(fld.addfield(self, b"", val)))
|
||||
|
||||
@staticmethod
|
||||
def analyse_data(packets):
|
||||
|
@ -265,7 +266,7 @@ class PNIORealTime(Packet):
|
|||
counts.extend([0 for _ in range(len(pdu) - len(counts))])
|
||||
|
||||
for i in range(len(pdu)):
|
||||
if pdu[i] != b"\x80":
|
||||
if orb(pdu[i]) != 0x80:
|
||||
counts[i] += 1
|
||||
|
||||
comm = (pkt.src, pkt.dst)
|
||||
|
@ -279,7 +280,7 @@ class PNIORealTime(Packet):
|
|||
loc = locations[comm] = []
|
||||
start = None
|
||||
for i in range(length):
|
||||
if counts[i] > total / 2: # Data if more than half is != 0x80
|
||||
if counts[i] > total // 2: # Data if more than half is != 0x80
|
||||
if start is None:
|
||||
start = i
|
||||
else:
|
||||
|
@ -421,7 +422,7 @@ def entropy_of_byte(packets, position):
|
|||
# Count each byte a appearance
|
||||
for pkt in packets:
|
||||
if -position <= len(pkt): # position must be a negative index
|
||||
counter[ord(pkt[position])] += 1
|
||||
counter[orb(pkt[position])] += 1
|
||||
|
||||
# Compute the Shannon entropy
|
||||
entropy = 0
|
||||
|
|
|
@ -9,7 +9,7 @@ from scapy.contrib.pnio_rtc import *
|
|||
+ Check PNIORealTimeIOxS
|
||||
|
||||
= PNIORealTimeIOxS default values
|
||||
str(PNIORealTimeIOxS()) == b'\x80'
|
||||
raw(PNIORealTimeIOxS()) == b'\x80'
|
||||
|
||||
= Check no payload is dissected (only padding)
|
||||
* In order for the PNIORealTime to dissect correctly all the data buffer, data field must strictly dissect what they know as being of themselves
|
||||
|
@ -20,69 +20,69 @@ p == PNIORealTimeIOxS(dataState='bad', instance='device') / conf.padding_layer(b
|
|||
+ Check PNIORealTimeRawData
|
||||
|
||||
= PNIORealTimeRawData default values
|
||||
str(PNIORealTimeRawData(config={'length': 5})) == b'\x00\x00\x00\x00\x00'
|
||||
raw(PNIORealTimeRawData(config={'length': 5})) == b'\x00\x00\x00\x00\x00'
|
||||
|
||||
= PNIORealTimeRawData must always be the same configured length
|
||||
str(PNIORealTimeRawData(load='ABC', config={'length': 5})) == b'ABC\x00\x00'
|
||||
raw(PNIORealTimeRawData(load='ABC', config={'length': 5})) == b'ABC\x00\x00'
|
||||
|
||||
= PNIORealTimeRawData may be truncated
|
||||
str(PNIORealTimeRawData(load='ABCDEF', config={'length': 5})) == 'ABCDE'
|
||||
raw(PNIORealTimeRawData(load='ABCDEF', config={'length': 5})) == b'ABCDE'
|
||||
|
||||
= Check that the dissected payload is an PNIORealTimeIOxS (IOPS)
|
||||
p = PNIORealTimeRawData(b'ABCDE\x80\x01\x02', config={'length': 5})
|
||||
p == PNIORealTimeRawData(load='ABCDE', config={'length': 5}) / PNIORealTimeIOxS() / conf.padding_layer(b'\x01\x02')
|
||||
p == PNIORealTimeRawData(load=b'ABCDE', config={'length': 5}) / PNIORealTimeIOxS() / conf.padding_layer(b'\x01\x02')
|
||||
|
||||
= PNIORealTimeRawData is capable of dissected uncomplete packets
|
||||
p = PNIORealTimeRawData('ABC', config={'length': 5})
|
||||
p == PNIORealTimeRawData(load='ABC', config={'length': 5})
|
||||
p == PNIORealTimeRawData(load=b'ABC', config={'length': 5})
|
||||
|
||||
|
||||
+ Check Profisafe
|
||||
|
||||
= Profisafe default values
|
||||
str(Profisafe(config={'length': 7, 'CRC': 3})) == b'\0\0\0\0\0\0\0'
|
||||
raw(Profisafe(config={'length': 7, 'CRC': 3})) == b'\0\0\0\0\0\0\0'
|
||||
|
||||
= Profisafe must always be the same configured length
|
||||
str(Profisafe(load='AB', config={'length': 7, 'CRC': 3})) == b'AB\0\0\0\0\0'
|
||||
raw(Profisafe(load=b'AB', config={'length': 7, 'CRC': 3})) == b'AB\0\0\0\0\0'
|
||||
|
||||
= Profisafe load may be truncated
|
||||
str(Profisafe(load='ABCDEF', config={'length': 7, 'CRC': 3})) == b'ABC\0\0\0\0'
|
||||
raw(Profisafe(load=b'ABCDEF', config={'length': 7, 'CRC': 3})) == b'ABC\0\0\0\0'
|
||||
|
||||
= Check that the dissected payload is an PNIORealTimeIOxS (IOPS)
|
||||
p = Profisafe(b'ABC\x20\x12\x34\x56\x80\x01\x02', config={'length': 7, 'CRC': 3})
|
||||
p == Profisafe(load='ABC', Control_Status=0x20, CRC=0x123456, config={'length': 7, 'CRC': 3}) / PNIORealTimeIOxS() / conf.padding_layer(b'\x01\x02')
|
||||
p == Profisafe(load=b'ABC', Control_Status=0x20, CRC=0x123456, config={'length': 7, 'CRC': 3}) / PNIORealTimeIOxS() / conf.padding_layer(b'\x01\x02')
|
||||
|
||||
= Profisafe with a CRC-32
|
||||
str(Profisafe(load='ABC', Control_Status=0x33, CRC=0x12345678, config={'length': 8, 'CRC': 4})) == b'ABC\x33\x12\x34\x56\x78'
|
||||
raw(Profisafe(load=b'ABC', Control_Status=0x33, CRC=0x12345678, config={'length': 8, 'CRC': 4})) == b'ABC\x33\x12\x34\x56\x78'
|
||||
|
||||
= Profisafe is capable of dissected uncomplete packets
|
||||
p = Profisafe('AB', config={'length': 7, 'CRC': 3})
|
||||
p == Profisafe(load='AB', Control_Status=0, CRC=0)
|
||||
p == Profisafe(load=b'AB', Control_Status=0, CRC=0)
|
||||
|
||||
|
||||
+ Check PNIORealTime layer
|
||||
|
||||
= PNIORealTime default values
|
||||
str(PNIORealTime()) == b'\0' * 40 + b'\0\0\x35\0'
|
||||
raw(PNIORealTime()) == b'\0' * 40 + b'\0\0\x35\0'
|
||||
|
||||
= PNIORealTime default values under an UDP packet
|
||||
str(UDP(sport=0x1234) / ProfinetIO(frameID=0x8002) / PNIORealTime()) == '12348892001a00008002'.decode('hex') + b'\0' * 12 + b'\0\0\x35\0'
|
||||
raw(UDP(sport=0x1234) / ProfinetIO(frameID=0x8002) / PNIORealTime()) == hex_bytes('12348892001a00008002') + b'\0' * 12 + b'\0\0\x35\0'
|
||||
|
||||
= PNIORealTime simple packet
|
||||
* a simple data packet with a raw profinet data and its IOPS, an IOCS and a Profisafe data and its IOPS. 15B data length, 1B padding (20 - 15 -4)
|
||||
str(PNIORealTime(len=20, dataLen=15, cycleCounter=0x1234, dataStatus='redundancy+validData+no_problem', transferStatus=3,
|
||||
raw(PNIORealTime(len=20, dataLen=15, cycleCounter=0x1234, dataStatus='redundancy+validData+no_problem', transferStatus=3,
|
||||
data=[
|
||||
PNIORealTimeRawData(load=b'\x01\x02\x03\x04', config={'length': 5}) / PNIORealTimeIOxS(),
|
||||
PNIORealTimeIOxS(dataState='bad'),
|
||||
Profisafe(load=b'\x05\x06', Control_Status=0x20, CRC=0x12345678, config={'length': 7, 'CRC': 4}) / PNIORealTimeIOxS()
|
||||
]
|
||||
)) == '0102030400800005062012345678800012342603'.decode('hex')
|
||||
)) == hex_bytes('0102030400800005062012345678800012342603')
|
||||
|
||||
= PNIORealTime dissects to PNIORealTimeRawData when no config is available
|
||||
p = PNIORealTime('0102030400800005062012345678800012342603'.decode('hex'))
|
||||
p = PNIORealTime(hex_bytes('0102030400800005062012345678800012342603'))
|
||||
p == PNIORealTime(len=20, dataLen=15, cycleCounter=0x1234, dataStatus='redundancy+validData+no_problem', transferStatus=3, padding=b'\0',
|
||||
data=[
|
||||
PNIORealTimeRawData(load='010203040080000506201234567880'.decode('hex'))
|
||||
PNIORealTimeRawData(load=hex_bytes('010203040080000506201234567880'))
|
||||
]
|
||||
)
|
||||
|
||||
|
@ -94,7 +94,7 @@ pnio_update_config({
|
|||
(-8, Profisafe, {'length': 7, 'CRC': 4}),
|
||||
]
|
||||
})
|
||||
p = Ether('000102030405060708090a0b889280020102030400800005062012345678800012342603'.decode('hex'))
|
||||
p = Ether(hex_bytes('000102030405060708090a0b889280020102030400800005062012345678800012342603'))
|
||||
p == Ether(dst='00:01:02:03:04:05', src='06:07:08:09:0a:0b') / ProfinetIO(frameID=0x8002) / PNIORealTime(
|
||||
len=20, dataLen=15, cycleCounter=0x1234, dataStatus='redundancy+validData+no_problem', transferStatus=3, padding=b'\0',
|
||||
data=[
|
||||
|
@ -110,8 +110,8 @@ p == Ether(dst='00:01:02:03:04:05', src='06:07:08:09:0a:0b') / ProfinetIO(frameI
|
|||
bind_layers(UDP, ProfinetIO, dport=0x8894)
|
||||
bind_layers(UDP, ProfinetIO, sport=0x8894)
|
||||
|
||||
packets = [Ether('0090274ee3fc000991442017080045000128000c00004011648f0a0a00810a0a00968894061e011444c604022800100000000000a0de976cd111827100010003015a0100a0de976cd111827100a02442df7ddbabbaec1d005443b2500b01630abafd0100000001000000000000000500ffffffffbc000000000000000000a80000004080000000000000a80000008009003c0100000a0000000000000000000000000000000000000000000000010000f840000000680000000000000000000000000000000000000000000000000030002c0100000100000000000200000000000100020001000000010003ffff010a0001ffff814000010001ffff814000310018010000010000000000010001ffff814000010001ffff814000320018010000010000000000010000000000010001000100000001'.decode("hex")),
|
||||
Ether('0009914420170090274ee3fc0800450000c0b28800008011727a0a0a00960a0a0081061e889400ac689504000800100000000000a0de976cd111827100010003015a0100a0de976cd111827100a02442df7ddbabbaec1d005443b2500b01630abafd0000000001000000000000000500ffffffff54000000000040800000400000004080000000000000400000000009003c0100000a0000000000000000000000000000000000000000000000010000f84000008000000000000000000000000000000000000000000000000000'.decode("hex"))]
|
||||
packets = [Ether(hex_bytes('0090274ee3fc000991442017080045000128000c00004011648f0a0a00810a0a00968894061e011444c604022800100000000000a0de976cd111827100010003015a0100a0de976cd111827100a02442df7ddbabbaec1d005443b2500b01630abafd0100000001000000000000000500ffffffffbc000000000000000000a80000004080000000000000a80000008009003c0100000a0000000000000000000000000000000000000000000000010000f840000000680000000000000000000000000000000000000000000000000030002c0100000100000000000200000000000100020001000000010003ffff010a0001ffff814000010001ffff814000310018010000010000000000010001ffff814000010001ffff814000320018010000010000000000010000000000010001000100000001')),
|
||||
Ether(hex_bytes('0009914420170090274ee3fc0800450000c0b28800008011727a0a0a00960a0a0081061e889400ac689504000800100000000000a0de976cd111827100010003015a0100a0de976cd111827100a02442df7ddbabbaec1d005443b2500b01630abafd0000000001000000000000000500ffffffff54000000000040800000400000004080000000000000400000000009003c0100000a0000000000000000000000000000000000000000000000010000f84000008000000000000000000000000000000000000000000000000000'))]
|
||||
|
||||
analysed_data = PNIORealTime.analyse_data(packets)
|
||||
assert len(analysed_data) == 2
|
||||
|
|
|
@ -665,7 +665,7 @@ class XStrFixedLenField(StrFixedLenField):
|
|||
|
||||
class StrLenFieldUtf16(StrLenField):
|
||||
def h2i(self, pkt, x):
|
||||
return x.encode('utf-16')[2:]
|
||||
return plain_str(x).encode('utf-16')[2:]
|
||||
def i2h(self, pkt, x):
|
||||
return x.decode('utf-16')
|
||||
|
||||
|
|
Loading…
Reference in New Issue