Travis cleanup / TunTap fixes / Mypy 3.9 (#3182)

* Use 3.9 for Mypy

* Improve tuntap tests - Fix
This commit is contained in:
gpotter2 2021-04-24 14:23:39 +02:00 committed by GitHub
parent cadc0831f6
commit 11832deee0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 69 additions and 66 deletions

View File

@ -49,7 +49,7 @@ jobs:
- name: Setup Python
uses: actions/setup-python@v2
with:
python-version: 3.8
python-version: 3.9
- name: Install tox
run: pip install tox
- name: Run mypy

View File

@ -14,14 +14,6 @@ jobs:
env:
- TOXENV=py38-isotp_kernel_module,codecov
# warnings/deprecations
- os: linux
python: 3.8
env:
- SCAPY_PY_OPTS="-Werror -X tracemalloc" TOXENV=py38-linux_root
allow_failures:
- env: SCAPY_PY_OPTS="-Werror -X tracemalloc" TOXENV=py38-linux_root
install:
- bash .config/ci/install.sh
- python -c "from scapy.all import conf; print(repr(conf))"

View File

@ -13,7 +13,13 @@ import socket
from scapy.compat import orb
from scapy.config import conf, _set_conf_sockets
from scapy.consts import LINUX, SOLARIS, WINDOWS, BSD
from scapy.data import ARPHDR_ETHER, ARPHDR_LOOPBACK, IPV6_ADDR_GLOBAL
from scapy.data import (
ARPHDR_ETHER,
ARPHDR_LOOPBACK,
ARPHDR_PPP,
ARPHDR_TUN,
IPV6_ADDR_GLOBAL
)
from scapy.error import Scapy_Exception
from scapy.interfaces import NetworkInterface
from scapy.pton_ntop import inet_pton, inet_ntop
@ -72,7 +78,7 @@ def get_if_hwaddr(iff):
Returns the MAC (hardware) address of an interface
"""
addrfamily, mac = get_if_raw_hwaddr(iff) # type: ignore # noqa: F405
if addrfamily in [ARPHDR_ETHER, ARPHDR_LOOPBACK]:
if addrfamily in [ARPHDR_ETHER, ARPHDR_LOOPBACK, ARPHDR_PPP, ARPHDR_TUN]:
return str2mac(mac)
else:
raise Scapy_Exception("Unsupported address family (%i) for interface [%s]" % (addrfamily, iff)) # noqa: E501

View File

@ -83,7 +83,7 @@ def get_if_raw_hwaddr(iff, # type: Union[NetworkInterface, str]
from scapy.arch import SIOCGIFHWADDR # type: ignore
siocgifhwaddr = SIOCGIFHWADDR
return struct.unpack( # type: ignore
"16xh6s8x",
"16xH6s8x",
get_if(iff, siocgifhwaddr)
)

View File

@ -210,7 +210,7 @@ def get_alias_address(iface_name, # type: str
# Extract interfaces names
out = struct.unpack("iL", ifreq)[0]
names_b = names_ar.tobytes() if six.PY3 else names_ar.tostring()
names_b = names_ar.tobytes() if six.PY3 else names_ar.tostring() # type: ignore # noqa: E501
names = [names_b[i:i + offset].split(b'\0', 1)[0] for i in range(0, out, name_len)] # noqa: E501
# Look for the IP address

View File

@ -186,7 +186,7 @@ class TunTapInterface(SimpleSocket):
flags = LINUX_IFF_TAP | LINUX_IFF_NO_PI
tsetiff = raw(LinuxTunIfReq(
ifrn_name=bytes_encode(self.iface),
ifrn_name=self.iface,
ifru_flags=flags))
ioctl(sock, LINUX_TUNSETIFF, tsetiff)
@ -226,6 +226,7 @@ class TunTapInterface(SimpleSocket):
return r
def send(self, x):
# type: (Packet) -> int
if hasattr(x, "sent_time"):
x.sent_time = time.time()
@ -240,8 +241,9 @@ class TunTapInterface(SimpleSocket):
sx = raw(x)
try:
self.outs.write(sx)
r = self.outs.write(sx)
self.outs.flush()
return r
except socket.error:
log_runtime.error("%s send",
self.__class__.__name__, exc_info=True)

View File

@ -1079,7 +1079,7 @@ def main():
pass
if conf.use_pcap:
KW_KO.append("not_pcapdnet")
KW_KO.append("not_libpcap")
if VERB > 2:
print(" " + arrow + " libpcap mode")

View File

@ -1056,7 +1056,7 @@ def corrupt_bytes(data, p=0.01, n=None):
n = max(1, int(s_len * p))
for i in random.sample(range(s_len), n):
s[i] = (s[i] + random.randint(1, 255)) % 256
return s.tostring() if six.PY2 else s.tobytes()
return s.tostring() if six.PY2 else s.tobytes() # type: ignore
@conf.commands.register
@ -1072,7 +1072,7 @@ def corrupt_bits(data, p=0.01, n=None):
n = max(1, int(s_len * p))
for i in random.sample(range(s_len), n):
s[i // 8] ^= 1 << (i % 8)
return s.tostring() if six.PY2 else s.tobytes()
return s.tostring() if six.PY2 else s.tobytes() # type: ignore
#############################

View File

@ -114,7 +114,7 @@ else:
############
+ Test bridge_and_sniff() using tun sockets
~ tun not_pcapdnet
~ tun not_libpcap
= Create two tun interfaces

View File

@ -4,7 +4,7 @@
#######
+ Test Linux-specific protocol headers for TunTap
~ linux tun
~ linux tun not_libpcap
= Linux-specific protocol headers
@ -26,7 +26,7 @@ assert isinstance(p.payload, IPv6)
#######
+ Test tun device
~ tun netaccess
~ tun netaccess not_libpcap
= Create a tun interface
@ -47,32 +47,31 @@ elif BSD:
else:
raise NotImplementedError()
conf.ifaces.reload()
conf.route.resync()
conf.route6.resync()
= Setup ICMPEcho_am on the interface
am = tun0.am(ICMPEcho_am, count=3)
am.defoptsniff['timeout'] = 5
t_am = Thread(target=am)
t_am.start()
time.sleep(1)
= Send ping packets from OS into scapy
# ping returns non-zero exit code on 100% packet loss
assert subprocess.check_call(["ping", "-c3", "192.0.2.2"]) == 0
send(IP(dst="192.0.2.2")/ICMP(seq=(1,3)))
= Cleanup
t_am.join(timeout=3)
tun0.close()
if not conf.use_pypy:
# See https://pypy.readthedocs.io/en/latest/cpython_differences.html
del tun0
#######
+ Test strip_packet_info=False on Linux
~ tun linux netaccess
~ tun linux netaccess not_libpcap
= Create a tun interface
@ -88,17 +87,18 @@ assert subprocess.check_call([
"ip", "addr", "change",
"192.0.2.1", "peer", "192.0.2.2", "dev", "tun0"]) == 0
conf.ifaces.reload()
conf.route.resync()
conf.route6.resync()
= Send ping packets from Linux into Scapy
t = AsyncSniffer(opened_socket=tun0)
def cb():
send(IP(dst="192.0.2.2")/ICMP(seq=(1,3)))
t = AsyncSniffer(opened_socket=tun0, lfilter=lambda x: IP in x, started_callback=cb, count=3)
t.start()
# We expect this to return exit code 1, because there's nothing in Scapy that
# responds to these packets.
assert subprocess.call(["ping", "-c3", "192.0.2.2"]) == 1
time.sleep(1)
t.stop()
t.join(timeout=3)
assert len(t.results) >= 3
icmp4_sequences = set()
@ -118,13 +118,10 @@ assert len(icmp4_sequences) == 3
= Delete the tun interface
tun0.close()
if not conf.use_pypy:
# See https://pypy.readthedocs.io/en/latest/cpython_differences.html
del tun0
+ Test strip_packet_info=True and IPv6
~ tun netaccess ipv6
~ tun netaccess ipv6 not_libpcap
= Create a tun interface with IPv4 + IPv6
@ -149,19 +146,19 @@ elif BSD:
else:
raise NotImplementedError()
conf.ifaces.reload()
conf.route.resync()
conf.route6.resync()
= Send ping packets from OS into Scapy
t = AsyncSniffer(opened_socket=tun0)
def cb():
send(IP(dst="192.0.2.2")/ICMP(seq=(1,3)))
send(IPv6(dst="2001:db8::2")/ICMPv6EchoRequest(seq=(1,3)))
t = AsyncSniffer(opened_socket=tun0, lfilter=lambda x: ICMP in x or ICMPv6EchoRequest in x, started_callback=cb, count=6)
t.start()
# There's nothing in Scapy that responds, but we expect the packets to be sent
# successfully. Linux and BSD (incl. macOS) have different exit codes.
EXPECTED_EXIT = 1 if LINUX else 2
assert subprocess.call(["ping", "-c3", "192.0.2.2"]) == EXPECTED_EXIT
assert subprocess.call(["ping6", "-c3", "2001:db8::2"]) == EXPECTED_EXIT
time.sleep(1)
t.stop()
t.join(timeout=3)
assert len(t.results) >= 6
icmp4_sequences = set()
@ -187,13 +184,10 @@ assert len(icmp6_sequences) == 3, (
= Delete the tun interface
tun0.close()
if not conf.use_pypy:
# See https://pypy.readthedocs.io/en/latest/cpython_differences.html
del tun0
+ Test tap interfaces
~ tap netaccess
~ tap netaccess not_libpcap
= Create a tap interface with IPv4
@ -215,18 +209,21 @@ else:
assert subprocess.check_call([
"arp", "-s", "192.0.2.2", "20:00:00:20:00:00", "temp"]) == 0
conf.ifaces.reload()
conf.route.resync()
conf.route6.resync()
= Send ping packets from OS into Scapy
t = AsyncSniffer(opened_socket=tap0)
conf.ifaces
conf.route
def cb():
sendp(Ether(dst="ff:ff:ff:ff:ff:ff")/IP(dst="192.0.2.2")/ICMP(seq=(1,3)), iface="tap0")
t = AsyncSniffer(opened_socket=tap0, lfilter=lambda x: ICMP in x, started_callback=cb, count=3)
t.start()
# There's nothing in Scapy that responds, but we expect the packets to be sent
# successfully. Linux and BSD (incl. macOS) have different exit codes.
EXPECTED_EXIT = 1 if LINUX else 2
assert subprocess.call(["ping", "-c3", "192.0.2.2"]) == EXPECTED_EXIT
time.sleep(1)
t.stop()
t.join(timeout=3)
assert len(t.results) >= 3
icmp4_sequences = set()
@ -245,6 +242,12 @@ assert len(icmp4_sequences) == 3, (
= Delete the tap interface
tap0.close()
if not conf.use_pypy:
# See https://pypy.readthedocs.io/en/latest/cpython_differences.html
del tap0
+ Refresh interfaces
~ linux tun tap not_libpcap
= Cleanup
conf.ifaces.reload()
conf.route.resync()
conf.route6.resync()