Merge pull request #255 from zeronounours/profinet_rtc

Add PROFINET IO real-time layer
This commit is contained in:
Guillaume Valadon 2016-09-15 16:58:03 +02:00 committed by GitHub
commit cce88a93b0
6 changed files with 963 additions and 0 deletions

View File

@ -790,3 +790,272 @@ Two methods are hooks to be overloaded:
* The ``master_filter()`` method is called each time a packet is sniffed and decides if it is interesting for the automaton. When working on a specific protocol, this is where you will ensure the packet belongs to the connection you are being part of, so that you do not need to make all the sanity checks in each transition.
PROFINET IO RTC
===============
PROFINET IO is an industrial protocol composed of different layers such as the Real-Time Cyclic (RTC) layer, used to exchange data. However, this RTC layer is stateful and depends on a configuration sent through another layer: the DCE/RPC endpoint of PROFINET. This configuration defines where each exchanged piece of data must be located in the RTC ``data`` buffer, as well as the length of this same buffer. Building such packet is then a bit more complicated than other protocols.
RTC data packet
---------------
The first thing to do when building the RTC ``data`` buffer is to instanciate each Scapy packet which represents a piece of data. Each one of them may require some specific piece of configuration, such as its length. All packets and their configuration are:
* ``PNIORealTimeRawData``: a simple raw data like ``Raw``
* ``length``: defines the length of the data
* ``Profisafe``: the PROFIsafe profile to perform functional safety
* ``length``: defines the length of the whole packet
* ``CRC``: defines the length of the CRC, either ``3`` or ``4``
* ``PNIORealTimeIOxS``: either an IO Consumer or Provider Status byte
* Doesn't require any configuration
To instanciate one of these packets with its configuration, the ``config`` argument must be given. It is a ``dict()`` which contains all the required piece of configuration::
>>> load_contrib('pnio_rtc')
>>> str(PNIORealTimeRawData(load='AAA', config={'length': 4}))
'AAA\x00'
>>> str(Profisafe(load='AAA', Control_Status=0x20, CRC=0x424242, config={'length': 8, 'CRC': 3}))
'AAA\x00 BBB'
>>> hexdump(PNIORealTimeIOxS())
0000 80 .
RTC packet
----------
Now that a data packet can be instanciated, a whole RTC packet may be built. ``PNIORealTime`` contains a field ``data`` which is a list of all data packets to add in the buffer, however, without the configuration, Scapy won't be
able to dissect it::
>>> load_contrib("pnio_rtc")
>>> p=PNIORealTime(cycleCounter=1024, data=[
... PNIORealTimeIOxS(),
... PNIORealTimeRawData(load='AAA', config={'length':4}) / PNIORealTimeIOxS(),
... Profisafe(load='AAA', Control_Status=0x20, CRC=0x424242, config={'length': 8, 'CRC': 3}) / PNIORealTimeIOxS(),
... ])
>>> p.show()
###[ PROFINET Real-Time ]###
len= None
dataLen= None
\data\
|###[ PNIO RTC IOxS ]###
| dataState= good
| instance= subslot
| reserved= 0x0
| extension= 0
|###[ PNIO RTC Raw data ]###
| load= 'AAA'
|###[ PNIO RTC IOxS ]###
| dataState= good
| instance= subslot
| reserved= 0x0
| extension= 0
|###[ PROFISafe ]###
| load= 'AAA'
| Control_Status= 0x20
| CRC= 0x424242
|###[ PNIO RTC IOxS ]###
| dataState= good
| instance= subslot
| reserved= 0x0
| extension= 0
padding= ''
cycleCounter= 1024
dataStatus= primary+validData+run+no_problem
transferStatus= 0
>>> p.show2()
###[ PROFINET Real-Time ]###
len= 44
dataLen= 15
\data\
|###[ PNIO RTC Raw data ]###
| load= '\x80AAA\x00\x80AAA\x00 BBB\x80'
padding= ''
cycleCounter= 1024
dataStatus= primary+validData+run+no_problem
transferStatus= 0
For Scapy to be able to dissect it correctly, one must also configure the layer for it to know the location of each data in the buffer. This configuration is saved in the dictionary ``conf.contribs["PNIO_RTC"]`` which can be updated with the ``pnio_update_config`` method. Each item in the dictionary uses the tuple ``(Ether.src, Ether.dst)`` as key, to be able to separate the configuration of each communication. Each value is then a list of a tuple which describes a data packet. It is composed of the negative index, from the end of the data buffer, of the packet position, the class of the packet as second item and the ``config`` dictionary to provide to the class as last. If we continue the previous example, here is the configuration to set::
>>> load_contrib("pnio")
>>> e=Ether(src='00:01:02:03:04:05', dst='06:07:08:09:0a:0b') / ProfinetIO() / p
>>> e.show2()
###[ Ethernet ]###
dst= 06:07:08:09:0a:0b
src= 00:01:02:03:04:05
type= 0x8892
###[ ProfinetIO ]###
frameID= RT_CLASS_1
###[ PROFINET Real-Time ]###
len= 44
dataLen= 15
\data\
|###[ PNIO RTC Raw data ]###
| load= '\x80AAA\x00\x80AAA\x00 BBB\x80'
padding= ''
cycleCounter= 1024
dataStatus= primary+validData+run+no_problem
transferStatus= 0
>>> pnio_update_config({('00:01:02:03:04:05', '06:07:08:09:0a:0b'): [
... (-9, Profisafe, {'length': 8, 'CRC': 3}),
... (-9 - 5, PNIORealTimeRawData, {'length':4}),
... ]})
>>> e.show2()
###[ Ethernet ]###
dst= 06:07:08:09:0a:0b
src= 00:01:02:03:04:05
type= 0x8892
###[ ProfinetIO ]###
frameID= RT_CLASS_1
###[ PROFINET Real-Time ]###
len= 44
dataLen= 15
\data\
|###[ PNIO RTC IOxS ]###
| dataState= good
| instance= subslot
| reserved= 0x0L
| extension= 0L
|###[ PNIO RTC Raw data ]###
| load= 'AAA'
|###[ PNIO RTC IOxS ]###
| dataState= good
| instance= subslot
| reserved= 0x0L
| extension= 0L
|###[ PROFISafe ]###
| load= 'AAA'
| Control_Status= 0x20
| CRC= 0x424242L
|###[ PNIO RTC IOxS ]###
| dataState= good
| instance= subslot
| reserved= 0x0L
| extension= 0L
padding= ''
cycleCounter= 1024
dataStatus= primary+validData+run+no_problem
transferStatus= 0
If no data packets are configured for a given offset, it defaults to a ``PNIORealTimeIOxS``. However, this method is not very convenient for the user to configure the layer and it only affects the dissection of packets. In such cases, one may have access to several RTC packets, sniffed or retrieved from a PCAP file. Thus, ``PNIORealTime`` provides some methods to analyse a list of ``PNIORealTime`` packets and locate all data in it, based on simple heuristics. All of them take as first argument an iterable which contains the list of packets to analyse.
* ``PNIORealTime.find_data()`` analyses the data buffer and separate real data from IOxS. It returns a dict which can be provided to ``pnio_update_config``.
* ``PNIORealTime.find_profisafe()`` analyses the data buffer and find the PROFIsafe profiles among the real data. It returns a dict which can be provided to ``pnio_update_config``.
* ``PNIORealTime.analyse_data()`` executes both previous methods and update the configuration. **This is usually the method to call.**
* ``PNIORealTime.draw_entropy()`` will draw the entropy of each byte in the data buffer. It can be used to easily visualize PROFIsafe locations as entropy is the base of the decision algorithm of ``find_profisafe``.
::
>>> load_contrib('pnio_rtc')
>>> t=rdpcap('/path/to/trace.pcap', 1024)
>>> PNIORealTime.analyse_data(t)
{('00:01:02:03:04:05', '06:07:08:09:0a:0b'): [(-19, <class 'scapy.contrib.pnio_rtc.PNIORealTimeRawData'>, {'length': 1}), (-15, <class 'scapy.contrib.pnio_rtc.Profisafe'>, {'CRC': 3, 'length': 6}), (-7, <class 'scapy.contrib.pnio_rtc.Profisafe'>, {'CRC': 3, 'length': 5})]}
>>> t[100].show()
###[ Ethernet ]###
dst= 06:07:08:09:0a:0b
src= 00:01:02:03:04:05
type= n_802_1Q
###[ 802.1Q ]###
prio= 6L
id= 0L
vlan= 0L
type= 0x8892
###[ ProfinetIO ]###
frameID= RT_CLASS_1
###[ PROFINET Real-Time ]###
len= 44
dataLen= 22
\data\
|###[ PNIO RTC Raw data ]###
| load= '\x80\x80\x80\x80\x80\x80\x00\x80\x80\x80\x12:\x0e\x12\x80\x80\x00\x12\x8b\x97\xe3\x80'
padding= ''
cycleCounter= 6208
dataStatus= primary+validData+run+no_problem
transferStatus= 0
>>> t[100].show2()
###[ Ethernet ]###
dst= 06:07:08:09:0a:0b
src= 00:01:02:03:04:05
type= n_802_1Q
###[ 802.1Q ]###
prio= 6L
id= 0L
vlan= 0L
type= 0x8892
###[ ProfinetIO ]###
frameID= RT_CLASS_1
###[ PROFINET Real-Time ]###
len= 44
dataLen= 22
\data\
|###[ PNIO RTC IOxS ]###
| dataState= good
| instance= subslot
| reserved= 0x0L
| extension= 0L
[...]
|###[ PNIO RTC IOxS ]###
| dataState= good
| instance= subslot
| reserved= 0x0L
| extension= 0L
|###[ PNIO RTC Raw data ]###
| load= ''
|###[ PNIO RTC IOxS ]###
| dataState= good
| instance= subslot
| reserved= 0x0L
| extension= 0L
[...]
|###[ PNIO RTC IOxS ]###
| dataState= good
| instance= subslot
| reserved= 0x0L
| extension= 0L
|###[ PROFISafe ]###
| load= ''
| Control_Status= 0x12
| CRC= 0x3a0e12L
|###[ PNIO RTC IOxS ]###
| dataState= good
| instance= subslot
| reserved= 0x0L
| extension= 0L
|###[ PNIO RTC IOxS ]###
| dataState= good
| instance= subslot
| reserved= 0x0L
| extension= 0L
|###[ PROFISafe ]###
| load= ''
| Control_Status= 0x12
| CRC= 0x8b97e3L
|###[ PNIO RTC IOxS ]###
| dataState= good
| instance= subslot
| reserved= 0x0L
| extension= 0L
padding= ''
cycleCounter= 6208
dataStatus= primary+validData+run+no_problem
transferStatus= 0
In addition, one can see, when displaying a ``PNIORealTime`` packet, the field ``len``. This is a computed field which is not added in the final packet build. It is mainly useful for dissection and reconstruction, but it can also be used to modify the behaviour of the packet. In fact, RTC packet must always be long enough for an Ethernet frame and to do so, a padding must be added right after the ``data`` buffer. The default behaviour is to add ``padding`` whose size is computed during the ``build`` process::
>>> str(PNIORealTime(cycleCounter=0x4242, data=[PNIORealTimeIOxS()]))
'\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00BB5\x00'
However, one can set ``len`` to modify this behaviour. ``len`` controls the length of the whole ``PNIORealTime`` packet. Then, to shorten the length of the padding, ``len`` can be set to a lower value::
>>> str(PNIORealTime(cycleCounter=0x4242, data=[PNIORealTimeIOxS()], len=50))
'\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00BB5\x00'
>>> str(PNIORealTime(cycleCounter=0x4242, data=[PNIORealTimeIOxS()]))
'\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00BB5\x00'
>>> str(PNIORealTime(cycleCounter=0x4242, data=[PNIORealTimeIOxS()], len=30))
'\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00BB5\x00'

