Better unit tests for

This commit is contained in:
Guillaume Valadon 2017-02-14 10:48:49 +01:00 committed by Guillaume Valadon
parent 4d8b461e69
commit caff8ac253
2 changed files with 184 additions and 22 deletions

View File

@ -70,12 +70,12 @@ def get_cls(name, fallback_cls):
conf.netcache.new_cache("in6_neighbor", 120)
def neighsol(addr, src, iface, timeout=1, chainCC=0):
Sends an ICMPv6 Neighbor Solicitation message to get the MAC address
of the neighbor with specified IPv6 address addr. 'src' address is
used as source of the message. Message is sent on iface. By default,
timeout waiting for an answer is 1 second.
"""Sends an ICMPv6 Neighbor Solicitation message to get the MAC address of the neighbor with specified IPv6 address addr
'src' address is used as source of the message. Message is sent on iface.
By default, timeout waiting for an answer is 1 second.
If no answer is gathered, None is returned. Else, the answer is
returned (ethernet frame).
@ -92,9 +92,10 @@ def neighsol(addr, src, iface, timeout=1, chainCC=0):
return res
def getmacbyip6(ip6, chainCC=0):
Returns the mac address to be used for provided 'ip6' peer.
"""Returns the MAC address corresponding to an IPv6 address
neighborCache.get() method is used on instantiated neighbor cache.
Resolution mechanism is described in associated doc string.
@ -1972,8 +1973,9 @@ class NonceField(StrFixedLenField):
if default is None:
self.default = self.randval()
# Compute the NI group Address. Can take a FQDN as input parameter
def computeNIGroupAddr(name):
"""Compute the NI group Address. Can take a FQDN as input parameter"""
import md5
name = name.lower().split(".")[0]
record = chr(len(name))+name
@ -2557,7 +2559,6 @@ class MIP6OptMsgAuth(_MIP6OptAlign, Packet): # RFC 4285 (Sect. 5)
# in seconds relative to 0h on 1 January 1900. The integer part is in the
# first 32 bits and the fraction part in the last 32 bits.
class NTPTimestampField(LongField):
epoch = (1900, 1, 1, 0, 0, 0, 5, 1, 0)
def i2repr(self, pkt, x):
if x < ((50*31536000)<<32):
return "Some date a few decades ago (%d)" % x
@ -2981,6 +2982,12 @@ class AS_resolver6(AS_resolver_riswhois):
_, asn, desc = AS_resolver_riswhois._resolve_one(self, addr)
if asn.startswith("AS"):
asn = int(asn[2:])
except ValueError:
return ip,asn,desc
class TracerouteResult6(TracerouteResult):
@ -3024,10 +3031,10 @@ class TracerouteResult6(TracerouteResult):
def graph(self, ASres=AS_resolver6(), **kargs):
TracerouteResult.graph(self, ASres=ASres, **kargs)
def traceroute6(target, dport=80, minttl=1, maxttl=30, sport=RandShort(),
l4 = None, timeout=2, verbose=None, **kargs):
Instant TCP traceroute using IPv6 :
"""Instant TCP traceroute using IPv6
traceroute6(target, [maxttl=30], [dport=80], [sport=80]) -> None
if verbose is None:

View File

@ -504,6 +504,7 @@ assert success
~ netaccess IP
* This test retries on failure because it often fails
ret = list()
success = False
for i in xrange(5):
@ -518,6 +519,26 @@ assert (len(ret) == 2)
all(x[1] == "AS15169" for x in ret)
= AS resolver - IPv6
~ netaccess IP
* This test retries on failure because it often fails
ret = list()
success = False
as_resolver6 = AS_resolver6()
for i in xrange(5):
ret = as_resolver6.resolve("2001:4860:4860::8888", "2001:4860:4860::4444")
except socket.error:
success = True
assert (len(ret) == 2)
all(x[1] == 15169 for x in ret)
@ -1600,6 +1621,11 @@ str(HAO(optlen=9, hoa="2001::ffff")) == '\xc9\t \x01\x00\x00\x00\x00\x00\x00\x00
a=HAO('\xc9\t \x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff')
a.otype == 0xC9 and a.optlen == 9 and a.hoa == "2001::ffff"
= HAO - hashret
p = IPv6()/IPv6ExtHdrDestOpt(options=HAO(hoa="2001:db8::1"))/ICMPv6EchoRequest()
p.hashret() == "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00:\x00\x00\x00\x00"
@ -2213,6 +2239,11 @@ type(a) is tuple and len(a) == 2 and a[0] == 2 and a[1] == ''
= ICMPv6NIQueryName - IPv4 address
ICMPv6NIQueryName(data="").data == ''
= ICMPv6NIQueryName - build & dissection
s = str(IPv6()/ICMPv6NIQueryName(data=""))
p = IPv6(s)
ICMPv6NIQueryName in p and p[ICMPv6NIQueryName].data == ""
@ -2550,6 +2581,11 @@ type(a) is tuple and len(a) == 2 and a[0] == 3 and type(a[1]) is list and len(a[
= ICMPv6NIReplyIPv6 - two IPv6 addresses as a list (first with ttl, second without)
ICMPv6NIReplyIPv6(data=[(42, "2001:db8::1"), "2001:db8::2"]).data == [(42, "2001:db8::1"), (0, "2001:db8::2")]
= ICMPv6NIReplyIPv6 - build & dissection
s = str(IPv6()/ICMPv6NIReplyIPv6(data="2001:db8::1"))
p = IPv6(s)
ICMPv6NIReplyIPv6 in p and == [(0, '2001:db8::1')]
@ -2590,6 +2626,16 @@ type(a) is tuple and len(a) == 2 and a[0] == 4 and type(a[1]) is list and len(a[
= ICMPv6NIReplyIPv4 - two IPv4 addresses as a list (first with ttl, second without) (internal)
ICMPv6NIReplyIPv4(data=[(42, ""), ""]).data == [(42, ""), (0, "")]
= ICMPv6NIReplyIPv4 - build & dissection
s = str(IPv6()/ICMPv6NIReplyIPv4(data=""))
p = IPv6(s)
ICMPv6NIReplyIPv4 in p and == [(0, '')]
s = str(IPv6()/ICMPv6NIReplyIPv4(data=[(2807, "")]))
p = IPv6(s)
ICMPv6NIReplyIPv4 in p and == [(2807, "")]
@ -2614,6 +2660,14 @@ a=ICMPv6NIReplyRefuse('\x8c\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\
a.type == 140 and a.code == 2 and a.cksum == 0 and a.unused == 0 and a.flags == 0 and a.nonce == '\x00'*8 and == None
+ Test Node Information Query - utilities
= computeNIGroupAddr
computeNIGroupAddr("scapy") == "ff02::2:f886:2f66"
+ IPv6ExtHdrFragment Class Test
@ -2708,12 +2762,102 @@ print len(r6.routes) == len_r6 + 1
r6.delt(dst="2001:db8:cafe:f000::/64", gw="2001:db8:cafe::1")
len(r6.routes) == len_r6
= IPv6 - utils
def test_neighsol(mock_srp1, mock_get_if_hwaddr):
mock_srp1.return_value = Ether()/IPv6()/ICMPv6ND_NA()/ICMPv6NDOptDstLLAddr(lladdr="05:04:03:02:01:00")
mock_get_if_hwaddr.return_value = "00:01:02:03:04:05"
return neighsol("fe80::f6ce:46ff:fea9:e04b", "fe80::f6ce:46ff:fea9:e04b", "scapy0")
p = test_neighsol()
ICMPv6NDOptDstLLAddr in p and p[ICMPv6NDOptDstLLAddr].lladdr == "05:04:03:02:01:00"
def test_getmacbyip6(mock_route6, mock_neighsol):
mock_route6.return_value = ("scapy0", "fe80::baca:3aff:fe72:b08b", "::")
mock_neighsol.return_value = test_neighsol()
return getmacbyip6("fe80::704:3ff:fe2:100")
test_getmacbyip6() == "05:04:03:02:01:00"
= IPv6 - IPerror6 & UDPerror
query = IPv6(dst="2001:db8::1", src="2001:db8::2", hlim=1)/UDP()/DNS()
answer = IPv6(dst="2001:db8::2", src="2001:db8::1", hlim=1)/ICMPv6TimeExceeded()/IPerror6(dst="2001:db8::1", src="2001:db8::2", hlim=0)/UDPerror()/DNS()
answer.answers(query) == True
= ICMPv6MLQuery - build & dissection
s = str(IPv6()/ICMPv6MLQuery())
s == "`\x00\x00\x00\x00\x18:\x01\xfe\x80\x00\x00\x00\x00\x00\x00\xba\xca:\xff\xfer\xb0\x8b\xff\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x82\x00\xb4O'\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
p = IPv6(s)
ICMPv6MLQuery in p and p[IPv6].dst == "ff02::1"
+ TracerouteResult6
= get_trace()
ip6_hlim = [("2001:db8::%d" % i, i) for i in xrange(1, 10)]
tr6_packets = [ (IPv6(dst="2001:db8::1", src="2001:db8::254", hlim=hlim)/UDP()/"scapy",
IPv6(dst="2001:db8::254", src=ip)/ICMPv6TimeExceeded()/IPerror6(dst="2001:db8::1", src="2001:db8::254", hlim=0)/UDPerror()/"scapy")
for (ip, hlim) in ip6_hlim ]
tr6 = TracerouteResult6(tr6_packets)
tr6.get_trace() == {'2001:db8::1': {1: ('2001:db8::1', False), 2: ('2001:db8::2', False), 3: ('2001:db8::3', False), 4: ('2001:db8::4', False), 5: ('2001:db8::5', False), 6: ('2001:db8::6', False), 7: ('2001:db8::7', False), 8: ('2001:db8::8', False), 9: ('2001:db8::9', False)}}
= show()
result = ""
def test_show():
def write(s):
global result
result += s
mock_stdout = mock.Mock()
mock_stdout.write = write
sys.stdout = mock_stdout
tr6 = TracerouteResult6(tr6_packets)
sys.stdout = sys.__stdout__
expected = " 2001:db8::1 :udpdomain \n"
expected += "1 2001:db8::1 3 \n"
expected += "2 2001:db8::2 3 \n"
expected += "3 2001:db8::3 3 \n"
expected += "4 2001:db8::4 3 \n"
expected += "5 2001:db8::5 3 \n"
expected += "6 2001:db8::6 3 \n"
expected += "7 2001:db8::7 3 \n"
expected += "8 2001:db8::8 3 \n"
expected += "9 2001:db8::9 3 \n"
index_result = result.index("1")
index_expected = expected.index("1")
assert(result[index_result:] == expected[index_expected:])
= graph()
saved_AS_resolver = conf.AS_resolver
conf.AS_resolver = None
len(tr6.graphdef) == 492
tr6.graphdef.startswith("digraph trace {") == True
'"2001:db8::1 53/udp";' in tr6.graphdef
conf.AS_resolver = conf.AS_resolver
# Below is our Homework : here is the mountain ...
########### ICMPv6MLQuery Class #####################################
########### ICMPv6MLReport Class ####################################
########### ICMPv6MLDone Class ######################################
########### ICMPv6ND_Redirect Class #################################
@ -2721,8 +2865,6 @@ len(r6.routes) == len_r6
########### ICMPv6NDOptTgtAddrList Class ############################
########### ICMPv6ND_INDSol Class ###################################
########### ICMPv6ND_INDAdv Class ###################################
########### ICMPerror6 Class ########################################
########### TracerouteResult6 Class #################################
@ -3920,6 +4062,11 @@ p[ICMPv6MPAdv].cksum == 0x2807 and p[ICMPv6MPAdv].flags == 1 and p[ICMPv6MPAdv].
p = IPv6(str(IPv6(dst='2001:db8::1', src='2001:db8::2')/IPv6ExtHdrRouting(type=2, addresses=['2001:db8::3'])/ICMPv6EchoRequest()))
p.type == 2 and len(p.addresses) == 1 and p.cksum == 0x2446
= IPv6ExtHdrRouting - type 2 - hashret
p = IPv6()/IPv6ExtHdrRouting(addresses=["2001:db8::1", "2001:db8::2"])/ICMPv6EchoRequest()
p.hashret() == " \x01\r\xb8\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00:\x00\x00\x00\x00"
@ -4098,11 +4245,13 @@ p = MIP6OptReplayProtection('\n\x08\x00\x00\x00\x00\x00\x00\x00\x00')
p.otype == 10 and p.olen == 8 and p.timestamp == 0
= MIP6OptReplayProtection - build with specific values
str(MIP6OptReplayProtection(olen=42, timestamp=(52*31536000)<<32)) == '\n*a\xbev\x00\x00\x00\x00\x00'
s = str(MIP6OptReplayProtection(olen=42, timestamp=(72*31536000)<<32))
s == '\n*\x87V|\x00\x00\x00\x00\x00'
= MIP6OptReplayProtection - dissection with specific values
p = MIP6OptReplayProtection('\n*a\xbev\x00\x00\x00\x00\x00')
p.otype == 10 and p.olen == 42 and p.timestamp == 7043196609626112000L
p = MIP6OptReplayProtection(s)
p.otype == 10 and p.olen == 42 and p.timestamp == 9752118382559232000L
p.fields_desc[-1].i2repr("", p.timestamp) == 'Mon, 13 Dec 1971 23:50:39 +0000 (9752118382559232000)'
@ -4404,6 +4553,8 @@ b2=IPv6(str(IPv6(src=coa, dst=cn)/IPv6ExtHdrDestOpt(options=HAO(hoa=hoa))/MIP6MH
c=IPv6(str(IPv6(src=cn, dst=coa)/IPv6ExtHdrRouting(type=2, addresses=[hoa])/MIP6MH_BA()))
b.answers(a) and not a.answers(b) and c.answers(b) and not b.answers(c) and not c.answers(b2)
len(b[IPv6ExtHdrDestOpt].options) == 2
@ -6995,6 +7146,10 @@ len(inet_pton(socket.AF_INET6, p)) == 16 and p.startswith("fd")
teredoAddrExtractInfo("2001:0:0a0b:0c0d:0028:f508:f508:08f5") == ("", 40, "", 2807)
ip6 = IP6Field("test", None)
ip6.i2repr("", "2001:0:0a0b:0c0d:0028:f508:f508:08f5") == "2001:0:0a0b:0c0d:0028:f508:f508:08f5 [Teredo srv: cli:]"
ip6.i2repr("", "2002:0102:0304::1") == "2002:0102:0304::1 [6to4 GW:]"
in6_iseui64("fe80::bae8:58ff:fed4:e5f6") == True
in6_isanycast("2001:db8::fdff:ffff:ffff:ff80") == True