Tazmen sniffer protocol layer (#1004)

* add TaZmen Sniffer Protocol (TZSP) layer #1000

* add tests for TZSP layer #1000

* cleanups from PR reviews #1000

* fix ranges used in state tags #1000

* add some corner case tests #1000

* add support for unknown tag type #1000

use type TZSPTagUnknown if tag type is not known.

+ related tests

* fixing a nasty grammar bug #1000

* add YesNoByteField #1000

+ tests

* use YesNoByteField instead of generated dicts for conditional tags #1000

* use orb instead of unpack for accessing payload bytes #1000
This commit is contained in:
hecke 2018-02-23 12:56:46 +01:00 committed by Guillaume Valadon
parent d2201f1b32
commit d8b24c7138
4 changed files with 1330 additions and 0 deletions

486
scapy/contrib/tzsp.py Normal file
View File

@ -0,0 +1,486 @@
#! /usr/bin/env python
#
# scapy.contrib.description = TZSP
# scapy.contrib.status = loads
"""
TZSP - TaZmen Sniffer Protocol
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
:author: Thomas Tannhaeuser, hecke@naberius.de
:license: GPLv2
This module is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This module is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
:description:
This module provides Scapy layers for the TZSP protocol.
references:
- https://en.wikipedia.org/wiki/TZSP
- https://web.archive.org/web/20050404125022/http://www.networkchemistry.com/support/appnotes/an001_tzsp.html
:NOTES:
- to allow Scapy to dissect this layer automatically, you need to bind the TZSP layer to UDP using
the default TZSP port (0x9090), e.g.
bind_layers(UDP, TZSP, sport=TZSP_PORT_DEFAULT)
bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT)
- packet format definition from www.networkchemistry.com is different from the one given by wikipedia
- seems Wireshark implements the wikipedia protocol version (didn't dive into their code)
- observed (miss)behavior of Wireshark (2.2.6)
- fails to decode RSSI & SNR using short values - only one byte taken
- SNR is labeled as silence
- WlanRadioHdrSerial is labeled as Sensor MAC
- doesn't know the packet count tag (40 / 0x28)
"""
from scapy.compat import orb
from scapy.contrib.avs import AVSWLANHeader
from scapy.error import warning, Scapy_Exception
from scapy.fields import ByteField, ShortEnumField, IntField, FieldLenField, YesNoByteField
from scapy.layers.dot11 import Packet, Dot11, PrismHeader
from scapy.layers.l2 import Ether
from scapy.fields import StrLenField, ByteEnumField, ShortField, XStrLenField
from scapy.modules.six.moves import range
from scapy.packet import Raw
TZSP_PORT_DEFAULT = 0x9090
class TZSP(Packet):
TYPE_RX_PACKET = 0x00
TYPE_TX_PACKET = 0x01
TYPE_CONFIG = 0x03
TYPE_KEEPALIVE = TYPE_NULL = 0x04
TYPE_PORT = 0x05
TYPES = {
TYPE_RX_PACKET: 'RX_PACKET',
TYPE_TX_PACKET: 'TX_PACKET',
TYPE_CONFIG: 'CONFIG',
TYPE_NULL: 'KEEPALIVE/NULL',
TYPE_PORT: 'PORT',
}
ENCAPSULATED_ETHERNET = 0x01
ENCAPSULATED_IEEE_802_11 = 0x12
ENCAPSULATED_PRISM_HEADER = 0x77
ENCAPSULATED_WLAN_AVS = 0x7f
ENCAPSULATED_PROTOCOLS = {
ENCAPSULATED_ETHERNET: 'ETHERNET',
ENCAPSULATED_IEEE_802_11: 'IEEE 802.11',
ENCAPSULATED_PRISM_HEADER: 'PRISM HEADER',
ENCAPSULATED_WLAN_AVS: 'WLAN AVS'
}
ENCAPSULATED_PROTOCOL_CLASSES = {
ENCAPSULATED_ETHERNET: Ether,
ENCAPSULATED_IEEE_802_11: Dot11,
ENCAPSULATED_PRISM_HEADER: PrismHeader,
ENCAPSULATED_WLAN_AVS: AVSWLANHeader
}
fields_desc = [
ByteField('version', 0x01),
ByteEnumField('type', TYPE_RX_PACKET, TYPES),
ShortEnumField('encapsulated_protocol', ENCAPSULATED_ETHERNET, ENCAPSULATED_PROTOCOLS)
]
def get_encapsulated_payload_class(self):
"""
get the class that holds the encapsulated payload of the TZSP packet
:return: class representing the payload, Raw() on error
"""
try:
return TZSP.ENCAPSULATED_PROTOCOL_CLASSES[self.encapsulated_protocol]
except KeyError:
warning(
'unknown or invalid encapsulation type (%i) - returning payload as raw()' % self.encapsulated_protocol)
return Raw
def guess_payload_class(self, payload):
if self.type == TZSP.TYPE_KEEPALIVE:
if len(payload):
warning('payload (%i bytes) in KEEPALIVE/NULL packet' % len(payload))
return Raw
else:
return _tzsp_guess_next_tag(payload)
def get_encapsulated_payload(self):
has_encapsulated_data = self.type == TZSP.TYPE_RX_PACKET or self.type == TZSP.TYPE_TX_PACKET
if has_encapsulated_data:
end_tag_lyr = self.payload.getlayer(TZSPTagEnd)
if end_tag_lyr:
return end_tag_lyr.payload
else:
return None
def _tzsp_handle_unknown_tag(payload, tag_type):
payload_len = len(payload)
if payload_len < 2:
warning('invalid or unknown tag type (%i) and too short packet - treat remaining data as Raw' % tag_type)
return Raw
tag_data_length = orb(payload[1])
tag_data_fits_in_payload = (tag_data_length + 2) <= payload_len
if not tag_data_fits_in_payload:
warning('invalid or unknown tag type (%i) and too short packet - treat remaining data as Raw' % tag_type)
return Raw
warning('invalid or unknown tag type (%i)' % tag_type)
return TZSPTagUnknown
def _tzsp_guess_next_tag(payload):
"""
:return: class representing the next tag, Raw on error, None on missing payload
"""
if not payload:
warning('missing payload')
return None
tag_type = orb(payload[0])
try:
tag_class_definition = _TZSP_TAG_CLASSES[tag_type]
except KeyError:
return _tzsp_handle_unknown_tag(payload, tag_type)
if type(tag_class_definition) is not dict:
return tag_class_definition
try:
length = orb(payload[1])
except IndexError:
length = None
if not length:
warning('no tag length given - packet to short')
return Raw
try:
return tag_class_definition[length]
except KeyError:
warning('invalid tag length {} for tag type {}'.format(length, tag_type))
return Raw
class _TZSPTag(Packet):
TAG_TYPE_PADDING = 0x00
TAG_TYPE_END = 0x01
TAG_TYPE_RAW_RSSI = 0x0a
TAG_TYPE_SNR = 0x0b
TAG_TYPE_DATA_RATE = 0x0c
TAG_TYPE_TIMESTAMP = 0x0d
TAG_TYPE_CONTENTION_FREE = 0x0f
TAG_TYPE_DECRYPTED = 0x10
TAG_TYPE_FCS_ERROR = 0x11
TAG_TYPE_RX_CHANNEL = 0x12
TAG_TYPE_PACKET_COUNT = 0x28
TAG_TYPE_RX_FRAME_LENGTH = 0x29
TAG_TYPE_WLAN_RADIO_HDR_SERIAL = 0x3c
TAG_TYPES = {
TAG_TYPE_PADDING: 'PADDING',
TAG_TYPE_END: 'END',
TAG_TYPE_RAW_RSSI: 'RAW_RSSI',
TAG_TYPE_SNR: 'SNR',
TAG_TYPE_DATA_RATE: 'DATA_RATE',
TAG_TYPE_TIMESTAMP: 'TIMESTAMP',
TAG_TYPE_CONTENTION_FREE: 'CONTENTION_FREE',
TAG_TYPE_DECRYPTED: 'DECRYPTED',
TAG_TYPE_FCS_ERROR: 'FCS_ERROR',
TAG_TYPE_RX_CHANNEL: 'RX_CHANNEL',
TAG_TYPE_PACKET_COUNT: 'PACKET_COUNT',
TAG_TYPE_RX_FRAME_LENGTH: 'RX_FRAME_LENGTH',
TAG_TYPE_WLAN_RADIO_HDR_SERIAL: 'WLAN_RADIO_HDR_SERIAL'
}
def guess_payload_class(self, payload):
return _tzsp_guess_next_tag(payload)
class TZSPStructureException(Scapy_Exception):
pass
class TZSPTagPadding(_TZSPTag):
"""
padding tag (should be ignored)
"""
fields_desc = [
ByteEnumField('type', _TZSPTag.TAG_TYPE_PADDING, _TZSPTag.TAG_TYPES),
]
class TZSPTagEnd(Packet):
"""
last tag
"""
fields_desc = [
ByteEnumField('type', _TZSPTag.TAG_TYPE_END, _TZSPTag.TAG_TYPES),
]
def guess_payload_class(self, payload):
"""
the type of the payload encapsulation is given be the outer TZSP layers attribute encapsulation_protocol
"""
under_layer = self.underlayer
tzsp_header = None
while under_layer:
if isinstance(under_layer, TZSP):
tzsp_header = under_layer
break
under_layer = under_layer.underlayer
if tzsp_header:
return tzsp_header.get_encapsulated_payload_class()
else:
raise TZSPStructureException('missing parent TZSP header')
class TZSPTagRawRSSIByte(_TZSPTag):
"""
relative received signal strength - signed byte value
"""
fields_desc = [
ByteEnumField('type', _TZSPTag.TAG_TYPE_RAW_RSSI, _TZSPTag.TAG_TYPES),
ByteField('len', 1),
ByteField('raw_rssi', 0)
]
class TZSPTagRawRSSIShort(_TZSPTag):
"""
relative received signal strength - signed short value
"""
fields_desc = [
ByteEnumField('type', _TZSPTag.TAG_TYPE_RAW_RSSI, _TZSPTag.TAG_TYPES),
ByteField('len', 2),
ShortField('raw_rssi', 0)
]
class TZSPTagSNRByte(_TZSPTag):
"""
signal noise ratio - signed byte value
"""
fields_desc = [
ByteEnumField('type', _TZSPTag.TAG_TYPE_SNR, _TZSPTag.TAG_TYPES),
ByteField('len', 1),
ByteField('snr', 0)
]
class TZSPTagSNRShort(_TZSPTag):
"""
signal noise ratio - signed short value
"""
fields_desc = [
ByteEnumField('type', _TZSPTag.TAG_TYPE_SNR, _TZSPTag.TAG_TYPES),
ByteField('len', 2),
ShortField('snr', 0)
]
class TZSPTagDataRate(_TZSPTag):
"""
wireless link data rate
"""
DATA_RATE_UNKNOWN = 0x00
DATA_RATE_1 = 0x02
DATA_RATE_2 = 0x04
DATA_RATE_5_5 = 0x0B
DATA_RATE_6 = 0x0C
DATA_RATE_9 = 0x12
DATA_RATE_11 = 0x16
DATA_RATE_12 = 0x18
DATA_RATE_18 = 0x24
DATA_RATE_22 = 0x2C
DATA_RATE_24 = 0x30
DATA_RATE_33 = 0x42
DATA_RATE_36 = 0x48
DATA_RATE_48 = 0x60
DATA_RATE_54 = 0x6C
DATA_RATE_LEGACY_1 = 0x0A
DATA_RATE_LEGACY_2 = 0x14
DATA_RATE_LEGACY_5_5 = 0x37
DATA_RATE_LEGACY_11 = 0x6E
DATA_RATES = {
DATA_RATE_UNKNOWN: 'unknown',
DATA_RATE_1: '1 MB/s',
DATA_RATE_2: '2 MB/s',
DATA_RATE_5_5: '5.5 MB/s',
DATA_RATE_6: '6 MB/s',
DATA_RATE_9: '9 MB/s',
DATA_RATE_11: '11 MB/s',
DATA_RATE_12: '12 MB/s',
DATA_RATE_18: '18 MB/s',
DATA_RATE_22: '22 MB/s',
DATA_RATE_24: '24 MB/s',
DATA_RATE_33: '33 MB/s',
DATA_RATE_36: '36 MB/s',
DATA_RATE_48: '48 MB/s',
DATA_RATE_54: '54 MB/s',
DATA_RATE_LEGACY_1: '1 MB/s (legacy)',
DATA_RATE_LEGACY_2: '2 MB/s (legacy)',
DATA_RATE_LEGACY_5_5: '5.5 MB/s (legacy)',
DATA_RATE_LEGACY_11: '11 MB/s (legacy)',
}
fields_desc = [
ByteEnumField('type', _TZSPTag.TAG_TYPE_DATA_RATE, _TZSPTag.TAG_TYPES),
ByteField('len', 1),
ByteEnumField('data_rate', DATA_RATE_UNKNOWN, DATA_RATES)
]
class TZSPTagTimestamp(_TZSPTag):
"""
MAC receive timestamp
"""
fields_desc = [
ByteEnumField('type', _TZSPTag.TAG_TYPE_TIMESTAMP, _TZSPTag.TAG_TYPES),
ByteField('len', 4),
IntField('timestamp', 0)
]
class TZSPTagContentionFree(_TZSPTag):
"""
packet received in contention free period
"""
NO = 0x00
YES = 0x01
fields_desc = [
ByteEnumField('type', _TZSPTag.TAG_TYPE_CONTENTION_FREE, _TZSPTag.TAG_TYPES),
ByteField('len', 1),
YesNoByteField('contention_free', NO)
]
class TZSPTagDecrypted(_TZSPTag):
"""
packet was decrypted
"""
YES = 0x00
NO = 0x01
fields_desc = [
ByteEnumField('type', _TZSPTag.TAG_TYPE_DECRYPTED, _TZSPTag.TAG_TYPES),
ByteField('len', 1),
YesNoByteField('decrypted', NO, config={'yes': YES, 'no': (NO, 0xff)})
]
class TZSPTagError(_TZSPTag):
"""
frame checksum error
"""
NO = 0x00
YES = 0x01
fields_desc = [
ByteEnumField('type', _TZSPTag.TAG_TYPE_FCS_ERROR, _TZSPTag.TAG_TYPES),
ByteField('len', 1),
YesNoByteField('fcs_error', NO, config={'no': NO, 'yes': YES, 'reserved': (YES + 1, 0xff)})
]
class TZSPTagRXChannel(_TZSPTag):
"""
channel the sensor was on while receiving the frame
"""
fields_desc = [
ByteEnumField('type', _TZSPTag.TAG_TYPE_RX_CHANNEL, _TZSPTag.TAG_TYPES),
ByteField('len', 1),
ByteField('rx_channel', 0)
]
class TZSPTagPacketCount(_TZSPTag):
"""
packet counter
"""
fields_desc = [
ByteEnumField('type', _TZSPTag.TAG_TYPE_PACKET_COUNT, _TZSPTag.TAG_TYPES),
ByteField('len', 4),
IntField('packet_count', 0)
]
class TZSPTagRXFrameLength(_TZSPTag):
"""
received packet length
"""
fields_desc = [
ByteEnumField('type', _TZSPTag.TAG_TYPE_RX_FRAME_LENGTH, _TZSPTag.TAG_TYPES),
ByteField('len', 2),
ShortField('rx_frame_length', 0)
]
class TZSPTagWlanRadioHdrSerial(_TZSPTag):
"""
(vendor specific) unique capture device (sensor/AP) identifier
"""
fields_desc = [
ByteEnumField('type', _TZSPTag.TAG_TYPE_WLAN_RADIO_HDR_SERIAL, _TZSPTag.TAG_TYPES),
FieldLenField('len', None, length_of='sensor_id', fmt='b'),
StrLenField('sensor_id', '', length_from=lambda pkt:pkt.len)
]
class TZSPTagUnknown(_TZSPTag):
"""
unknown tag type dummy
"""
fields_desc = [
ByteField('type', 0xff),
FieldLenField('len', None, length_of='data', fmt='b'),
XStrLenField('data', '', length_from=lambda pkt: pkt.len)
]
_TZSP_TAG_CLASSES = {
_TZSPTag.TAG_TYPE_PADDING: TZSPTagPadding,
_TZSPTag.TAG_TYPE_END: TZSPTagEnd,
_TZSPTag.TAG_TYPE_RAW_RSSI: {1: TZSPTagRawRSSIByte, 2: TZSPTagRawRSSIShort},
_TZSPTag.TAG_TYPE_SNR: {1: TZSPTagSNRByte, 2: TZSPTagSNRShort},
_TZSPTag.TAG_TYPE_DATA_RATE: TZSPTagDataRate,
_TZSPTag.TAG_TYPE_TIMESTAMP: TZSPTagTimestamp,
_TZSPTag.TAG_TYPE_CONTENTION_FREE: TZSPTagContentionFree,
_TZSPTag.TAG_TYPE_DECRYPTED: TZSPTagDecrypted,
_TZSPTag.TAG_TYPE_FCS_ERROR: TZSPTagError,
_TZSPTag.TAG_TYPE_RX_CHANNEL: TZSPTagRXChannel,
_TZSPTag.TAG_TYPE_PACKET_COUNT: TZSPTagPacketCount,
_TZSPTag.TAG_TYPE_RX_FRAME_LENGTH: TZSPTagRXFrameLength,
_TZSPTag.TAG_TYPE_WLAN_RADIO_HDR_SERIAL: TZSPTagWlanRadioHdrSerial
}

653
scapy/contrib/tzsp.uts Normal file
View File

@ -0,0 +1,653 @@
% TZSP test campaign
#
# execute test:
# > test/run_tests -P "load_contrib('tzsp')" -t scapy/contrib/tzsp.uts
#
+ Basic layer handling
= build basic TZSP frames
== basic TZSP header - keepalive
bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT)
frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02')/ \
IP(src='1.1.1.1', dst='2.2.2.2')/ \
UDP(sport=12345, dport=TZSP_PORT_DEFAULT)/ \
TZSP(type=TZSP.TYPE_KEEPALIVE, encapsulated_protocol=0)
frm = frm.build()
frm = Ether(frm)
tzsp_lyr = frm.getlayer(TZSP)
assert(tzsp_lyr.type == TZSP.TYPE_KEEPALIVE)
assert(not tzsp_lyr.payload)
== basic TZSP header - keepalive + ignored end tag
bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT)
frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02')/ \
IP(src='1.1.1.1', dst='2.2.2.2')/ \
UDP(sport=12345, dport=TZSP_PORT_DEFAULT)/ \
TZSP(type=TZSP.TYPE_KEEPALIVE, encapsulated_protocol=0)/ \
TZSPTagEnd()
frm = frm.build()
frm = Ether(frm)
tzsp_lyr = frm.getlayer(TZSP)
assert(tzsp_lyr.type == TZSP.TYPE_KEEPALIVE)
assert(tzsp_lyr.guess_payload_class(tzsp_lyr.payload) is scapy.packet.Raw)
== basic TZSP header with RX Packet and EndTag
bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT)
frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \
IP(src='1.1.1.1', dst='2.2.2.2') / \
UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \
TZSP() / \
TZSPTagEnd() / \
Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04') / \
Raw('foobar')
frm = frm.build()
frm = Ether(frm)
tzsp_lyr = frm.getlayer(TZSP)
assert(tzsp_lyr.type == TZSP.TYPE_RX_PACKET)
tzsp_tag_end = tzsp_lyr.payload
assert(tzsp_tag_end.type == 1)
encapsulated_payload = tzsp_lyr.get_encapsulated_payload()
encapsulated_ether_lyr = encapsulated_payload.getlayer(Ether)
assert(encapsulated_ether_lyr.src == '00:03:03:03:03:03')
== basic TZSP header with RX Packet and Padding
bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT)
frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \
IP(src='1.1.1.1', dst='2.2.2.2') / \
UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \
TZSP() / \
TZSPTagPadding() / \
TZSPTagEnd() / \
Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04') / \
Raw('foobar')
frm = frm.build()
frm = Ether(frm)
tzsp_lyr = frm.getlayer(TZSP)
assert(tzsp_lyr.type == TZSP.TYPE_RX_PACKET)
tzsp_tag_padding = tzsp_lyr.payload
assert(tzsp_tag_padding.type == 0)
tzsp_tag_end = tzsp_tag_padding.payload
assert(tzsp_tag_end.type == 1)
== basic TZSP header with RX Packet and RAWRSSI (byte, short)
bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT)
frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \
IP(src='1.1.1.1', dst='2.2.2.2') / \
UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \
TZSP() / \
TZSPTagRawRSSIByte(raw_rssi=42) / \
TZSPTagRawRSSIShort(raw_rssi=12345) / \
TZSPTagEnd() / \
Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04') / \
Raw('foobar')
frm = frm.build()
frm = Ether(frm)
tzsp_lyr = frm.getlayer(TZSP)
assert(tzsp_lyr.type == TZSP.TYPE_RX_PACKET)
tzsp_tag_raw_rssi_byte = tzsp_lyr.payload
assert(tzsp_tag_raw_rssi_byte.type == 10)
assert(tzsp_tag_raw_rssi_byte.raw_rssi == 42)
tzsp_tag_raw_rssi_short = tzsp_tag_raw_rssi_byte.payload
assert(tzsp_tag_raw_rssi_short.type == 10)
assert(tzsp_tag_raw_rssi_short.raw_rssi == 12345)
tzsp_tag_end = tzsp_tag_raw_rssi_short.payload
assert(tzsp_tag_end.type == 1)
== basic TZSP header with RX Packet and SNR (byte, short)
bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT)
frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \
IP(src='1.1.1.1', dst='2.2.2.2') / \
UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \
TZSP() / \
TZSPTagSNRByte(snr=23) / \
TZSPTagSNRShort(snr=54321) / \
TZSPTagEnd() / \
Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04') / \
Raw('foobar')
frm = frm.build()
frm = Ether(frm)
tzsp_lyr = frm.getlayer(TZSP)
assert(tzsp_lyr.type == TZSP.TYPE_RX_PACKET)
tzsp_tag_snr_byte = tzsp_lyr.payload
assert(tzsp_tag_snr_byte.type == 11)
assert(tzsp_tag_snr_byte.len == 1)
assert(tzsp_tag_snr_byte.snr == 23)
tzsp_tag_snr_short = tzsp_tag_snr_byte.payload
assert(tzsp_tag_snr_short.type == 11)
assert(tzsp_tag_snr_short.len == 2)
assert(tzsp_tag_snr_short.snr == 54321)
tzsp_tag_end = tzsp_tag_snr_short.payload
assert(tzsp_tag_end.type == 1)
== basic TZSP header with RX Packet and DATA Rate
bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT)
frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \
IP(src='1.1.1.1', dst='2.2.2.2') / \
UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \
TZSP() / \
TZSPTagDataRate(data_rate=TZSPTagDataRate.DATA_RATE_33) / \
TZSPTagEnd() / \
Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04') / \
Raw('foobar')
frm = frm.build()
frm = Ether(frm)
tzsp_lyr = frm.getlayer(TZSP)
assert(tzsp_lyr.type == TZSP.TYPE_RX_PACKET)
tzsp_tag_data_rate = tzsp_lyr.payload
assert(tzsp_tag_data_rate.type == 12)
assert(tzsp_tag_data_rate.len == 1)
assert(tzsp_tag_data_rate.data_rate == TZSPTagDataRate.DATA_RATE_33)
tzsp_tag_end = tzsp_tag_data_rate.payload
assert(tzsp_tag_end.type == 1)
== basic TZSP header with RX Packet and Timestamp
bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT)
frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \
IP(src='1.1.1.1', dst='2.2.2.2') / \
UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \
TZSP() / \
TZSPTagTimestamp(timestamp=0x11223344) / \
TZSPTagEnd() / \
Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04') / \
Raw('foobar')
frm = frm.build()
frm = Ether(frm)
tzsp_lyr = frm.getlayer(TZSP)
assert(tzsp_lyr.type == TZSP.TYPE_RX_PACKET)
tzsp_tag_timestamp = tzsp_lyr.payload
assert(tzsp_tag_timestamp.type == 13)
assert(tzsp_tag_timestamp.len == 4)
assert(tzsp_tag_timestamp.timestamp == 0x11223344)
tzsp_tag_end = tzsp_tag_timestamp.payload
assert(tzsp_tag_end.type == 1)
== basic TZSP header with RX Packet and ContentionFree
bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT)
frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \
IP(src='1.1.1.1', dst='2.2.2.2') / \
UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \
TZSP() / \
TZSPTagContentionFree(contention_free=TZSPTagContentionFree.NO) / \
TZSPTagContentionFree(contention_free=TZSPTagContentionFree.YES) / \
TZSPTagEnd() / \
Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04') / \
Raw('foobar')
frm = frm.build()
frm = Ether(frm)
tzsp_lyr = frm.getlayer(TZSP)
assert(tzsp_lyr.type == TZSP.TYPE_RX_PACKET)
tzsp_tag_contention_free_no = tzsp_lyr.payload
assert(tzsp_tag_contention_free_no.type == 15)
assert(tzsp_tag_contention_free_no.len == 1)
assert(tzsp_tag_contention_free_no.contention_free == TZSPTagContentionFree.NO)
tzsp_tag_contention_free_yes = tzsp_tag_contention_free_no.payload
assert(tzsp_tag_contention_free_yes.type == 15)
assert(tzsp_tag_contention_free_yes.len == 1)
assert(tzsp_tag_contention_free_yes.contention_free == TZSPTagContentionFree.YES)
tzsp_tag_end = tzsp_tag_contention_free_yes.payload
assert(tzsp_tag_end.type == 1)
== basic TZSP header with RX Packet and Decrypted
bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT)
frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \
IP(src='1.1.1.1', dst='2.2.2.2') / \
UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \
TZSP() / \
TZSPTagDecrypted(decrypted=TZSPTagDecrypted.NO) / \
TZSPTagDecrypted(decrypted=TZSPTagDecrypted.YES) / \
TZSPTagEnd() / \
Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04') / \
Raw('foobar')
frm = frm.build()
frm = Ether(frm)
tzsp_lyr = frm.getlayer(TZSP)
assert(tzsp_lyr.type == TZSP.TYPE_RX_PACKET)
tzsp_tag_decrypted_no = tzsp_lyr.payload
assert(tzsp_tag_decrypted_no.type == 16)
assert(tzsp_tag_decrypted_no.len == 1)
assert(tzsp_tag_decrypted_no.decrypted == TZSPTagDecrypted.NO)
tzsp_tag_decrypted_yes= tzsp_tag_decrypted_no.payload
assert(tzsp_tag_decrypted_yes.type == 16)
assert(tzsp_tag_decrypted_yes.len == 1)
assert(tzsp_tag_decrypted_yes.decrypted == TZSPTagDecrypted.YES)
tzsp_tag_end = tzsp_tag_decrypted_yes.payload
assert(tzsp_tag_end.type == 1)
== basic TZSP header with RX Packet and FCS error
bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT)
frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \
IP(src='1.1.1.1', dst='2.2.2.2') / \
UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \
TZSP() / \
TZSPTagError(fcs_error=TZSPTagError.NO) / \
TZSPTagError(fcs_error=TZSPTagError.YES) / \
TZSPTagEnd() / \
Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04') / \
Raw('foobar')
frm = frm.build()
frm = Ether(frm)
tzsp_lyr = frm.getlayer(TZSP)
assert(tzsp_lyr.type == TZSP.TYPE_RX_PACKET)
tzsp_tag_error_no = tzsp_lyr.payload
assert(tzsp_tag_error_no.type == 17)
assert(tzsp_tag_error_no.len == 1)
assert(tzsp_tag_error_no.fcs_error == TZSPTagError.NO)
tzsp_tag_error_yes = tzsp_tag_error_no.payload
assert(tzsp_tag_error_yes.type == 17)
assert(tzsp_tag_error_yes.len == 1)
assert(tzsp_tag_error_yes.fcs_error == TZSPTagError.YES)
tzsp_tag_end = tzsp_tag_error_yes.payload
assert(tzsp_tag_end.type == 1)
== basic TZSP header with RX Packet and RXChannel
bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT)
frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \
IP(src='1.1.1.1', dst='2.2.2.2') / \
UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \
TZSP() / \
TZSPTagRXChannel(rx_channel=123) / \
TZSPTagEnd() / \
Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04') / \
Raw('foobar')
frm = frm.build()
frm = Ether(frm)
tzsp_lyr = frm.getlayer(TZSP)
assert(tzsp_lyr.type == TZSP.TYPE_RX_PACKET)
tzsp_tag_rx_channel = tzsp_lyr.payload
assert(tzsp_tag_rx_channel.type == 18)
assert(tzsp_tag_rx_channel.len == 1)
assert(tzsp_tag_rx_channel.rx_channel == 123)
tzsp_tag_end = tzsp_tag_rx_channel.payload
assert(tzsp_tag_end.type == 1)
== basic TZSP header with RX Packet and Packet count
bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT)
frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \
IP(src='1.1.1.1', dst='2.2.2.2') / \
UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \
TZSP() / \
TZSPTagPacketCount(packet_count=0x44332211) / \
TZSPTagEnd() / \
Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04') / \
Raw('foobar')
frm = frm.build()
frm = Ether(frm)
tzsp_lyr = frm.getlayer(TZSP)
assert(tzsp_lyr.type == TZSP.TYPE_RX_PACKET)
tzsp_tag_packet_count = tzsp_lyr.payload
assert(tzsp_tag_packet_count.type == 40)
assert(tzsp_tag_packet_count.len == 4)
assert(tzsp_tag_packet_count.packet_count == 0x44332211)
tzsp_tag_end = tzsp_tag_packet_count.payload
assert(tzsp_tag_end.type == 1)
== basic TZSP header with RX Packet and RXFrameLength
bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT)
frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \
IP(src='1.1.1.1', dst='2.2.2.2') / \
UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \
TZSP() / \
TZSPTagRXFrameLength(rx_frame_length=0xbad0) / \
TZSPTagEnd() / \
Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04') / \
Raw('foobar')
frm = frm.build()
frm = Ether(frm)
tzsp_lyr = frm.getlayer(TZSP)
assert(tzsp_lyr.type == TZSP.TYPE_RX_PACKET)
tzsp_tag_frame_length = tzsp_lyr.payload
assert(tzsp_tag_frame_length.type == 41)
assert(tzsp_tag_frame_length.len == 2)
assert(tzsp_tag_frame_length.rx_frame_length == 0xbad0)
tzsp_tag_end = tzsp_tag_frame_length.payload
assert(tzsp_tag_end.type == 1)
== basic TZSP header with RX Packet and WLAN RADIO HDR SERIAL
bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT)
SENSOR_ID = '1E:AT:DE:AD:BE:EF'
frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \
IP(src='1.1.1.1', dst='2.2.2.2') / \
UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \
TZSP() / \
TZSPTagWlanRadioHdrSerial(sensor_id=SENSOR_ID) / \
TZSPTagEnd() / \
Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04') / \
Raw('foobar')
frm = frm.build()
frm = Ether(frm)
tzsp_lyr = frm.getlayer(TZSP)
assert(tzsp_lyr.type == TZSP.TYPE_RX_PACKET)
tzsp_tag_sensor_id = tzsp_lyr.payload
assert(tzsp_tag_sensor_id.type == 60)
assert(tzsp_tag_sensor_id.len == len(SENSOR_ID))
assert(tzsp_tag_sensor_id.sensor_id == SENSOR_ID)
tzsp_tag_end = tzsp_tag_sensor_id.payload
assert(tzsp_tag_end.type == 1)
== handling of unknown tag
bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT)
SENSOR_ID = '1E:AT:DE:AD:BE:EF'
frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \
IP(src='1.1.1.1', dst='2.2.2.2') / \
UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \
TZSP() / \
TZSPTagUnknown(len=6, data=b'\x06\x05\x04\x03\x02\x01') / \
TZSPTagWlanRadioHdrSerial(sensor_id=SENSOR_ID) / \
TZSPTagEnd() / \
Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04') / \
Raw('foobar')
frm = frm.build()
frm = Ether(frm)
tzsp_lyr = frm.getlayer(TZSP)
assert(tzsp_lyr)
tzsp_tag_unknown = tzsp_lyr.payload
assert(type(tzsp_tag_unknown) is TZSPTagUnknown)
= all layers stacked
bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT)
frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02')/ \
IP(src='1.1.1.1', dst='2.2.2.2')/ \
UDP(sport=12345, dport=TZSP_PORT_DEFAULT)/ \
TZSP()/ \
TZSPTagRawRSSIByte(raw_rssi=12)/ \
TZSPTagRawRSSIShort(raw_rssi=1234)/ \
TZSPTagSNRByte(snr=12)/ \
TZSPTagSNRShort(snr=1234)/ \
TZSPTagDataRate(data_rate = TZSPTagDataRate.DATA_RATE_54)/ \
TZSPTagTimestamp(timestamp=12345)/ \
TZSPTagContentionFree(contention_free = TZSPTagContentionFree.NO)/ \
TZSPTagContentionFree(contention_free = TZSPTagContentionFree.YES)/ \
TZSPTagDecrypted(decrypted=TZSPTagDecrypted.NO)/ \
TZSPTagDecrypted(decrypted=TZSPTagDecrypted.YES)/ \
TZSPTagError(fcs_error = TZSPTagError.YES)/ \
TZSPTagError(fcs_error = TZSPTagError.NO)/ \
TZSPTagRXChannel(rx_channel = 42)/ \
TZSPTagPacketCount(packet_count = 987654)/ \
TZSPTagRXFrameLength(rx_frame_length = 0x0bad)/ \
TZSPTagWlanRadioHdrSerial(sensor_id = 'foobar')/ \
TZSPTagPadding()/ \
TZSPTagEnd()/ \
Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04')/ \
ARP()
frm = frm.build()
frm = Ether(frm)
tzsp_lyr = frm.getlayer(TZSP)
tzsp_raw_rssi_byte_lyr = tzsp_lyr.payload
assert(tzsp_raw_rssi_byte_lyr.type == 10)
tzsp_tag_raw_rssi_short = tzsp_raw_rssi_byte_lyr.payload
assert(tzsp_tag_raw_rssi_short.type == 10)
tzsp_tag_snr_byte = tzsp_tag_raw_rssi_short.payload
assert(tzsp_tag_snr_byte.type == 11)
tzsp_tag_snr_short = tzsp_tag_snr_byte.payload
assert(tzsp_tag_snr_short.type == 11)
tzsp_tag_data_rate = tzsp_tag_snr_short.payload
assert(tzsp_tag_data_rate.type == 12)
tzsp_tag_timestamp = tzsp_tag_data_rate.payload
assert(tzsp_tag_timestamp.type == 13)
tzsp_tag_contention_free_no = tzsp_tag_timestamp.payload
assert(tzsp_tag_contention_free_no.type == 15)
tzsp_tag_contention_free_yes = tzsp_tag_contention_free_no.payload
assert(tzsp_tag_contention_free_yes.type == 15)
tzsp_tag_decrypted_no = tzsp_tag_contention_free_yes.payload
assert(tzsp_tag_decrypted_no.type == 16)
tzsp_tag_decrypted_yes = tzsp_tag_decrypted_no.payload
assert(tzsp_tag_decrypted_yes.type == 16)
tzsp_tag_error_yes = tzsp_tag_decrypted_yes.payload
assert(tzsp_tag_error_yes.type == 17)
tzsp_tag_error_no = tzsp_tag_error_yes.payload
assert(tzsp_tag_error_no.type == 17)
tzsp_tag_rx_channel = tzsp_tag_error_no.payload
assert(tzsp_tag_rx_channel.type == 18)
tzsp_tag_packet_count = tzsp_tag_rx_channel.payload
assert(tzsp_tag_packet_count.type == 40)
tzsp_tag_frame_length = tzsp_tag_packet_count.payload
assert(tzsp_tag_frame_length.type == 41)
tzsp_tag_sensor_id = tzsp_tag_frame_length.payload
assert(tzsp_tag_sensor_id.type == 60)
tzsp_tag_padding = tzsp_tag_sensor_id.payload
assert(tzsp_tag_padding.type == 0)
tzsp_tag_end = tzsp_tag_padding.payload
assert(tzsp_tag_end.type == 1)
encapsulated_payload = tzsp_tag_end.payload
encapsulated_ether_lyr = encapsulated_payload.getlayer(Ether)
assert(encapsulated_ether_lyr.src == '00:03:03:03:03:03')
+ corner cases
= state tags value range
== TZSPTagContentionFree
bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT)
frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02')/ \
IP(src='1.1.1.1', dst='2.2.2.2')/ \
UDP(sport=12345, dport=TZSP_PORT_DEFAULT)/ \
TZSP()/ \
TZSPTagContentionFree(contention_free = 0xff)/ \
TZSPTagEnd()
frm = frm.build()
frm = Ether(frm)
tzsp_tag_contention_free = frm.getlayer(TZSPTagContentionFree)
assert(tzsp_tag_contention_free)
tzsp_tag_contention_free_attr = tzsp_tag_contention_free.get_field('contention_free')
assert(tzsp_tag_contention_free_attr)
symb_str = tzsp_tag_contention_free_attr.i2repr(tzsp_tag_contention_free, tzsp_tag_contention_free.contention_free)
assert(symb_str == 'yes')
== TZSPTagError
bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT)
frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02')/ \
IP(src='1.1.1.1', dst='2.2.2.2')/ \
UDP(sport=12345, dport=TZSP_PORT_DEFAULT)/ \
TZSP()/ \
TZSPTagError(fcs_error=TZSPTagError.NO)/ \
TZSPTagEnd()
frm = frm.build()
frm = Ether(frm)
tzsp_tag_error = frm.getlayer(TZSPTagError)
assert(tzsp_tag_error)
tzsp_tag_error_attr = tzsp_tag_error.get_field('fcs_error')
assert(tzsp_tag_error_attr)
symb_str = tzsp_tag_error_attr.i2repr(tzsp_tag_error, tzsp_tag_error.fcs_error)
assert(symb_str == 'no')
frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02')/ \
IP(src='1.1.1.1', dst='2.2.2.2')/ \
UDP(sport=12345, dport=TZSP_PORT_DEFAULT)/ \
TZSP()/ \
TZSPTagError(fcs_error=TZSPTagError.YES + 1)/ \
TZSPTagEnd()
frm = frm.build()
frm = Ether(frm)
tzsp_tag_error = frm.getlayer(TZSPTagError)
assert(tzsp_tag_error)
tzsp_tag_error_attr = tzsp_tag_error.get_field('fcs_error')
assert(tzsp_tag_error_attr)
symb_str = tzsp_tag_error_attr.i2repr(tzsp_tag_error, tzsp_tag_error.fcs_error)
assert(symb_str == 'reserved')
== missing TZSP header before end tag
frm = TZSPTagEnd()/ \
Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04')/ \
ARP()
frm = frm.build()
try:
frm = TZSPTagEnd(frm)
assert False
except TZSPStructureException:
pass
== invalid length field for given tag
bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT)
frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \
IP(src='1.1.1.1', dst='2.2.2.2') / \
UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \
TZSP() / \
TZSPTagRawRSSIByte(len=3) / \
TZSPTagEnd()
frm = frm.build()
frm = Ether(frm)
tzsp_lyr = frm.getlayer(TZSP)
assert(type(tzsp_lyr.payload) is Raw )
== handling of unknown tag - payload to short
bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT)
SENSOR_ID = '1E:AT:DE:AD:BE:EF'
frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \
IP(src='1.1.1.1', dst='2.2.2.2') / \
UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \
TZSP() / \
Raw(b'\xff\x0a\x01\x02\x03\x04\x05')
frm = frm.build()
frm = Ether(frm)
frm.show()
tzsp_lyr = frm.getlayer(TZSP)
assert(tzsp_lyr)
raw_lyr = tzsp_lyr.payload
assert(type(raw_lyr) is Raw)
assert(raw_lyr.load == b'\xff\x0a\x01\x02\x03\x04\x05')
== handling of unknown tag - no payload after tag type
bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT)
SENSOR_ID = '1E:AT:DE:AD:BE:EF'
frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \
IP(src='1.1.1.1', dst='2.2.2.2') / \
UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \
TZSP() / \
Raw(b'\xff')
frm = frm.build()
frm = Ether(frm)
tzsp_lyr = frm.getlayer(TZSP)
assert(tzsp_lyr)
raw_lyr = tzsp_lyr.payload
assert(type(raw_lyr) is Raw)
assert(raw_lyr.load == b'\xff')

