From e0993c9138b4a0e2f756486a9647521d1a5ed3a1 Mon Sep 17 00:00:00 2001 From: Pierre LALET Date: Thu, 28 Sep 2017 12:32:53 +0200 Subject: [PATCH 1/5] UTscapy: do not write bytes to stdout --- scapy/tools/UTscapy.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scapy/tools/UTscapy.py b/scapy/tools/UTscapy.py index 592cf98c6..660eb961b 100755 --- a/scapy/tools/UTscapy.py +++ b/scapy/tools/UTscapy.py @@ -843,7 +843,8 @@ def main(argv): if FORMAT == Format.HTML: glob_output = pack_html_campaigns(runned_campaigns, glob_output, LOCAL, glob_title) - OUTPUTFILE.write(glob_output.encode("utf8", "ignore")) + OUTPUTFILE.write(glob_output.encode("utf8", "ignore") + if 'b' in OUTPUTFILE.mode else glob_output) OUTPUTFILE.close() # Return state From 4fd39a5a48916c428670adff1b79ef56240c091b Mon Sep 17 00:00:00 2001 From: Pierre LALET Date: Thu, 28 Sep 2017 13:16:16 +0200 Subject: [PATCH 2/5] Python 3: use floor division --- scapy/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scapy/utils.py b/scapy/utils.py index a6b1b8df6..edbca70c0 100644 --- a/scapy/utils.py +++ b/scapy/utils.py @@ -656,7 +656,7 @@ def corrupt_bits(s, p=0.01, n=None): if n is None: n = max(1,int(l*p)) for i in random.sample(range(l), n): - s[i/8] ^= 1 << (i%8) + s[i // 8] ^= 1 << (i % 8) return s.tostring() From e18771144877d24638235519336ef2aed849471e Mon Sep 17 00:00:00 2001 From: Pierre LALET Date: Thu, 28 Sep 2017 14:21:45 +0200 Subject: [PATCH 3/5] Python 3: use str for interface names --- scapy/arch/linux.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scapy/arch/linux.py b/scapy/arch/linux.py index b106a8410..931d922c7 100644 --- a/scapy/arch/linux.py +++ b/scapy/arch/linux.py @@ -295,7 +295,8 @@ def in6_getifaddr(): tmp = i.split() addr = struct.unpack('4s4s4s4s4s4s4s4s', tmp[0]) addr = scapy.utils6.in6_ptop(b':'.join(addr).decode()) - ret.append((addr, int(tmp[3], 16), tmp[5])) # (addr, scope, iface) + # (addr, scope, iface) + ret.append((addr, int(tmp[3], 16), tmp[5].decode())) return ret def read_routes6(): From 9f0dda1685ae75c88edbeea2323408e3e8fc9c7b Mon Sep 17 00:00:00 2001 From: Pierre LALET Date: Thu, 28 Sep 2017 14:22:40 +0200 Subject: [PATCH 4/5] Python 3 / VolatileValue: add __radd__ and __rsub__ methods --- scapy/volatile.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/scapy/volatile.py b/scapy/volatile.py index 7e108888b..16d9043eb 100644 --- a/scapy/volatile.py +++ b/scapy/volatile.py @@ -113,8 +113,12 @@ class RandNum(RandField): return int(self) def __add__(self, other): return self._fix() + other + def __radd__(self, other): + return other + self._fix() def __sub__(self, other): return self._fix() - other + def __rsub__(self, other): + return other - self._fix() def __mul__(self, other): return self._fix() * other def __floordiv__(self, other): @@ -356,7 +360,7 @@ class RandOID(RandString): return "<%s [%s]>" % (self.__class__.__name__, self.ori_fmt) def _fix(self): if self.fmt is None: - return ".".join(map(str, [self.idnum for i in range(1+self.depth)])) + return ".".join(str(self.idnum) for _ in range(1 + self.depth)) else: oid = [] for i in self.fmt: From a73622c320cfa56f5311bd6c66b106c325558cc9 Mon Sep 17 00:00:00 2001 From: Pierre LALET Date: Thu, 28 Sep 2017 14:22:58 +0200 Subject: [PATCH 5/5] Python 3: fix regression tests --- test/regression.uts | 116 ++++++++++++++++++++++++-------------------- 1 file changed, 64 insertions(+), 52 deletions(-) diff --git a/test/regression.uts b/test/regression.uts index 09a3dd94f..70b5b0e20 100644 --- a/test/regression.uts +++ b/test/regression.uts @@ -75,7 +75,7 @@ if WINDOWS: # - one route if there is only the loopback interface # - three routes if there is a network interface -if len(routes6): +if routes6: iflist = get_if_list() if WINDOWS: route_add_loopback(ipv6=True, iflist=iflist) @@ -260,7 +260,8 @@ test_hexdiff() = Test mysummary functions - Ether Ether(dst="ff:ff:ff:ff:ff:ff", src="ff:ff:ff:ff:ff:ff", type=0x9000) -assert _.mysummary() == 'ff:ff:ff:ff:ff:ff > ff:ff:ff:ff:ff:ff (0x9000)' +assert _.mysummary() in ['ff:ff:ff:ff:ff:ff > ff:ff:ff:ff:ff:ff (%s)' % loop + for loop in ['0x9000', 'LOOP']] = Test fletcher16_* functions assert(fletcher16_checksum(b"\x28\x07") == 22319) @@ -268,7 +269,7 @@ assert(fletcher16_checkbytes(b"ABCDEF", 2) == "\x89\x67") = Test zerofree_randstring function random.seed(0x2807) -zerofree_randstring(4) == "\xd2\x12\xe4\x5b" +zerofree_randstring(4) in [b"\xd2\x12\xe4\x5b", b'\xd3\x8b\x13\x12'] = Test export_object and import_object functions import mock @@ -295,11 +296,11 @@ assert([next(f) for i in range(2)] == ["tag00000", "tag00001"]) = Test corrupt_* functions import random random.seed(0x2807) -assert(corrupt_bytes("ABCDE") == "ABCDW") -assert(sane(corrupt_bytes("ABCDE", n=3)) == "A.8D4") +assert(corrupt_bytes("ABCDE") in [b"ABCDW", b"ABCDX"]) +assert(sane(corrupt_bytes("ABCDE", n=3)) in ["A.8D4", ".2.DE"]) -assert(corrupt_bits("ABCDE") == "EBCDE") -assert(sane(corrupt_bits("ABCDE", n=3)) == "AF.EE") +assert(corrupt_bits("ABCDE") in [b"EBCDE", b"ABCDG"]) +assert(sane(corrupt_bits("ABCDE", n=3)) in ["AF.EE", "QB.TE"]) = Test save_object and load_object functions import tempfile @@ -403,7 +404,10 @@ bind_layers(IP, ICMP, frag=0, proto=1) = fuzz ~ not_pypy random.seed(0x2807) -raw(fuzz(IP()/ICMP())) == b'u\x14\x00\x1c\xc2\xf6\x80\x00\xde\x01k\xd3\x7f\x00\x00\x01\x7f\x00\x00\x01y\xc9>\xa6\x84\xd8\xc2\xb7' +raw(fuzz(IP()/ICMP())) in [ + b'u\x14\x00\x1c\xc2\xf6\x80\x00\xde\x01k\xd3\x7f\x00\x00\x01\x7f\x00\x00\x01y\xc9>\xa6\x84\xd8\xc2\xb7', + b'E\xa7\x00\x1c\xb0c\xc0\x00\xf6\x01U\xd3\x7f\x00\x00\x01\x7f\x00\x00\x01\xfex\xb3\x92B<\x0b\xb8', +] = Building some packets ~ basic IP TCP UDP NTP LLC SNAP Dot11 @@ -1179,7 +1183,7 @@ import os class ATMT8(Automaton): @ATMT.state(initial=1) def BEGIN(self): - self.res = "U" + self.res = b"U" @ATMT.ioevent(BEGIN, name="extfd") def tr1(self, fd): self.res += fd.read(2) @@ -1193,7 +1197,7 @@ class ATMT8(Automaton): raise self.END() @ATMT.state(final=1) def END(self): - self.res += "s" + self.res += b"s" return self.res if WINDOWS: @@ -1209,18 +1213,18 @@ def writeOn(w, msg): a=ATMT8(external_fd={"extfd":r}, ll=lambda: None, recvsock=lambda: None) a.run(wait=False) -writeOn(w,"ra") -writeOn(w,"nu") +writeOn(w, b"ra") +writeOn(w, b"nu") a.run() -assert( _ == "Uranus" ) +assert( _ == b"Uranus" ) a.restart() a.run(wait=False) -writeOn(w,"ra") -writeOn(w,"nu") +writeOn(w, b"ra") +writeOn(w, b"nu") a.run() -assert( _ == "Uranus" ) +assert( _ == b"Uranus" ) = Automaton test interception_points, and restart ~ automaton @@ -5640,14 +5644,14 @@ assert isinstance(pkt, DNS) and isinstance(pkt.payload, NoPayload) ~ mock_read_routes6_bsd import mock -from io import BytesIO +from io import StringIO @mock.patch("scapy.arch.unix.get_if_addr") @mock.patch("scapy.arch.unix.os") def test_osx_netstat_truncated(mock_os, mock_get_if_addr): """Test read_routes() on OS X 10.? with a long interface name""" # netstat & ifconfig outputs from https://github.com/secdev/scapy/pull/119 - netstat_output = """ + netstat_output = u""" Routing tables Internet: @@ -5657,14 +5661,14 @@ default link#11 UCSI 1 0 bridge1 127 127.0.0.1 UCS 1 0 lo0 127.0.0.1 127.0.0.1 UH 10 2012351 lo0 """ - ifconfig_output = "lo0 en1 bridge10\n" + ifconfig_output = u"lo0 en1 bridge10\n" # Mocked file descriptors def se_popen(command): """Perform specific side effects""" if command.startswith("netstat -rn"): - return BytesIO(netstat_output) + return StringIO(netstat_output) elif command == "ifconfig -l": - ret = BytesIO(ifconfig_output) + ret = StringIO(ifconfig_output) def unit(): return ret ret.__call__ = unit @@ -5700,14 +5704,14 @@ test_osx_netstat_truncated() ~ mock_read_routes6_bsd import mock -from io import BytesIO +from io import StringIO def valid_output_read_routes6(routes): """"Return True if 'routes' contains correctly formatted entries, False otherwise""" for destination, plen, next_hop, dev, cset in routes: if not in6_isvalid(destination) or not type(plen) == int: return False - if not in6_isvalid(next_hop) or not type(dev) == str: + if not in6_isvalid(next_hop) or not isinstance(dev, six.string_types): return False for address in cset: if not in6_isvalid(address): @@ -5734,7 +5738,7 @@ def check_mandatory_ipv6_routes(routes6): def test_osx_10_9_5(mock_os, mock_in6_getifaddr): """Test read_routes6() on OS X 10.9.5""" # 'netstat -rn -f inet6' output - netstat_output = """ + netstat_output = u""" Routing tables Internet6: @@ -5751,7 +5755,7 @@ ff02::%lo0/32 ::1 UmCI ff02::%en0/32 link#4 UmCI en0 """ # Mocked file descriptor - strio = BytesIO(netstat_output) + strio = StringIO(netstat_output) mock_os.popen = mock.MagicMock(return_value=strio) # Mocked in6_getifaddr() output mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0"), @@ -5774,7 +5778,7 @@ test_osx_10_9_5() def test_osx_10_9_5_global(mock_os, mock_in6_getifaddr): """Test read_routes6() on OS X 10.9.5 with an IPv6 connectivity""" # 'netstat -rn -f inet6' output - netstat_output = """ + netstat_output = u""" Routing tables Internet6: @@ -5798,7 +5802,7 @@ ff01::%en0/32 link#4 UmCI ff02::%lo0/32 ::1 UmCI lo """ # Mocked file descriptor - strio = BytesIO(netstat_output) + strio = StringIO(netstat_output) mock_os.popen = mock.MagicMock(return_value=strio) # Mocked in6_getifaddr() output mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0"), @@ -5806,6 +5810,7 @@ ff02::%lo0/32 ::1 UmCI # Test the function from scapy.arch.unix import read_routes6 routes = read_routes6() + print(routes) assert(valid_output_read_routes6(routes)) for r in routes: print(r) @@ -5823,7 +5828,7 @@ test_osx_10_9_5_global() def test_osx_10_10_4(mock_os, mock_in6_getifaddr): """Test read_routes6() on OS X 10.10.4""" # 'netstat -rn -f inet6' output - netstat_output = """ + netstat_output = u""" Routing tables Internet6: @@ -5839,7 +5844,7 @@ ff02::%lo0/32 ::1 UmCI ff02::%en0/32 link#4 UmCI en0 """ # Mocked file descriptor - strio = BytesIO(netstat_output) + strio = StringIO(netstat_output) mock_os.popen = mock.MagicMock(return_value=strio) # Mocked in6_getifaddr() output mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0"), @@ -5863,7 +5868,7 @@ test_osx_10_10_4() def test_freebsd_10_2(mock_os, mock_in6_getifaddr): """Test read_routes6() on FreeBSD 10.2""" # 'netstat -rn -f inet6' output - netstat_output = """ + netstat_output = u""" Routing tables Internet6: @@ -5879,7 +5884,7 @@ ff02::/16 ::1 UGRS lo0 ff02::%lo0/32 ::1 U lo0 """ # Mocked file descriptor - strio = BytesIO(netstat_output) + strio = StringIO(netstat_output) mock_os.popen = mock.MagicMock(return_value=strio) # Mocked in6_getifaddr() output mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0")] @@ -5903,7 +5908,7 @@ test_freebsd_10_2() def test_openbsd_5_5(mock_os, mock_in6_getifaddr, mock_openbsd): """Test read_routes6() on OpenBSD 5.5""" # 'netstat -rn -f inet6' output - netstat_output = """ + netstat_output = u""" Routing tables Internet6: @@ -5933,7 +5938,7 @@ ff02::%em0/32 link#1 UC 0 ff02::%lo0/32 fe80::1%lo0 UC 0 0 - 4 lo0 """ # Mocked file descriptor - strio = BytesIO(netstat_output) + strio = StringIO(netstat_output) mock_os.popen = mock.MagicMock(return_value=strio) # Mocked in6_getifaddr() output @@ -5961,7 +5966,7 @@ test_openbsd_5_5() def test_netbsd_7_0(mock_os, mock_in6_getifaddr, mock_netbsd): """Test read_routes6() on NetBSD 7.0""" # 'netstat -rn -f inet6' output - netstat_output = """ + netstat_output = u""" Routing tables Internet6: @@ -5989,7 +5994,7 @@ ff02::%wm0/32 link#1 UC - ff02::%lo0/32 ::1 UC - - - lo0 """ # Mocked file descriptor - strio = BytesIO(netstat_output) + strio = StringIO(netstat_output) mock_os.popen = mock.MagicMock(return_value=strio) # Mocked in6_getifaddr() output mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0"), @@ -8110,29 +8115,30 @@ len(r4.routes) == len_r4 = RandomEnumeration re = RandomEnumeration(0, 7, seed=0x2807, forever=False) -[x for x in re] == [3, 4, 2, 5, 1, 6, 0, 7] +[x for x in re] == ([3, 4, 2, 5, 1, 6, 0, 7] if six.PY2 else [5, 0, 2, 7, 6, 3, 1, 4]) = RandIP6 random.seed(0x2807) r6 = RandIP6() -assert(r6 == "d279:1205:e445:5a9f:db28:efc9:afd7:f594") +assert(r6 == ("d279:1205:e445:5a9f:db28:efc9:afd7:f594" if six.PY2 else + "240b:238f:b53f:b727:d0f9:bfc4:2007:e265")) random.seed(0x2807) r6 = RandIP6("2001:db8::-") -assert(r6 == "2001:0db8::e445") +assert(r6 == ("2001:0db8::e445" if six.PY2 else "2001:0db8::b53f")) r6 = RandIP6("2001:db8::*") -assert(r6 == "2001:0db8::efc9") +assert(r6 == ("2001:0db8::efc9" if six.PY2 else "2001:0db8::bfc4")) = RandMAC random.seed(0x2807) rm = RandMAC() -assert(rm == "d2:12:e4:5a:db:ef") +assert(rm == ("d2:12:e4:5a:db:ef" if six.PY2 else "24:23:b5:b7:d0:bf")) rm = RandMAC("00:01:02:03:04:0-7") -assert(rm == "00:01:02:03:04:05") +assert(rm == ("00:01:02:03:04:05" if six.PY2 else "00:01:02:03:04:01")) = RandOID @@ -8145,44 +8151,44 @@ ro = RandOID("1.2.3.*") assert(ro == "1.2.3.41") ro = RandOID("1.2.3.0-28") -assert(ro == "1.2.3.11") +assert(ro == ("1.2.3.11" if six.PY2 else "1.2.3.12")) = RandRegExp random.seed(0x2807) re = RandRegExp("[g-v]* @? [0-9]{3} . (g|v)") -re == 'vmuvr @ 906 \x9e g' +re == ('vmuvr @ 906 \x9e g' if six.PY2 else 'irrtv @ 517 ΒΈ v') = Corrupted(Bytes|Bits) random.seed(0x2807) cb = CorruptedBytes("ABCDE", p=0.5) -assert(sane(raw(cb)) == ".BCD)") +assert(sane(raw(cb)) in [".BCD)", "&BCDW"]) cb = CorruptedBits("ABCDE", p=0.2) -assert(sane(raw(cb)) == "ECk@Y") +assert(sane(raw(cb)) in ["ECk@Y", "QB.P."]) = RandEnumKeys ~ not_pypy random.seed(0x2807) rek = RandEnumKeys({'a': 1, 'b': 2, 'c': 3}, seed=0x2807) -assert(rek == 'b') +assert(rek == ('b' if six.PY2 else 'a')) = RandSingNum ~ not_pypy random.seed(0x2807) rs = RandSingNum(-28, 7) -assert(rs == 3) -assert(rs == -27) +assert(rs == (3 if six.PY2 else 2)) +assert(rs == (-27 if six.PY2 else -17)) = Rand* random.seed(0x2807) rss = RandSingString() -assert(rss == "CON:") +assert(rss == ("CON:" if six.PY2 else "foo.exe:")) random.seed(0x2807) rts = RandTermString(4, "scapy") -assert(sane(raw(rts)) == "...[scapy") +assert(sane(raw(rts)) in ["...[scapy", "......scapy"]) ############ @@ -8824,7 +8830,12 @@ def test_summary(): tr.summary() result_summary = cmco.get_output() assert(len(result_summary.split('\n')) == 10) - assert("IP / TCP 192.168.0.254:ftp_data > 192.168.0.1:http S / Raw ==> IP / ICMP 192.168.0.9 > 192.168.0.254 time-exceeded ttl-zero-during-transit / IPerror / TCPerror / Raw" in result_summary) + assert(any( + "IP / TCP 192.168.0.254:ftp_data > 192.168.0.1:%s S / Raw ==> " + "IP / ICMP 192.168.0.9 > 192.168.0.254 time-exceeded " + "ttl-zero-during-transit / IPerror / TCPerror / " + "Raw" % http in result_summary for http in ['http', 'www_http', 'www'] + )) test_summary() @@ -8882,7 +8893,8 @@ def test_IPID_count(): result_IPID_count = cmco.get_output() lines = result_IPID_count.split("\n") assert(len(lines) == 5) - assert(lines[0].endswith("Probably 3 classes: [4613, 53881, 58437]")) + assert(lines[0] in ["Probably 3 classes: [4613, 53881, 58437]", + "Probably 3 classes: [9103, 9227, 46399]"]) test_IPID_count()