diff --git a/scapy/arch/unix.py b/scapy/arch/unix.py index 949ff1eac..1449df373 100644 --- a/scapy/arch/unix.py +++ b/scapy/arch/unix.py @@ -12,10 +12,13 @@ from fcntl import ioctl from scapy.error import warning import scapy.config import scapy.utils -import scapy.utils6 +from scapy.utils6 import in6_getscope, construct_source_candidate_set +from scapy.utils6 import in6_isvalid, in6_ismlladdr, in6_ismnladdr import scapy.arch from scapy.config import conf +import socket + ################## ## Routes stuff ## @@ -136,7 +139,7 @@ def _in6_getifaddr(ifname): continue # Get the scope and keep the address - scope = scapy.utils6.in6_getscope(addr) + scope = in6_getscope(addr) ret.append((addr, scope, ifname)) return ret @@ -181,65 +184,116 @@ def in6_getifaddr(): ret += _in6_getifaddr(i) return ret + def read_routes6(): - f = os.popen("netstat -rn -f inet6") - ok = False + """Return a list of IPv6 routes than can be used by Scapy.""" + + # Call netstat to retrieve IPv6 routes + fd_netstat = os.popen("netstat -rn -f inet6") + + # List interfaces IPv6 addresses + lifaddr = in6_getifaddr() + if not lifaddr: + return [] + + # Routes header information + got_header = False mtu_present = False prio_present = False + + # Parse the routes routes = [] - lifaddr = in6_getifaddr() - for l in f.readlines(): - if not l: - break - l = l.strip() - if not ok: - if l.find("Destination") >= 0: - ok = 1 - mtu_present = l.find("Mtu") >= 0 - prio_present = l.find("Prio") >= 0 - continue - # gv 12/12/06: under debugging - if scapy.arch.NETBSD or scapy.arch.OPENBSD: - lspl = l.split() - d,nh,fl = lspl[:3] - dev = lspl[5+mtu_present+prio_present] - else: # FREEBSD or DARWIN - d,nh,fl,dev = l.split()[:4] - if filter(lambda x: x[2] == dev, lifaddr) == []: - continue - if 'L' in fl: # drop MAC addresses + for line in fd_netstat.readlines(): + + # Parse the routes header and try to identify extra columns + if not got_header: + if "Destination" == line[:11]: + got_header = True + mtu_present = "Mtu" in line + prio_present = "Prio" in line continue - if 'link' in nh: - nh = '::' - - cset = [] # candidate set (possible source addresses) - dp = 128 - if d == 'default': - d = '::' - dp = 0 - if '/' in d: - d,dp = d.split("/") - dp = int(dp) - if '%' in d: - d,dev = d.split('%') - if '%' in nh: - nh,dev = nh.split('%') - if scapy.arch.LOOPBACK_NAME in dev: - cset = ['::1'] - nh = '::' + # Parse a route entry according to the operating system + splitted_line = line.split() + if scapy.arch.OPENBSD or scapy.arch.NETBSD: + index = 5 + mtu_present + prio_present + if len(splitted_line) < index: + warning("Not enough columns in route entry !") + continue + destination, next_hop, flags = splitted_line[:3] + dev = splitted_line[index] else: + # FREEBSD or DARWIN + if len(splitted_line) < 4: + warning("Not enough columns in route entry !") + continue + destination, next_hop, flags, dev = splitted_line[:4] + + # Check flags + if not "U" in flags: # usable route + continue + if "R" in flags: # Host or net unrechable + continue + if "m" in flags: # multicast address + # Note: multicast routing is handled in Route6.route() + continue + + # Replace link with the default route in next_hop + if "link" in next_hop: + next_hop = "::" + + # Default prefix length + destination_plen = 128 + + # Extract network interface from the zone id + if '%' in destination: + destination, dev = destination.split('%') + if '/' in dev: + # Example: fe80::%lo0/64 ; dev = "lo0/64" + dev, destination_plen = dev.split('/') + if '%' in next_hop: + next_hop, dev = next_hop.split('%') + + # Ensure that the next hop is a valid IPv6 address + if not in6_isvalid(next_hop): + # Note: the 'Gateway' column might contain a MAC address + next_hop = "::" + + # Modify parsed routing entries + # Note: these rules are OS specific and may evolve over time + if destination == "default": + destination, destination_plen = "::", 0 + elif '/' in destination: + # Example: fe80::/10 + destination, destination_plen = destination.split('/') + if '/' in dev: + # Example: ff02::%lo0/32 ; dev = "lo0/32" + dev, destination_plen = dev.split('/') + + # Check route entries parameters consistency + if not in6_isvalid(destination): + warning("Invalid destination IPv6 address in route entry !") + continue + try: + destination_plen = int(destination_plen) + except: + warning("Invalid IPv6 prefix length in route entry !") + continue + if in6_ismlladdr(destination) or in6_ismnladdr(destination): + # Note: multicast routing is handled in Route6.route() + continue + + if scapy.arch.LOOPBACK_NAME in dev: + # Handle ::1 separately + cset = ["::1"] + next_hop = "::" + else: + # Get possible IPv6 source addresses devaddrs = filter(lambda x: x[2] == dev, lifaddr) - cset = scapy.utils6.construct_source_candidate_set(d, dp, devaddrs, scapy.arch.LOOPBACK_NAME) + cset = construct_source_candidate_set(destination, destination_plen, devaddrs, scapy.arch.LOOPBACK_NAME) - if len(cset) != 0: - routes.append((d, dp, nh, dev, cset)) + if len(cset): + routes.append((destination, destination_plen, next_hop, dev, cset)) - f.close() + fd_netstat.close() return routes - - - - - - diff --git a/scapy/utils6.py b/scapy/utils6.py index 8e08872a1..6a2ae83da 100644 --- a/scapy/utils6.py +++ b/scapy/utils6.py @@ -800,3 +800,13 @@ def in6_get_common_plen(a, b): if mbits != 8: return 8*i + mbits return 128 + +def in6_isvalid(address): + """Return True if 'address' is a valid IPv6 address string, False + otherwise.""" + + try: + socket.inet_pton(socket.AF_INET6, address) + return True + except: + return False diff --git a/test/regression.uts b/test/regression.uts index 430bc9851..762df277e 100644 --- a/test/regression.uts +++ b/test/regression.uts @@ -4542,3 +4542,306 @@ pkt = pkt.payload assert isinstance(pkt, UDP) and pkt.dport == 5353 pkt = pkt.payload assert isinstance(pkt, DNS) and isinstance(pkt.payload, NoPayload) + ++ Mocked read_routes6() calls + += Preliminary definitions + +import mock +import StringIO + +def valid_output_read_routes6(routes): + """"Return True if 'routes' contains correctly formatted entries, False otherwise""" + for destination, plen, next_hop, dev, cset in routes: + if not in6_isvalid(destination) or not type(plen) == int: + return False + if not in6_isvalid(next_hop) or not type(dev) == str: + return False + for address in cset: + if not in6_isvalid(address): + return False + return True + +def check_mandatory_ipv6_routes(routes6): + """Ensure that mandatory IPv6 routes are present""" + if len(filter(lambda r: r[0] == "::1" and r[-1] == ["::1"], routes6)) < 1: + return False + if len(filter(lambda r: r[0] == "fe80::" and r[1] == 64, routes6)) < 1: + return False + if len(filter(lambda r: in6_islladdr(r[0]) and r[1] == 128 and \ + r[-1] == ["::1"], routes6)) < 1: + return False + return True + + += Mac OS X 10.9.5 + +@mock.patch("scapy.arch.unix.in6_getifaddr") +@mock.patch("scapy.arch.unix.os") +def test_osx_10_9_5(mock_os, mock_in6_getifaddr): + """Test read_routes6() on OS X 10.9.5""" + # 'netstat -rn -f inet6' output + netstat_output = """ +Routing tables + +Internet6: +Destination Gateway Flags Netif Expire +::1 ::1 UHL lo0 +fe80::%lo0/64 fe80::1%lo0 UcI lo0 +fe80::1%lo0 link#1 UHLI lo0 +fe80::%en0/64 link#4 UCI en0 +fe80::ba26:6cff:fe5f:4eee%en0 b8:26:6c:5f:4e:ee UHLWIi en0 +fe80::bae8:56ff:fe45:8ce6%en0 b8:e8:56:45:8c:e6 UHLI lo0 +ff01::%lo0/32 ::1 UmCI lo0 +ff01::%en0/32 link#4 UmCI en0 +ff02::%lo0/32 ::1 UmCI lo0 +ff02::%en0/32 link#4 UmCI en0 +""" + # Mocked file descriptor + strio = StringIO.StringIO(netstat_output) + mock_os.popen = mock.MagicMock(return_value=strio) + # Mocked in6_getifaddr() output + mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0"), + ("fe80::ba26:6cff:fe5f:4eee", IPV6_ADDR_LINKLOCAL, "en0")] + # Test the function + from scapy.arch.unix import read_routes6 + routes = read_routes6() + for r in routes: + print r + assert(len(routes) == 6) + assert(check_mandatory_ipv6_routes(routes)) + +test_osx_10_9_5() + + += Mac OS X 10.9.5 with global IPv6 connectivity +@mock.patch("scapy.arch.unix.in6_getifaddr") +@mock.patch("scapy.arch.unix.os") +def test_osx_10_9_5_global(mock_os, mock_in6_getifaddr): + """Test read_routes6() on OS X 10.9.5 with an IPv6 connectivity""" + # 'netstat -rn -f inet6' output + netstat_output = """ +Routing tables + +Internet6: +Destination Gateway Flags Netif Expire +default fe80::ba26:8aff:fe5f:4eef%en0 UGc en0 +::1 ::1 UHL lo0 +2a01:ab09:7d:1f01::/64 link#4 UC en0 +2a01:ab09:7d:1f01:420:205c:9fab:5be7 b8:e9:55:44:7c:e5 UHL lo0 +2a01:ab09:7d:1f01:ba26:8aff:fe5f:4eef b8:26:8a:5f:4e:ef UHLWI en0 +2a01:ab09:7d:1f01:bae9:55ff:fe44:7ce5 b8:e9:55:44:7c:e5 UHL lo0 +fe80::%lo0/64 fe80::1%lo0 UcI lo0 +fe80::1%lo0 link#1 UHLI lo0 +fe80::%en0/64 link#4 UCI en0 +fe80::5664:d9ff:fe79:4e00%en0 54:64:d9:79:4e:0 UHLWI en0 +fe80::6ead:f8ff:fe74:945a%en0 6c:ad:f8:74:94:5a UHLWI en0 +fe80::a2f3:c1ff:fec4:5b50%en0 a0:f3:c1:c4:5b:50 UHLWI en0 +fe80::ba26:8aff:fe5f:4eef%en0 b8:26:8a:5f:4e:ef UHLWIir en0 +fe80::bae9:55ff:fe44:7ce5%en0 b8:e9:55:44:7c:e5 UHLI lo0 +ff01::%lo0/32 ::1 UmCI lo0 +ff01::%en0/32 link#4 UmCI en0 +ff02::%lo0/32 ::1 UmCI lo +""" + # Mocked file descriptor + strio = StringIO.StringIO(netstat_output) + mock_os.popen = mock.MagicMock(return_value=strio) + # Mocked in6_getifaddr() output + mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0"), + ("fe80::ba26:6cff:fe5f:4eee", IPV6_ADDR_LINKLOCAL, "en0")] + # Test the function + from scapy.arch.unix import read_routes6 + routes = read_routes6() + assert(valid_output_read_routes6(routes)) + for r in routes: + print r + assert(len(routes) == 11) + assert(check_mandatory_ipv6_routes(routes)) + +test_osx_10_9_5_global() + + += Mac OS X 10.10.4 + +@mock.patch("scapy.arch.unix.in6_getifaddr") +@mock.patch("scapy.arch.unix.os") +def test_osx_10_10_4(mock_os, mock_in6_getifaddr): + """Test read_routes6() on OS X 10.10.4""" + # 'netstat -rn -f inet6' output + netstat_output = """ +Routing tables + +Internet6: +Destination Gateway Flags Netif Expire +::1 ::1 UHL lo0 +fe80::%lo0/64 fe80::1%lo0 UcI lo0 +fe80::1%lo0 link#1 UHLI lo0 +fe80::%en0/64 link#4 UCI en0 +fe80::a00:27ff:fe9b:c965%en0 8:0:27:9b:c9:65 UHLI lo0 +ff01::%lo0/32 ::1 UmCI lo0 +ff01::%en0/32 link#4 UmCI en0 +ff02::%lo0/32 ::1 UmCI lo0 +ff02::%en0/32 link#4 UmCI en0 +""" + # Mocked file descriptor + strio = StringIO.StringIO(netstat_output) + mock_os.popen = mock.MagicMock(return_value=strio) + # Mocked in6_getifaddr() output + mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0"), + ("fe80::a00:27ff:fe9b:c965", IPV6_ADDR_LINKLOCAL, "en0")] + # Test the function + from scapy.arch.unix import read_routes6 + routes = read_routes6() + for r in routes: + print r + assert(len(routes) == 5) + assert(check_mandatory_ipv6_routes(routes)) + +test_osx_10_10_4() + + += FreeBSD 10.2 + +@mock.patch("scapy.arch.unix.in6_getifaddr") +@mock.patch("scapy.arch.unix.os") +def test_freebsd_10_2(mock_os, mock_in6_getifaddr): + """Test read_routes6() on FreeBSD 10.2""" + # 'netstat -rn -f inet6' output + netstat_output = """ +Routing tables + +Internet6: +Destination Gateway Flags Netif Expire +::/96 ::1 UGRS lo0 +::1 link#2 UH lo0 +::ffff:0.0.0.0/96 ::1 UGRS lo0 +fe80::/10 ::1 UGRS lo0 +fe80::%lo0/64 link#2 U lo0 +fe80::1%lo0 link#2 UHS lo0 +ff01::%lo0/32 ::1 U lo0 +ff02::/16 ::1 UGRS lo0 +ff02::%lo0/32 ::1 U lo0 +""" + # Mocked file descriptor + strio = StringIO.StringIO(netstat_output) + mock_os.popen = mock.MagicMock(return_value=strio) + # Mocked in6_getifaddr() output + mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0")] + # Test the function + from scapy.arch.unix import read_routes6 + routes = read_routes6() + for r in routes: + print r + assert(len(routes) == 3) + assert(check_mandatory_ipv6_routes(routes)) + +test_freebsd_10_2() + + += OpenBSD 5.5 + +@mock.patch("scapy.arch.OPENBSD") +@mock.patch("scapy.arch.unix.in6_getifaddr") +@mock.patch("scapy.arch.unix.os") +def test_openbsd_5_5(mock_os, mock_in6_getifaddr, mock_openbsd): + """Test read_routes6() on OpenBSD 5.5""" + # 'netstat -rn -f inet6' output + netstat_output = """ +Routing tables + +Internet6: +Destination Gateway Flags Refs Use Mtu Prio Iface +::/104 ::1 UGRS 0 0 - 8 lo0 +::/96 ::1 UGRS 0 0 - 8 lo0 +::1 ::1 UH 14 0 33144 4 lo0 +::127.0.0.0/104 ::1 UGRS 0 0 - 8 lo0 +::224.0.0.0/100 ::1 UGRS 0 0 - 8 lo0 +::255.0.0.0/104 ::1 UGRS 0 0 - 8 lo0 +::ffff:0.0.0.0/96 ::1 UGRS 0 0 - 8 lo0 +2002::/24 ::1 UGRS 0 0 - 8 lo0 +2002:7f00::/24 ::1 UGRS 0 0 - 8 lo0 +2002:e000::/20 ::1 UGRS 0 0 - 8 lo0 +2002:ff00::/24 ::1 UGRS 0 0 - 8 lo0 +fe80::/10 ::1 UGRS 0 0 - 8 lo0 +fe80::%em0/64 link#1 UC 0 0 - 4 em0 +fe80::a00:27ff:fe04:59bf%em0 08:00:27:04:59:bf UHL 0 0 - 4 lo0 +fe80::%lo0/64 fe80::1%lo0 U 0 0 - 4 lo0 +fe80::1%lo0 link#3 UHL 0 0 - 4 lo0 +fec0::/10 ::1 UGRS 0 0 - 8 lo0 +ff01::/16 ::1 UGRS 0 0 - 8 lo0 +ff01::%em0/32 link#1 UC 0 0 - 4 em0 +ff01::%lo0/32 fe80::1%lo0 UC 0 0 - 4 lo0 +ff02::/16 ::1 UGRS 0 0 - 8 lo0 +ff02::%em0/32 link#1 UC 0 0 - 4 em0 +ff02::%lo0/32 fe80::1%lo0 UC 0 0 - 4 lo0 +""" + # Mocked file descriptor + strio = StringIO.StringIO(netstat_output) + mock_os.popen = mock.MagicMock(return_value=strio) + + # Mocked in6_getifaddr() output + mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0"), + ("fe80::a00:27ff:fe04:59bf", IPV6_ADDR_LINKLOCAL, "em0")] + # Mocked OpenBSD parsing behavior + mock_openbsd = True + # Test the function + from scapy.arch.unix import read_routes6 + routes = read_routes6() + for r in routes: + print r + assert(len(routes) == 5) + assert(check_mandatory_ipv6_routes(routes)) + +test_openbsd_5_5() + + += NetBSD 7.0 + +@mock.patch("scapy.arch.NETBSD") +@mock.patch("scapy.arch.unix.in6_getifaddr") +@mock.patch("scapy.arch.unix.os") +def test_netbsd_7_0(mock_os, mock_in6_getifaddr, mock_netbsd): + """Test read_routes6() on NetBSD 7.0""" + # 'netstat -rn -f inet6' output + netstat_output = """ +Routing tables + +Internet6: +Destination Gateway Flags Refs Use Mtu Interface +::/104 ::1 UGRS - - - lo0 +::/96 ::1 UGRS - - - lo0 +::1 ::1 UH - - 33648 lo0 +::127.0.0.0/104 ::1 UGRS - - - lo0 +::224.0.0.0/100 ::1 UGRS - - - lo0 +::255.0.0.0/104 ::1 UGRS - - - lo0 +::ffff:0.0.0.0/96 ::1 UGRS - - - lo0 +2001:db8::/32 ::1 UGRS - - - lo0 +2002::/24 ::1 UGRS - - - lo0 +2002:7f00::/24 ::1 UGRS - - - lo0 +2002:e000::/20 ::1 UGRS - - - lo0 +2002:ff00::/24 ::1 UGRS - - - lo0 +fe80::/10 ::1 UGRS - - - lo0 +fe80::%wm0/64 link#1 UC - - - wm0 +fe80::acd1:3989:180e:fde0 08:00:27:a1:64:d8 UHL - - - lo0 +fe80::%lo0/64 fe80::1 U - - - lo0 +fe80::1 link#2 UHL - - - lo0 +ff01:1::/32 link#1 UC - - - wm0 +ff01:2::/32 ::1 UC - - - lo0 +ff02::%wm0/32 link#1 UC - - - wm0 +ff02::%lo0/32 ::1 UC - - - lo0 +""" + # Mocked file descriptor + strio = StringIO.StringIO(netstat_output) + mock_os.popen = mock.MagicMock(return_value=strio) + # Mocked in6_getifaddr() output + mock_in6_getifaddr.return_value = [("::1", IPV6_ADDR_LOOPBACK, "lo0"), + ("fe80::acd1:3989:180e:fde0", IPV6_ADDR_LINKLOCAL, "wm0")] + # Test the function + from scapy.arch.unix import read_routes6 + routes = read_routes6() + for r in routes: + print r + assert(len(routes) == 5) + assert(check_mandatory_ipv6_routes(routes)) + +test_netbsd_7_0()