From d8b24c7138f2f683163bb7fdf407e9263cd1da95 Mon Sep 17 00:00:00 2001 From: hecke Date: Fri, 23 Feb 2018 12:56:46 +0100 Subject: [PATCH] Tazmen sniffer protocol layer (#1004) * add TaZmen Sniffer Protocol (TZSP) layer #1000 * add tests for TZSP layer #1000 * cleanups from PR reviews #1000 * fix ranges used in state tags #1000 * add some corner case tests #1000 * add support for unknown tag type #1000 use type TZSPTagUnknown if tag type is not known. + related tests * fixing a nasty grammar bug #1000 * add YesNoByteField #1000 + tests * use YesNoByteField instead of generated dicts for conditional tags #1000 * use orb instead of unpack for accessing payload bytes #1000 --- scapy/contrib/tzsp.py | 486 ++++++++++++++++++++++++++++++ scapy/contrib/tzsp.uts | 653 +++++++++++++++++++++++++++++++++++++++++ scapy/fields.py | 112 +++++++ test/fields.uts | 79 +++++ 4 files changed, 1330 insertions(+) create mode 100644 scapy/contrib/tzsp.py create mode 100644 scapy/contrib/tzsp.uts diff --git a/scapy/contrib/tzsp.py b/scapy/contrib/tzsp.py new file mode 100644 index 000000000..c1cd0cf98 --- /dev/null +++ b/scapy/contrib/tzsp.py @@ -0,0 +1,486 @@ +#! /usr/bin/env python +# +# scapy.contrib.description = TZSP +# scapy.contrib.status = loads + +""" + TZSP - TaZmen Sniffer Protocol + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + :author: Thomas Tannhaeuser, hecke@naberius.de + :license: GPLv2 + + This module is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; either version 2 + of the License, or (at your option) any later version. + + This module is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + :description: + + This module provides Scapy layers for the TZSP protocol. + + references: + - https://en.wikipedia.org/wiki/TZSP + - https://web.archive.org/web/20050404125022/http://www.networkchemistry.com/support/appnotes/an001_tzsp.html + + :NOTES: + - to allow Scapy to dissect this layer automatically, you need to bind the TZSP layer to UDP using + the default TZSP port (0x9090), e.g. + + bind_layers(UDP, TZSP, sport=TZSP_PORT_DEFAULT) + bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT) + + - packet format definition from www.networkchemistry.com is different from the one given by wikipedia + - seems Wireshark implements the wikipedia protocol version (didn't dive into their code) + - observed (miss)behavior of Wireshark (2.2.6) + - fails to decode RSSI & SNR using short values - only one byte taken + - SNR is labeled as silence + - WlanRadioHdrSerial is labeled as Sensor MAC + - doesn't know the packet count tag (40 / 0x28) + +""" +from scapy.compat import orb +from scapy.contrib.avs import AVSWLANHeader +from scapy.error import warning, Scapy_Exception +from scapy.fields import ByteField, ShortEnumField, IntField, FieldLenField, YesNoByteField +from scapy.layers.dot11 import Packet, Dot11, PrismHeader +from scapy.layers.l2 import Ether +from scapy.fields import StrLenField, ByteEnumField, ShortField, XStrLenField +from scapy.modules.six.moves import range +from scapy.packet import Raw + + +TZSP_PORT_DEFAULT = 0x9090 + + +class TZSP(Packet): + TYPE_RX_PACKET = 0x00 + TYPE_TX_PACKET = 0x01 + TYPE_CONFIG = 0x03 + TYPE_KEEPALIVE = TYPE_NULL = 0x04 + TYPE_PORT = 0x05 + + TYPES = { + TYPE_RX_PACKET: 'RX_PACKET', + TYPE_TX_PACKET: 'TX_PACKET', + TYPE_CONFIG: 'CONFIG', + TYPE_NULL: 'KEEPALIVE/NULL', + TYPE_PORT: 'PORT', + } + + ENCAPSULATED_ETHERNET = 0x01 + ENCAPSULATED_IEEE_802_11 = 0x12 + ENCAPSULATED_PRISM_HEADER = 0x77 + ENCAPSULATED_WLAN_AVS = 0x7f + + ENCAPSULATED_PROTOCOLS = { + ENCAPSULATED_ETHERNET: 'ETHERNET', + ENCAPSULATED_IEEE_802_11: 'IEEE 802.11', + ENCAPSULATED_PRISM_HEADER: 'PRISM HEADER', + ENCAPSULATED_WLAN_AVS: 'WLAN AVS' + } + + ENCAPSULATED_PROTOCOL_CLASSES = { + ENCAPSULATED_ETHERNET: Ether, + ENCAPSULATED_IEEE_802_11: Dot11, + ENCAPSULATED_PRISM_HEADER: PrismHeader, + ENCAPSULATED_WLAN_AVS: AVSWLANHeader + } + + fields_desc = [ + ByteField('version', 0x01), + ByteEnumField('type', TYPE_RX_PACKET, TYPES), + ShortEnumField('encapsulated_protocol', ENCAPSULATED_ETHERNET, ENCAPSULATED_PROTOCOLS) + ] + + def get_encapsulated_payload_class(self): + """ + get the class that holds the encapsulated payload of the TZSP packet + :return: class representing the payload, Raw() on error + """ + + try: + return TZSP.ENCAPSULATED_PROTOCOL_CLASSES[self.encapsulated_protocol] + except KeyError: + warning( + 'unknown or invalid encapsulation type (%i) - returning payload as raw()' % self.encapsulated_protocol) + return Raw + + def guess_payload_class(self, payload): + if self.type == TZSP.TYPE_KEEPALIVE: + if len(payload): + warning('payload (%i bytes) in KEEPALIVE/NULL packet' % len(payload)) + return Raw + else: + return _tzsp_guess_next_tag(payload) + + def get_encapsulated_payload(self): + + has_encapsulated_data = self.type == TZSP.TYPE_RX_PACKET or self.type == TZSP.TYPE_TX_PACKET + + if has_encapsulated_data: + end_tag_lyr = self.payload.getlayer(TZSPTagEnd) + if end_tag_lyr: + return end_tag_lyr.payload + else: + return None + + +def _tzsp_handle_unknown_tag(payload, tag_type): + + payload_len = len(payload) + + if payload_len < 2: + warning('invalid or unknown tag type (%i) and too short packet - treat remaining data as Raw' % tag_type) + return Raw + + tag_data_length = orb(payload[1]) + + tag_data_fits_in_payload = (tag_data_length + 2) <= payload_len + if not tag_data_fits_in_payload: + warning('invalid or unknown tag type (%i) and too short packet - treat remaining data as Raw' % tag_type) + return Raw + + warning('invalid or unknown tag type (%i)' % tag_type) + + return TZSPTagUnknown + + +def _tzsp_guess_next_tag(payload): + """ + :return: class representing the next tag, Raw on error, None on missing payload + """ + + if not payload: + warning('missing payload') + return None + + tag_type = orb(payload[0]) + + try: + tag_class_definition = _TZSP_TAG_CLASSES[tag_type] + + except KeyError: + + return _tzsp_handle_unknown_tag(payload, tag_type) + + if type(tag_class_definition) is not dict: + return tag_class_definition + + try: + length = orb(payload[1]) + except IndexError: + length = None + + if not length: + warning('no tag length given - packet to short') + return Raw + + try: + return tag_class_definition[length] + except KeyError: + warning('invalid tag length {} for tag type {}'.format(length, tag_type)) + return Raw + + +class _TZSPTag(Packet): + TAG_TYPE_PADDING = 0x00 + TAG_TYPE_END = 0x01 + TAG_TYPE_RAW_RSSI = 0x0a + TAG_TYPE_SNR = 0x0b + TAG_TYPE_DATA_RATE = 0x0c + TAG_TYPE_TIMESTAMP = 0x0d + TAG_TYPE_CONTENTION_FREE = 0x0f + TAG_TYPE_DECRYPTED = 0x10 + TAG_TYPE_FCS_ERROR = 0x11 + TAG_TYPE_RX_CHANNEL = 0x12 + TAG_TYPE_PACKET_COUNT = 0x28 + TAG_TYPE_RX_FRAME_LENGTH = 0x29 + TAG_TYPE_WLAN_RADIO_HDR_SERIAL = 0x3c + + TAG_TYPES = { + TAG_TYPE_PADDING: 'PADDING', + TAG_TYPE_END: 'END', + TAG_TYPE_RAW_RSSI: 'RAW_RSSI', + TAG_TYPE_SNR: 'SNR', + TAG_TYPE_DATA_RATE: 'DATA_RATE', + TAG_TYPE_TIMESTAMP: 'TIMESTAMP', + TAG_TYPE_CONTENTION_FREE: 'CONTENTION_FREE', + TAG_TYPE_DECRYPTED: 'DECRYPTED', + TAG_TYPE_FCS_ERROR: 'FCS_ERROR', + TAG_TYPE_RX_CHANNEL: 'RX_CHANNEL', + TAG_TYPE_PACKET_COUNT: 'PACKET_COUNT', + TAG_TYPE_RX_FRAME_LENGTH: 'RX_FRAME_LENGTH', + TAG_TYPE_WLAN_RADIO_HDR_SERIAL: 'WLAN_RADIO_HDR_SERIAL' + } + + def guess_payload_class(self, payload): + return _tzsp_guess_next_tag(payload) + + +class TZSPStructureException(Scapy_Exception): + pass + + +class TZSPTagPadding(_TZSPTag): + """ + padding tag (should be ignored) + """ + fields_desc = [ + ByteEnumField('type', _TZSPTag.TAG_TYPE_PADDING, _TZSPTag.TAG_TYPES), + ] + + +class TZSPTagEnd(Packet): + """ + last tag + """ + fields_desc = [ + ByteEnumField('type', _TZSPTag.TAG_TYPE_END, _TZSPTag.TAG_TYPES), + ] + + def guess_payload_class(self, payload): + """ + the type of the payload encapsulation is given be the outer TZSP layers attribute encapsulation_protocol + """ + + under_layer = self.underlayer + tzsp_header = None + + while under_layer: + if isinstance(under_layer, TZSP): + tzsp_header = under_layer + break + under_layer = under_layer.underlayer + + if tzsp_header: + + return tzsp_header.get_encapsulated_payload_class() + else: + raise TZSPStructureException('missing parent TZSP header') + + +class TZSPTagRawRSSIByte(_TZSPTag): + """ + relative received signal strength - signed byte value + """ + fields_desc = [ + ByteEnumField('type', _TZSPTag.TAG_TYPE_RAW_RSSI, _TZSPTag.TAG_TYPES), + ByteField('len', 1), + ByteField('raw_rssi', 0) + ] + + +class TZSPTagRawRSSIShort(_TZSPTag): + """ + relative received signal strength - signed short value + """ + fields_desc = [ + ByteEnumField('type', _TZSPTag.TAG_TYPE_RAW_RSSI, _TZSPTag.TAG_TYPES), + ByteField('len', 2), + ShortField('raw_rssi', 0) + ] + + +class TZSPTagSNRByte(_TZSPTag): + """ + signal noise ratio - signed byte value + """ + fields_desc = [ + ByteEnumField('type', _TZSPTag.TAG_TYPE_SNR, _TZSPTag.TAG_TYPES), + ByteField('len', 1), + ByteField('snr', 0) + ] + + +class TZSPTagSNRShort(_TZSPTag): + """ + signal noise ratio - signed short value + """ + fields_desc = [ + ByteEnumField('type', _TZSPTag.TAG_TYPE_SNR, _TZSPTag.TAG_TYPES), + ByteField('len', 2), + ShortField('snr', 0) + ] + + +class TZSPTagDataRate(_TZSPTag): + """ + wireless link data rate + """ + DATA_RATE_UNKNOWN = 0x00 + DATA_RATE_1 = 0x02 + DATA_RATE_2 = 0x04 + DATA_RATE_5_5 = 0x0B + DATA_RATE_6 = 0x0C + DATA_RATE_9 = 0x12 + DATA_RATE_11 = 0x16 + DATA_RATE_12 = 0x18 + DATA_RATE_18 = 0x24 + DATA_RATE_22 = 0x2C + DATA_RATE_24 = 0x30 + DATA_RATE_33 = 0x42 + DATA_RATE_36 = 0x48 + DATA_RATE_48 = 0x60 + DATA_RATE_54 = 0x6C + DATA_RATE_LEGACY_1 = 0x0A + DATA_RATE_LEGACY_2 = 0x14 + DATA_RATE_LEGACY_5_5 = 0x37 + DATA_RATE_LEGACY_11 = 0x6E + + DATA_RATES = { + DATA_RATE_UNKNOWN: 'unknown', + DATA_RATE_1: '1 MB/s', + DATA_RATE_2: '2 MB/s', + DATA_RATE_5_5: '5.5 MB/s', + DATA_RATE_6: '6 MB/s', + DATA_RATE_9: '9 MB/s', + DATA_RATE_11: '11 MB/s', + DATA_RATE_12: '12 MB/s', + DATA_RATE_18: '18 MB/s', + DATA_RATE_22: '22 MB/s', + DATA_RATE_24: '24 MB/s', + DATA_RATE_33: '33 MB/s', + DATA_RATE_36: '36 MB/s', + DATA_RATE_48: '48 MB/s', + DATA_RATE_54: '54 MB/s', + DATA_RATE_LEGACY_1: '1 MB/s (legacy)', + DATA_RATE_LEGACY_2: '2 MB/s (legacy)', + DATA_RATE_LEGACY_5_5: '5.5 MB/s (legacy)', + DATA_RATE_LEGACY_11: '11 MB/s (legacy)', + } + + fields_desc = [ + ByteEnumField('type', _TZSPTag.TAG_TYPE_DATA_RATE, _TZSPTag.TAG_TYPES), + ByteField('len', 1), + ByteEnumField('data_rate', DATA_RATE_UNKNOWN, DATA_RATES) + ] + + +class TZSPTagTimestamp(_TZSPTag): + """ + MAC receive timestamp + """ + fields_desc = [ + ByteEnumField('type', _TZSPTag.TAG_TYPE_TIMESTAMP, _TZSPTag.TAG_TYPES), + ByteField('len', 4), + IntField('timestamp', 0) + ] + + +class TZSPTagContentionFree(_TZSPTag): + """ + packet received in contention free period + """ + NO = 0x00 + YES = 0x01 + + fields_desc = [ + ByteEnumField('type', _TZSPTag.TAG_TYPE_CONTENTION_FREE, _TZSPTag.TAG_TYPES), + ByteField('len', 1), + YesNoByteField('contention_free', NO) + ] + + +class TZSPTagDecrypted(_TZSPTag): + """ + packet was decrypted + """ + YES = 0x00 + NO = 0x01 + + fields_desc = [ + ByteEnumField('type', _TZSPTag.TAG_TYPE_DECRYPTED, _TZSPTag.TAG_TYPES), + ByteField('len', 1), + YesNoByteField('decrypted', NO, config={'yes': YES, 'no': (NO, 0xff)}) + ] + + +class TZSPTagError(_TZSPTag): + """ + frame checksum error + """ + NO = 0x00 + YES = 0x01 + + fields_desc = [ + ByteEnumField('type', _TZSPTag.TAG_TYPE_FCS_ERROR, _TZSPTag.TAG_TYPES), + ByteField('len', 1), + YesNoByteField('fcs_error', NO, config={'no': NO, 'yes': YES, 'reserved': (YES + 1, 0xff)}) + ] + + +class TZSPTagRXChannel(_TZSPTag): + """ + channel the sensor was on while receiving the frame + """ + fields_desc = [ + ByteEnumField('type', _TZSPTag.TAG_TYPE_RX_CHANNEL, _TZSPTag.TAG_TYPES), + ByteField('len', 1), + ByteField('rx_channel', 0) + ] + + +class TZSPTagPacketCount(_TZSPTag): + """ + packet counter + """ + fields_desc = [ + ByteEnumField('type', _TZSPTag.TAG_TYPE_PACKET_COUNT, _TZSPTag.TAG_TYPES), + ByteField('len', 4), + IntField('packet_count', 0) + ] + + +class TZSPTagRXFrameLength(_TZSPTag): + """ + received packet length + """ + fields_desc = [ + ByteEnumField('type', _TZSPTag.TAG_TYPE_RX_FRAME_LENGTH, _TZSPTag.TAG_TYPES), + ByteField('len', 2), + ShortField('rx_frame_length', 0) + ] + + +class TZSPTagWlanRadioHdrSerial(_TZSPTag): + """ + (vendor specific) unique capture device (sensor/AP) identifier + """ + fields_desc = [ + ByteEnumField('type', _TZSPTag.TAG_TYPE_WLAN_RADIO_HDR_SERIAL, _TZSPTag.TAG_TYPES), + FieldLenField('len', None, length_of='sensor_id', fmt='b'), + StrLenField('sensor_id', '', length_from=lambda pkt:pkt.len) + ] + + +class TZSPTagUnknown(_TZSPTag): + """ + unknown tag type dummy + """ + fields_desc = [ + ByteField('type', 0xff), + FieldLenField('len', None, length_of='data', fmt='b'), + XStrLenField('data', '', length_from=lambda pkt: pkt.len) + ] + +_TZSP_TAG_CLASSES = { + _TZSPTag.TAG_TYPE_PADDING: TZSPTagPadding, + _TZSPTag.TAG_TYPE_END: TZSPTagEnd, + _TZSPTag.TAG_TYPE_RAW_RSSI: {1: TZSPTagRawRSSIByte, 2: TZSPTagRawRSSIShort}, + _TZSPTag.TAG_TYPE_SNR: {1: TZSPTagSNRByte, 2: TZSPTagSNRShort}, + _TZSPTag.TAG_TYPE_DATA_RATE: TZSPTagDataRate, + _TZSPTag.TAG_TYPE_TIMESTAMP: TZSPTagTimestamp, + _TZSPTag.TAG_TYPE_CONTENTION_FREE: TZSPTagContentionFree, + _TZSPTag.TAG_TYPE_DECRYPTED: TZSPTagDecrypted, + _TZSPTag.TAG_TYPE_FCS_ERROR: TZSPTagError, + _TZSPTag.TAG_TYPE_RX_CHANNEL: TZSPTagRXChannel, + _TZSPTag.TAG_TYPE_PACKET_COUNT: TZSPTagPacketCount, + _TZSPTag.TAG_TYPE_RX_FRAME_LENGTH: TZSPTagRXFrameLength, + _TZSPTag.TAG_TYPE_WLAN_RADIO_HDR_SERIAL: TZSPTagWlanRadioHdrSerial +} diff --git a/scapy/contrib/tzsp.uts b/scapy/contrib/tzsp.uts new file mode 100644 index 000000000..fbe3f970e --- /dev/null +++ b/scapy/contrib/tzsp.uts @@ -0,0 +1,653 @@ +% TZSP test campaign + +# +# execute test: +# > test/run_tests -P "load_contrib('tzsp')" -t scapy/contrib/tzsp.uts +# + ++ Basic layer handling + += build basic TZSP frames + +== basic TZSP header - keepalive + +bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT) + +frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02')/ \ + IP(src='1.1.1.1', dst='2.2.2.2')/ \ + UDP(sport=12345, dport=TZSP_PORT_DEFAULT)/ \ + TZSP(type=TZSP.TYPE_KEEPALIVE, encapsulated_protocol=0) + +frm = frm.build() +frm = Ether(frm) +tzsp_lyr = frm.getlayer(TZSP) +assert(tzsp_lyr.type == TZSP.TYPE_KEEPALIVE) +assert(not tzsp_lyr.payload) + +== basic TZSP header - keepalive + ignored end tag + +bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT) + +frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02')/ \ + IP(src='1.1.1.1', dst='2.2.2.2')/ \ + UDP(sport=12345, dport=TZSP_PORT_DEFAULT)/ \ + TZSP(type=TZSP.TYPE_KEEPALIVE, encapsulated_protocol=0)/ \ + TZSPTagEnd() + +frm = frm.build() +frm = Ether(frm) +tzsp_lyr = frm.getlayer(TZSP) +assert(tzsp_lyr.type == TZSP.TYPE_KEEPALIVE) +assert(tzsp_lyr.guess_payload_class(tzsp_lyr.payload) is scapy.packet.Raw) + +== basic TZSP header with RX Packet and EndTag + +bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT) + +frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \ + IP(src='1.1.1.1', dst='2.2.2.2') / \ + UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \ + TZSP() / \ + TZSPTagEnd() / \ + Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04') / \ + Raw('foobar') + +frm = frm.build() +frm = Ether(frm) +tzsp_lyr = frm.getlayer(TZSP) +assert(tzsp_lyr.type == TZSP.TYPE_RX_PACKET) + +tzsp_tag_end = tzsp_lyr.payload +assert(tzsp_tag_end.type == 1) + +encapsulated_payload = tzsp_lyr.get_encapsulated_payload() +encapsulated_ether_lyr = encapsulated_payload.getlayer(Ether) +assert(encapsulated_ether_lyr.src == '00:03:03:03:03:03') + +== basic TZSP header with RX Packet and Padding + +bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT) + +frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \ + IP(src='1.1.1.1', dst='2.2.2.2') / \ + UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \ + TZSP() / \ + TZSPTagPadding() / \ + TZSPTagEnd() / \ + Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04') / \ + Raw('foobar') + +frm = frm.build() +frm = Ether(frm) +tzsp_lyr = frm.getlayer(TZSP) +assert(tzsp_lyr.type == TZSP.TYPE_RX_PACKET) + +tzsp_tag_padding = tzsp_lyr.payload +assert(tzsp_tag_padding.type == 0) + +tzsp_tag_end = tzsp_tag_padding.payload +assert(tzsp_tag_end.type == 1) + +== basic TZSP header with RX Packet and RAWRSSI (byte, short) + +bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT) + +frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \ + IP(src='1.1.1.1', dst='2.2.2.2') / \ + UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \ + TZSP() / \ + TZSPTagRawRSSIByte(raw_rssi=42) / \ + TZSPTagRawRSSIShort(raw_rssi=12345) / \ + TZSPTagEnd() / \ + Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04') / \ + Raw('foobar') + +frm = frm.build() +frm = Ether(frm) +tzsp_lyr = frm.getlayer(TZSP) +assert(tzsp_lyr.type == TZSP.TYPE_RX_PACKET) + +tzsp_tag_raw_rssi_byte = tzsp_lyr.payload +assert(tzsp_tag_raw_rssi_byte.type == 10) +assert(tzsp_tag_raw_rssi_byte.raw_rssi == 42) + +tzsp_tag_raw_rssi_short = tzsp_tag_raw_rssi_byte.payload +assert(tzsp_tag_raw_rssi_short.type == 10) +assert(tzsp_tag_raw_rssi_short.raw_rssi == 12345) + +tzsp_tag_end = tzsp_tag_raw_rssi_short.payload +assert(tzsp_tag_end.type == 1) + +== basic TZSP header with RX Packet and SNR (byte, short) + +bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT) + +frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \ + IP(src='1.1.1.1', dst='2.2.2.2') / \ + UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \ + TZSP() / \ + TZSPTagSNRByte(snr=23) / \ + TZSPTagSNRShort(snr=54321) / \ + TZSPTagEnd() / \ + Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04') / \ + Raw('foobar') + +frm = frm.build() +frm = Ether(frm) +tzsp_lyr = frm.getlayer(TZSP) +assert(tzsp_lyr.type == TZSP.TYPE_RX_PACKET) + +tzsp_tag_snr_byte = tzsp_lyr.payload +assert(tzsp_tag_snr_byte.type == 11) +assert(tzsp_tag_snr_byte.len == 1) +assert(tzsp_tag_snr_byte.snr == 23) + +tzsp_tag_snr_short = tzsp_tag_snr_byte.payload +assert(tzsp_tag_snr_short.type == 11) +assert(tzsp_tag_snr_short.len == 2) +assert(tzsp_tag_snr_short.snr == 54321) + +tzsp_tag_end = tzsp_tag_snr_short.payload +assert(tzsp_tag_end.type == 1) + +== basic TZSP header with RX Packet and DATA Rate + +bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT) + +frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \ + IP(src='1.1.1.1', dst='2.2.2.2') / \ + UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \ + TZSP() / \ + TZSPTagDataRate(data_rate=TZSPTagDataRate.DATA_RATE_33) / \ + TZSPTagEnd() / \ + Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04') / \ + Raw('foobar') + +frm = frm.build() +frm = Ether(frm) +tzsp_lyr = frm.getlayer(TZSP) +assert(tzsp_lyr.type == TZSP.TYPE_RX_PACKET) + +tzsp_tag_data_rate = tzsp_lyr.payload +assert(tzsp_tag_data_rate.type == 12) +assert(tzsp_tag_data_rate.len == 1) +assert(tzsp_tag_data_rate.data_rate == TZSPTagDataRate.DATA_RATE_33) + +tzsp_tag_end = tzsp_tag_data_rate.payload +assert(tzsp_tag_end.type == 1) + +== basic TZSP header with RX Packet and Timestamp + +bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT) + +frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \ + IP(src='1.1.1.1', dst='2.2.2.2') / \ + UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \ + TZSP() / \ + TZSPTagTimestamp(timestamp=0x11223344) / \ + TZSPTagEnd() / \ + Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04') / \ + Raw('foobar') + +frm = frm.build() +frm = Ether(frm) +tzsp_lyr = frm.getlayer(TZSP) +assert(tzsp_lyr.type == TZSP.TYPE_RX_PACKET) + +tzsp_tag_timestamp = tzsp_lyr.payload +assert(tzsp_tag_timestamp.type == 13) +assert(tzsp_tag_timestamp.len == 4) +assert(tzsp_tag_timestamp.timestamp == 0x11223344) + +tzsp_tag_end = tzsp_tag_timestamp.payload +assert(tzsp_tag_end.type == 1) + +== basic TZSP header with RX Packet and ContentionFree + +bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT) + +frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \ + IP(src='1.1.1.1', dst='2.2.2.2') / \ + UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \ + TZSP() / \ + TZSPTagContentionFree(contention_free=TZSPTagContentionFree.NO) / \ + TZSPTagContentionFree(contention_free=TZSPTagContentionFree.YES) / \ + TZSPTagEnd() / \ + Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04') / \ + Raw('foobar') + +frm = frm.build() +frm = Ether(frm) +tzsp_lyr = frm.getlayer(TZSP) +assert(tzsp_lyr.type == TZSP.TYPE_RX_PACKET) + +tzsp_tag_contention_free_no = tzsp_lyr.payload +assert(tzsp_tag_contention_free_no.type == 15) +assert(tzsp_tag_contention_free_no.len == 1) +assert(tzsp_tag_contention_free_no.contention_free == TZSPTagContentionFree.NO) + +tzsp_tag_contention_free_yes = tzsp_tag_contention_free_no.payload +assert(tzsp_tag_contention_free_yes.type == 15) +assert(tzsp_tag_contention_free_yes.len == 1) +assert(tzsp_tag_contention_free_yes.contention_free == TZSPTagContentionFree.YES) + +tzsp_tag_end = tzsp_tag_contention_free_yes.payload +assert(tzsp_tag_end.type == 1) + +== basic TZSP header with RX Packet and Decrypted + +bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT) + +frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \ + IP(src='1.1.1.1', dst='2.2.2.2') / \ + UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \ + TZSP() / \ + TZSPTagDecrypted(decrypted=TZSPTagDecrypted.NO) / \ + TZSPTagDecrypted(decrypted=TZSPTagDecrypted.YES) / \ + TZSPTagEnd() / \ + Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04') / \ + Raw('foobar') + +frm = frm.build() +frm = Ether(frm) +tzsp_lyr = frm.getlayer(TZSP) +assert(tzsp_lyr.type == TZSP.TYPE_RX_PACKET) + +tzsp_tag_decrypted_no = tzsp_lyr.payload +assert(tzsp_tag_decrypted_no.type == 16) +assert(tzsp_tag_decrypted_no.len == 1) +assert(tzsp_tag_decrypted_no.decrypted == TZSPTagDecrypted.NO) + +tzsp_tag_decrypted_yes= tzsp_tag_decrypted_no.payload +assert(tzsp_tag_decrypted_yes.type == 16) +assert(tzsp_tag_decrypted_yes.len == 1) +assert(tzsp_tag_decrypted_yes.decrypted == TZSPTagDecrypted.YES) + +tzsp_tag_end = tzsp_tag_decrypted_yes.payload +assert(tzsp_tag_end.type == 1) + +== basic TZSP header with RX Packet and FCS error + +bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT) + +frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \ + IP(src='1.1.1.1', dst='2.2.2.2') / \ + UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \ + TZSP() / \ + TZSPTagError(fcs_error=TZSPTagError.NO) / \ + TZSPTagError(fcs_error=TZSPTagError.YES) / \ + TZSPTagEnd() / \ + Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04') / \ + Raw('foobar') + +frm = frm.build() +frm = Ether(frm) +tzsp_lyr = frm.getlayer(TZSP) +assert(tzsp_lyr.type == TZSP.TYPE_RX_PACKET) + +tzsp_tag_error_no = tzsp_lyr.payload +assert(tzsp_tag_error_no.type == 17) +assert(tzsp_tag_error_no.len == 1) +assert(tzsp_tag_error_no.fcs_error == TZSPTagError.NO) + +tzsp_tag_error_yes = tzsp_tag_error_no.payload +assert(tzsp_tag_error_yes.type == 17) +assert(tzsp_tag_error_yes.len == 1) +assert(tzsp_tag_error_yes.fcs_error == TZSPTagError.YES) + +tzsp_tag_end = tzsp_tag_error_yes.payload +assert(tzsp_tag_end.type == 1) + +== basic TZSP header with RX Packet and RXChannel + +bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT) + +frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \ + IP(src='1.1.1.1', dst='2.2.2.2') / \ + UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \ + TZSP() / \ + TZSPTagRXChannel(rx_channel=123) / \ + TZSPTagEnd() / \ + Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04') / \ + Raw('foobar') + +frm = frm.build() +frm = Ether(frm) +tzsp_lyr = frm.getlayer(TZSP) +assert(tzsp_lyr.type == TZSP.TYPE_RX_PACKET) + +tzsp_tag_rx_channel = tzsp_lyr.payload +assert(tzsp_tag_rx_channel.type == 18) +assert(tzsp_tag_rx_channel.len == 1) +assert(tzsp_tag_rx_channel.rx_channel == 123) + +tzsp_tag_end = tzsp_tag_rx_channel.payload +assert(tzsp_tag_end.type == 1) + +== basic TZSP header with RX Packet and Packet count + +bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT) + +frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \ + IP(src='1.1.1.1', dst='2.2.2.2') / \ + UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \ + TZSP() / \ + TZSPTagPacketCount(packet_count=0x44332211) / \ + TZSPTagEnd() / \ + Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04') / \ + Raw('foobar') + +frm = frm.build() +frm = Ether(frm) +tzsp_lyr = frm.getlayer(TZSP) +assert(tzsp_lyr.type == TZSP.TYPE_RX_PACKET) + +tzsp_tag_packet_count = tzsp_lyr.payload +assert(tzsp_tag_packet_count.type == 40) +assert(tzsp_tag_packet_count.len == 4) +assert(tzsp_tag_packet_count.packet_count == 0x44332211) + +tzsp_tag_end = tzsp_tag_packet_count.payload +assert(tzsp_tag_end.type == 1) + +== basic TZSP header with RX Packet and RXFrameLength + +bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT) + +frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \ + IP(src='1.1.1.1', dst='2.2.2.2') / \ + UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \ + TZSP() / \ + TZSPTagRXFrameLength(rx_frame_length=0xbad0) / \ + TZSPTagEnd() / \ + Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04') / \ + Raw('foobar') + +frm = frm.build() +frm = Ether(frm) +tzsp_lyr = frm.getlayer(TZSP) +assert(tzsp_lyr.type == TZSP.TYPE_RX_PACKET) + +tzsp_tag_frame_length = tzsp_lyr.payload +assert(tzsp_tag_frame_length.type == 41) +assert(tzsp_tag_frame_length.len == 2) +assert(tzsp_tag_frame_length.rx_frame_length == 0xbad0) + +tzsp_tag_end = tzsp_tag_frame_length.payload +assert(tzsp_tag_end.type == 1) + +== basic TZSP header with RX Packet and WLAN RADIO HDR SERIAL + +bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT) + +SENSOR_ID = '1E:AT:DE:AD:BE:EF' + +frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \ + IP(src='1.1.1.1', dst='2.2.2.2') / \ + UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \ + TZSP() / \ + TZSPTagWlanRadioHdrSerial(sensor_id=SENSOR_ID) / \ + TZSPTagEnd() / \ + Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04') / \ + Raw('foobar') + +frm = frm.build() +frm = Ether(frm) +tzsp_lyr = frm.getlayer(TZSP) +assert(tzsp_lyr.type == TZSP.TYPE_RX_PACKET) + +tzsp_tag_sensor_id = tzsp_lyr.payload +assert(tzsp_tag_sensor_id.type == 60) +assert(tzsp_tag_sensor_id.len == len(SENSOR_ID)) +assert(tzsp_tag_sensor_id.sensor_id == SENSOR_ID) + +tzsp_tag_end = tzsp_tag_sensor_id.payload +assert(tzsp_tag_end.type == 1) + +== handling of unknown tag + +bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT) + +SENSOR_ID = '1E:AT:DE:AD:BE:EF' + +frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \ + IP(src='1.1.1.1', dst='2.2.2.2') / \ + UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \ + TZSP() / \ + TZSPTagUnknown(len=6, data=b'\x06\x05\x04\x03\x02\x01') / \ + TZSPTagWlanRadioHdrSerial(sensor_id=SENSOR_ID) / \ + TZSPTagEnd() / \ + Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04') / \ + Raw('foobar') + +frm = frm.build() +frm = Ether(frm) + +tzsp_lyr = frm.getlayer(TZSP) +assert(tzsp_lyr) +tzsp_tag_unknown = tzsp_lyr.payload +assert(type(tzsp_tag_unknown) is TZSPTagUnknown) + += all layers stacked + +bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT) + +frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02')/ \ + IP(src='1.1.1.1', dst='2.2.2.2')/ \ + UDP(sport=12345, dport=TZSP_PORT_DEFAULT)/ \ + TZSP()/ \ + TZSPTagRawRSSIByte(raw_rssi=12)/ \ + TZSPTagRawRSSIShort(raw_rssi=1234)/ \ + TZSPTagSNRByte(snr=12)/ \ + TZSPTagSNRShort(snr=1234)/ \ + TZSPTagDataRate(data_rate = TZSPTagDataRate.DATA_RATE_54)/ \ + TZSPTagTimestamp(timestamp=12345)/ \ + TZSPTagContentionFree(contention_free = TZSPTagContentionFree.NO)/ \ + TZSPTagContentionFree(contention_free = TZSPTagContentionFree.YES)/ \ + TZSPTagDecrypted(decrypted=TZSPTagDecrypted.NO)/ \ + TZSPTagDecrypted(decrypted=TZSPTagDecrypted.YES)/ \ + TZSPTagError(fcs_error = TZSPTagError.YES)/ \ + TZSPTagError(fcs_error = TZSPTagError.NO)/ \ + TZSPTagRXChannel(rx_channel = 42)/ \ + TZSPTagPacketCount(packet_count = 987654)/ \ + TZSPTagRXFrameLength(rx_frame_length = 0x0bad)/ \ + TZSPTagWlanRadioHdrSerial(sensor_id = 'foobar')/ \ + TZSPTagPadding()/ \ + TZSPTagEnd()/ \ + Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04')/ \ + ARP() + +frm = frm.build() +frm = Ether(frm) + +tzsp_lyr = frm.getlayer(TZSP) + +tzsp_raw_rssi_byte_lyr = tzsp_lyr.payload +assert(tzsp_raw_rssi_byte_lyr.type == 10) + +tzsp_tag_raw_rssi_short = tzsp_raw_rssi_byte_lyr.payload +assert(tzsp_tag_raw_rssi_short.type == 10) + +tzsp_tag_snr_byte = tzsp_tag_raw_rssi_short.payload +assert(tzsp_tag_snr_byte.type == 11) + +tzsp_tag_snr_short = tzsp_tag_snr_byte.payload +assert(tzsp_tag_snr_short.type == 11) + +tzsp_tag_data_rate = tzsp_tag_snr_short.payload +assert(tzsp_tag_data_rate.type == 12) + +tzsp_tag_timestamp = tzsp_tag_data_rate.payload +assert(tzsp_tag_timestamp.type == 13) + +tzsp_tag_contention_free_no = tzsp_tag_timestamp.payload +assert(tzsp_tag_contention_free_no.type == 15) + +tzsp_tag_contention_free_yes = tzsp_tag_contention_free_no.payload +assert(tzsp_tag_contention_free_yes.type == 15) + +tzsp_tag_decrypted_no = tzsp_tag_contention_free_yes.payload +assert(tzsp_tag_decrypted_no.type == 16) + +tzsp_tag_decrypted_yes = tzsp_tag_decrypted_no.payload +assert(tzsp_tag_decrypted_yes.type == 16) + +tzsp_tag_error_yes = tzsp_tag_decrypted_yes.payload +assert(tzsp_tag_error_yes.type == 17) + +tzsp_tag_error_no = tzsp_tag_error_yes.payload +assert(tzsp_tag_error_no.type == 17) + +tzsp_tag_rx_channel = tzsp_tag_error_no.payload +assert(tzsp_tag_rx_channel.type == 18) + +tzsp_tag_packet_count = tzsp_tag_rx_channel.payload +assert(tzsp_tag_packet_count.type == 40) + +tzsp_tag_frame_length = tzsp_tag_packet_count.payload +assert(tzsp_tag_frame_length.type == 41) + +tzsp_tag_sensor_id = tzsp_tag_frame_length.payload +assert(tzsp_tag_sensor_id.type == 60) + +tzsp_tag_padding = tzsp_tag_sensor_id.payload +assert(tzsp_tag_padding.type == 0) + +tzsp_tag_end = tzsp_tag_padding.payload +assert(tzsp_tag_end.type == 1) + +encapsulated_payload = tzsp_tag_end.payload +encapsulated_ether_lyr = encapsulated_payload.getlayer(Ether) +assert(encapsulated_ether_lyr.src == '00:03:03:03:03:03') + ++ corner cases + += state tags value range + +== TZSPTagContentionFree + +bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT) + +frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02')/ \ + IP(src='1.1.1.1', dst='2.2.2.2')/ \ + UDP(sport=12345, dport=TZSP_PORT_DEFAULT)/ \ + TZSP()/ \ + TZSPTagContentionFree(contention_free = 0xff)/ \ + TZSPTagEnd() + +frm = frm.build() +frm = Ether(frm) +tzsp_tag_contention_free = frm.getlayer(TZSPTagContentionFree) +assert(tzsp_tag_contention_free) +tzsp_tag_contention_free_attr = tzsp_tag_contention_free.get_field('contention_free') +assert(tzsp_tag_contention_free_attr) +symb_str = tzsp_tag_contention_free_attr.i2repr(tzsp_tag_contention_free, tzsp_tag_contention_free.contention_free) +assert(symb_str == 'yes') + +== TZSPTagError + +bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT) + +frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02')/ \ + IP(src='1.1.1.1', dst='2.2.2.2')/ \ + UDP(sport=12345, dport=TZSP_PORT_DEFAULT)/ \ + TZSP()/ \ + TZSPTagError(fcs_error=TZSPTagError.NO)/ \ + TZSPTagEnd() + +frm = frm.build() +frm = Ether(frm) +tzsp_tag_error = frm.getlayer(TZSPTagError) +assert(tzsp_tag_error) +tzsp_tag_error_attr = tzsp_tag_error.get_field('fcs_error') +assert(tzsp_tag_error_attr) +symb_str = tzsp_tag_error_attr.i2repr(tzsp_tag_error, tzsp_tag_error.fcs_error) +assert(symb_str == 'no') + +frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02')/ \ + IP(src='1.1.1.1', dst='2.2.2.2')/ \ + UDP(sport=12345, dport=TZSP_PORT_DEFAULT)/ \ + TZSP()/ \ + TZSPTagError(fcs_error=TZSPTagError.YES + 1)/ \ + TZSPTagEnd() + +frm = frm.build() +frm = Ether(frm) +tzsp_tag_error = frm.getlayer(TZSPTagError) +assert(tzsp_tag_error) +tzsp_tag_error_attr = tzsp_tag_error.get_field('fcs_error') +assert(tzsp_tag_error_attr) +symb_str = tzsp_tag_error_attr.i2repr(tzsp_tag_error, tzsp_tag_error.fcs_error) +assert(symb_str == 'reserved') + +== missing TZSP header before end tag + +frm = TZSPTagEnd()/ \ + Ether(src='00:03:03:03:03:03', dst='00:04:04:04:04:04')/ \ + ARP() + +frm = frm.build() +try: + frm = TZSPTagEnd(frm) + assert False +except TZSPStructureException: + pass + +== invalid length field for given tag + +bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT) + +frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \ + IP(src='1.1.1.1', dst='2.2.2.2') / \ + UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \ + TZSP() / \ + TZSPTagRawRSSIByte(len=3) / \ + TZSPTagEnd() + +frm = frm.build() +frm = Ether(frm) + +tzsp_lyr = frm.getlayer(TZSP) +assert(type(tzsp_lyr.payload) is Raw ) + +== handling of unknown tag - payload to short + +bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT) + +SENSOR_ID = '1E:AT:DE:AD:BE:EF' + +frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \ + IP(src='1.1.1.1', dst='2.2.2.2') / \ + UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \ + TZSP() / \ + Raw(b'\xff\x0a\x01\x02\x03\x04\x05') + +frm = frm.build() +frm = Ether(frm) +frm.show() +tzsp_lyr = frm.getlayer(TZSP) +assert(tzsp_lyr) +raw_lyr = tzsp_lyr.payload +assert(type(raw_lyr) is Raw) +assert(raw_lyr.load == b'\xff\x0a\x01\x02\x03\x04\x05') + +== handling of unknown tag - no payload after tag type + +bind_layers(UDP, TZSP, dport=TZSP_PORT_DEFAULT) + +SENSOR_ID = '1E:AT:DE:AD:BE:EF' + +frm = Ether(src='00:01:01:01:01:01', dst='00:02:02:02:02:02') / \ + IP(src='1.1.1.1', dst='2.2.2.2') / \ + UDP(sport=12345, dport=TZSP_PORT_DEFAULT) / \ + TZSP() / \ + Raw(b'\xff') + +frm = frm.build() +frm = Ether(frm) + +tzsp_lyr = frm.getlayer(TZSP) +assert(tzsp_lyr) +raw_lyr = tzsp_lyr.payload +assert(type(raw_lyr) is Raw) +assert(raw_lyr.load == b'\xff') diff --git a/scapy/fields.py b/scapy/fields.py index c3dc9b30c..036f7cf3d 100644 --- a/scapy/fields.py +++ b/scapy/fields.py @@ -320,6 +320,118 @@ class SignedByteField(Field): def __init__(self, name, default): Field.__init__(self, name, default, "b") + +class FieldValueRangeException(Scapy_Exception): + pass + + +class FieldAttributeException(Scapy_Exception): + pass + + +class YesNoByteField(ByteField): + """ + byte based flag field that shows representation of its number based on a given association + + in its default configuration the following representation is generated: + x == 0 : 'no' + x != 0 : 'yes' + + in more sophisticated use-cases (e.g. yes/no/invalid) one can use the config attribute to configure + key-value, key-range and key-value-set associations that will be used to generate the values representation. + + a range is given by a tuple (, ) including the last value. a single-value tuple + is treated as scalar. + + a list defines a set of (probably non consecutive) values that should be associated to a given key. + + all values not associated with a key will be shown as number of type unsigned byte. + + config = { + 'no' : 0, + 'foo' : (1,22), + 'yes' : 23, + 'bar' : [24,25, 42, 48, 87, 253] + } + + generates the following representations: + + x == 0 : 'no' + x == 15: 'foo' + x == 23: 'yes' + x == 42: 'bar' + x == 43: 43 + + using the config attribute one could also revert the stock-yes-no-behavior: + + config = { + 'yes' : 0, + 'no' : (1,255) + } + + will generate the following value representation: + + x == 0 : 'yes' + x != 0 : 'no' + + """ + __slots__ = ['eval_fn'] + + def _build_config_representation(self, config): + assoc_table = dict() + for key in config: + value_spec = config[key] + + value_spec_type = type(value_spec) + + if value_spec_type is int: + if value_spec < 0 or value_spec > 255: + raise FieldValueRangeException('given field value {} invalid - ' + 'must be in range [0..255]'.format(value_spec)) + assoc_table[value_spec] = key + + elif value_spec_type is list: + for value in value_spec: + if value < 0 or value > 255: + raise FieldValueRangeException('given field value {} invalid - ' + 'must be in range [0..255]'.format(value)) + assoc_table[value] = key + + elif value_spec_type is tuple: + value_spec_len = len(value_spec) + if value_spec_len != 2: + raise FieldAttributeException('invalid length {} of given config item tuple {} - must be ' + '(, ).'.format(value_spec_len, value_spec)) + + value_range_start = value_spec[0] + if value_range_start < 0 or value_range_start > 255: + raise FieldValueRangeException('given field value {} invalid - ' + 'must be in range [0..255]'.format(value_range_start)) + + value_range_end = value_spec[1] + if value_range_end < 0 or value_range_end > 255: + raise FieldValueRangeException('given field value {} invalid - ' + 'must be in range [0..255]'.format(value_range_end)) + + for value in range(value_range_start, value_range_end + 1): + + assoc_table[value] = key + + self.eval_fn = lambda x: assoc_table[x] if x in assoc_table else x + + def __init__(self, name, default, config=None, *args, **kargs): + + if not config: + # this represents the common use case and therefore it is kept small + self.eval_fn = lambda x: 'no' if x == 0 else 'yes' + else: + self._build_config_representation(config) + ByteField.__init__(self, name, default, *args, **kargs) + + def i2repr(self, pkt, x): + return self.eval_fn(x) + + class ShortField(Field): def __init__(self, name, default): Field.__init__(self, name, default, "H") diff --git a/test/fields.uts b/test/fields.uts index f66b99445..90f5ee30c 100644 --- a/test/fields.uts +++ b/test/fields.uts @@ -1036,3 +1036,82 @@ try: assert(False) except (Scapy_Exception, IndexError): pass + ++ YesNoByteField + += default usage + +yn_bf = YesNoByteField('test', 0x00) +assert(yn_bf.i2repr(None, 0x00) == 'no') +assert(yn_bf.i2repr(None, 0x01) == 'yes') +assert(yn_bf.i2repr(None, 0x02) == 'yes') +assert(yn_bf.i2repr(None, 0xff) == 'yes') + += inverted yes - no (scalar config) +yn_bf = YesNoByteField('test', 0x00, config={'yes': 0x00, 'no': 0x01}) +assert(yn_bf.i2repr(None, 0x00) == 'yes') +assert(yn_bf.i2repr(None, 0x01) == 'no') +assert(yn_bf.i2repr(None, 0x02) == 2) +assert(yn_bf.i2repr(None, 0xff) == 255) + += inverted yes - no (range config) +yn_bf = YesNoByteField('test', 0x00, config={'yes': 0x00, 'no': (0x01, 0xff)}) +assert(yn_bf.i2repr(None, 0x00) == 'yes') +assert(yn_bf.i2repr(None, 0x01) == 'no') +assert(yn_bf.i2repr(None, 0x02) == 'no') +assert(yn_bf.i2repr(None, 0xff) == 'no') + += yes - no (using sets) +yn_bf = YesNoByteField('test', 0x00, config={'yes': [0x00, 0x02], 'no': [0x01, 0x04, 0xff]}) +assert(yn_bf.i2repr(None, 0x00) == 'yes') +assert(yn_bf.i2repr(None, 0x01) == 'no') +assert(yn_bf.i2repr(None, 0x02) == 'yes') +assert(yn_bf.i2repr(None, 0x03) == 3) +assert(yn_bf.i2repr(None, 0x04) == 'no') +assert(yn_bf.i2repr(None, 0x05) == 5) +assert(yn_bf.i2repr(None, 0xff) == 'no') + += yes, no and invalid +yn_bf = YesNoByteField('test', 0x00, config={'no': 0x00, 'yes': 0x01, 'invalid': (0x02, 0xff)}) +assert(yn_bf.i2repr(None, 0x00) == 'no') +assert(yn_bf.i2repr(None, 0x01) == 'yes') +assert(yn_bf.i2repr(None, 0x02) == 'invalid') +assert(yn_bf.i2repr(None, 0xff) == 'invalid') + += invalid scalar spec + +try: + YesNoByteField('test', 0x00, config={'no': 0x00, 'yes': 256}) + assert(False) +except FieldValueRangeException: + pass + += invalid range spec - invalid length + +try: + YesNoByteField('test', 0x00, config={'no': 0x00, 'yes': (0x00, 0x02, 0x02)}) + assert(False) +except FieldAttributeException: + pass + += invalid range spec - invalid value + +try: + YesNoByteField('test', 0x00, config={'no': 0x00, 'yes': (0x100, 0x01)}) + assert(False) +except FieldValueRangeException: + pass + +try: + YesNoByteField('test', 0x00, config={'no': 0x00, 'yes': (0x00, 0x100)}) + assert(False) +except FieldValueRangeException: + pass + += invalid set spec - invalid value + +try: + YesNoByteField('test', 0x00, config={'no': 0x00, 'yes': [0x01, 0x101]}) + assert(False) +except FieldValueRangeException: + pass