Cleanup Nmap module, add tests

This commit is contained in:
Pierre LALET 2017-04-28 23:17:55 +02:00
parent 359b09301f
commit ddc4359b86
2 changed files with 160 additions and 125 deletions

View File

@ -3,24 +3,35 @@
## Copyright (C) Philippe Biondi <phil@secdev.org>
## This program is published under a GPLv2 license
"""
Clone of Nmap's first generation OS fingerprinting.
"""Clone of Nmap's first generation OS fingerprinting.
This code works with the first-generation OS detection and
nmap-os-fingerprints, which has been removed from Nmap on November 3,
2007 (https://github.com/nmap/nmap/commit/50c49819), which means it is
outdated.
To get the last published version of this outdated fingerprint
database, you can fetch it from
<https://raw.githubusercontent.com/nmap/nmap/9efe1892/nmap-os-fingerprints>.
"""
import os
import re
from scapy.data import KnowledgeBase
from scapy.config import conf
from scapy.arch import WINDOWS
from scapy.error import warning
from scapy.layers.inet import IP, TCP, UDP, ICMP, UDPerror, IPerror
from scapy.packet import NoPayload
from scapy.sendrecv import sr
if WINDOWS:
conf.nmap_base=os.environ["ProgramFiles"] + "\\nmap\\nmap-os-fingerprints"
conf.nmap_base = os.environ["ProgramFiles"] + "\\nmap\\nmap-os-fingerprints"
else:
conf.nmap_base ="/usr/share/nmap/nmap-os-fingerprints"
conf.nmap_base = "/usr/share/nmap/nmap-os-fingerprints"
######################
@ -28,159 +39,159 @@ else:
######################
_NMAP_LINE = re.compile('^([^\\(]*)\\(([^\\)]*)\\)$')
class NmapKnowledgeBase(KnowledgeBase):
"""A KnowledgeBase specialized in Nmap first-generation OS
fingerprints database. Loads from conf.nmap_base when self.filename is
None.
"""
def lazy_init(self):
try:
f=open(self.filename)
except IOError:
fdesc = open(conf.nmap_base
if self.filename is None else
self.filename)
except (IOError, TypeError):
warning("Cannot open nmap database [%s]" % self.filename)
return
self.base = []
name = None
sig = {}
try:
for l in f:
l = l.strip()
if not l or l[0] == "#":
for line in fdesc:
line = line.split('#', 1)[0].strip()
if not line:
continue
if l[:12] == "Fingerprint ":
if line.startswith("Fingerprint "):
if name is not None:
self.base.append((name,sig))
name = l[12:].strip()
sig={}
p = self.base
self.base.append((name, sig))
name = line[12:].strip()
sig = {}
continue
elif l[:6] == "Class ":
if line.startswith("Class "):
continue
op = l.find("(")
cl = l.find(")")
if op < 0 or cl < 0:
warning("error reading nmap os fp base file")
line = _NMAP_LINE.search(line)
if line is None:
continue
test = l[:op]
s = map(lambda x: x.split("="), l[op+1:cl].split("%"))
si = {}
for n,v in s:
si[n] = v
sig[test]=si
test, values = line.groups()
sig[test] = dict(val.split('=', 1) for val in
(values.split('%') if values else []))
if name is not None:
self.base.append((name,sig))
except:
self.base.append((name, sig))
except Exception:
self.base = None
warning("Can't read nmap database [%s](new nmap version ?)" % self.filename)
f.close()
warning("Cannot read nmap database [%s](new nmap version ?)" %
self.filename)
fdesc.close()
nmap_kdb = NmapKnowledgeBase(conf.nmap_base)
def TCPflags2str(f):
fl="FSRPAUEC"
s=""
for fli in fl:
if f & 1:
s = fli + s
f >>= 1
return s
nmap_kdb = NmapKnowledgeBase(None)
def nmap_tcppacket_sig(pkt):
r = {}
res = {}
if pkt is not None:
# r["Resp"] = "Y"
r["DF"] = (pkt.flags & 2) and "Y" or "N"
r["W"] = "%X" % pkt.window
r["ACK"] = pkt.ack==2 and "S++" or pkt.ack==1 and "S" or "O"
r["Flags"] = TCPflags2str(pkt.payload.flags)
r["Ops"] = "".join(map(lambda x: x[0][0],pkt.payload.options))
res["DF"] = "Y" if pkt.flags.DF else "N"
res["W"] = "%X" % pkt.window
res["ACK"] = "S++" if pkt.ack == 2 else "S" if pkt.ack == 1 else "O"
res["Flags"] = pkt[TCP].flags.flagrepr()[::-1]
res["Ops"] = "".join(x[0][0] for x in pkt[TCP].options)
else:
r["Resp"] = "N"
return r
res["Resp"] = "N"
return res
def nmap_udppacket_sig(S,T):
r={}
if T is None:
r["Resp"] = "N"
def nmap_udppacket_sig(snd, rcv):
res = {}
if rcv is None:
res["Resp"] = "N"
else:
r["DF"] = (T.flags & 2) and "Y" or "N"
r["TOS"] = "%X" % T.tos
r["IPLEN"] = "%X" % T.len
r["RIPTL"] = "%X" % T.payload.payload.len
r["RID"] = S.id == T.payload.payload.id and "E" or "F"
r["RIPCK"] = S.chksum == T.getlayer(IPerror).chksum and "E" or T.getlayer(IPerror).chksum == 0 and "0" or "F"
r["UCK"] = S.payload.chksum == T.getlayer(UDPerror).chksum and "E" or T.getlayer(UDPerror).chksum ==0 and "0" or "F"
r["ULEN"] = "%X" % T.getlayer(UDPerror).len
r["DAT"] = T.getlayer(conf.raw_layer) is None and "E" or S.getlayer(conf.raw_layer).load == T.getlayer(conf.raw_layer).load and "E" or "F"
return r
res["DF"] = "Y" if rcv.flags.DF else "N"
res["TOS"] = "%X" % rcv.tos
res["IPLEN"] = "%X" % rcv.len
res["RIPTL"] = "%X" % rcv.payload.payload.len
res["RID"] = "E" if snd.id == rcv[IPerror].id else "F"
res["RIPCK"] = "E" if snd.chksum == rcv[IPerror].chksum else (
"0" if rcv[IPerror].chksum == 0 else "F"
)
res["UCK"] = "E" if snd.payload.chksum == rcv[UDPerror].chksum else (
"0" if rcv[UDPerror].chksum == 0 else "F"
)
res["ULEN"] = "%X" % rcv[UDPerror].len
res["DAT"] = "E" if (
isinstance(rcv[UDPerror].payload, NoPayload) or
str(rcv[UDPerror].payload) == str(snd[UDP].payload)
) else "F"
return res
def nmap_match_one_sig(seen, ref):
c = 0
for k, v in seen.iteritems():
if k in ref:
if v in ref[k].split("|"):
c += 1
if c == 0 and seen.get("Resp") == "N":
cnt = sum(val in ref.get(key, "").split("|")
for key, val in seen.iteritems())
if cnt == 0 and seen.get("Resp") == "N":
return 0.7
else:
return float(c) / len(seen)
return float(cnt) / len(seen)
def nmap_sig(target, oport=80, cport=81, ucport=1):
res = {}
tcpopt = [ ("WScale", 10),
("NOP",None),
("MSS", 256),
("Timestamp",(123,0)) ]
tests = [ IP(dst=target, id=1)/TCP(seq=1, sport=5001, dport=oport, options=tcpopt, flags="CS"),
IP(dst=target, id=1)/TCP(seq=1, sport=5002, dport=oport, options=tcpopt, flags=0),
IP(dst=target, id=1)/TCP(seq=1, sport=5003, dport=oport, options=tcpopt, flags="SFUP"),
IP(dst=target, id=1)/TCP(seq=1, sport=5004, dport=oport, options=tcpopt, flags="A"),
IP(dst=target, id=1)/TCP(seq=1, sport=5005, dport=cport, options=tcpopt, flags="S"),
IP(dst=target, id=1)/TCP(seq=1, sport=5006, dport=cport, options=tcpopt, flags="A"),
IP(dst=target, id=1)/TCP(seq=1, sport=5007, dport=cport, options=tcpopt, flags="FPU"),
IP(str(IP(dst=target)/UDP(sport=5008,dport=ucport)/(300*"i"))) ]
tcpopt = [("WScale", 10),
("NOP", None),
("MSS", 256),
("Timestamp", (123, 0))]
tests = [
IP(dst=target, id=1) /
TCP(seq=1, sport=5001 + i, dport=oport if i < 4 else cport,
options=tcpopt, flags=flags)
for i, flags in enumerate(["CS", "", "SFUP", "A", "S", "A", "FPU"])
]
tests.append(IP(dst=target)/UDP(sport=5008, dport=ucport)/(300 * "i"))
ans, unans = sr(tests, timeout=2)
ans += map(lambda x: (x,None), unans)
ans.extend((x, None) for x in unans)
for S,T in ans:
if S.sport == 5008:
res["PU"] = nmap_udppacket_sig(S,T)
for snd, rcv in ans:
if snd.sport == 5008:
res["PU"] = nmap_udppacket_sig(snd, rcv)
else:
t = "T%i" % (S.sport-5000)
if T is not None and T.haslayer(ICMP):
warning("Test %s answered by an ICMP" % t)
T=None
res[t] = nmap_tcppacket_sig(T)
test = "T%i" % (snd.sport - 5000)
if rcv is not None and ICMP in rcv:
warning("Test %s answered by an ICMP" % test)
rcv = None
res[test] = nmap_tcppacket_sig(rcv)
return res
def nmap_probes2sig(tests):
tests=tests.copy()
tests = tests.copy()
res = {}
if "PU" in tests:
res["PU"] = nmap_udppacket_sig(*tests["PU"])
del(tests["PU"])
del tests["PU"]
for k in tests:
res[k] = nmap_tcppacket_sig(tests[k])
return res
def nmap_search(sigs):
guess = 0,[]
for os,fp in nmap_kdb.get_base():
c = 0.0
for t, v in sigs.itervalues():
if t in fp:
c += nmap_match_one_sig(v, fp[t])
c /= len(sigs)
if c > guess[0]:
guess = c,[ os ]
elif c == guess[0]:
guess[1].append(os)
guess = 0, []
for osval, fprint in nmap_kdb.get_base():
score = 0.0
for test, values in fprint.iteritems():
if test in sigs:
score += nmap_match_one_sig(sigs[test], values)
score /= len(sigs)
if score > guess[0]:
guess = score, [osval]
elif score == guess[0]:
guess[1].append(osval)
return guess
@conf.commands.register
def nmap_fp(target, oport=80, cport=81):
"""nmap fingerprinting
@ -188,31 +199,23 @@ nmap_fp(target, [oport=80,] [cport=81,]) -> list of best guesses with accuracy
"""
sigs = nmap_sig(target, oport, cport)
return nmap_search(sigs)
@conf.commands.register
def nmap_sig2txt(sig):
torder = ["TSeq","T1","T2","T3","T4","T5","T6","T7","PU"]
torder = ["TSeq", "T1", "T2", "T3", "T4", "T5", "T6", "T7", "PU"]
korder = ["Class", "gcd", "SI", "IPID", "TS",
"Resp", "DF", "W", "ACK", "Flags", "Ops",
"TOS", "IPLEN", "RIPTL", "RID", "RIPCK", "UCK", "ULEN", "DAT" ]
txt=[]
"TOS", "IPLEN", "RIPTL", "RID", "RIPCK", "UCK", "ULEN", "DAT"]
txt = []
for i in sig:
if i not in torder:
torder.append(i)
for t in torder:
sl = sig.get(t)
if sl is None:
for test in torder:
testsig = sig.get(test)
if testsig is None:
continue
s = []
for k in korder:
v = sl.get(k)
if v is None:
continue
s.append("%s=%s"%(k,v))
txt.append("%s(%s)" % (t, "%".join(s)))
txt.append("%s(%s)" % (test, "%".join(
"%s=%s" % (key, testsig[key]) for key in korder if key in testsig
)))
return "\n".join(txt)

32
test/nmap.uts Normal file
View File

@ -0,0 +1,32 @@
% Regression tests for Scapy Nmap module
~ nmap
############
############
+ Basic Nmap OS fingerprints tests
= Module loading
load_module('nmap')
= Fetch database
import urllib
open('nmap-os-fingerprints', 'w').write(urllib.urlopen('https://raw.githubusercontent.com/nmap/nmap/9efe1892/nmap-os-fingerprints').read())
conf.nmap_base = 'nmap-os-fingerprints'
= Database loading
assert len(nmap_kdb.get_base()) > 100
= fingerprint test: www.secdev.org
~ netaccess
score, fprint = nmap_fp('www.secdev.org')
print score, fprint
assert score > 0.5
assert fprint
= fingerprint test: gateway
~ netaccess
score, fprint = nmap_fp(conf.route.route('0.0.0.0')[2])
print score, fprint
assert score > 0.5
assert fprint