Merge pull request #890 from gpotter2/py3-contrib-14

Python 3: fix PNIO
This commit is contained in:
Pierre Lalet 2017-10-24 17:47:46 +02:00 committed by GitHub
commit 532861baa5
4 changed files with 33 additions and 32 deletions

View File

@ -8,7 +8,7 @@ from scapy.contrib.pnio import *
+ Check DCE/RPC layer
= ProfinetIO default values
str(ProfinetIO()) == b'\x00\x00'
raw(ProfinetIO()) == b'\x00\x00'
= ProfinetIO overloads Ethertype
p = Ether() / ProfinetIO()
@ -19,14 +19,14 @@ p = UDP() / ProfinetIO()
p.dport == 0x8892
= Ether guesses ProfinetIO as payload class
p = Ether('ffffffffffff00000000000088920102'.decode('hex'))
p = Ether(hex_bytes('ffffffffffff00000000000088920102'))
p.payload.__class__ == ProfinetIO and p.frameID == 0x0102
= UDP guesses ProfinetIO as payload class
p = UDP('12348892000a00000102'.decode('hex'))
p = UDP(hex_bytes('12348892000a00000102'))
p.payload.__class__ == ProfinetIO and p.frameID == 0x0102
= ProfinetIO guess payload to PNIORealTime
p = UDP('12348892000c000080020102'.decode('hex'))
p = UDP(hex_bytes('12348892000c000080020102'))
p.payload.payload.__class__.__name__ == 'PNIORealTime' and p.frameID == 0x8002

View File

@ -36,6 +36,7 @@ from scapy.fields import BitEnumField, BitField, ByteField,\
# local imports
from scapy.contrib.pnio import ProfinetIO
from scapy.compat import orb
from scapy.modules.six.moves import range
@ -226,13 +227,13 @@ class PNIORealTime(Packet):
# dissected packets
pkt_len = self.getfieldval("len")
if pkt_len is not None:
return max(0, pkt_len - len(fld.addfield(self, "", val)) - 4)
return max(0, pkt_len - len(fld.addfield(self, b"", val)) - 4)
if isinstance(self.underlayer, ProfinetIO) and \
isinstance(self.underlayer.underlayer, UDP):
return max(0, 12 - len(fld.addfield(self, "", val)))
return max(0, 12 - len(fld.addfield(self, b"", val)))
else:
return max(0, 40 - len(fld.addfield(self, "", val)))
return max(0, 40 - len(fld.addfield(self, b"", val)))
@staticmethod
def analyse_data(packets):
@ -265,7 +266,7 @@ class PNIORealTime(Packet):
counts.extend([0 for _ in range(len(pdu) - len(counts))])
for i in range(len(pdu)):
if pdu[i] != b"\x80":
if orb(pdu[i]) != 0x80:
counts[i] += 1
comm = (pkt.src, pkt.dst)
@ -279,7 +280,7 @@ class PNIORealTime(Packet):
loc = locations[comm] = []
start = None
for i in range(length):
if counts[i] > total / 2: # Data if more than half is != 0x80
if counts[i] > total // 2: # Data if more than half is != 0x80
if start is None:
start = i
else:
@ -421,7 +422,7 @@ def entropy_of_byte(packets, position):
# Count each byte a appearance
for pkt in packets:
if -position <= len(pkt): # position must be a negative index
counter[ord(pkt[position])] += 1
counter[orb(pkt[position])] += 1
# Compute the Shannon entropy
entropy = 0

View File

