Merge pull request #847 from p-l-/fixes-py3

Python 3: multiple fixes
This commit is contained in:
Pierre Lalet 2017-09-29 14:52:50 +02:00 committed by GitHub
commit 542b1e7f5a
5 changed files with 74 additions and 56 deletions

View File

@ -295,7 +295,8 @@ def in6_getifaddr():
tmp = i.split()
addr = struct.unpack('4s4s4s4s4s4s4s4s', tmp[0])
addr = scapy.utils6.in6_ptop(b':'.join(addr).decode())
ret.append((addr, int(tmp[3], 16), tmp[5])) # (addr, scope, iface)
# (addr, scope, iface)
ret.append((addr, int(tmp[3], 16), tmp[5].decode()))
return ret
def read_routes6():

View File

@ -843,7 +843,8 @@ def main(argv):
if FORMAT == Format.HTML:
glob_output = pack_html_campaigns(runned_campaigns, glob_output, LOCAL, glob_title)
OUTPUTFILE.write(glob_output.encode("utf8", "ignore"))
OUTPUTFILE.write(glob_output.encode("utf8", "ignore")
if 'b' in OUTPUTFILE.mode else glob_output)
OUTPUTFILE.close()
# Return state

View File

@ -656,7 +656,7 @@ def corrupt_bits(s, p=0.01, n=None):
if n is None:
n = max(1,int(l*p))
for i in random.sample(range(l), n):
s[i/8] ^= 1 << (i%8)
s[i // 8] ^= 1 << (i % 8)
return s.tostring()

View File

@ -113,8 +113,12 @@ class RandNum(RandField):
return int(self)
def __add__(self, other):
return self._fix() + other
def __radd__(self, other):
return other + self._fix()
def __sub__(self, other):
return self._fix() - other
def __rsub__(self, other):
return other - self._fix()
def __mul__(self, other):
return self._fix() * other
def __floordiv__(self, other):
@ -356,7 +360,7 @@ class RandOID(RandString):
return "<%s [%s]>" % (self.__class__.__name__, self.ori_fmt)
def _fix(self):
if self.fmt is None:
return ".".join(map(str, [self.idnum for i in range(1+self.depth)]))
return ".".join(str(self.idnum) for _ in range(1 + self.depth))
else:
oid = []
for i in self.fmt:

View File

@ -75,7 +75,7 @@ if WINDOWS:
# - one route if there is only the loopback interface
# - three routes if there is a network interface
if len(routes6):
if routes6:
iflist = get_if_list()
if WINDOWS:
route_add_loopback(ipv6=True, iflist=iflist)
@ -260,7 +260,8 @@ test_hexdiff()
= Test mysummary functions - Ether
Ether(dst="ff:ff:ff:ff:ff:ff", src="ff:ff:ff:ff:ff:ff", type=0x9000)
assert _.mysummary() == 'ff:ff:ff:ff:ff:ff > ff:ff:ff:ff:ff:ff (0x9000)'
assert _.mysummary() in ['ff:ff:ff:ff:ff:ff > ff:ff:ff:ff:ff:ff (%s)' % loop
for loop in ['0x9000', 'LOOP']]
= Test fletcher16_* functions
assert(fletcher16_checksum(b"\x28\x07") == 22319)
@ -268,7 +269,7 @@ assert(fletcher16_checkbytes(b"ABCDEF", 2) == "\x89\x67")
= Test zerofree_randstring function
random.seed(0x2807)
zerofree_randstring(4) == "\xd2\x12\xe4\x5b"
zerofree_randstring(4) in [b"\xd2\x12\xe4\x5b", b'\xd3\x8b\x13\x12']
= Test export_object and import_object functions
import mock
@ -295,11 +296,11 @@ assert([next(f) for i in range(2)] == ["tag00000", "tag00001"])
= Test corrupt_* functions
import random
random.seed(0x2807)
assert(corrupt_bytes("ABCDE") == "ABCDW")
assert(sane(corrupt_bytes("ABCDE", n=3)) == "A.8D4")
assert(corrupt_bytes("ABCDE") in [b"ABCDW", b"ABCDX"])
assert(sane(corrupt_bytes("ABCDE", n=3)) in ["A.8D4", ".2.DE"])
assert(corrupt_bits("ABCDE") == "EBCDE")
assert(sane(corrupt_bits("ABCDE", n=3)) == "AF.EE")
assert(corrupt_bits("ABCDE") in [b"EBCDE", b"ABCDG"])
assert(sane(corrupt_bits("ABCDE", n=3)) in ["AF.EE", "QB.TE"])
= Test save_object and load_object functions
import tempfile
@ -403,7 +404,10 @@ bind_layers(IP, ICMP, frag=0, proto=1)
= fuzz
~ not_pypy
random.seed(0x2807)
raw(fuzz(IP()/ICMP())) == b'u\x14\x00\x1c\xc2\xf6\x80\x00\xde\x01k\xd3\x7f\x00\x00\x01\x7f\x00\x00\x01y\xc9>\xa6\x84\xd8\xc2\xb7'
raw(fuzz(IP()/ICMP())) in [
b'u\x14\x00\x1c\xc2\xf6\x80\x00\xde\x01k\xd3\x7f\x00\x00\x01\x7f\x00\x00\x01y\xc9>\xa6\x84\xd8\xc2\xb7',
b'E\xa7\x00\x1c\xb0c\xc0\x00\xf6\x01U\xd3\x7f\x00\x00\x01\x7f\x00\x00\x01\xfex\xb3\x92B<\x0b\xb8',
]
= Building some packets
~ basic IP TCP UDP NTP LLC SNAP Dot11
@ -1179,7 +1183,7 @@ import os
class ATMT8(Automaton):
@ATMT.state(initial=1)
def BEGIN(self):
self.res = "U"
self.res = b"U"
@ATMT.ioevent(BEGIN, name="extfd")
def tr1(self, fd):
self.res += fd.read(2)
@ -1193,7 +1197,7 @@ class ATMT8(Automaton):
raise self.END()
@ATMT.state(final=1)
def END(self):
self.res += "s"
self.res += b"s"
return self.res
if WINDOWS:
@ -1209,18 +1213,18 @@ def writeOn(w, msg):
a=ATMT8(external_fd={"extfd":r}, ll=lambda: None, recvsock=lambda: None)
a.run(wait=False)
writeOn(w,"ra")
writeOn(w,"nu")
writeOn(w, b"ra")
writeOn(w, b"nu")
a.run()
assert( _ == "Uranus" )
assert( _ == b"Uranus" )
a.restart()
a.run(wait=False)
writeOn(w,"ra")
writeOn(w,"nu")
writeOn(w, b"ra")
writeOn(w, b"nu")
a.run()
assert( _ == "Uranus" )
assert( _ == b"Uranus" )
= Automaton test interception_points, and restart
~ automaton
@ -5640,14 +5644,14 @@ assert isinstance(pkt, DNS) and isinstance(pkt.payload, NoPayload)
~ mock_read_routes6_bsd
import mock
from io import BytesIO
from io import StringIO
@mock.patch("scapy.arch.unix.get_if_addr")
@mock.patch("scapy.arch.unix.os")
def test_osx_netstat_truncated(mock_os, mock_get_if_addr):
"""Test read_routes() on OS X 10.? with a long interface name"""
# netstat & ifconfig outputs from https://github.com/secdev/scapy/pull/119
netstat_output = """
netstat_output = u"""
Routing tables
Internet:
@ -5657,14 +5661,14 @@ default link#11 UCSI 1 0 bridge1
127 127.0.0.1 UCS 1 0 lo0
127.0.0.1 127.0.0.1 UH 10 2012351 lo0
"""
ifconfig_output = "lo0 en1 bridge10\n"
ifconfig_output = u"lo0 en1 bridge10\n"
# Mocked file descriptors
def se_popen(command):
"""Perform specific side effects"""
if command.startswith("netstat -rn"):
return BytesIO(netstat_output)
return StringIO(netstat_output)
elif command == "ifconfig -l":
ret = BytesIO(ifconfig_output)
ret = StringIO(ifconfig_output)
def unit():
return ret
ret.__call__ = unit
@ -5700,14 +5704,14 @@ test_osx_netstat_truncated()
~ mock_read_routes6_bsd
import mock
from io import BytesIO
from io import StringIO
def valid_output_read_routes6(routes):
""""Return True if 'routes' contains correctly formatted entries, False otherwise"""
for destination, plen, next_hop, dev, cset in routes:
if not in6_isvalid(destination) or not type(plen) == int:
return False
if not in6_isvalid(next_hop) or not type(dev) == str:
if not in6_isvalid(next_hop) or not isinstance(dev, six.string_types):
return False
for address in cset:
if not in6_isvalid(address):
@ -5734,7 +5738,7 @@ def check_mandatory_ipv6_routes(routes6):
def test_osx_10_9_5(mock_os, mock_in6_getifaddr):
"""Test read_routes6() on OS X 10.9.5"""
# 'netstat -rn -f inet6' output
netstat_output = """
netstat_output = u"""
Routing tables
Internet6:
@ -5751,7 +5755,7 @@ ff02::%lo0/32 ::1 UmCI
ff02::%en0/32 link#4 UmCI en0
"""
# Mocked file descriptor
strio = BytesIO(netstat_output)
strio = StringIO(netstat_output)
mock_os.popen = mock.MagicMock(return_value=strio)
# Mocked in6_getifaddr() output
mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0"),
@ -5774,7 +5778,7 @@ test_osx_10_9_5()
def test_osx_10_9_5_global(mock_os, mock_in6_getifaddr):
"""Test read_routes6() on OS X 10.9.5 with an IPv6 connectivity"""
# 'netstat -rn -f inet6' output
netstat_output = """
netstat_output = u"""
Routing tables
Internet6:
@ -5798,7 +5802,7 @@ ff01::%en0/32 link#4 UmCI
ff02::%lo0/32 ::1 UmCI lo
"""
# Mocked file descriptor
strio = BytesIO(netstat_output)
strio = StringIO(netstat_output)
mock_os.popen = mock.MagicMock(return_value=strio)
# Mocked in6_getifaddr() output
mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0"),
@ -5806,6 +5810,7 @@ ff02::%lo0/32 ::1 UmCI
# Test the function
from scapy.arch.unix import read_routes6
routes = read_routes6()
print(routes)
assert(valid_output_read_routes6(routes))
for r in routes:
print(r)
@ -5823,7 +5828,7 @@ test_osx_10_9_5_global()
def test_osx_10_10_4(mock_os, mock_in6_getifaddr):
"""Test read_routes6() on OS X 10.10.4"""
# 'netstat -rn -f inet6' output
netstat_output = """
netstat_output = u"""
Routing tables
Internet6:
@ -5839,7 +5844,7 @@ ff02::%lo0/32 ::1 UmCI
ff02::%en0/32 link#4 UmCI en0
"""
# Mocked file descriptor
strio = BytesIO(netstat_output)
strio = StringIO(netstat_output)
mock_os.popen = mock.MagicMock(return_value=strio)
# Mocked in6_getifaddr() output
mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0"),
@ -5863,7 +5868,7 @@ test_osx_10_10_4()
def test_freebsd_10_2(mock_os, mock_in6_getifaddr):
"""Test read_routes6() on FreeBSD 10.2"""
# 'netstat -rn -f inet6' output
netstat_output = """
netstat_output = u"""
Routing tables
Internet6:
@ -5879,7 +5884,7 @@ ff02::/16 ::1 UGRS lo0
ff02::%lo0/32 ::1 U lo0
"""
# Mocked file descriptor
strio = BytesIO(netstat_output)
strio = StringIO(netstat_output)
mock_os.popen = mock.MagicMock(return_value=strio)
# Mocked in6_getifaddr() output
mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0")]
@ -5903,7 +5908,7 @@ test_freebsd_10_2()
def test_openbsd_5_5(mock_os, mock_in6_getifaddr, mock_openbsd):
"""Test read_routes6() on OpenBSD 5.5"""
# 'netstat -rn -f inet6' output
netstat_output = """
netstat_output = u"""
Routing tables
Internet6:
@ -5933,7 +5938,7 @@ ff02::%em0/32 link#1 UC 0
ff02::%lo0/32 fe80::1%lo0 UC 0 0 - 4 lo0
"""
# Mocked file descriptor
strio = BytesIO(netstat_output)
strio = StringIO(netstat_output)
mock_os.popen = mock.MagicMock(return_value=strio)
# Mocked in6_getifaddr() output
@ -5961,7 +5966,7 @@ test_openbsd_5_5()
def test_netbsd_7_0(mock_os, mock_in6_getifaddr, mock_netbsd):
"""Test read_routes6() on NetBSD 7.0"""
# 'netstat -rn -f inet6' output
netstat_output = """
netstat_output = u"""
Routing tables
Internet6:
@ -5989,7 +5994,7 @@ ff02::%wm0/32 link#1 UC -
ff02::%lo0/32 ::1 UC - - - lo0
"""
# Mocked file descriptor
strio = BytesIO(netstat_output)
strio = StringIO(netstat_output)
mock_os.popen = mock.MagicMock(return_value=strio)
# Mocked in6_getifaddr() output
mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0"),
@ -8110,29 +8115,30 @@ len(r4.routes) == len_r4
= RandomEnumeration
re = RandomEnumeration(0, 7, seed=0x2807, forever=False)
[x for x in re] == [3, 4, 2, 5, 1, 6, 0, 7]
[x for x in re] == ([3, 4, 2, 5, 1, 6, 0, 7] if six.PY2 else [5, 0, 2, 7, 6, 3, 1, 4])
= RandIP6
random.seed(0x2807)
r6 = RandIP6()
assert(r6 == "d279:1205:e445:5a9f:db28:efc9:afd7:f594")
assert(r6 == ("d279:1205:e445:5a9f:db28:efc9:afd7:f594" if six.PY2 else
"240b:238f:b53f:b727:d0f9:bfc4:2007:e265"))
random.seed(0x2807)
r6 = RandIP6("2001:db8::-")
assert(r6 == "2001:0db8::e445")
assert(r6 == ("2001:0db8::e445" if six.PY2 else "2001:0db8::b53f"))
r6 = RandIP6("2001:db8::*")
assert(r6 == "2001:0db8::efc9")
assert(r6 == ("2001:0db8::efc9" if six.PY2 else "2001:0db8::bfc4"))
= RandMAC
random.seed(0x2807)
rm = RandMAC()
assert(rm == "d2:12:e4:5a:db:ef")
assert(rm == ("d2:12:e4:5a:db:ef" if six.PY2 else "24:23:b5:b7:d0:bf"))
rm = RandMAC("00:01:02:03:04:0-7")
assert(rm == "00:01:02:03:04:05")
assert(rm == ("00:01:02:03:04:05" if six.PY2 else "00:01:02:03:04:01"))
= RandOID
@ -8145,44 +8151,44 @@ ro = RandOID("1.2.3.*")
assert(ro == "1.2.3.41")
ro = RandOID("1.2.3.0-28")
assert(ro == "1.2.3.11")
assert(ro == ("1.2.3.11" if six.PY2 else "1.2.3.12"))
= RandRegExp
random.seed(0x2807)
re = RandRegExp("[g-v]* @? [0-9]{3} . (g|v)")
re == 'vmuvr @ 906 \x9e g'
re == ('vmuvr @ 906 \x9e g' if six.PY2 else 'irrtv @ 517 ¸ v')
= Corrupted(Bytes|Bits)
random.seed(0x2807)
cb = CorruptedBytes("ABCDE", p=0.5)
assert(sane(raw(cb)) == ".BCD)")
assert(sane(raw(cb)) in [".BCD)", "&BCDW"])
cb = CorruptedBits("ABCDE", p=0.2)
assert(sane(raw(cb)) == "ECk@Y")
assert(sane(raw(cb)) in ["ECk@Y", "QB.P."])
= RandEnumKeys
~ not_pypy
random.seed(0x2807)
rek = RandEnumKeys({'a': 1, 'b': 2, 'c': 3}, seed=0x2807)
assert(rek == 'b')
assert(rek == ('b' if six.PY2 else 'a'))
= RandSingNum
~ not_pypy
random.seed(0x2807)
rs = RandSingNum(-28, 7)
assert(rs == 3)
assert(rs == -27)
assert(rs == (3 if six.PY2 else 2))
assert(rs == (-27 if six.PY2 else -17))
= Rand*
random.seed(0x2807)
rss = RandSingString()
assert(rss == "CON:")
assert(rss == ("CON:" if six.PY2 else "foo.exe:"))
random.seed(0x2807)
rts = RandTermString(4, "scapy")
assert(sane(raw(rts)) == "...[scapy")
assert(sane(raw(rts)) in ["...[scapy", "......scapy"])
############
@ -8824,7 +8830,12 @@ def test_summary():
tr.summary()
result_summary = cmco.get_output()
assert(len(result_summary.split('\n')) == 10)
assert("IP / TCP 192.168.0.254:ftp_data > 192.168.0.1:http S / Raw ==> IP / ICMP 192.168.0.9 > 192.168.0.254 time-exceeded ttl-zero-during-transit / IPerror / TCPerror / Raw" in result_summary)
assert(any(
"IP / TCP 192.168.0.254:ftp_data > 192.168.0.1:%s S / Raw ==> "
"IP / ICMP 192.168.0.9 > 192.168.0.254 time-exceeded "
"ttl-zero-during-transit / IPerror / TCPerror / "
"Raw" % http in result_summary for http in ['http', 'www_http', 'www']
))
test_summary()
@ -8882,7 +8893,8 @@ def test_IPID_count():
result_IPID_count = cmco.get_output()
lines = result_IPID_count.split("\n")
assert(len(lines) == 5)
assert(lines[0].endswith("Probably 3 classes: [4613, 53881, 58437]"))
assert(lines[0] in ["Probably 3 classes: [4613, 53881, 58437]",
"Probably 3 classes: [9103, 9227, 46399]"])
test_IPID_count()