mirror of https://github.com/secdev/scapy.git
254 lines
6.4 KiB
Plaintext
254 lines
6.4 KiB
Plaintext
% tuntap tests for Scapy
|
|
|
|
# Packet capture-based tests are in sendsniff.uts
|
|
|
|
#######
|
|
+ Test Linux-specific protocol headers for TunTap
|
|
~ linux tun not_libpcap
|
|
|
|
= Linux-specific protocol headers
|
|
|
|
p = LinuxTunPacketInfo()/IP()
|
|
assert p.type == 2048
|
|
|
|
p = LinuxTunPacketInfo(raw(p))
|
|
assert p.type == 2048
|
|
assert isinstance(p.payload, IP)
|
|
|
|
p = LinuxTunPacketInfo()/IPv6()
|
|
assert p.type == 0x86dd
|
|
|
|
p = LinuxTunPacketInfo(raw(p))
|
|
assert p.type == 0x86dd
|
|
|
|
assert isinstance(p.payload, IPv6)
|
|
|
|
#######
|
|
+ Test tun device
|
|
|
|
~ tun needs_root not_libpcap
|
|
|
|
= Create a tun interface
|
|
|
|
import subprocess
|
|
from threading import Thread
|
|
|
|
tun0 = TunTapInterface("tun0", strip_packet_info=False)
|
|
|
|
if LINUX:
|
|
assert subprocess.check_call(["ip", "link", "set", "tun0", "up"]) == 0
|
|
assert subprocess.check_call([
|
|
"ip", "addr", "change",
|
|
"192.0.2.1", "peer", "192.0.2.2", "dev", "tun0"]) == 0
|
|
elif BSD:
|
|
assert subprocess.check_call(["ifconfig", "tun0", "up"]) == 0
|
|
assert subprocess.check_call([
|
|
"ifconfig", "tun0", "192.0.2.1", "192.0.2.2"]) == 0
|
|
else:
|
|
raise NotImplementedError()
|
|
|
|
conf.ifaces.reload()
|
|
conf.route.resync()
|
|
conf.route6.resync()
|
|
|
|
= Setup ICMPEcho_am on the interface
|
|
|
|
am = tun0.am(ICMPEcho_am, count=3)
|
|
am.defoptsniff['timeout'] = 5
|
|
t_am = Thread(target=am)
|
|
t_am.start()
|
|
|
|
= Send ping packets from OS into scapy
|
|
|
|
send(IP(dst="192.0.2.2")/ICMP(seq=(1,3)))
|
|
|
|
= Cleanup
|
|
|
|
t_am.join(timeout=3)
|
|
|
|
tun0.close()
|
|
|
|
#######
|
|
+ Test strip_packet_info=False on Linux
|
|
|
|
~ tun linux needs_root not_libpcap
|
|
|
|
= Create a tun interface
|
|
|
|
if not LINUX:
|
|
raise NotImplementedError()
|
|
|
|
import subprocess
|
|
|
|
tun0 = TunTapInterface("tun0", strip_packet_info=False)
|
|
|
|
assert subprocess.check_call(["ip", "link", "set", "tun0", "up"]) == 0
|
|
assert subprocess.check_call([
|
|
"ip", "addr", "change",
|
|
"192.0.2.1", "peer", "192.0.2.2", "dev", "tun0"]) == 0
|
|
|
|
conf.ifaces.reload()
|
|
conf.route.resync()
|
|
conf.route6.resync()
|
|
|
|
= Send ping packets from Linux into Scapy
|
|
|
|
def cb():
|
|
send(IP(dst="192.0.2.2")/ICMP(seq=(1,3)))
|
|
|
|
t = AsyncSniffer(opened_socket=tun0, lfilter=lambda x: ICMP in x, started_callback=cb, count=3)
|
|
t.start()
|
|
t.join(timeout=3)
|
|
|
|
assert len(t.results) >= 3
|
|
icmp4_sequences = set()
|
|
|
|
for pkt in t.results:
|
|
pkt
|
|
assert isinstance(pkt, LinuxTunPacketInfo)
|
|
if not isinstance(pkt.payload, IP) or ICMP not in pkt:
|
|
# We might get IPv6 router solicitation or other traffic...
|
|
continue
|
|
if pkt[IP].src != '192.0.2.1' or pkt[IP].dst != '192.0.2.2' or pkt[ICMP].type != 8:
|
|
continue
|
|
icmp4_sequences.add(pkt.seq)
|
|
|
|
# Expect to get 3 different ICMP sequence numbers
|
|
assert len(icmp4_sequences) == 3
|
|
|
|
= Delete the tun interface
|
|
tun0.close()
|
|
|
|
+ Test strip_packet_info=True and IPv6
|
|
|
|
~ tun needs_root ipv6 not_libpcap
|
|
|
|
= Create a tun interface with IPv4 + IPv6
|
|
|
|
import subprocess
|
|
|
|
tun0 = TunTapInterface("tun0", strip_packet_info=True)
|
|
|
|
if LINUX:
|
|
assert subprocess.check_call(["ip", "link", "set", "tun0", "up"]) == 0
|
|
assert subprocess.check_call([
|
|
"ip", "addr", "change",
|
|
"192.0.2.1", "peer", "192.0.2.2", "dev", "tun0"]) == 0
|
|
assert subprocess.check_call([
|
|
"ip", "-6", "addr", "add",
|
|
"2001:db8::1", "peer", "2001:db8::2", "dev", "tun0"]) == 0
|
|
elif BSD:
|
|
assert subprocess.check_call(["ifconfig", "tun0", "up"]) == 0
|
|
assert subprocess.check_call([
|
|
"ifconfig", "tun0", "192.0.2.1", "192.0.2.2"]) == 0
|
|
assert subprocess.check_call([
|
|
"ifconfig", "tun0", "inet6", "2001:db8::1/128", "2001:db8::2"]) == 0
|
|
else:
|
|
raise NotImplementedError()
|
|
|
|
conf.ifaces.reload()
|
|
conf.route.resync()
|
|
conf.route6.resync()
|
|
|
|
= Send ping packets from OS into Scapy
|
|
|
|
def cb():
|
|
send(IP(dst="192.0.2.2")/ICMP(seq=(1,3)))
|
|
send(IPv6(dst="2001:db8::2")/ICMPv6EchoRequest(seq=(1,3)))
|
|
|
|
t = AsyncSniffer(opened_socket=tun0, lfilter=lambda x: ICMP in x or ICMPv6EchoRequest in x, started_callback=cb, count=6)
|
|
t.start()
|
|
t.join(timeout=3)
|
|
|
|
assert len(t.results) >= 6
|
|
icmp4_sequences = set()
|
|
icmp6_sequences = set()
|
|
|
|
for pkt in t.results:
|
|
pkt
|
|
assert isinstance(pkt, (IP, IPv6))
|
|
if (isinstance(pkt, IP) and
|
|
pkt[IP].src == "192.0.2.1" and pkt[IP].dst == "192.0.2.2" and
|
|
ICMP in pkt and pkt[ICMP].type == 8):
|
|
icmp4_sequences.add(pkt[ICMP].seq)
|
|
if (isinstance(pkt, IPv6) and
|
|
pkt[IPv6].src == "2001:db8::1" and pkt[IPv6].dst == "2001:db8::2" and
|
|
ICMPv6EchoRequest in pkt):
|
|
icmp6_sequences.add(pkt[ICMPv6EchoRequest].seq)
|
|
|
|
# Expect to get 3 different ICMP sequence numbers
|
|
assert len(icmp4_sequences) == 3, (
|
|
"Expected 3 IPv4 ICMP ping packets, got: " + repr(icmp4_sequences))
|
|
assert len(icmp6_sequences) == 3, (
|
|
"Expected 3 IPv6 ICMP ping packets, got: " + repr(icmp6_sequences))
|
|
|
|
= Delete the tun interface
|
|
tun0.close()
|
|
|
|
+ Test tap interfaces
|
|
|
|
~ tap needs_root not_libpcap
|
|
|
|
= Create a tap interface with IPv4
|
|
|
|
import subprocess
|
|
|
|
tap0 = TunTapInterface("tap0")
|
|
|
|
if LINUX:
|
|
assert subprocess.check_call(["ip", "link", "set", "tap0", "up"]) == 0
|
|
assert subprocess.check_call([
|
|
"ip", "addr", "change", "192.0.2.1/30", "dev", "tap0"]) == 0
|
|
assert subprocess.check_call([
|
|
"ip", "neigh", "replace",
|
|
"192.0.2.2", "lladdr", "20:00:00:20:00:00", "dev", "tap0"]) == 0
|
|
else:
|
|
assert subprocess.check_call(["ifconfig", "tap0", "up"]) == 0
|
|
assert subprocess.check_call([
|
|
"ifconfig", "tap0", "192.0.2.1", "netmask", "255.255.255.252"]) == 0
|
|
assert subprocess.check_call([
|
|
"arp", "-s", "192.0.2.2", "20:00:00:20:00:00", "temp"]) == 0
|
|
|
|
conf.ifaces.reload()
|
|
conf.route.resync()
|
|
conf.route6.resync()
|
|
|
|
= Send ping packets from OS into Scapy
|
|
|
|
conf.ifaces
|
|
conf.route
|
|
|
|
def cb():
|
|
sendp(Ether(dst="ff:ff:ff:ff:ff:ff")/IP(dst="192.0.2.2")/ICMP(seq=(1,3)), iface="tap0")
|
|
|
|
t = AsyncSniffer(opened_socket=tap0, lfilter=lambda x: ICMP in x, started_callback=cb, count=3)
|
|
t.start()
|
|
t.join(timeout=3)
|
|
|
|
assert len(t.results) >= 3
|
|
icmp4_sequences = set()
|
|
|
|
for pkt in t.results:
|
|
pkt
|
|
assert isinstance(pkt, Ether)
|
|
if (IP in pkt and
|
|
pkt[IP].src == "192.0.2.1" and pkt[IP].dst == "192.0.2.2" and
|
|
ICMP in pkt and pkt[ICMP].type == 8):
|
|
icmp4_sequences.add(pkt[ICMP].seq)
|
|
|
|
# Expect to get 3 different ICMP sequence numbers
|
|
assert len(icmp4_sequences) == 3, (
|
|
"Expected 3 IPv4 ICMP ping packets, got: " + repr(icmp4_sequences))
|
|
|
|
= Delete the tap interface
|
|
tap0.close()
|
|
|
|
+ Refresh interfaces
|
|
~ linux tun tap not_libpcap
|
|
|
|
= Cleanup
|
|
|
|
conf.ifaces.reload()
|
|
conf.route.resync()
|
|
conf.route6.resync()
|