@ -9,7 +9,7 @@ from scapy.contrib.pnio_rtc import *
+ Check PNIORealTimeIOxS
= PNIORealTimeIOxS default values
str(PNIORealTimeIOxS()) == b'\x80'
raw(PNIORealTimeIOxS()) == b'\x80'
= Check no payload is dissected (only padding)
* In order for the PNIORealTime to dissect correctly all the data buffer, data field must strictly dissect what they know as being of themselves
@ -20,69 +20,69 @@ p == PNIORealTimeIOxS(dataState='bad', instance='device') / conf.padding_layer(b
+ Check PNIORealTimeRawData
= PNIORealTimeRawData default values
str(PNIORealTimeRawData(config={'length': 5})) == b'\x00\x00\x00\x00\x00'
raw(PNIORealTimeRawData(config={'length': 5})) == b'\x00\x00\x00\x00\x00'
= PNIORealTimeRawData must always be the same configured length
str(PNIORealTimeRawData(load='ABC', config={'length': 5})) == b'ABC\x00\x00'
raw(PNIORealTimeRawData(load='ABC', config={'length': 5})) == b'ABC\x00\x00'
= PNIORealTimeRawData may be truncated
str(PNIORealTimeRawData(load='ABCDEF', config={'length': 5})) == 'ABCDE'
raw(PNIORealTimeRawData(load='ABCDEF', config={'length': 5})) == b'ABCDE'
= Check that the dissected payload is an PNIORealTimeIOxS (IOPS)
p = PNIORealTimeRawData(b'ABCDE\x80\x01\x02', config={'length': 5})
p == PNIORealTimeRawData(load='ABCDE', config={'length': 5}) / PNIORealTimeIOxS() / conf.padding_layer(b'\x01\x02')
p == PNIORealTimeRawData(load=b'ABCDE', config={'length': 5}) / PNIORealTimeIOxS() / conf.padding_layer(b'\x01\x02')
= PNIORealTimeRawData is capable of dissected uncomplete packets
p = PNIORealTimeRawData('ABC', config={'length': 5})
p == PNIORealTimeRawData(load='ABC', config={'length': 5})
p == PNIORealTimeRawData(load=b'ABC', config={'length': 5})
+ Check Profisafe
= Profisafe default values
str(Profisafe(config={'length': 7, 'CRC': 3})) == b'\0\0\0\0\0\0\0'
raw(Profisafe(config={'length': 7, 'CRC': 3})) == b'\0\0\0\0\0\0\0'
= Profisafe must always be the same configured length
str(Profisafe(load='AB', config={'length': 7, 'CRC': 3})) == b'AB\0\0\0\0\0'
raw(Profisafe(load=b'AB', config={'length': 7, 'CRC': 3})) == b'AB\0\0\0\0\0'
= Profisafe load may be truncated
str(Profisafe(load='ABCDEF', config={'length': 7, 'CRC': 3})) == b'ABC\0\0\0\0'
raw(Profisafe(load=b'ABCDEF', config={'length': 7, 'CRC': 3})) == b'ABC\0\0\0\0'
= Check that the dissected payload is an PNIORealTimeIOxS (IOPS)
p = Profisafe(b'ABC\x20\x12\x34\x56\x80\x01\x02', config={'length': 7, 'CRC': 3})
p == Profisafe(load='ABC', Control_Status=0x20, CRC=0x123456, config={'length': 7, 'CRC': 3}) / PNIORealTimeIOxS() / conf.padding_layer(b'\x01\x02')
p == Profisafe(load=b'ABC', Control_Status=0x20, CRC=0x123456, config={'length': 7, 'CRC': 3}) / PNIORealTimeIOxS() / conf.padding_layer(b'\x01\x02')
= Profisafe with a CRC-32
str(Profisafe(load='ABC', Control_Status=0x33, CRC=0x12345678, config={'length': 8, 'CRC': 4})) == b'ABC\x33\x12\x34\x56\x78'
raw(Profisafe(load=b'ABC', Control_Status=0x33, CRC=0x12345678, config={'length': 8, 'CRC': 4})) == b'ABC\x33\x12\x34\x56\x78'
= Profisafe is capable of dissected uncomplete packets
p = Profisafe('AB', config={'length': 7, 'CRC': 3})
p == Profisafe(load='AB', Control_Status=0, CRC=0)
p == Profisafe(load=b'AB', Control_Status=0, CRC=0)
+ Check PNIORealTime layer
= PNIORealTime default values
str(PNIORealTime()) == b'\0' * 40 + b'\0\0\x35\0'
raw(PNIORealTime()) == b'\0' * 40 + b'\0\0\x35\0'
= PNIORealTime default values under an UDP packet
str(UDP(sport=0x1234) / ProfinetIO(frameID=0x8002) / PNIORealTime()) == '12348892001a00008002'.decode('hex') + b'\0' * 12 + b'\0\0\x35\0'
raw(UDP(sport=0x1234) / ProfinetIO(frameID=0x8002) / PNIORealTime()) == hex_bytes('12348892001a00008002') + b'\0' * 12 + b'\0\0\x35\0'
= PNIORealTime simple packet
* a simple data packet with a raw profinet data and its IOPS, an IOCS and a Profisafe data and its IOPS. 15B data length, 1B padding (20 - 15 -4)
str(PNIORealTime(len=20, dataLen=15, cycleCounter=0x1234, dataStatus='redundancy+validData+no_problem', transferStatus=3,
raw(PNIORealTime(len=20, dataLen=15, cycleCounter=0x1234, dataStatus='redundancy+validData+no_problem', transferStatus=3,
data=[
PNIORealTimeRawData(load=b'\x01\x02\x03\x04', config={'length': 5}) / PNIORealTimeIOxS(),
PNIORealTimeIOxS(dataState='bad'),
Profisafe(load=b'\x05\x06', Control_Status=0x20, CRC=0x12345678, config={'length': 7, 'CRC': 4}) / PNIORealTimeIOxS()
]
)) == '0102030400800005062012345678800012342603'.decode('hex')
)) == hex_bytes('0102030400800005062012345678800012342603')
= PNIORealTime dissects to PNIORealTimeRawData when no config is available
p = PNIORealTime('0102030400800005062012345678800012342603'.decode('hex'))
p = PNIORealTime(hex_bytes('0102030400800005062012345678800012342603'))
p == PNIORealTime(len=20, dataLen=15, cycleCounter=0x1234, dataStatus='redundancy+validData+no_problem', transferStatus=3, padding=b'\0',
data=[
PNIORealTimeRawData(load='010203040080000506201234567880'.decode('hex'))
PNIORealTimeRawData(load=hex_bytes('010203040080000506201234567880'))
]
)
@ -94,7 +94,7 @@ pnio_update_config({
(-8, Profisafe, {'length': 7, 'CRC': 4}),
]
})
p = Ether('000102030405060708090a0b889280020102030400800005062012345678800012342603'.decode('hex'))
p = Ether(hex_bytes('000102030405060708090a0b889280020102030400800005062012345678800012342603'))
p == Ether(dst='00:01:02:03:04:05', src='06:07:08:09:0a:0b') / ProfinetIO(frameID=0x8002) / PNIORealTime(
len=20, dataLen=15, cycleCounter=0x1234, dataStatus='redundancy+validData+no_problem', transferStatus=3, padding=b'\0',
data=[
@ -110,8 +110,8 @@ p == Ether(dst='00:01:02:03:04:05', src='06:07:08:09:0a:0b') / ProfinetIO(frameI
bind_layers(UDP, ProfinetIO, dport=0x8894)
bind_layers(UDP, ProfinetIO, sport=0x8894)
packets = [Ether('0090274ee3fc000991442017080045000128000c00004011648f0a0a00810a0a00968894061e011444c604022800100000000000a0de976cd111827100010003015a0100a0de976cd111827100a02442df7ddbabbaec1d005443b2500b01630abafd0100000001000000000000000500ffffffffbc000000000000000000a80000004080000000000000a80000008009003c0100000a0000000000000000000000000000000000000000000000010000f840000000680000000000000000000000000000000000000000000000000030002c0100000100000000000200000000000100020001000000010003ffff010a0001ffff814000010001ffff814000310018010000010000000000010001ffff814000010001ffff814000320018010000010000000000010000000000010001000100000001'.decode("hex")),
Ether('0009914420170090274ee3fc0800450000c0b28800008011727a0a0a00960a0a0081061e889400ac689504000800100000000000a0de976cd111827100010003015a0100a0de976cd111827100a02442df7ddbabbaec1d005443b2500b01630abafd0000000001000000000000000500ffffffff54000000000040800000400000004080000000000000400000000009003c0100000a0000000000000000000000000000000000000000000000010000f84000008000000000000000000000000000000000000000000000000000'.decode("hex"))]
packets = [Ether(hex_bytes('0090274ee3fc000991442017080045000128000c00004011648f0a0a00810a0a00968894061e011444c604022800100000000000a0de976cd111827100010003015a0100a0de976cd111827100a02442df7ddbabbaec1d005443b2500b01630abafd0100000001000000000000000500ffffffffbc000000000000000000a80000004080000000000000a80000008009003c0100000a0000000000000000000000000000000000000000000000010000f840000000680000000000000000000000000000000000000000000000000030002c0100000100000000000200000000000100020001000000010003ffff010a0001ffff814000010001ffff814000310018010000010000000000010001ffff814000010001ffff814000320018010000010000000000010000000000010001000100000001')),
Ether(hex_bytes('0009914420170090274ee3fc0800450000c0b28800008011727a0a0a00960a0a0081061e889400ac689504000800100000000000a0de976cd111827100010003015a0100a0de976cd111827100a02442df7ddbabbaec1d005443b2500b01630abafd0000000001000000000000000500ffffffff54000000000040800000400000004080000000000000400000000009003c0100000a0000000000000000000000000000000000000000000000010000f84000008000000000000000000000000000000000000000000000000000'))]
analysed_data = PNIORealTime.analyse_data(packets)
assert len(analysed_data) == 2

View File

@ -665,7 +665,7 @@ class XStrFixedLenField(StrFixedLenField):
class StrLenFieldUtf16(StrLenField):
def h2i(self, pkt, x):
return x.encode('utf-16')[2:]
return plain_str(x).encode('utf-16')[2:]
def i2h(self, pkt, x):
return x.decode('utf-16')