View File

@ -320,6 +320,118 @@ class SignedByteField(Field):
def __init__(self, name, default):
Field.__init__(self, name, default, "b")
class FieldValueRangeException(Scapy_Exception):
pass
class FieldAttributeException(Scapy_Exception):
pass
class YesNoByteField(ByteField):
"""
byte based flag field that shows representation of its number based on a given association
in its default configuration the following representation is generated:
x == 0 : 'no'
x != 0 : 'yes'
in more sophisticated use-cases (e.g. yes/no/invalid) one can use the config attribute to configure
key-value, key-range and key-value-set associations that will be used to generate the values representation.
a range is given by a tuple (<first-val>, <last-value>) including the last value. a single-value tuple
is treated as scalar.
a list defines a set of (probably non consecutive) values that should be associated to a given key.
all values not associated with a key will be shown as number of type unsigned byte.
config = {
'no' : 0,
'foo' : (1,22),
'yes' : 23,
'bar' : [24,25, 42, 48, 87, 253]
}
generates the following representations:
x == 0 : 'no'
x == 15: 'foo'
x == 23: 'yes'
x == 42: 'bar'
x == 43: 43
using the config attribute one could also revert the stock-yes-no-behavior:
config = {
'yes' : 0,
'no' : (1,255)
}
will generate the following value representation:
x == 0 : 'yes'
x != 0 : 'no'
"""
__slots__ = ['eval_fn']
def _build_config_representation(self, config):
assoc_table = dict()
for key in config:
value_spec = config[key]
value_spec_type = type(value_spec)
if value_spec_type is int:
if value_spec < 0 or value_spec > 255:
raise FieldValueRangeException('given field value {} invalid - '
'must be in range [0..255]'.format(value_spec))
assoc_table[value_spec] = key
elif value_spec_type is list:
for value in value_spec:
if value < 0 or value > 255:
raise FieldValueRangeException('given field value {} invalid - '
'must be in range [0..255]'.format(value))
assoc_table[value] = key
elif value_spec_type is tuple:
value_spec_len = len(value_spec)
if value_spec_len != 2:
raise FieldAttributeException('invalid length {} of given config item tuple {} - must be '
'(<start-range>, <end-range>).'.format(value_spec_len, value_spec))
value_range_start = value_spec[0]
if value_range_start < 0 or value_range_start > 255:
raise FieldValueRangeException('given field value {} invalid - '
'must be in range [0..255]'.format(value_range_start))
value_range_end = value_spec[1]
if value_range_end < 0 or value_range_end > 255:
raise FieldValueRangeException('given field value {} invalid - '
'must be in range [0..255]'.format(value_range_end))
for value in range(value_range_start, value_range_end + 1):
assoc_table[value] = key
self.eval_fn = lambda x: assoc_table[x] if x in assoc_table else x
def __init__(self, name, default, config=None, *args, **kargs):
if not config:
# this represents the common use case and therefore it is kept small
self.eval_fn = lambda x: 'no' if x == 0 else 'yes'
else:
self._build_config_representation(config)
ByteField.__init__(self, name, default, *args, **kargs)
def i2repr(self, pkt, x):
return self.eval_fn(x)
class ShortField(Field):
def __init__(self, name, default):
Field.__init__(self, name, default, "H")

