diff --git a/scapy/fields.py b/scapy/fields.py index fc41b7600..2bfe605b9 100644 --- a/scapy/fields.py +++ b/scapy/fields.py @@ -435,15 +435,74 @@ class PacketLenField(PacketField): class PacketListField(PacketField): - __slots__ = ["count_from", "length_from"] + """ PacketListField represents a series of Packet instances that might occur right in the middle of another Packet + field list. + This field type may also be used to indicate that a series of Packet instances have a sibling semantic instead of + a parent/child relationship (i.e. a stack of layers). + """ + __slots__ = ["count_from", "length_from", "next_cls_cb"] islist = 1 - def __init__(self, name, default, cls, count_from=None, length_from=None): + def __init__(self, name, default, cls=None, count_from=None, length_from=None, next_cls_cb=None): + """ The number of Packet instances that are dissected by this field can be parametrized using one of three + different mechanisms/parameters: + * count_from: a callback that returns the number of Packet instances to dissect. The callback prototype is: + count_from(pkt:Packet) -> int + * length_from: a callback that returns the number of bytes that must be dissected by this field. The + callback prototype is: + length_from(pkt:Packet) -> int + * next_cls_cb: a callback that enables a Scapy developer to dynamically discover if another Packet instance + should be dissected or not. See below for this callback prototype. + + The bytes that are not consumed during the dissection of this field are passed to the next field of the current + packet. + + For the serialization of such a field, the list of Packets that are contained in a PacketListField can be + heterogeneous and is unrestricted. + + The type of the Packet instances that are dissected with this field is specified or discovered using one of the + following mechanism: + * the cls parameter may contain a callable that returns an instance of the dissected Packet. This + may either be a reference of a Packet subclass (e.g. DNSRROPT in layers/dns.py) to generate an + homogeneous PacketListField or a function deciding the type of the Packet instance + (e.g. _CDPGuessAddrRecord in contrib/cdp.py) + * the cls parameter may contain a class object with a defined "dispatch_hook" classmethod. That + method must return a Packet instance. The dispatch_hook callmethod must implement the following prototype: + dispatch_hook(cls, _pkt:Optional[Packet], *args, **kargs) -> Packet_metaclass + The _pkt parameter may contain a reference to the packet instance containing the PacketListField that is + being dissected. + * the next_cls_cb parameter may contain a callable whose prototype is: + cbk(pkt:Packet, lst:List[Packet], cur:Optional[Packet], remain:str) -> Optional[Packet_metaclass] + The pkt argument contains a reference to the Packet instance containing the PacketListField that is + being dissected. The lst argument is the list of all Packet instances that were previously parsed during + the current PacketListField dissection, save for the very last Packet instance. The cur argument + contains a reference to that very last parsed Packet instance. The remain argument contains the bytes + that may still be consumed by the current PacketListField dissection operation. This callback returns + either the type of the next Packet to dissect or None to indicate that no more Packet are to be + dissected. + These four arguments allows a variety of dynamic discovery of the number of Packet to dissect and of the + type of each one of these Packets, including: type determination based on current Packet instances or + its underlayers, continuation based on the previously parsed Packet instances within that + PacketListField, continuation based on a look-ahead on the bytes to be dissected... + + The cls and next_cls_cb parameters are semantically exclusive, although one could specify both. If both are + specified, cls is silently ignored. The same is true for count_from and next_cls_cb. + length_from and next_cls_cb are compatible and the dissection will end, whichever of the two stop conditions + comes first. + + @param name: the name of the field + @param default: the default value of this field; generally an empty Python list + @param cls: either a callable returning a Packet instance or a class object defining a dispatch_hook class + method + @param count_from: a callback returning the number of Packet instances to dissect + @param length_from: a callback returning the number of bytes to dissect + @param next_cls_cb: a callback returning either None or the type of the next Packet to dissect. + """ if default is None: default = [] # Create a new list for each instance PacketField.__init__(self, name, default, cls) self.count_from = count_from self.length_from = length_from - + self.next_cls_cb = next_cls_cb def any2i(self, pkt, x): if not isinstance(x, list): @@ -462,11 +521,14 @@ class PacketListField(PacketField): else: return [p if isinstance(p, bytes) else p.copy() for p in x] def getfield(self, pkt, s): - c = l = None + c = l = cls = None if self.length_from is not None: l = self.length_from(pkt) elif self.count_from is not None: c = self.count_from(pkt) + if self.next_cls_cb is not None: + cls = self.next_cls_cb(pkt, [], None, s) + c = 1 lst = [] ret = b"" @@ -479,7 +541,10 @@ class PacketListField(PacketField): break c -= 1 try: - p = self.m2i(pkt,remain) + if cls is not None: + p = cls(remain) + else: + p = self.m2i(pkt, remain) except Exception: if conf.debug_dissector: raise @@ -490,6 +555,10 @@ class PacketListField(PacketField): pad = p[conf.padding_layer] remain = pad.load del(pad.underlayer.payload) + if self.next_cls_cb is not None: + cls = self.next_cls_cb(pkt, lst, p, remain) + if cls is not None: + c += 1 else: remain = b"" lst.append(p) diff --git a/test/fields.uts b/test/fields.uts index 815d36768..80bc45951 100644 --- a/test/fields.uts +++ b/test/fields.uts @@ -328,6 +328,196 @@ assert( str(a) == str(b) ) assert TCPOptionsField("test", "").getfield(TCP(dataofs=0), "") == ('', []) +############ +############ ++ PacketListField tests + += Create a layer +~ field lengthfield +class TestPLF(Packet): + name="test" + fields_desc=[ FieldLenField("len", None, count_of="plist"), + PacketListField("plist", None, IP, count_from=lambda pkt:pkt.len) ] + += Test the PacketListField assembly +~ field lengthfield +x=TestPLF() +str(x) +_ == "\x00\x00" + += Test the PacketListField assembly 2 +~ field lengthfield +x=TestPLF() +x.plist=[IP()/TCP(), IP()/UDP()] +str(x) +_.startswith('\x00\x02E') + += Test disassembly +~ field lengthfield +x=TestPLF(plist=[IP()/TCP(seq=1234567), IP()/UDP()]) +TestPLF(str(x)) +_.show() +IP in _ and TCP in _ and UDP in _ and _[TCP].seq == 1234567 + += Nested PacketListField +~ field lengthfield +y=IP()/TCP(seq=111111)/TestPLF(plist=[IP()/TCP(seq=222222),IP()/UDP()]) +TestPLF(plist=[y,IP()/TCP(seq=333333)]) +_.show() +IP in _ and TCP in _ and UDP in _ and _[TCP].seq == 111111 and _[TCP:2].seq==222222 and _[TCP:3].seq == 333333 + += Complex packet +~ field lengthfield ccc +class TestPkt(Packet): + fields_desc = [ ByteField("f1",65), + ShortField("f2",0x4244) ] + def extract_padding(self, p): + return "", p + +class TestPLF2(Packet): + fields_desc = [ FieldLenField("len1", None, count_of="plist",fmt="H", adjust=lambda pkt,x:x+2), + FieldLenField("len2", None, length_of="plist",fmt="I", adjust=lambda pkt,x:(x+1)/2), + PacketListField("plist", None, TestPkt, length_from=lambda x:(x.len2*2)/3*3) ] + +a=TestPLF2() +str(a) +assert( _ == "\x00\x02\x00\x00\x00\x00" ) + +a.plist=[TestPkt(),TestPkt(f1=100)] +str(a) +assert(_ == '\x00\x04\x00\x00\x00\x03ABDdBD') + +a /= "123456" +b = TestPLF2(str(a)) +b.show() +assert(b.len1 == 4 and b.len2 == 3) +assert(b[TestPkt].f1 == 65 and b[TestPkt].f2 == 0x4244) +assert(b[TestPkt:2].f1 == 100) +assert(Raw in b and b[Raw].load == "123456") + +a.plist.append(TestPkt(f1=200)) +b = TestPLF2(str(a)) +b.show() +assert(b.len1 == 5 and b.len2 == 5) +assert(b[TestPkt].f1 == 65 and b[TestPkt].f2 == 0x4244) +assert(b[TestPkt:2].f1 == 100) +assert(b[TestPkt:3].f1 == 200) +assert(b.getlayer(TestPkt,4) is None) +assert(Raw in b and b[Raw].load == "123456") +hexdiff(a,b) +assert( str(a) == str(b) ) + += Create layers for heterogeneous PacketListField +~ field lengthfield +TestPLFH1 = type('TestPLFH1', (Packet,), { + 'name': 'test1', + 'fields_desc': [ByteField('data', 0)], + 'guess_payload_class': lambda self, p: conf.padding_layer, + } +) +TestPLFH2 = type('TestPLFH2', (Packet,), { + 'name': 'test2', + 'fields_desc': [ShortField('data', 0)], + 'guess_payload_class': lambda self, p: conf.padding_layer, + } +) +class TestPLFH3(Packet): + name = 'test3' + fields_desc = [ + PacketListField( + 'data', [], + next_cls_cb=lambda pkt, lst, p, remain: pkt.detect_next_packet(lst, p, remain) + ) + ] + def detect_next_packet(self, lst, p, remain): + if len(remain) < 3: + return None + if isinstance(p, type(None)): + return TestPLFH1 + if p.data & 3 == 1: + return TestPLFH1 + if p.data & 3 == 2: + return TestPLFH2 + return None + += Test heterogeneous PacketListField +~ field lengthfield + +p = TestPLFH3('\x02\x01\x01\xc1\x02\x80\x04toto') +assert(isinstance(p.data[0], TestPLFH1)) +assert(p.data[0].data == 0x2) +assert(isinstance(p.data[1], TestPLFH2)) +assert(p.data[1].data == 0x101) +assert(isinstance(p.data[2], TestPLFH1)) +assert(p.data[2].data == 0xc1) +assert(isinstance(p.data[3], TestPLFH1)) +assert(p.data[3].data == 0x2) +assert(isinstance(p.data[4], TestPLFH2)) +assert(p.data[4].data == 0x8004) +assert(isinstance(p.payload, conf.raw_layer)) +assert(p.payload.load == 'toto') + +p = TestPLFH3('\x02\x01\x01\xc1\x02\x80\x02to') +assert(isinstance(p.data[0], TestPLFH1)) +assert(p.data[0].data == 0x2) +assert(isinstance(p.data[1], TestPLFH2)) +assert(p.data[1].data == 0x101) +assert(isinstance(p.data[2], TestPLFH1)) +assert(p.data[2].data == 0xc1) +assert(isinstance(p.data[3], TestPLFH1)) +assert(p.data[3].data == 0x2) +assert(isinstance(p.data[4], TestPLFH2)) +assert(p.data[4].data == 0x8002) +assert(isinstance(p.payload, conf.raw_layer)) +assert(p.payload.load == 'to') + += Create layers for heterogeneous PacketListField with memory +~ field lengthfield +TestPLFH4 = type('TestPLFH4', (Packet,), { + 'name': 'test4', + 'fields_desc': [ByteField('data', 0)], + 'guess_payload_class': lambda self, p: conf.padding_layer, + } +) +TestPLFH5 = type('TestPLFH5', (Packet,), { + 'name': 'test5', + 'fields_desc': [ShortField('data', 0)], + 'guess_payload_class': lambda self, p: conf.padding_layer, + } +) +class TestPLFH6(Packet): + __slots__ = ['_memory'] + name = 'test6' + fields_desc = [ + PacketListField( + 'data', [], + next_cls_cb=lambda pkt, lst, p, remain: pkt.detect_next_packet(lst, p, remain) + ) + ] + def detect_next_packet(self, lst, p, remain): + if isinstance(p, type(None)): + self._memory = [TestPLFH4] * 3 + [TestPLFH5] + try: + return self._memory.pop(0) + except IndexError: + return None + += Test heterogeneous PacketListField with memory +~ field lengthfield + +p = TestPLFH6('\x01\x02\x03\xc1\x02toto') +assert(isinstance(p.data[0], TestPLFH4)) +assert(p.data[0].data == 0x1) +assert(isinstance(p.data[1], TestPLFH4)) +assert(p.data[1].data == 0x2) +assert(isinstance(p.data[2], TestPLFH4)) +assert(p.data[2].data == 0x3) +assert(isinstance(p.data[3], TestPLFH5)) +assert(p.data[3].data == 0xc102) +assert(isinstance(p.payload, conf.raw_layer)) +assert(p.payload.load == 'toto') + + ############ ############ + Tests on MultiFlagsField