mirror of https://github.com/secdev/scapy.git
Merge pull request #439 from X-Cli/HeterPLF
Add next_cls_cb attribute to PacketListField
This commit is contained in:
commit
ecfe6dd96f
|
@ -435,15 +435,74 @@ class PacketLenField(PacketField):
|
|||
|
||||
|
||||
class PacketListField(PacketField):
|
||||
__slots__ = ["count_from", "length_from"]
|
||||
""" PacketListField represents a series of Packet instances that might occur right in the middle of another Packet
|
||||
field list.
|
||||
This field type may also be used to indicate that a series of Packet instances have a sibling semantic instead of
|
||||
a parent/child relationship (i.e. a stack of layers).
|
||||
"""
|
||||
__slots__ = ["count_from", "length_from", "next_cls_cb"]
|
||||
islist = 1
|
||||
def __init__(self, name, default, cls, count_from=None, length_from=None):
|
||||
def __init__(self, name, default, cls=None, count_from=None, length_from=None, next_cls_cb=None):
|
||||
""" The number of Packet instances that are dissected by this field can be parametrized using one of three
|
||||
different mechanisms/parameters:
|
||||
* count_from: a callback that returns the number of Packet instances to dissect. The callback prototype is:
|
||||
count_from(pkt:Packet) -> int
|
||||
* length_from: a callback that returns the number of bytes that must be dissected by this field. The
|
||||
callback prototype is:
|
||||
length_from(pkt:Packet) -> int
|
||||
* next_cls_cb: a callback that enables a Scapy developer to dynamically discover if another Packet instance
|
||||
should be dissected or not. See below for this callback prototype.
|
||||
|
||||
The bytes that are not consumed during the dissection of this field are passed to the next field of the current
|
||||
packet.
|
||||
|
||||
For the serialization of such a field, the list of Packets that are contained in a PacketListField can be
|
||||
heterogeneous and is unrestricted.
|
||||
|
||||
The type of the Packet instances that are dissected with this field is specified or discovered using one of the
|
||||
following mechanism:
|
||||
* the cls parameter may contain a callable that returns an instance of the dissected Packet. This
|
||||
may either be a reference of a Packet subclass (e.g. DNSRROPT in layers/dns.py) to generate an
|
||||
homogeneous PacketListField or a function deciding the type of the Packet instance
|
||||
(e.g. _CDPGuessAddrRecord in contrib/cdp.py)
|
||||
* the cls parameter may contain a class object with a defined "dispatch_hook" classmethod. That
|
||||
method must return a Packet instance. The dispatch_hook callmethod must implement the following prototype:
|
||||
dispatch_hook(cls, _pkt:Optional[Packet], *args, **kargs) -> Packet_metaclass
|
||||
The _pkt parameter may contain a reference to the packet instance containing the PacketListField that is
|
||||
being dissected.
|
||||
* the next_cls_cb parameter may contain a callable whose prototype is:
|
||||
cbk(pkt:Packet, lst:List[Packet], cur:Optional[Packet], remain:str) -> Optional[Packet_metaclass]
|
||||
The pkt argument contains a reference to the Packet instance containing the PacketListField that is
|
||||
being dissected. The lst argument is the list of all Packet instances that were previously parsed during
|
||||
the current PacketListField dissection, save for the very last Packet instance. The cur argument
|
||||
contains a reference to that very last parsed Packet instance. The remain argument contains the bytes
|
||||
that may still be consumed by the current PacketListField dissection operation. This callback returns
|
||||
either the type of the next Packet to dissect or None to indicate that no more Packet are to be
|
||||
dissected.
|
||||
These four arguments allows a variety of dynamic discovery of the number of Packet to dissect and of the
|
||||
type of each one of these Packets, including: type determination based on current Packet instances or
|
||||
its underlayers, continuation based on the previously parsed Packet instances within that
|
||||
PacketListField, continuation based on a look-ahead on the bytes to be dissected...
|
||||
|
||||
The cls and next_cls_cb parameters are semantically exclusive, although one could specify both. If both are
|
||||
specified, cls is silently ignored. The same is true for count_from and next_cls_cb.
|
||||
length_from and next_cls_cb are compatible and the dissection will end, whichever of the two stop conditions
|
||||
comes first.
|
||||
|
||||
@param name: the name of the field
|
||||
@param default: the default value of this field; generally an empty Python list
|
||||
@param cls: either a callable returning a Packet instance or a class object defining a dispatch_hook class
|
||||
method
|
||||
@param count_from: a callback returning the number of Packet instances to dissect
|
||||
@param length_from: a callback returning the number of bytes to dissect
|
||||
@param next_cls_cb: a callback returning either None or the type of the next Packet to dissect.
|
||||
"""
|
||||
if default is None:
|
||||
default = [] # Create a new list for each instance
|
||||
PacketField.__init__(self, name, default, cls)
|
||||
self.count_from = count_from
|
||||
self.length_from = length_from
|
||||
|
||||
self.next_cls_cb = next_cls_cb
|
||||
|
||||
def any2i(self, pkt, x):
|
||||
if not isinstance(x, list):
|
||||
|
@ -462,11 +521,14 @@ class PacketListField(PacketField):
|
|||
else:
|
||||
return [p if isinstance(p, bytes) else p.copy() for p in x]
|
||||
def getfield(self, pkt, s):
|
||||
c = l = None
|
||||
c = l = cls = None
|
||||
if self.length_from is not None:
|
||||
l = self.length_from(pkt)
|
||||
elif self.count_from is not None:
|
||||
c = self.count_from(pkt)
|
||||
if self.next_cls_cb is not None:
|
||||
cls = self.next_cls_cb(pkt, [], None, s)
|
||||
c = 1
|
||||
|
||||
lst = []
|
||||
ret = b""
|
||||
|
@ -479,6 +541,9 @@ class PacketListField(PacketField):
|
|||
break
|
||||
c -= 1
|
||||
try:
|
||||
if cls is not None:
|
||||
p = cls(remain)
|
||||
else:
|
||||
p = self.m2i(pkt, remain)
|
||||
except Exception:
|
||||
if conf.debug_dissector:
|
||||
|
@ -490,6 +555,10 @@ class PacketListField(PacketField):
|
|||
pad = p[conf.padding_layer]
|
||||
remain = pad.load
|
||||
del(pad.underlayer.payload)
|
||||
if self.next_cls_cb is not None:
|
||||
cls = self.next_cls_cb(pkt, lst, p, remain)
|
||||
if cls is not None:
|
||||
c += 1
|
||||
else:
|
||||
remain = b""
|
||||
lst.append(p)
|
||||
|
|
190
test/fields.uts
190
test/fields.uts
|
@ -328,6 +328,196 @@ assert( str(a) == str(b) )
|
|||
assert TCPOptionsField("test", "").getfield(TCP(dataofs=0), "") == ('', [])
|
||||
|
||||
|
||||
############
|
||||
############
|
||||
+ PacketListField tests
|
||||
|
||||
= Create a layer
|
||||
~ field lengthfield
|
||||
class TestPLF(Packet):
|
||||
name="test"
|
||||
fields_desc=[ FieldLenField("len", None, count_of="plist"),
|
||||
PacketListField("plist", None, IP, count_from=lambda pkt:pkt.len) ]
|
||||
|
||||
= Test the PacketListField assembly
|
||||
~ field lengthfield
|
||||
x=TestPLF()
|
||||
str(x)
|
||||
_ == "\x00\x00"
|
||||
|
||||
= Test the PacketListField assembly 2
|
||||
~ field lengthfield
|
||||
x=TestPLF()
|
||||
x.plist=[IP()/TCP(), IP()/UDP()]
|
||||
str(x)
|
||||
_.startswith('\x00\x02E')
|
||||
|
||||
= Test disassembly
|
||||
~ field lengthfield
|
||||
x=TestPLF(plist=[IP()/TCP(seq=1234567), IP()/UDP()])
|
||||
TestPLF(str(x))
|
||||
_.show()
|
||||
IP in _ and TCP in _ and UDP in _ and _[TCP].seq == 1234567
|
||||
|
||||
= Nested PacketListField
|
||||
~ field lengthfield
|
||||
y=IP()/TCP(seq=111111)/TestPLF(plist=[IP()/TCP(seq=222222),IP()/UDP()])
|
||||
TestPLF(plist=[y,IP()/TCP(seq=333333)])
|
||||
_.show()
|
||||
IP in _ and TCP in _ and UDP in _ and _[TCP].seq == 111111 and _[TCP:2].seq==222222 and _[TCP:3].seq == 333333
|
||||
|
||||
= Complex packet
|
||||
~ field lengthfield ccc
|
||||
class TestPkt(Packet):
|
||||
fields_desc = [ ByteField("f1",65),
|
||||
ShortField("f2",0x4244) ]
|
||||
def extract_padding(self, p):
|
||||
return "", p
|
||||
|
||||
class TestPLF2(Packet):
|
||||
fields_desc = [ FieldLenField("len1", None, count_of="plist",fmt="H", adjust=lambda pkt,x:x+2),
|
||||
FieldLenField("len2", None, length_of="plist",fmt="I", adjust=lambda pkt,x:(x+1)/2),
|
||||
PacketListField("plist", None, TestPkt, length_from=lambda x:(x.len2*2)/3*3) ]
|
||||
|
||||
a=TestPLF2()
|
||||
str(a)
|
||||
assert( _ == "\x00\x02\x00\x00\x00\x00" )
|
||||
|
||||
a.plist=[TestPkt(),TestPkt(f1=100)]
|
||||
str(a)
|
||||
assert(_ == '\x00\x04\x00\x00\x00\x03ABDdBD')
|
||||
|
||||
a /= "123456"
|
||||
b = TestPLF2(str(a))
|
||||
b.show()
|
||||
assert(b.len1 == 4 and b.len2 == 3)
|
||||
assert(b[TestPkt].f1 == 65 and b[TestPkt].f2 == 0x4244)
|
||||
assert(b[TestPkt:2].f1 == 100)
|
||||
assert(Raw in b and b[Raw].load == "123456")
|
||||
|
||||
a.plist.append(TestPkt(f1=200))
|
||||
b = TestPLF2(str(a))
|
||||
b.show()
|
||||
assert(b.len1 == 5 and b.len2 == 5)
|
||||
assert(b[TestPkt].f1 == 65 and b[TestPkt].f2 == 0x4244)
|
||||
assert(b[TestPkt:2].f1 == 100)
|
||||
assert(b[TestPkt:3].f1 == 200)
|
||||
assert(b.getlayer(TestPkt,4) is None)
|
||||
assert(Raw in b and b[Raw].load == "123456")
|
||||
hexdiff(a,b)
|
||||
assert( str(a) == str(b) )
|
||||
|
||||
= Create layers for heterogeneous PacketListField
|
||||
~ field lengthfield
|
||||
TestPLFH1 = type('TestPLFH1', (Packet,), {
|
||||
'name': 'test1',
|
||||
'fields_desc': [ByteField('data', 0)],
|
||||
'guess_payload_class': lambda self, p: conf.padding_layer,
|
||||
}
|
||||
)
|
||||
TestPLFH2 = type('TestPLFH2', (Packet,), {
|
||||
'name': 'test2',
|
||||
'fields_desc': [ShortField('data', 0)],
|
||||
'guess_payload_class': lambda self, p: conf.padding_layer,
|
||||
}
|
||||
)
|
||||
class TestPLFH3(Packet):
|
||||
name = 'test3'
|
||||
fields_desc = [
|
||||
PacketListField(
|
||||
'data', [],
|
||||
next_cls_cb=lambda pkt, lst, p, remain: pkt.detect_next_packet(lst, p, remain)
|
||||
)
|
||||
]
|
||||
def detect_next_packet(self, lst, p, remain):
|
||||
if len(remain) < 3:
|
||||
return None
|
||||
if isinstance(p, type(None)):
|
||||
return TestPLFH1
|
||||
if p.data & 3 == 1:
|
||||
return TestPLFH1
|
||||
if p.data & 3 == 2:
|
||||
return TestPLFH2
|
||||
return None
|
||||
|
||||
= Test heterogeneous PacketListField
|
||||
~ field lengthfield
|
||||
|
||||
p = TestPLFH3('\x02\x01\x01\xc1\x02\x80\x04toto')
|
||||
assert(isinstance(p.data[0], TestPLFH1))
|
||||
assert(p.data[0].data == 0x2)
|
||||
assert(isinstance(p.data[1], TestPLFH2))
|
||||
assert(p.data[1].data == 0x101)
|
||||
assert(isinstance(p.data[2], TestPLFH1))
|
||||
assert(p.data[2].data == 0xc1)
|
||||
assert(isinstance(p.data[3], TestPLFH1))
|
||||
assert(p.data[3].data == 0x2)
|
||||
assert(isinstance(p.data[4], TestPLFH2))
|
||||
assert(p.data[4].data == 0x8004)
|
||||
assert(isinstance(p.payload, conf.raw_layer))
|
||||
assert(p.payload.load == 'toto')
|
||||
|
||||
p = TestPLFH3('\x02\x01\x01\xc1\x02\x80\x02to')
|
||||
assert(isinstance(p.data[0], TestPLFH1))
|
||||
assert(p.data[0].data == 0x2)
|
||||
assert(isinstance(p.data[1], TestPLFH2))
|
||||
assert(p.data[1].data == 0x101)
|
||||
assert(isinstance(p.data[2], TestPLFH1))
|
||||
assert(p.data[2].data == 0xc1)
|
||||
assert(isinstance(p.data[3], TestPLFH1))
|
||||
assert(p.data[3].data == 0x2)
|
||||
assert(isinstance(p.data[4], TestPLFH2))
|
||||
assert(p.data[4].data == 0x8002)
|
||||
assert(isinstance(p.payload, conf.raw_layer))
|
||||
assert(p.payload.load == 'to')
|
||||
|
||||
= Create layers for heterogeneous PacketListField with memory
|
||||
~ field lengthfield
|
||||
TestPLFH4 = type('TestPLFH4', (Packet,), {
|
||||
'name': 'test4',
|
||||
'fields_desc': [ByteField('data', 0)],
|
||||
'guess_payload_class': lambda self, p: conf.padding_layer,
|
||||
}
|
||||
)
|
||||
TestPLFH5 = type('TestPLFH5', (Packet,), {
|
||||
'name': 'test5',
|
||||
'fields_desc': [ShortField('data', 0)],
|
||||
'guess_payload_class': lambda self, p: conf.padding_layer,
|
||||
}
|
||||
)
|
||||
class TestPLFH6(Packet):
|
||||
__slots__ = ['_memory']
|
||||
name = 'test6'
|
||||
fields_desc = [
|
||||
PacketListField(
|
||||
'data', [],
|
||||
next_cls_cb=lambda pkt, lst, p, remain: pkt.detect_next_packet(lst, p, remain)
|
||||
)
|
||||
]
|
||||
def detect_next_packet(self, lst, p, remain):
|
||||
if isinstance(p, type(None)):
|
||||
self._memory = [TestPLFH4] * 3 + [TestPLFH5]
|
||||
try:
|
||||
return self._memory.pop(0)
|
||||
except IndexError:
|
||||
return None
|
||||
|
||||
= Test heterogeneous PacketListField with memory
|
||||
~ field lengthfield
|
||||
|
||||
p = TestPLFH6('\x01\x02\x03\xc1\x02toto')
|
||||
assert(isinstance(p.data[0], TestPLFH4))
|
||||
assert(p.data[0].data == 0x1)
|
||||
assert(isinstance(p.data[1], TestPLFH4))
|
||||
assert(p.data[1].data == 0x2)
|
||||
assert(isinstance(p.data[2], TestPLFH4))
|
||||
assert(p.data[2].data == 0x3)
|
||||
assert(isinstance(p.data[3], TestPLFH5))
|
||||
assert(p.data[3].data == 0xc102)
|
||||
assert(isinstance(p.payload, conf.raw_layer))
|
||||
assert(p.payload.load == 'toto')
|
||||
|
||||
|
||||
############
|
||||
############
|
||||
+ Tests on MultiFlagsField
|
||||
|
|
Loading…
Reference in New Issue