- removed 4 byte IP string autorecognition. Never used and broken for 4 byte names

- added "islist" flag to fields to distinguish a list value from a list of values
- changed TCP options from dict to list to preserve order and redundancy
- added conf.except_filter, to have every command ignore your own traffic (BPF filter)
- worked in progress for nmap OS fingerprint. Added PU test. Fixed other tests.
- added nmap_sig2txt() to transform a signature to its text form, suitable for nmap base
This commit is contained in:
pbi 2003-04-24 10:47:49 +00:00
parent e2f65cb919
commit c4e8a38ce6
1 changed files with 124 additions and 30 deletions

154
scapy.py
View File

@ -2,7 +2,7 @@
#############################################################################
## ##
## scapy.py --- Low-Level network scanner ##
## scapy.py --- Interactive packet manipulation tool ##
## see http://www.cartel-securite.net/pbiondi/scapy.html ##
## for more informations ##
## ##
@ -22,6 +22,14 @@
#
# $Log: scapy.py,v $
# Revision 0.9.11.3 2003/04/24 12:47:49 pbi
# - removed 4 byte IP string autorecognition. Never used and broken for 4 byte names
# - added "islist" flag to fields to distinguish a list value from a list of values
# - changed TCP options from dict to list to preserve order and redundancy
# - added conf.except_filter, to have every command ignore your own traffic (BPF filter)
# - worked in progress for nmap OS fingerprint. Added PU test. Fixed other tests.
# - added nmap_sig2txt() to transform a signature to its text form, suitable for nmap base
#
# Revision 0.9.11.2 2003/04/23 21:23:30 pbi
# - small fixes in init_queso()
# - experimental support of nmap fingerprinting (not complete yet)
@ -184,7 +192,7 @@
from __future__ import generators
RCSID="$Id: scapy.py,v 0.9.11.2 2003/04/23 21:23:30 pbi Exp $"
RCSID="$Id: scapy.py,v 0.9.11.3 2003/04/24 12:47:49 pbi Exp $"
VERSION = RCSID.split()[2]+"beta"
@ -814,6 +822,7 @@ class Net(Gen):
############
class Field:
islist=0
def __init__(self, name, default, fmt="H"):
self.name = name
self.fmt = "!"+fmt
@ -856,6 +865,7 @@ class Field:
class MACField(Field):
def __init__(self, name, default):
Field.__init__(self, name, default, "6s")
@ -948,15 +958,13 @@ class IPField(Field):
except socket.error:
x = Net(x)
return x
def i2m(self, pkt, x):
return socket.inet_aton(x)
def m2i(self, pkt, x):
return socket.inet_ntoa(x)
def any2i(self, pkt, x):
if type(x) is str and len(x) == 4:
x = self.m2i(pkt, x)
# if type(x) is str and len(x) == 4:
# x = self.m2i(pkt, x)
return self.h2i(pkt,x)
def i2repr(self, pkt, x):
return self.i2h(pkt, x)
@ -1210,6 +1218,7 @@ TCPOptions = (
} )
class TCPOptionsField(StrField):
islist=1
def getfield(self, pkt, s):
opsz = (pkt.dataofs-5)*4
if opsz < 0:
@ -1217,13 +1226,13 @@ class TCPOptionsField(StrField):
opsz = 0
return s[opsz:],self.m2i(pkt,s[:opsz])
def m2i(self, pkt, x):
opt = {}
opt = []
while x:
onum = ord(x[0])
if onum == 0:
break
if onum == 1:
opt["NOP"] = ()
opt.append(("NOP",None))
x=x[1:]
continue
olen = ord(x[1])
@ -1234,16 +1243,15 @@ class TCPOptionsField(StrField):
oval = struct.unpack(ofmt, oval)
if len(oval) == 1:
oval = oval[0]
opt[oname] = oval
opt.append((oname, oval))
else:
opt[onum] = oval
opt.append((onum, oval))
x = x[olen:]
return opt
def i2m(self, pkt, x):
opt = ""
for oname in x:
oval = x[oname]
for oname,oval in x:
if type(oname) is str:
if oname == "NOP":
opt += "\x01"
@ -1633,7 +1641,10 @@ class Packet(Gen):
eltname = todo.pop()
elt = self.__getattr__(eltname)
if not isinstance(elt, Gen):
elt = SetGen(elt)
if self.fieldtype[eltname].islist:
elt = SetGen([elt])
else:
elt = SetGen(elt)
for e in elt:
done[eltname]=e
for x in loop(todo[:], done):
@ -2074,7 +2085,7 @@ class TCP(Packet):
if not ((self.sport == other.dport) and
(self.dport == other.sport)):
return 0
if (abs(other.seq-self.ack) >= 2):
if (abs(other.seq-self.ack) > 2):
return 0
return 1
@ -2390,6 +2401,11 @@ class L3PacketSocket(SuperSocket):
self.type = type
self.ins = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(type))
self.ins.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 2**30)
if conf.except_filter:
if filter:
filter = "(%s) and not (%s)" % (filter, conf.except_filter)
else:
filter = "not (%s)" % conf.except_filter
if filter is not None:
attach_filter(self.ins, filter)
self.outs = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(type))
@ -2448,6 +2464,11 @@ class L2Socket(SuperSocket):
if iface is None:
iface = conf.iff
self.ins = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(type))
if conf.except_filter:
if filter:
filter = "(%s) and not (%s)" % (filter, conf.except_filter)
else:
filter = "not (%s)" % conf.except_filter
if filter is not None:
attach_filter(self.ins, filter)
self.ins.bind((iface, type))
@ -2472,6 +2493,11 @@ class L2ListenSocket(SuperSocket):
self.ins = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(type))
if iface is not None:
self.ins.bind((iface, type))
if conf.except_filter:
if filter:
filter = "(%s) and not (%s)" % (filter, conf.except_filter)
else:
filter = "not (%s)" % conf.except_filter
if filter is not None:
attach_filter(self.ins, filter)
if promisc is None:
@ -2520,6 +2546,11 @@ if DNET and PCAP:
if iface is None:
iface = "any"
self.ins.open_live(iface, 1600, 0, 100)
if conf.except_filter:
if filter:
filter = "(%s) and not (%s)" % (filter, conf.except_filter)
else:
filter = "not (%s)" % conf.except_filter
if filter:
self.ins.setfilter(filter, 0, 0)
def send(self, x):
@ -2545,6 +2576,11 @@ if DNET and PCAP:
iface = conf.iff
self.ins = pcap.pcapObject()
self.ins.open_live(iface, 1600, 0, 100)
if conf.except_filter:
if filter:
filter = "(%s) and not (%s)" % (filter, conf.except_filter)
else:
filter = "not (%s)" % conf.except_filter
if filter:
self.ins.setfilter(filter, 0, 0)
self.outs = dnet.eth(iface)
@ -2572,6 +2608,11 @@ if PCAP:
promisc = conf.sniff_promisc
self.promisc = promisc
self.ins.open_live(iface, 1600, self.promisc, 100)
if conf.except_filter:
if filter:
filter = "(%s) and not (%s)" % (filter, conf.except_filter)
else:
filter = "not (%s)" % conf.except_filter
if filter:
self.ins.setfilter(filter, 0, 0)
@ -3092,7 +3133,7 @@ def TCPflags2str(f):
s=""
for i in range(len(fl)):
if f & 1:
s += fl[i]
s = fl[i]+s
f >>= 1
return s
@ -3104,7 +3145,7 @@ def nmap_packet_sig(pkt):
r["W"] = "%X" % pkt.window
r["ACK"] = pkt.ack==2 and "S++" or pkt.ack==1 and "S" or "O"
r["Flags"] = TCPflags2str(pkt.payload.flags)
r["Ops"] = "".join(map(lambda x: x[0],pkt.payload.options.keys()))
r["Ops"] = "".join(map(lambda x: x[0][0],pkt.payload.options))
else:
r["Resp"] = "N"
return r
@ -3112,19 +3153,20 @@ def nmap_packet_sig(pkt):
def nmap_match_one_sig(seen, ref):
c = 0
for k in seen.keys():
if ref.has_key(k) and seen[k] in ref[k].split("|"):
c += 1
if ref.has_key(k):
if seen[k] in ref[k].split("|"):
c += 1
return 1.0*c/len(seen.keys())
def nmap_sig(target, oport=80, cport=81):
def nmap_sig(target, oport=80, cport=81, ucport=1):
res = {}
tcpopt = { "WScale": 10,
"NOP":(),
"MSS": 256,
"Timestamp" : (123,0) }
tcpopt = [ ("WScale", 10),
("NOP",None),
("MSS", 256),
("Timestamp",(123,0)) ]
tests = { "T1": IP(dst=target)/TCP(seq=1, dport=oport, options=tcpopt, flags="CS"),
"T2": IP(dst=target)/TCP(seq=1, dport=oport, options=tcpopt, flags=0),
"T3": IP(dst=target)/TCP(seq=1, dport=oport, options=tcpopt, flags="SFUP"),
@ -3134,25 +3176,75 @@ def nmap_sig(target, oport=80, cport=81):
"T7": IP(dst=target)/TCP(seq=1, dport=cport, options=tcpopt, flags="FPU") }
for t in tests.keys():
T = sr1(tests[t], timeout=2)
T = sr1(tests[t], timeout=2, verbose=2)
if T is not None and T.haslayer(ICMP):
warning("Test %s answered by an ICMP" % t)
T=None
res[t] = nmap_packet_sig(T)
# Assemble then dissect to calculate checksums
PU = IP(str(IP(dst=target)/UDP(dport=ucport)/(300*"i")))
T = sr1(PU, verbose=2, timeout=2)
r={}
if T is None:
r["Resp"] = "N"
else:
r["DF"] = (T.flags & 2) and "Y" or "N"
r["TOS"] = "%X" % T.tos
r["IPLEN"] = "%X" % T.len
r["RIPTL"] = "%X" % T.payload.payload.len
r["RID"] = PU.id == T.payload.payload.id and "E" or "F"
r["RIPCK"] = PU.chksum == T.getlayer(IPerror).chksum and "E" or T.getlayer(IPerror).chksum == 0 and "0" or "F"
r["UCK"] = PU.payload.chksum == T.getlayer(UDPerror).chksum and "E" or T.getlayer(UDPerror).chksum ==0 and "0" or "F"
r["ULEN"] = "%X" % T.getlayer(UDPerror).len
r["DAT"] = T.getlayer(Raw) is None and "E" or PU.getlayer(Raw).load == T.getlayer(Raw).load and "E" or "F"
res["PU"] = r
return res
def nmap_fp(target, oport=80, cport=81):
sigs = nmap_sig(target, oport, cport)
def nmap_search(sigs):
guess = 0,[]
for os,fp in nmap_base:
c = 0
c = 0.0
for t in sigs.keys():
c += nmap_match_one_sig(sigs[t], fp[t])
if t in fp:
c += nmap_match_one_sig(sigs[t], fp[t])
c /= len(sigs.keys())
if c > guess[0]:
guess = c,[ os ]
elif c == guess[0]:
guess[1].append(os)
return guess
def nmap_fp(target, oport=80, cport=81):
sigs = nmap_sig(target, oport, cport)
return nmap_search(sigs)
def nmap_sig2txt(sig):
torder = ["TSeq","T1","T2","T3","T4","T5","T6","T7","PU"]
korder = ["Class", "gcd", "SI", "IPID", "TS",
"Resp", "DF", "W", "ACK", "Flags", "Ops",
"TOS", "IPLEN", "RIPTL", "RID", "RIPCK", "UCK", "ULEN", "DAT" ]
txt=[]
for i in sig.keys():
if i not in torder:
torder.append(i)
for t in torder:
sl = sig.get(t)
if sl is None:
continue
s = []
for k in korder:
v = sl.get(k)
if v is None:
continue
s.append("%s=%s"%(k,v))
txt.append("%s(%s)" % (t, "%".join(s)))
return "\n".join(txt)
@ -3444,6 +3536,7 @@ sniff_promisc : default mode for sniff()
filter : bpf filter added to every sniffing socket to exclude traffic from analysis
histfile : history file
padding : include padding in desassembled packets
except_filter : BPF filter for packets to ignore
"""
session = ""
stealth = "not implemented"
@ -3460,6 +3553,7 @@ padding : include padding in desassembled packets
p0f_base ="/etc/p0f.fp"
queso_base ="/etc/queso.conf"
nmap_base ="/usr/share/nmap/nmap-os-fingerprints"
except_filter = ""
conf=Conf()