From f30d24fab028f98d1ba89e8bc5d6b5b86b632de3 Mon Sep 17 00:00:00 2001 From: Florian Maury Date: Mon, 2 Jan 2017 16:26:56 +0100 Subject: [PATCH 1/2] Add next_cls_cb attribute to PacketListField - this feature adds the ability to have a PacketListField of heterogeneous Packet types with dynamic discovery of the next type. This discovery can be based on any elements including previously parsed packets, underlayers, remaining bytes (look ahead), and last parsed packet. - this feature also adds the ability to parse PacketListFields where neither the length nor the number of elements can be predicted before parsing. This could be done previously using a length_from callback that did significant peeks into the string to parse, but it felt clumsy. --- scapy/fields.py | 20 +++-- test/fields.uts | 190 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 205 insertions(+), 5 deletions(-) diff --git a/scapy/fields.py b/scapy/fields.py index fc41b7600..a7b511a86 100644 --- a/scapy/fields.py +++ b/scapy/fields.py @@ -435,15 +435,15 @@ class PacketLenField(PacketField): class PacketListField(PacketField): - __slots__ = ["count_from", "length_from"] + __slots__ = ["count_from", "length_from", "next_cls_cb"] islist = 1 - def __init__(self, name, default, cls, count_from=None, length_from=None): + def __init__(self, name, default, cls=None, count_from=None, length_from=None, next_cls_cb=None): if default is None: default = [] # Create a new list for each instance PacketField.__init__(self, name, default, cls) self.count_from = count_from self.length_from = length_from - + self.next_cls_cb = next_cls_cb def any2i(self, pkt, x): if not isinstance(x, list): @@ -462,11 +462,14 @@ class PacketListField(PacketField): else: return [p if isinstance(p, bytes) else p.copy() for p in x] def getfield(self, pkt, s): - c = l = None + c = l = cls = None if self.length_from is not None: l = self.length_from(pkt) elif self.count_from is not None: c = self.count_from(pkt) + if self.next_cls_cb is not None: + cls = self.next_cls_cb(pkt, [], None, s) + c = 1 lst = [] ret = b"" @@ -479,7 +482,10 @@ class PacketListField(PacketField): break c -= 1 try: - p = self.m2i(pkt,remain) + if cls is not None: + p = cls(remain) + else: + p = self.m2i(pkt, remain) except Exception: if conf.debug_dissector: raise @@ -490,6 +496,10 @@ class PacketListField(PacketField): pad = p[conf.padding_layer] remain = pad.load del(pad.underlayer.payload) + if self.next_cls_cb is not None: + cls = self.next_cls_cb(pkt, lst, p, remain) + if cls is not None: + c += 1 else: remain = b"" lst.append(p) diff --git a/test/fields.uts b/test/fields.uts index 815d36768..80bc45951 100644 --- a/test/fields.uts +++ b/test/fields.uts @@ -328,6 +328,196 @@ assert( str(a) == str(b) ) assert TCPOptionsField("test", "").getfield(TCP(dataofs=0), "") == ('', []) +############ +############ ++ PacketListField tests + += Create a layer +~ field lengthfield +class TestPLF(Packet): + name="test" + fields_desc=[ FieldLenField("len", None, count_of="plist"), + PacketListField("plist", None, IP, count_from=lambda pkt:pkt.len) ] + += Test the PacketListField assembly +~ field lengthfield +x=TestPLF() +str(x) +_ == "\x00\x00" + += Test the PacketListField assembly 2 +~ field lengthfield +x=TestPLF() +x.plist=[IP()/TCP(), IP()/UDP()] +str(x) +_.startswith('\x00\x02E') + += Test disassembly +~ field lengthfield +x=TestPLF(plist=[IP()/TCP(seq=1234567), IP()/UDP()]) +TestPLF(str(x)) +_.show() +IP in _ and TCP in _ and UDP in _ and _[TCP].seq == 1234567 + += Nested PacketListField +~ field lengthfield +y=IP()/TCP(seq=111111)/TestPLF(plist=[IP()/TCP(seq=222222),IP()/UDP()]) +TestPLF(plist=[y,IP()/TCP(seq=333333)]) +_.show() +IP in _ and TCP in _ and UDP in _ and _[TCP].seq == 111111 and _[TCP:2].seq==222222 and _[TCP:3].seq == 333333 + += Complex packet +~ field lengthfield ccc +class TestPkt(Packet): + fields_desc = [ ByteField("f1",65), + ShortField("f2",0x4244) ] + def extract_padding(self, p): + return "", p + +class TestPLF2(Packet): + fields_desc = [ FieldLenField("len1", None, count_of="plist",fmt="H", adjust=lambda pkt,x:x+2), + FieldLenField("len2", None, length_of="plist",fmt="I", adjust=lambda pkt,x:(x+1)/2), + PacketListField("plist", None, TestPkt, length_from=lambda x:(x.len2*2)/3*3) ] + +a=TestPLF2() +str(a) +assert( _ == "\x00\x02\x00\x00\x00\x00" ) + +a.plist=[TestPkt(),TestPkt(f1=100)] +str(a) +assert(_ == '\x00\x04\x00\x00\x00\x03ABDdBD') + +a /= "123456" +b = TestPLF2(str(a)) +b.show() +assert(b.len1 == 4 and b.len2 == 3) +assert(b[TestPkt].f1 == 65 and b[TestPkt].f2 == 0x4244) +assert(b[TestPkt:2].f1 == 100) +assert(Raw in b and b[Raw].load == "123456") + +a.plist.append(TestPkt(f1=200)) +b = TestPLF2(str(a)) +b.show() +assert(b.len1 == 5 and b.len2 == 5) +assert(b[TestPkt].f1 == 65 and b[TestPkt].f2 == 0x4244) +assert(b[TestPkt:2].f1 == 100) +assert(b[TestPkt:3].f1 == 200) +assert(b.getlayer(TestPkt,4) is None) +assert(Raw in b and b[Raw].load == "123456") +hexdiff(a,b) +assert( str(a) == str(b) ) + += Create layers for heterogeneous PacketListField +~ field lengthfield +TestPLFH1 = type('TestPLFH1', (Packet,), { + 'name': 'test1', + 'fields_desc': [ByteField('data', 0)], + 'guess_payload_class': lambda self, p: conf.padding_layer, + } +) +TestPLFH2 = type('TestPLFH2', (Packet,), { + 'name': 'test2', + 'fields_desc': [ShortField('data', 0)], + 'guess_payload_class': lambda self, p: conf.padding_layer, + } +) +class TestPLFH3(Packet): + name = 'test3' + fields_desc = [ + PacketListField( + 'data', [], + next_cls_cb=lambda pkt, lst, p, remain: pkt.detect_next_packet(lst, p, remain) + ) + ] + def detect_next_packet(self, lst, p, remain): + if len(remain) < 3: + return None + if isinstance(p, type(None)): + return TestPLFH1 + if p.data & 3 == 1: + return TestPLFH1 + if p.data & 3 == 2: + return TestPLFH2 + return None + += Test heterogeneous PacketListField +~ field lengthfield + +p = TestPLFH3('\x02\x01\x01\xc1\x02\x80\x04toto') +assert(isinstance(p.data[0], TestPLFH1)) +assert(p.data[0].data == 0x2) +assert(isinstance(p.data[1], TestPLFH2)) +assert(p.data[1].data == 0x101) +assert(isinstance(p.data[2], TestPLFH1)) +assert(p.data[2].data == 0xc1) +assert(isinstance(p.data[3], TestPLFH1)) +assert(p.data[3].data == 0x2) +assert(isinstance(p.data[4], TestPLFH2)) +assert(p.data[4].data == 0x8004) +assert(isinstance(p.payload, conf.raw_layer)) +assert(p.payload.load == 'toto') + +p = TestPLFH3('\x02\x01\x01\xc1\x02\x80\x02to') +assert(isinstance(p.data[0], TestPLFH1)) +assert(p.data[0].data == 0x2) +assert(isinstance(p.data[1], TestPLFH2)) +assert(p.data[1].data == 0x101) +assert(isinstance(p.data[2], TestPLFH1)) +assert(p.data[2].data == 0xc1) +assert(isinstance(p.data[3], TestPLFH1)) +assert(p.data[3].data == 0x2) +assert(isinstance(p.data[4], TestPLFH2)) +assert(p.data[4].data == 0x8002) +assert(isinstance(p.payload, conf.raw_layer)) +assert(p.payload.load == 'to') + += Create layers for heterogeneous PacketListField with memory +~ field lengthfield +TestPLFH4 = type('TestPLFH4', (Packet,), { + 'name': 'test4', + 'fields_desc': [ByteField('data', 0)], + 'guess_payload_class': lambda self, p: conf.padding_layer, + } +) +TestPLFH5 = type('TestPLFH5', (Packet,), { + 'name': 'test5', + 'fields_desc': [ShortField('data', 0)], + 'guess_payload_class': lambda self, p: conf.padding_layer, + } +) +class TestPLFH6(Packet): + __slots__ = ['_memory'] + name = 'test6' + fields_desc = [ + PacketListField( + 'data', [], + next_cls_cb=lambda pkt, lst, p, remain: pkt.detect_next_packet(lst, p, remain) + ) + ] + def detect_next_packet(self, lst, p, remain): + if isinstance(p, type(None)): + self._memory = [TestPLFH4] * 3 + [TestPLFH5] + try: + return self._memory.pop(0) + except IndexError: + return None + += Test heterogeneous PacketListField with memory +~ field lengthfield + +p = TestPLFH6('\x01\x02\x03\xc1\x02toto') +assert(isinstance(p.data[0], TestPLFH4)) +assert(p.data[0].data == 0x1) +assert(isinstance(p.data[1], TestPLFH4)) +assert(p.data[1].data == 0x2) +assert(isinstance(p.data[2], TestPLFH4)) +assert(p.data[2].data == 0x3) +assert(isinstance(p.data[3], TestPLFH5)) +assert(p.data[3].data == 0xc102) +assert(isinstance(p.payload, conf.raw_layer)) +assert(p.payload.load == 'toto') + + ############ ############ + Tests on MultiFlagsField From bd1265b27a95c239a9c788f246e6cbbbe8afaa76 Mon Sep 17 00:00:00 2001 From: Florian Maury Date: Wed, 19 Jul 2017 15:17:51 +0200 Subject: [PATCH 2/2] Add documentation to PacketListField --- scapy/fields.py | 59 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/scapy/fields.py b/scapy/fields.py index a7b511a86..2bfe605b9 100644 --- a/scapy/fields.py +++ b/scapy/fields.py @@ -435,9 +435,68 @@ class PacketLenField(PacketField): class PacketListField(PacketField): + """ PacketListField represents a series of Packet instances that might occur right in the middle of another Packet + field list. + This field type may also be used to indicate that a series of Packet instances have a sibling semantic instead of + a parent/child relationship (i.e. a stack of layers). + """ __slots__ = ["count_from", "length_from", "next_cls_cb"] islist = 1 def __init__(self, name, default, cls=None, count_from=None, length_from=None, next_cls_cb=None): + """ The number of Packet instances that are dissected by this field can be parametrized using one of three + different mechanisms/parameters: + * count_from: a callback that returns the number of Packet instances to dissect. The callback prototype is: + count_from(pkt:Packet) -> int + * length_from: a callback that returns the number of bytes that must be dissected by this field. The + callback prototype is: + length_from(pkt:Packet) -> int + * next_cls_cb: a callback that enables a Scapy developer to dynamically discover if another Packet instance + should be dissected or not. See below for this callback prototype. + + The bytes that are not consumed during the dissection of this field are passed to the next field of the current + packet. + + For the serialization of such a field, the list of Packets that are contained in a PacketListField can be + heterogeneous and is unrestricted. + + The type of the Packet instances that are dissected with this field is specified or discovered using one of the + following mechanism: + * the cls parameter may contain a callable that returns an instance of the dissected Packet. This + may either be a reference of a Packet subclass (e.g. DNSRROPT in layers/dns.py) to generate an + homogeneous PacketListField or a function deciding the type of the Packet instance + (e.g. _CDPGuessAddrRecord in contrib/cdp.py) + * the cls parameter may contain a class object with a defined "dispatch_hook" classmethod. That + method must return a Packet instance. The dispatch_hook callmethod must implement the following prototype: + dispatch_hook(cls, _pkt:Optional[Packet], *args, **kargs) -> Packet_metaclass + The _pkt parameter may contain a reference to the packet instance containing the PacketListField that is + being dissected. + * the next_cls_cb parameter may contain a callable whose prototype is: + cbk(pkt:Packet, lst:List[Packet], cur:Optional[Packet], remain:str) -> Optional[Packet_metaclass] + The pkt argument contains a reference to the Packet instance containing the PacketListField that is + being dissected. The lst argument is the list of all Packet instances that were previously parsed during + the current PacketListField dissection, save for the very last Packet instance. The cur argument + contains a reference to that very last parsed Packet instance. The remain argument contains the bytes + that may still be consumed by the current PacketListField dissection operation. This callback returns + either the type of the next Packet to dissect or None to indicate that no more Packet are to be + dissected. + These four arguments allows a variety of dynamic discovery of the number of Packet to dissect and of the + type of each one of these Packets, including: type determination based on current Packet instances or + its underlayers, continuation based on the previously parsed Packet instances within that + PacketListField, continuation based on a look-ahead on the bytes to be dissected... + + The cls and next_cls_cb parameters are semantically exclusive, although one could specify both. If both are + specified, cls is silently ignored. The same is true for count_from and next_cls_cb. + length_from and next_cls_cb are compatible and the dissection will end, whichever of the two stop conditions + comes first. + + @param name: the name of the field + @param default: the default value of this field; generally an empty Python list + @param cls: either a callable returning a Packet instance or a class object defining a dispatch_hook class + method + @param count_from: a callback returning the number of Packet instances to dissect + @param length_from: a callback returning the number of bytes to dissect + @param next_cls_cb: a callback returning either None or the type of the next Packet to dissect. + """ if default is None: default = [] # Create a new list for each instance PacketField.__init__(self, name, default, cls)