Add p0f tests + PY3 fixes

This commit is contained in:
gpotter2 2018-01-21 00:19:13 +01:00
parent aa484a7166
commit d1ebdb9246
2 changed files with 71 additions and 8 deletions

View File

@ -17,6 +17,7 @@ import random
from scapy.data import KnowledgeBase
from scapy.config import conf
from scapy.compat import raw
from scapy.layers.inet import IP, TCP, TCPOptions
from scapy.packet import NoPayload, Packet
from scapy.error import warning, Scapy_Exception, log_runtime
@ -83,10 +84,16 @@ class p0fKnowledgeBase(KnowledgeBase):
self.base = None
f.close()
p0f_kdb = p0fKnowledgeBase(conf.p0f_base)
p0fa_kdb = p0fKnowledgeBase(conf.p0fa_base)
p0fr_kdb = p0fKnowledgeBase(conf.p0fr_base)
p0fo_kdb = p0fKnowledgeBase(conf.p0fo_base)
p0f_kdb, p0fa_kdb, p0fr_kdb, p0fo_kdb = None, None, None, None
def p0f_load_knowledgebases():
global p0f_kdb, p0fa_kdb, p0fr_kdb, p0fo_kdb
p0f_kdb = p0fKnowledgeBase(conf.p0f_base)
p0fa_kdb = p0fKnowledgeBase(conf.p0fa_base)
p0fr_kdb = p0fKnowledgeBase(conf.p0fr_base)
p0fo_kdb = p0fKnowledgeBase(conf.p0fo_base)
p0f_load_knowledgebases()
def p0f_selectdb(flags):
# tested flags: S, R, A
@ -107,7 +114,7 @@ def p0f_selectdb(flags):
def packet2p0f(pkt):
pkt = pkt.copy()
pkt = pkt.__class__(str(pkt))
pkt = pkt.__class__(raw(pkt))
while pkt.haslayer(IP) and pkt.haslayer(TCP):
pkt = pkt.getlayer(IP)
if isinstance(pkt.payload, TCP):
@ -127,7 +134,6 @@ def packet2p0f(pkt):
#ttl=t[t.index(pkt.ttl)+1]
ttl = pkt.ttl
df = (pkt.flags & 2) / 2
ss = len(pkt)
# from p0f/config.h : PACKET_BIG = 100
if ss > 100:
@ -243,7 +249,7 @@ def packet2p0f(pkt):
if qq == "":
qq = "."
return (db, (win, ttl, df, ss, ooo, qq))
return (db, (win, ttl, pkt.flags.DF, ss, ooo, qq))
def p0f_correl(x,y):
d = 0
@ -297,6 +303,7 @@ p0f(packet) -> accuracy, [list of guesses]
return r
def prnp0f(pkt):
"""Calls p0f and returns a user-friendly output"""
# we should print which DB we use
try:
r = p0f(pkt)
@ -351,7 +358,7 @@ specified (as a tuple), we use the signature.
For now, only TCP Syn packets are supported.
Some specifications of the p0f.fp file are not (yet) implemented."""
pkt = pkt.copy()
#pkt = pkt.__class__(str(pkt))
#pkt = pkt.__class__(raw(pkt))
while pkt.haslayer(IP) and pkt.haslayer(TCP):
pkt = pkt.getlayer(IP)
if isinstance(pkt.payload, TCP):

View File

@ -10,6 +10,48 @@
= Module loading
load_module('p0f')
= Fetch database
from __future__ import print_function
try:
from urllib.request import urlopen
except ImportError:
from urllib2 import urlopen
def _load_database(file):
for i in range(10):
try:
open(file, 'wb').write(urlopen('https://raw.githubusercontent.com/p0f/p0f/4b4d1f384abebbb9b1b25b8f3c6df5ad7ab365f7/' + file).read())
break
except:
raise
pass
_load_database("p0f.fp")
conf.p0f_base = "p0f.fp"
_load_database("p0fa.fp")
conf.p0fa_base = "p0fa.fp"
_load_database("p0fr.fp")
conf.p0fr_base = "p0fr.fp"
_load_database("p0fo.fp")
conf.p0fo_base = "p0fo.fp"
p0f_load_knowledgebases()
############
############
+ Default tests
= Test p0f
pkt = Ether(b'\x14\x0cv\x8f\xfe(\xd0P\x99V\xdd\xf9\x08\x00E\x00\x0045+@\x00\x80\x06\x00\x00\xc0\xa8\x00w(M\xe2\xf9\xda\xcb\x01\xbbcc\xdd\x1e\x00\x00\x00\x00\x80\x02\xfa\xf0\xcc\x8c\x00\x00\x02\x04\x05\xb4\x01\x03\x03\x08\x01\x01\x04\x02')
assert p0f(pkt) == [('@Windows', 'XP/2000 (RFC1323+, w+, tstamp-)', 0)]
= Test prnp0f
with ContextManagerCaptureOutput() as cmco:
prnp0f(pkt)
assert cmco.get_output() == '192.168.0.119:56011 - @Windows XP/2000 (RFC1323+, w+, tstamp-)\n -> 40.77.226.249:https (S) (distance 0)\n'
############
############
@ -60,3 +102,17 @@ sig = ('S4', 64, 1, 60, 'M*,T', 'T', 'Phony Sys', '1.0')
opts = [('Timestamp', (54321, 0))]
pkt = p0f_impersonate(IP()/TCP(options=opts), signature=sig)
assert pkt.payload.options[1][1][1] > 0
+ Clear temp files
= Remove fp files
def _rem(f):
try:
os.remove(f)
except:
pass
_rem("p0f.fp")
_rem("p0fa.fp")
_rem("p0fr.fp")
_rem("p0fo.fp")