scapy/test/tuntap.uts

254 lines
6.4 KiB
Plaintext

% tuntap tests for Scapy
# Packet capture-based tests are in sendsniff.uts
#######
+ Test Linux-specific protocol headers for TunTap
~ linux tun not_libpcap
= Linux-specific protocol headers
p = LinuxTunPacketInfo()/IP()
assert p.type == 2048
p = LinuxTunPacketInfo(raw(p))
assert p.type == 2048
assert isinstance(p.payload, IP)
p = LinuxTunPacketInfo()/IPv6()
assert p.type == 0x86dd
p = LinuxTunPacketInfo(raw(p))
assert p.type == 0x86dd
assert isinstance(p.payload, IPv6)
#######
+ Test tun device
~ tun needs_root not_libpcap
= Create a tun interface
import subprocess
from threading import Thread
tun0 = TunTapInterface("tun0", strip_packet_info=False)
if LINUX:
assert subprocess.check_call(["ip", "link", "set", "tun0", "up"]) == 0
assert subprocess.check_call([
"ip", "addr", "change",
"192.0.2.1", "peer", "192.0.2.2", "dev", "tun0"]) == 0
elif BSD:
assert subprocess.check_call(["ifconfig", "tun0", "up"]) == 0
assert subprocess.check_call([
"ifconfig", "tun0", "192.0.2.1", "192.0.2.2"]) == 0
else:
raise NotImplementedError()
conf.ifaces.reload()
conf.route.resync()
conf.route6.resync()
= Setup ICMPEcho_am on the interface
am = tun0.am(ICMPEcho_am, count=3)
am.defoptsniff['timeout'] = 5
t_am = Thread(target=am)
t_am.start()
= Send ping packets from OS into scapy
send(IP(dst="192.0.2.2")/ICMP(seq=(1,3)))
= Cleanup
t_am.join(timeout=3)
tun0.close()
#######
+ Test strip_packet_info=False on Linux
~ tun linux needs_root not_libpcap
= Create a tun interface
if not LINUX:
raise NotImplementedError()
import subprocess
tun0 = TunTapInterface("tun0", strip_packet_info=False)
assert subprocess.check_call(["ip", "link", "set", "tun0", "up"]) == 0
assert subprocess.check_call([
"ip", "addr", "change",
"192.0.2.1", "peer", "192.0.2.2", "dev", "tun0"]) == 0
conf.ifaces.reload()
conf.route.resync()
conf.route6.resync()
= Send ping packets from Linux into Scapy
def cb():
send(IP(dst="192.0.2.2")/ICMP(seq=(1,3)))
t = AsyncSniffer(opened_socket=tun0, lfilter=lambda x: ICMP in x, started_callback=cb, count=3)
t.start()
t.join(timeout=3)
assert len(t.results) >= 3
icmp4_sequences = set()
for pkt in t.results:
pkt
assert isinstance(pkt, LinuxTunPacketInfo)
if not isinstance(pkt.payload, IP) or ICMP not in pkt:
# We might get IPv6 router solicitation or other traffic...
continue
if pkt[IP].src != '192.0.2.1' or pkt[IP].dst != '192.0.2.2' or pkt[ICMP].type != 8:
continue
icmp4_sequences.add(pkt.seq)
# Expect to get 3 different ICMP sequence numbers
assert len(icmp4_sequences) == 3
= Delete the tun interface
tun0.close()
+ Test strip_packet_info=True and IPv6
~ tun needs_root ipv6 not_libpcap
= Create a tun interface with IPv4 + IPv6
import subprocess
tun0 = TunTapInterface("tun0", strip_packet_info=True)
if LINUX:
assert subprocess.check_call(["ip", "link", "set", "tun0", "up"]) == 0
assert subprocess.check_call([
"ip", "addr", "change",
"192.0.2.1", "peer", "192.0.2.2", "dev", "tun0"]) == 0
assert subprocess.check_call([
"ip", "-6", "addr", "add",
"2001:db8::1", "peer", "2001:db8::2", "dev", "tun0"]) == 0
elif BSD:
assert subprocess.check_call(["ifconfig", "tun0", "up"]) == 0
assert subprocess.check_call([
"ifconfig", "tun0", "192.0.2.1", "192.0.2.2"]) == 0
assert subprocess.check_call([
"ifconfig", "tun0", "inet6", "2001:db8::1/128", "2001:db8::2"]) == 0
else:
raise NotImplementedError()
conf.ifaces.reload()
conf.route.resync()
conf.route6.resync()
= Send ping packets from OS into Scapy
def cb():
send(IP(dst="192.0.2.2")/ICMP(seq=(1,3)))
send(IPv6(dst="2001:db8::2")/ICMPv6EchoRequest(seq=(1,3)))
t = AsyncSniffer(opened_socket=tun0, lfilter=lambda x: ICMP in x or ICMPv6EchoRequest in x, started_callback=cb, count=6)
t.start()
t.join(timeout=3)
assert len(t.results) >= 6
icmp4_sequences = set()
icmp6_sequences = set()
for pkt in t.results:
pkt
assert isinstance(pkt, (IP, IPv6))
if (isinstance(pkt, IP) and
pkt[IP].src == "192.0.2.1" and pkt[IP].dst == "192.0.2.2" and
ICMP in pkt and pkt[ICMP].type == 8):
icmp4_sequences.add(pkt[ICMP].seq)
if (isinstance(pkt, IPv6) and
pkt[IPv6].src == "2001:db8::1" and pkt[IPv6].dst == "2001:db8::2" and
ICMPv6EchoRequest in pkt):
icmp6_sequences.add(pkt[ICMPv6EchoRequest].seq)
# Expect to get 3 different ICMP sequence numbers
assert len(icmp4_sequences) == 3, (
"Expected 3 IPv4 ICMP ping packets, got: " + repr(icmp4_sequences))
assert len(icmp6_sequences) == 3, (
"Expected 3 IPv6 ICMP ping packets, got: " + repr(icmp6_sequences))
= Delete the tun interface
tun0.close()
+ Test tap interfaces
~ tap needs_root not_libpcap
= Create a tap interface with IPv4
import subprocess
tap0 = TunTapInterface("tap0")
if LINUX:
assert subprocess.check_call(["ip", "link", "set", "tap0", "up"]) == 0
assert subprocess.check_call([
"ip", "addr", "change", "192.0.2.1/30", "dev", "tap0"]) == 0
assert subprocess.check_call([
"ip", "neigh", "replace",
"192.0.2.2", "lladdr", "20:00:00:20:00:00", "dev", "tap0"]) == 0
else:
assert subprocess.check_call(["ifconfig", "tap0", "up"]) == 0
assert subprocess.check_call([
"ifconfig", "tap0", "192.0.2.1", "netmask", "255.255.255.252"]) == 0
assert subprocess.check_call([
"arp", "-s", "192.0.2.2", "20:00:00:20:00:00", "temp"]) == 0
conf.ifaces.reload()
conf.route.resync()
conf.route6.resync()
= Send ping packets from OS into Scapy
conf.ifaces
conf.route
def cb():
sendp(Ether(dst="ff:ff:ff:ff:ff:ff")/IP(dst="192.0.2.2")/ICMP(seq=(1,3)), iface="tap0")
t = AsyncSniffer(opened_socket=tap0, lfilter=lambda x: ICMP in x, started_callback=cb, count=3)
t.start()
t.join(timeout=3)
assert len(t.results) >= 3
icmp4_sequences = set()
for pkt in t.results:
pkt
assert isinstance(pkt, Ether)
if (IP in pkt and
pkt[IP].src == "192.0.2.1" and pkt[IP].dst == "192.0.2.2" and
ICMP in pkt and pkt[ICMP].type == 8):
icmp4_sequences.add(pkt[ICMP].seq)
# Expect to get 3 different ICMP sequence numbers
assert len(icmp4_sequences) == 3, (
"Expected 3 IPv4 ICMP ping packets, got: " + repr(icmp4_sequences))
= Delete the tap interface
tap0.close()
+ Refresh interfaces
~ linux tun tap not_libpcap
= Cleanup
conf.ifaces.reload()
conf.route.resync()
conf.route6.resync()