View File

@ -1036,3 +1036,82 @@ try:
assert(False)
except (Scapy_Exception, IndexError):
pass
+ YesNoByteField
= default usage
yn_bf = YesNoByteField('test', 0x00)
assert(yn_bf.i2repr(None, 0x00) == 'no')
assert(yn_bf.i2repr(None, 0x01) == 'yes')
assert(yn_bf.i2repr(None, 0x02) == 'yes')
assert(yn_bf.i2repr(None, 0xff) == 'yes')
= inverted yes - no (scalar config)
yn_bf = YesNoByteField('test', 0x00, config={'yes': 0x00, 'no': 0x01})
assert(yn_bf.i2repr(None, 0x00) == 'yes')
assert(yn_bf.i2repr(None, 0x01) == 'no')
assert(yn_bf.i2repr(None, 0x02) == 2)
assert(yn_bf.i2repr(None, 0xff) == 255)
= inverted yes - no (range config)
yn_bf = YesNoByteField('test', 0x00, config={'yes': 0x00, 'no': (0x01, 0xff)})
assert(yn_bf.i2repr(None, 0x00) == 'yes')
assert(yn_bf.i2repr(None, 0x01) == 'no')
assert(yn_bf.i2repr(None, 0x02) == 'no')
assert(yn_bf.i2repr(None, 0xff) == 'no')
= yes - no (using sets)
yn_bf = YesNoByteField('test', 0x00, config={'yes': [0x00, 0x02], 'no': [0x01, 0x04, 0xff]})
assert(yn_bf.i2repr(None, 0x00) == 'yes')
assert(yn_bf.i2repr(None, 0x01) == 'no')
assert(yn_bf.i2repr(None, 0x02) == 'yes')
assert(yn_bf.i2repr(None, 0x03) == 3)
assert(yn_bf.i2repr(None, 0x04) == 'no')
assert(yn_bf.i2repr(None, 0x05) == 5)
assert(yn_bf.i2repr(None, 0xff) == 'no')
= yes, no and invalid
yn_bf = YesNoByteField('test', 0x00, config={'no': 0x00, 'yes': 0x01, 'invalid': (0x02, 0xff)})
assert(yn_bf.i2repr(None, 0x00) == 'no')
assert(yn_bf.i2repr(None, 0x01) == 'yes')
assert(yn_bf.i2repr(None, 0x02) == 'invalid')
assert(yn_bf.i2repr(None, 0xff) == 'invalid')
= invalid scalar spec
try:
YesNoByteField('test', 0x00, config={'no': 0x00, 'yes': 256})
assert(False)
except FieldValueRangeException:
pass
= invalid range spec - invalid length
try:
YesNoByteField('test', 0x00, config={'no': 0x00, 'yes': (0x00, 0x02, 0x02)})
assert(False)
except FieldAttributeException:
pass
= invalid range spec - invalid value
try:
YesNoByteField('test', 0x00, config={'no': 0x00, 'yes': (0x100, 0x01)})
assert(False)
except FieldValueRangeException:
pass
try:
YesNoByteField('test', 0x00, config={'no': 0x00, 'yes': (0x00, 0x100)})
assert(False)
except FieldValueRangeException:
pass
= invalid set spec - invalid value
try:
YesNoByteField('test', 0x00, config={'no': 0x00, 'yes': [0x01, 0x101]})
assert(False)
except FieldValueRangeException:
pass