View File

@ -319,6 +319,7 @@ resolve : holds list of fields for which resolution should be done
noenum : holds list of enum fields for which conversion to string should NOT be done
AS_resolver: choose the AS resolver class to use
extensions_paths: path or list of paths where extensions are to be looked for
contribs: a dict which can be used by contrib layers to store local configuration
"""
version = VERSION
session = ""
@ -385,6 +386,7 @@ extensions_paths: path or list of paths where extensions are to be looked for
"radius", "rip", "rtp", "skinny", "smb", "snmp",
"tftp", "x509", "bluetooth", "dhcp6", "llmnr",
"sctp", "vrrp", "ipsec", "lltd", "vxlan"]
contribs = dict()
if not Conf.ipv6_enabled:

77
scapy/contrib/pnio.py Normal file
View File

@ -0,0 +1,77 @@
# This file is part of Scapy
# Scapy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# any later version.
#
# Scapy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Scapy. If not, see <http://www.gnu.org/licenses/>.
# Copyright (C) 2016 Gauthier Sebaux
# scapy.contrib.description = ProfinetIO base layer
# scapy.contrib.status = loads
"""
A simple and non exhaustive Profinet IO layer for scapy
"""
# Scapy imports
from scapy.all import Packet, bind_layers, Ether, UDP
from scapy.fields import XShortEnumField
# Some constants
PNIO_FRAME_IDS = {
0x0020:"PTCP-RTSyncPDU-followup",
0x0080:"PTCP-RTSyncPDU",
0xFC01:"Alarm High",
0xFE01:"Alarm Low",
0xFEFC:"DCP-Hello-Req",
0xFEFD:"DCP-Get-Set",
0xFEFE:"DCP-Identify-ReqPDU",
0xFEFF:"DCP-Identify-ResPDU",
0xFF00:"PTCP-AnnouncePDU",
0xFF20:"PTCP-FollowUpPDU",
0xFF40:"PTCP-DelayReqPDU",
0xFF41:"PTCP-DelayResPDU-followup",
0xFF42:"PTCP-DelayFuResPDU",
0xFF43:"PTCP-DelayResPDU",
}
for i in xrange(0x0100, 0x1000):
PNIO_FRAME_IDS[i] = "RT_CLASS_3"
for i in xrange(0x8000, 0xC000):
PNIO_FRAME_IDS[i] = "RT_CLASS_1"
for i in xrange(0xC000, 0xFC00):
PNIO_FRAME_IDS[i] = "RT_CLASS_UDP"
for i in xrange(0xFF80, 0xFF90):
PNIO_FRAME_IDS[i] = "FragmentationFrameID"
#################
## PROFINET IO ##
#################
class ProfinetIO(Packet):
"""Basic PROFINET IO dispatcher"""
fields_desc = [XShortEnumField("frameID", 0, PNIO_FRAME_IDS)]
overload_fields = {
Ether: {"type": 0x8892},
UDP: {"dport": 0x8892},
}
def guess_payload_class(self, payload):
# For frameID in the RT_CLASS_* range, use the RTC packet as payload
if (self.frameID >= 0x0100 and self.frameID < 0x1000) or \
(self.frameID >= 0x8000 and self.frameID < 0xFC00):
from scapy.contrib.pnio_rtc import PNIORealTime
return PNIORealTime
else:
return Packet.guess_payload_class(self, payload)
bind_layers(Ether, ProfinetIO, type=0x8892)
bind_layers(UDP, ProfinetIO, dport=0x8892)

32
scapy/contrib/pnio.uts Normal file
View File

@ -0,0 +1,32 @@
% ProfinetIO layer test campaign
+ Syntax check
= Import the ProfinetIO layer
from scapy.contrib.pnio import *
+ Check DCE/RPC layer
= ProfinetIO default values
str(ProfinetIO()) == '\x00\x00'
= ProfinetIO overloads Ethertype
p = Ether() / ProfinetIO()
p.type == 0x8892
= ProfinetIO overloads UDP dport
p = UDP() / ProfinetIO()
p.dport == 0x8892
= Ether guesses ProfinetIO as payload class
p = Ether('ffffffffffff00000000000088920102'.decode('hex'))
p.payload.__class__ == ProfinetIO and p.frameID == 0x0102
= UDP guesses ProfinetIO as payload class
p = UDP('12348892000a00000102'.decode('hex'))
p.payload.__class__ == ProfinetIO and p.frameID == 0x0102
= ProfinetIO guess payload to PNIORealTime
p = UDP('12348892000c000080020102'.decode('hex'))
p.payload.payload.__class__.__name__ == 'PNIORealTime' and p.frameID == 0x8002

477
scapy/contrib/pnio_rtc.py Normal file
View File

@ -0,0 +1,477 @@
# This file is part of Scapy
# Scapy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# any later version.
#
# Scapy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Scapy. If not, see <http://www.gnu.org/licenses/>.
# Copyright (C) 2016 Gauthier Sebaux
# scapy.contrib.description = ProfinetIO Real-Time Cyclic (RTC)
# scapy.contrib.status = loads
"""
PROFINET IO layers for scapy which correspond to Real-Time Cyclic data
"""
# external imports
import math
import struct
# Scapy imports
from scapy.all import Packet, bind_layers, Ether, UDP, Field, conf
from scapy.fields import BitEnumField, BitField, ByteField,\
FlagsField,\
PacketListField,\
ShortField, StrFixedLenField,\
XBitField, XByteField
# local imports
from scapy.contrib.pnio import ProfinetIO
#####################################
## PROFINET Real-Time Data Packets ##
#####################################
class PNIORealTimeIOxS(Packet):
"""IOCS and IOPS packets for PROFINET Real-Time payload"""
name = "PNIO RTC IOxS"
fields_desc = [
BitEnumField("dataState", 1, 1, ["bad", "good"]),
BitEnumField("instance", 0, 2, ["subslot", "slot", "device", "controller"]),
XBitField("reserved", 0, 4),
BitField("extension", 0, 1),
]
def extract_padding(self, s):
return None, s # No extra payload
class PNIORealTimeRawData(Packet):
"""Raw data packets for PROFINET Real-Time payload.
It's a configurable packet whose config only includes a fix length. The
config parameter must then be a dict {"length": X}.
PROFINET IO specification impose this packet to be followed with an IOPS
(PNIORealTimeIOxS)"""
__slots__ = ["_config"]
name = "PNIO RTC Raw data"
fields_desc = [
StrFixedLenField("load", "", length_from=lambda p: p[PNIORealTimeRawData].length()),
]
def __init__(self, _pkt="", post_transform=None, _internal=0, _underlayer=None, config=None, **fields):
"""
length=None means that the length must be managed by the user. If it's
defined, the field will always be length-long (padded with "\\x00" if
needed)
"""
self._config = config
Packet.__init__(self, _pkt=_pkt, post_transform=post_transform,
_internal=_internal, _underlayer=_underlayer, **fields)
def copy(self):
pkt = Packet.copy(self)
pkt._config = self._config
return pkt
def clone_with(self, *args, **kargs):
pkt = Packet.clone_with(self, *args, **kargs)
pkt._config = self._config
return pkt
def length(self):
"""Get the length of the raw data"""
# Manage the length of the packet if a length is provided
return self._config["length"]
# Make sure an IOPS follows a data
bind_layers(PNIORealTimeRawData, PNIORealTimeIOxS)
###############################
## PROFINET Real-Time Fields ##
###############################
class LowerLayerBoundPacketListField(PacketListField):
"""PacketList which binds each underlayer of packets to the current pkt"""
def m2i(self, pkt, m):
return self.cls(m, _underlayer=pkt)
class NotionalLenField(Field):
"""A len fields which isn't present in the machine representation, but is
computed from a given lambda"""
__slots__ = ["length_from", "count_from"]
def __init__(self, name, default, length_from=None, count_from=None):
Field.__init__(self, name, default)
self.length_from = length_from
self.count_from = count_from
def addfield(self, pkt, s, val):
return s # Isn't present in the machine packet
def getfield(self, pkt, s):
val = None
if self.length_from is not None:
val = self.length_from(pkt, s)
elif self.count_from is not None:
val = self.count_from(pkt, s)
return s, val
###############################
## PNIORealTime Configuration #
###############################
# conf.contribs["PNIO_RTC"] is a dict which contains data layout for each Ethernet
# communications. It must be formatted as such:
# {(Ether.src, Ether.dst): [(start, type, config), ...]}
# start: index of a data field from the END of the data buffer (-1, -2, ...)
# type: class to be instanciated to represent these data
# config: a config dict, given to the type class constructor
conf.contribs["PNIO_RTC"] = {}
def _get_ethernet(pkt):
"""Find the Ethernet packet of underlayer or None"""
ether = pkt
while ether is not None and not isinstance(ether, Ether):
ether = ether.underlayer
return ether
def pnio_update_config(config):
"""Update the PNIO RTC config"""
conf.contribs["PNIO_RTC"].update(config)
def pnio_get_config(pkt):
"""Retrieve the config for a given communication"""
# get the config based on the tuple (Ether.src, Ether.dst)
ether = _get_ethernet(pkt)
config = None
if ether is not None and (ether.src, ether.dst) in conf.contribs["PNIO_RTC"]:
config = conf.contribs["PNIO_RTC"][(ether.src, ether.dst)]
return config
###############################
## PROFINET Real-Time Packet ##
###############################
def _pnio_rtc_guess_payload_class(_pkt, _underlayer=None, *args, **kargs):
"""A dispatcher for the packet list field which manage the configuration
to fin dthe appropriate class"""
config = pnio_get_config(_underlayer)
if isinstance(config, list):
# If we have a valid config, it's a list which describe each data
# packets the rest being IOCS
cur_index = -len(_pkt)
for index, cls, params in config:
if cur_index == index:
return cls(_pkt, config=params, *args, **kargs)
# Not a data => IOCS packet
return PNIORealTimeIOxS(_pkt, *args, **kargs)
else:
# No config => Raw data which dissect the whole _pkt
return PNIORealTimeRawData(_pkt,
config={"length": len(_pkt)},
*args, **kargs
)
_PNIO_DS_FLAGS = [
"primary",
"redundancy",
"validData",
"reserved_1",
"run",
"no_problem",
"reserved_2",
"ignore",
]
class PNIORealTime(Packet):
"""PROFINET cyclic real-time"""
name = "PROFINET Real-Time"
fields_desc = [
NotionalLenField("len", None, length_from=lambda p, s: len(s)),
NotionalLenField("dataLen", None, length_from=lambda p, s: len(s[:-4].rstrip("\0"))),
LowerLayerBoundPacketListField("data", [], _pnio_rtc_guess_payload_class, length_from=lambda p: p.dataLen),
StrFixedLenField("padding", "", length_from=lambda p: p[PNIORealTime].padding_length()),
ShortField("cycleCounter", 0),
FlagsField("dataStatus", 0x35, 8, _PNIO_DS_FLAGS),
ByteField("transferStatus", 0)
]
overload_fields = {
ProfinetIO: {"frameID": 0x8000}, # RT_CLASS_1
}
def padding_length(self):
"""Compute the length of the padding need for the ethernet frame"""
fld, val = self.getfield_and_val("data")
# use the len field if available to define the padding length, eg for
# dissected packets
pkt_len = self.getfieldval("len")
if pkt_len is not None:
return max(0, pkt_len - len(fld.addfield(self, "", val)) - 4)
if isinstance(self.underlayer, ProfinetIO) and \
isinstance(self.underlayer.underlayer, UDP):
return max(0, 12 - len(fld.addfield(self, "", val)))
else:
return max(0, 40 - len(fld.addfield(self, "", val)))
@staticmethod
def analyse_data(packets):
"""Analyse the data to find heuristical properties and determine
location and type of data"""
loc = PNIORealTime.find_data(packets)
loc = PNIORealTime.analyse_profisafe(packets, loc)
pnio_update_config(loc)
return loc
@staticmethod
def find_data(packets):
"""Analyse a packet list to extract data offsets from packets data."""
# a dictionnary to count data offsets (ie != 0x80)
# It's formatted: {(src, dst): (total, [count for offset in len])}
heuristic = {}
# Counts possible data locations
# 0x80 are mainly IOxS and trailling 0x00s are just padding
for pkt in packets:
if PNIORealTime in pkt:
pdu = bytes(pkt[PNIORealTime])[:-4].rstrip("\0")
if (pkt.src, pkt.dst) not in heuristic:
heuristic[(pkt.src, pkt.dst)] = (0, [])
total, counts = heuristic[(pkt.src, pkt.dst)]
if len(counts) < len(pdu):
counts.extend([0 for _ in range(len(pdu) - len(counts))])
for i in range(len(pdu)):
if pdu[i] != "\x80":
counts[i] += 1
comm = (pkt.src, pkt.dst)
heuristic[comm] = (total + 1, counts)
# Determine data locations
locations = {}
for comm in heuristic:
total, counts = heuristic[comm]
length = len(counts)
loc = locations[comm] = []
start = None
for i in range(length):
if counts[i] > total / 2: # Data if more than half is != 0x80
if start is None:
start = i
else:
if start is not None:
loc.append((
start - length,
PNIORealTimeRawData,
{"length": i - start}
))
start = None
return locations
@staticmethod
def analyse_profisafe(packets, locations=None):
"""Analyse a packet list to find possible PROFISafe profils.
It's based on an heuristical analysis of each payload to try to find
CRC and control/status byte.
locations: possible data locations. If not provided, analyse_pn_rt will
be called beforehand. If not given, it calls in the same time
analyse_data which update the configuration of the data field"""
# get data locations and entropy of bytes
if not locations:
locations = PNIORealTime.find_data(packets)
entropies = PNIORealTime.data_entropy(packets, locations)
# Try to find at least 3 high entropy successive bytes (the CRC)
for comm in entropies:
entropy = dict(entropies[comm]) # Convert tuples to key => value
for i in range(len(locations[comm])):
# update each location with its value after profisafe analysis
locations[comm][i] = \
PNIORealTime.analyse_one_profisafe_location(
locations[comm][i], entropy
)
return locations
@staticmethod
def analyse_one_profisafe_location(location, entropy):
"""Analyse one PNIO RTC data location to find if its a PROFISafe
:param location: location to analyse, a tuple (start, type, config)
:param entropy: the entropy of each byte of the packet data
:returns: the configuration associated with the data
"""
start, klass, conf = location
if conf["length"] >= 4: # Minimal PROFISafe length
succ_count = 0
for j in range(start, start + conf["length"]):
# Limit for a CRC is set to 6 bit of entropy min
if j in entropy and entropy[j] >= 6:
succ_count += 1
else:
succ_count = 0
# PROFISafe profiles must end with at least 3 bytes of high entropy
if succ_count >= 3: # Possible profisafe CRC
return (
start,
Profisafe,
{"CRC": succ_count, "length": conf["length"]}
)
# Not a PROFISafe profile
return (start, klass, conf)
@staticmethod
def data_entropy(packets, locations=None):
"""Analyse a packet list to find the entropy of each data byte
locations: possible data locations. If not provided, analyse_pn_rt will
be called beforehand. If not given, it calls in the same time
analyse_data which update the configuration of the data field"""
if not locations:
locations = PNIORealTime.find_data(packets)
# Retrieve the entropy of each data byte, for each communication
entropies = {}
for comm in locations:
if len(locations[comm]) > 0: # Doesn't append empty data
entropies[comm] = []
comm_packets = []
# fetch all packets from the communication
for pkt in packets:
if PNIORealTime in pkt and (pkt.src, pkt.dst) == comm:
comm_packets.append(
bytes(pkt[PNIORealTime])[:-4].rstrip("\0")
)
# Get the entropy
for start, dummy, conf in locations[comm]:
for i in range(start, start + conf["length"]):
entropies[comm].append(
(i, entropy_of_byte(comm_packets, i))
)
return entropies
@staticmethod
def draw_entropy(packets, locations=None):
"""Plot the entropy of each data byte of PN RT communication"""
import matplotlib.pyplot as plt
import matplotlib.cm as cm
entropies = PNIORealTime.data_entropy(packets, locations)
rows = len(entropies)
cur_row = 1
for comm in entropies:
index = []
vals = []
for i, ent in entropies[comm]:
index.append(i)
vals.append(ent)
# Offsets the indexes to get the index from the beginning
offset = -min(index)
index = [i + offset for i in index]
plt.subplot(rows, 1, cur_row)
plt.bar(index, vals, 0.8, color="r")
plt.xticks([i + 0.4 for i in index], index)
plt.title("Entropy from %s to %s" % comm)
cur_row += 1
plt.ylabel("Shannon Entropy")
plt.xlabel("Byte offset") # x label only on the last row
plt.legend()
plt.tight_layout()
plt.show()
def entropy_of_byte(packets, position):
"""Compute the entropy of a byte at a given offset"""
counter = [0 for _ in range(256)]
# Count each byte a appearance
for pkt in packets:
if -position <= len(pkt): # position must be a negative index
counter[ord(pkt[position])] += 1
# Compute the Shannon entropy
entropy = 0
length = len(packets)
for count in counter:
if count > 0:
ratio = float(count) / length
entropy -= ratio * math.log(ratio, 2)
return entropy
###############
## PROFISafe ##
###############
class XVarBytesField(XByteField):
"""Variable length bytes field, from 0 to 8 bytes"""
__slots__ = ["length_from"]
def __init__(self, name, default, length=None, length_from=None):
self.length_from = length_from
if length:
self.length_from = lambda p, l=length: l
Field.__init__(self, name, default, "!Q")
def addfield(self, pkt, s, val):
length = self.length_from(pkt)
return s + struct.pack(self.fmt, self.i2m(pkt, val))[8-length:]
def getfield(self, pkt, s):
length = self.length_from(pkt)
val = struct.unpack(self.fmt, "\x00"*(8 - length) + s[:length])[0]
return s[length:], self.m2i(pkt, val)
class Profisafe(PNIORealTimeRawData):
"""PROFISafe profil to be encapsulated inside the PNRT.data list.
It's a configurable packet whose config includes a fix length, and a CRC
length. The config parameter must then be a dict {"length": X, "CRC": Y}.
"""
name = "PROFISafe"
fields_desc = [
StrFixedLenField("load", "", length_from=lambda p: p[Profisafe].data_length()),
XByteField("Control_Status", 0),
XVarBytesField("CRC", 0, length_from=lambda p: p[Profisafe].crc_length())
]
def data_length(self):
"""Return the length of the data"""
ret = self.length() - self.crc_length() - 1
return ret
def crc_length(self):
"""Return the length of the crc"""
return self._config["CRC"]

106
scapy/contrib/pnio_rtc.uts Normal file
View File

@ -0,0 +1,106 @@
% PNIO RTC layer test campaign
+ Syntax check
= Import the PNIO RTC layer
from scapy.contrib.pnio import *
from scapy.contrib.pnio_rtc import *
+ Check PNIORealTimeIOxS
= PNIORealTimeIOxS default values
str(PNIORealTimeIOxS()) == '\x80'
= Check no payload is dissected (only padding)
* In order for the PNIORealTime to dissect correctly all the data buffer, data field must strictly dissect what they know as being of themselves
p = PNIORealTimeIOxS('\x40\x01\x02')
p == PNIORealTimeIOxS(dataState='bad', instance='device') / conf.padding_layer('\x01\x02')
+ Check PNIORealTimeRawData
= PNIORealTimeRawData default values
str(PNIORealTimeRawData(config={'length': 5})) == '\x00\x00\x00\x00\x00'
= PNIORealTimeRawData must always be the same configured length
str(PNIORealTimeRawData(load='ABC', config={'length': 5})) == 'ABC\x00\x00'
= PNIORealTimeRawData may be truncated
str(PNIORealTimeRawData(load='ABCDEF', config={'length': 5})) == 'ABCDE'
= Check that the dissected payload is an PNIORealTimeIOxS (IOPS)
p = PNIORealTimeRawData('ABCDE\x80\x01\x02', config={'length': 5})
p == PNIORealTimeRawData(load='ABCDE', config={'length': 5}) / PNIORealTimeIOxS() / conf.padding_layer('\x01\x02')
= PNIORealTimeRawData is capable of dissected uncomplete packets
p = PNIORealTimeRawData('ABC', config={'length': 5})
p == PNIORealTimeRawData(load='ABC', config={'length': 5})
+ Check Profisafe
= Profisafe default values
str(Profisafe(config={'length': 7, 'CRC': 3})) == '\0\0\0\0\0\0\0'
= Profisafe must always be the same configured length
str(Profisafe(load='AB', config={'length': 7, 'CRC': 3})) == 'AB\0\0\0\0\0'
= Profisafe load may be truncated
str(Profisafe(load='ABCDEF', config={'length': 7, 'CRC': 3})) == 'ABC\0\0\0\0'
= Check that the dissected payload is an PNIORealTimeIOxS (IOPS)
p = Profisafe('ABC\x20\x12\x34\x56\x80\x01\x02', config={'length': 7, 'CRC': 3})
p == Profisafe(load='ABC', Control_Status=0x20, CRC=0x123456, config={'length': 7, 'CRC': 3}) / PNIORealTimeIOxS() / conf.padding_layer('\x01\x02')
= Profisafe with a CRC-32
str(Profisafe(load='ABC', Control_Status=0x33, CRC=0x12345678, config={'length': 8, 'CRC': 4})) == 'ABC\x33\x12\x34\x56\x78'
= Profisafe is capable of dissected uncomplete packets
p = Profisafe('AB', config={'length': 7, 'CRC': 3})
p == Profisafe(load='AB', Control_Status=0, CRC=0)
+ Check PNIORealTime layer
= PNIORealTime default values
str(PNIORealTime()) == '\0' * 40 + '\0\0\x35\0'
= PNIORealTime default values under an UDP packet
str(UDP(sport=0x1234) / ProfinetIO(frameID=0x8002) / PNIORealTime()) == '12348892001a00008002'.decode('hex') + '\0' * 12 + '\0\0\x35\0'
= PNIORealTime simple packet
* a simple data packet with a raw profinet data and its IOPS, an IOCS and a Profisafe data and its IOPS. 15B data length, 1B padding (20 - 15 -4)
str(PNIORealTime(len=20, dataLen=15, cycleCounter=0x1234, dataStatus='redundancy+validData+no_problem', transferStatus=3,
data=[
PNIORealTimeRawData(load='\x01\x02\x03\x04', config={'length': 5}) / PNIORealTimeIOxS(),
PNIORealTimeIOxS(dataState='bad'),
Profisafe(load='\x05\x06', Control_Status=0x20, CRC=0x12345678, config={'length': 7, 'CRC': 4}) / PNIORealTimeIOxS()
]
)) == '0102030400800005062012345678800012342603'.decode('hex')
= PNIORealTime dissects to PNIORealTimeRawData when no config is available
p = PNIORealTime('0102030400800005062012345678800012342603'.decode('hex'))
p == PNIORealTime(len=20, dataLen=15, cycleCounter=0x1234, dataStatus='redundancy+validData+no_problem', transferStatus=3, padding='\0',
data=[
PNIORealTimeRawData(load='010203040080000506201234567880'.decode('hex'))
]
)
= PNIORealTime dissection is configurable
* Usually, the configuration is not given manually, but using PNIORealTime.analyse_data() on a list of Packets which analyses and updates the configuration
pnio_update_config({
('06:07:08:09:0a:0b', '00:01:02:03:04:05'): [
(-15, PNIORealTimeRawData, {'length': 5}),
(-8, Profisafe, {'length': 7, 'CRC': 4}),
]
})
p = Ether('000102030405060708090a0b889280020102030400800005062012345678800012342603'.decode('hex'))
p == Ether(dst='00:01:02:03:04:05', src='06:07:08:09:0a:0b') / ProfinetIO(frameID=0x8002) / PNIORealTime(
len=20, dataLen=15, cycleCounter=0x1234, dataStatus='redundancy+validData+no_problem', transferStatus=3, padding='\0',
data=[
PNIORealTimeRawData(load='\x01\x02\x03\x04\0', config={'length': 5}) / PNIORealTimeIOxS(),
PNIORealTimeIOxS(dataState='bad'),
Profisafe(load='\x05\x06', Control_Status=0x20, CRC=0x12345678, config={'length': 7, 'CRC': 4}) / PNIORealTimeIOxS()
]
)