From ddc4359b86642cb24a88228be598d19fee2496fb Mon Sep 17 00:00:00 2001 From: Pierre LALET Date: Fri, 28 Apr 2017 23:17:55 +0200 Subject: [PATCH] Cleanup Nmap module, add tests --- scapy/modules/nmap.py | 253 +++++++++++++++++++++--------------------- test/nmap.uts | 32 ++++++ 2 files changed, 160 insertions(+), 125 deletions(-) create mode 100644 test/nmap.uts diff --git a/scapy/modules/nmap.py b/scapy/modules/nmap.py index 0eeafdbda..b879af7cb 100644 --- a/scapy/modules/nmap.py +++ b/scapy/modules/nmap.py @@ -3,24 +3,35 @@ ## Copyright (C) Philippe Biondi ## This program is published under a GPLv2 license -""" -Clone of Nmap's first generation OS fingerprinting. +"""Clone of Nmap's first generation OS fingerprinting. + +This code works with the first-generation OS detection and +nmap-os-fingerprints, which has been removed from Nmap on November 3, +2007 (https://github.com/nmap/nmap/commit/50c49819), which means it is +outdated. + +To get the last published version of this outdated fingerprint +database, you can fetch it from +. + """ import os +import re from scapy.data import KnowledgeBase from scapy.config import conf from scapy.arch import WINDOWS from scapy.error import warning from scapy.layers.inet import IP, TCP, UDP, ICMP, UDPerror, IPerror +from scapy.packet import NoPayload from scapy.sendrecv import sr if WINDOWS: - conf.nmap_base=os.environ["ProgramFiles"] + "\\nmap\\nmap-os-fingerprints" + conf.nmap_base = os.environ["ProgramFiles"] + "\\nmap\\nmap-os-fingerprints" else: - conf.nmap_base ="/usr/share/nmap/nmap-os-fingerprints" + conf.nmap_base = "/usr/share/nmap/nmap-os-fingerprints" ###################### @@ -28,159 +39,159 @@ else: ###################### +_NMAP_LINE = re.compile('^([^\\(]*)\\(([^\\)]*)\\)$') + + class NmapKnowledgeBase(KnowledgeBase): + """A KnowledgeBase specialized in Nmap first-generation OS +fingerprints database. Loads from conf.nmap_base when self.filename is +None. + + """ def lazy_init(self): try: - f=open(self.filename) - except IOError: + fdesc = open(conf.nmap_base + if self.filename is None else + self.filename) + except (IOError, TypeError): + warning("Cannot open nmap database [%s]" % self.filename) return self.base = [] name = None + sig = {} try: - for l in f: - l = l.strip() - if not l or l[0] == "#": + for line in fdesc: + line = line.split('#', 1)[0].strip() + if not line: continue - if l[:12] == "Fingerprint ": + if line.startswith("Fingerprint "): if name is not None: - self.base.append((name,sig)) - name = l[12:].strip() - sig={} - p = self.base + self.base.append((name, sig)) + name = line[12:].strip() + sig = {} continue - elif l[:6] == "Class ": + if line.startswith("Class "): continue - op = l.find("(") - cl = l.find(")") - if op < 0 or cl < 0: - warning("error reading nmap os fp base file") + line = _NMAP_LINE.search(line) + if line is None: continue - test = l[:op] - s = map(lambda x: x.split("="), l[op+1:cl].split("%")) - si = {} - for n,v in s: - si[n] = v - sig[test]=si + test, values = line.groups() + sig[test] = dict(val.split('=', 1) for val in + (values.split('%') if values else [])) if name is not None: - self.base.append((name,sig)) - except: + self.base.append((name, sig)) + except Exception: self.base = None - warning("Can't read nmap database [%s](new nmap version ?)" % self.filename) - f.close() + warning("Cannot read nmap database [%s](new nmap version ?)" % + self.filename) + fdesc.close() -nmap_kdb = NmapKnowledgeBase(conf.nmap_base) -def TCPflags2str(f): - fl="FSRPAUEC" - s="" - for fli in fl: - if f & 1: - s = fli + s - f >>= 1 - return s +nmap_kdb = NmapKnowledgeBase(None) + def nmap_tcppacket_sig(pkt): - r = {} + res = {} if pkt is not None: -# r["Resp"] = "Y" - r["DF"] = (pkt.flags & 2) and "Y" or "N" - r["W"] = "%X" % pkt.window - r["ACK"] = pkt.ack==2 and "S++" or pkt.ack==1 and "S" or "O" - r["Flags"] = TCPflags2str(pkt.payload.flags) - r["Ops"] = "".join(map(lambda x: x[0][0],pkt.payload.options)) + res["DF"] = "Y" if pkt.flags.DF else "N" + res["W"] = "%X" % pkt.window + res["ACK"] = "S++" if pkt.ack == 2 else "S" if pkt.ack == 1 else "O" + res["Flags"] = pkt[TCP].flags.flagrepr()[::-1] + res["Ops"] = "".join(x[0][0] for x in pkt[TCP].options) else: - r["Resp"] = "N" - return r + res["Resp"] = "N" + return res -def nmap_udppacket_sig(S,T): - r={} - if T is None: - r["Resp"] = "N" +def nmap_udppacket_sig(snd, rcv): + res = {} + if rcv is None: + res["Resp"] = "N" else: - r["DF"] = (T.flags & 2) and "Y" or "N" - r["TOS"] = "%X" % T.tos - r["IPLEN"] = "%X" % T.len - r["RIPTL"] = "%X" % T.payload.payload.len - r["RID"] = S.id == T.payload.payload.id and "E" or "F" - r["RIPCK"] = S.chksum == T.getlayer(IPerror).chksum and "E" or T.getlayer(IPerror).chksum == 0 and "0" or "F" - r["UCK"] = S.payload.chksum == T.getlayer(UDPerror).chksum and "E" or T.getlayer(UDPerror).chksum ==0 and "0" or "F" - r["ULEN"] = "%X" % T.getlayer(UDPerror).len - r["DAT"] = T.getlayer(conf.raw_layer) is None and "E" or S.getlayer(conf.raw_layer).load == T.getlayer(conf.raw_layer).load and "E" or "F" - return r - + res["DF"] = "Y" if rcv.flags.DF else "N" + res["TOS"] = "%X" % rcv.tos + res["IPLEN"] = "%X" % rcv.len + res["RIPTL"] = "%X" % rcv.payload.payload.len + res["RID"] = "E" if snd.id == rcv[IPerror].id else "F" + res["RIPCK"] = "E" if snd.chksum == rcv[IPerror].chksum else ( + "0" if rcv[IPerror].chksum == 0 else "F" + ) + res["UCK"] = "E" if snd.payload.chksum == rcv[UDPerror].chksum else ( + "0" if rcv[UDPerror].chksum == 0 else "F" + ) + res["ULEN"] = "%X" % rcv[UDPerror].len + res["DAT"] = "E" if ( + isinstance(rcv[UDPerror].payload, NoPayload) or + str(rcv[UDPerror].payload) == str(snd[UDP].payload) + ) else "F" + return res def nmap_match_one_sig(seen, ref): - c = 0 - for k, v in seen.iteritems(): - if k in ref: - if v in ref[k].split("|"): - c += 1 - if c == 0 and seen.get("Resp") == "N": + cnt = sum(val in ref.get(key, "").split("|") + for key, val in seen.iteritems()) + if cnt == 0 and seen.get("Resp") == "N": return 0.7 - else: - return float(c) / len(seen) + return float(cnt) / len(seen) def nmap_sig(target, oport=80, cport=81, ucport=1): res = {} - tcpopt = [ ("WScale", 10), - ("NOP",None), - ("MSS", 256), - ("Timestamp",(123,0)) ] - tests = [ IP(dst=target, id=1)/TCP(seq=1, sport=5001, dport=oport, options=tcpopt, flags="CS"), - IP(dst=target, id=1)/TCP(seq=1, sport=5002, dport=oport, options=tcpopt, flags=0), - IP(dst=target, id=1)/TCP(seq=1, sport=5003, dport=oport, options=tcpopt, flags="SFUP"), - IP(dst=target, id=1)/TCP(seq=1, sport=5004, dport=oport, options=tcpopt, flags="A"), - IP(dst=target, id=1)/TCP(seq=1, sport=5005, dport=cport, options=tcpopt, flags="S"), - IP(dst=target, id=1)/TCP(seq=1, sport=5006, dport=cport, options=tcpopt, flags="A"), - IP(dst=target, id=1)/TCP(seq=1, sport=5007, dport=cport, options=tcpopt, flags="FPU"), - IP(str(IP(dst=target)/UDP(sport=5008,dport=ucport)/(300*"i"))) ] + tcpopt = [("WScale", 10), + ("NOP", None), + ("MSS", 256), + ("Timestamp", (123, 0))] + tests = [ + IP(dst=target, id=1) / + TCP(seq=1, sport=5001 + i, dport=oport if i < 4 else cport, + options=tcpopt, flags=flags) + for i, flags in enumerate(["CS", "", "SFUP", "A", "S", "A", "FPU"]) + ] + tests.append(IP(dst=target)/UDP(sport=5008, dport=ucport)/(300 * "i")) ans, unans = sr(tests, timeout=2) - ans += map(lambda x: (x,None), unans) + ans.extend((x, None) for x in unans) - for S,T in ans: - if S.sport == 5008: - res["PU"] = nmap_udppacket_sig(S,T) + for snd, rcv in ans: + if snd.sport == 5008: + res["PU"] = nmap_udppacket_sig(snd, rcv) else: - t = "T%i" % (S.sport-5000) - if T is not None and T.haslayer(ICMP): - warning("Test %s answered by an ICMP" % t) - T=None - res[t] = nmap_tcppacket_sig(T) + test = "T%i" % (snd.sport - 5000) + if rcv is not None and ICMP in rcv: + warning("Test %s answered by an ICMP" % test) + rcv = None + res[test] = nmap_tcppacket_sig(rcv) return res def nmap_probes2sig(tests): - tests=tests.copy() + tests = tests.copy() res = {} if "PU" in tests: res["PU"] = nmap_udppacket_sig(*tests["PU"]) - del(tests["PU"]) + del tests["PU"] for k in tests: res[k] = nmap_tcppacket_sig(tests[k]) return res - + def nmap_search(sigs): - guess = 0,[] - for os,fp in nmap_kdb.get_base(): - c = 0.0 - for t, v in sigs.itervalues(): - if t in fp: - c += nmap_match_one_sig(v, fp[t]) - c /= len(sigs) - if c > guess[0]: - guess = c,[ os ] - elif c == guess[0]: - guess[1].append(os) + guess = 0, [] + for osval, fprint in nmap_kdb.get_base(): + score = 0.0 + for test, values in fprint.iteritems(): + if test in sigs: + score += nmap_match_one_sig(sigs[test], values) + score /= len(sigs) + if score > guess[0]: + guess = score, [osval] + elif score == guess[0]: + guess[1].append(osval) return guess - - + + @conf.commands.register def nmap_fp(target, oport=80, cport=81): """nmap fingerprinting @@ -188,31 +199,23 @@ nmap_fp(target, [oport=80,] [cport=81,]) -> list of best guesses with accuracy """ sigs = nmap_sig(target, oport, cport) return nmap_search(sigs) - + @conf.commands.register def nmap_sig2txt(sig): - torder = ["TSeq","T1","T2","T3","T4","T5","T6","T7","PU"] + torder = ["TSeq", "T1", "T2", "T3", "T4", "T5", "T6", "T7", "PU"] korder = ["Class", "gcd", "SI", "IPID", "TS", "Resp", "DF", "W", "ACK", "Flags", "Ops", - "TOS", "IPLEN", "RIPTL", "RID", "RIPCK", "UCK", "ULEN", "DAT" ] - txt=[] + "TOS", "IPLEN", "RIPTL", "RID", "RIPCK", "UCK", "ULEN", "DAT"] + txt = [] for i in sig: if i not in torder: torder.append(i) - for t in torder: - sl = sig.get(t) - if sl is None: + for test in torder: + testsig = sig.get(test) + if testsig is None: continue - s = [] - for k in korder: - v = sl.get(k) - if v is None: - continue - s.append("%s=%s"%(k,v)) - txt.append("%s(%s)" % (t, "%".join(s))) + txt.append("%s(%s)" % (test, "%".join( + "%s=%s" % (key, testsig[key]) for key in korder if key in testsig + ))) return "\n".join(txt) - - - - diff --git a/test/nmap.uts b/test/nmap.uts new file mode 100644 index 000000000..7dfcadc43 --- /dev/null +++ b/test/nmap.uts @@ -0,0 +1,32 @@ +% Regression tests for Scapy Nmap module + +~ nmap + +############ +############ ++ Basic Nmap OS fingerprints tests + += Module loading +load_module('nmap') + += Fetch database +import urllib +open('nmap-os-fingerprints', 'w').write(urllib.urlopen('https://raw.githubusercontent.com/nmap/nmap/9efe1892/nmap-os-fingerprints').read()) +conf.nmap_base = 'nmap-os-fingerprints' + += Database loading +assert len(nmap_kdb.get_base()) > 100 + += fingerprint test: www.secdev.org +~ netaccess +score, fprint = nmap_fp('www.secdev.org') +print score, fprint +assert score > 0.5 +assert fprint + += fingerprint test: gateway +~ netaccess +score, fprint = nmap_fp(conf.route.route('0.0.0.0')[2]) +print score, fprint +assert score > 0.5 +assert fprint