mirror of https://github.com/secdev/scapy.git
196 lines
5.8 KiB
Plaintext
196 lines
5.8 KiB
Plaintext
% Regression tests for Linux only
|
|
|
|
# More informations at http://www.secdev.org/projects/UTscapy/
|
|
|
|
|
|
############
|
|
############
|
|
|
|
+ Linux only test
|
|
|
|
= TCP client automaton
|
|
~ automaton netaccess linux needs_root
|
|
* This test retries on failure because it often fails
|
|
|
|
from __future__ import print_function
|
|
import os
|
|
import time
|
|
import signal
|
|
|
|
from scapy.modules.six.moves import range
|
|
|
|
def handler(signum, stack_frame):
|
|
raise Exception("Timer expired !")
|
|
|
|
tmp = signal.signal(signal.SIGALRM, handler)
|
|
|
|
SECDEV_IP4 = "203.178.141.194"
|
|
IPTABLE_RULE = "iptables -%c INPUT -s %s -p tcp --sport 80 -j DROP"
|
|
|
|
# Drop packets from SECDEV_IP4
|
|
assert(os.system(IPTABLE_RULE % ('A', SECDEV_IP4)) == 0)
|
|
|
|
success = False
|
|
for i in range(10):
|
|
tmp = signal.alarm(5)
|
|
try:
|
|
r, w = os.pipe()
|
|
t = TCP_client(SECDEV_IP4, 80, external_fd={ "tcp": (r,w) })
|
|
tmp = os.write(w, b"HEAD / HTTP/1.0\r\n\r\n")
|
|
t.runbg()
|
|
time.sleep(0.5)
|
|
response = os.read(r, 4096)
|
|
tmp = signal.alarm(0) # cancel the alarm
|
|
t.stop()
|
|
os.close(r)
|
|
os.close(w)
|
|
if response.startswith(b"HTTP/1.1 200 OK"):
|
|
success = True
|
|
break
|
|
else:
|
|
time.sleep(0.5)
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
# Remove the iptables rule
|
|
assert(os.system(IPTABLE_RULE % ('D', SECDEV_IP4)) == 0)
|
|
|
|
assert(success)
|
|
|
|
= L3RawSocket
|
|
~ netaccess IP ICMP linux needs_root
|
|
|
|
old_l3socket = conf.L3socket
|
|
old_debug_dissector = conf.debug_dissector
|
|
conf.debug_dissector = False
|
|
conf.L3socket = L3RawSocket
|
|
x = sr1(IP(dst="www.google.com")/ICMP(),timeout=3)
|
|
conf.debug_dissector = old_debug_dissector
|
|
conf.L3socket = old_l3socket
|
|
x
|
|
assert x[IP].ottl() in [32, 64, 128, 255]
|
|
assert 0 <= x[IP].hops() <= 126
|
|
x is not None and ICMP in x and x[ICMP].type == 0
|
|
|
|
# TODO: fix this test (randomly stuck)
|
|
# ex: https://travis-ci.org/secdev/scapy/jobs/247473497
|
|
|
|
#= Supersocket _flush_fd
|
|
#~ needs_root linux
|
|
#
|
|
#import select
|
|
#
|
|
#from scapy.arch.linux import _flush_fd
|
|
#socket = conf.L2listen()
|
|
#select.select([socket],[],[],2)
|
|
#_flush_fd(socket.ins)
|
|
|
|
= Test legacy attach_filter function
|
|
~ linux needs_root
|
|
from scapy.arch.common import get_bpf_pointer
|
|
|
|
old_pypy = conf.use_pypy
|
|
conf.use_pypy = True
|
|
|
|
tcpdump_lines = ['12\n', '40 0 0 12\n', '21 0 5 34525\n', '48 0 0 20\n', '21 6 0 6\n', '21 0 6 44\n', '48 0 0 54\n', '21 3 4 6\n', '21 0 3 2048\n', '48 0 0 23\n', '21 0 1 6\n', '6 0 0 1600\n', '6 0 0 0\n']
|
|
pointer = get_bpf_pointer(tcpdump_lines)
|
|
assert six.PY3 or isinstance(pointer, str)
|
|
assert six.PY3 or len(pointer) > 1
|
|
|
|
conf.use_pypy = old_pypy
|
|
|
|
= Interface aliases & sub-interfaces
|
|
~ linux needs_root
|
|
|
|
import os
|
|
exit_status = os.system("ip link add name scapy0 type dummy")
|
|
exit_status = os.system("ip addr add 192.0.2.1/24 dev scapy0")
|
|
exit_status = os.system("ip link set scapy0 up")
|
|
exit_status = os.system("ifconfig scapy0:0 inet 198.51.100.1/24 up")
|
|
exit_status = os.system("ip addr show scapy0")
|
|
print(get_if_list())
|
|
conf.route.resync()
|
|
print(conf.route.routes)
|
|
assert(conf.route.route("198.51.100.254") == ("scapy0", "198.51.100.1", "0.0.0.0"))
|
|
route_alias = (3325256704, 4294967040, "0.0.0.0", "scapy0", "198.51.100.1", 0)
|
|
assert(route_alias in conf.route.routes)
|
|
exit_status = os.system("ip link add link scapy0 name scapy0.42 type vlan id 42")
|
|
exit_status = os.system("ip addr add 203.0.113.42/24 dev scapy0.42")
|
|
exit_status = os.system("ip link set scapy0.42 up")
|
|
exit_status = os.system("ip route add 192.0.2.43/32 via 203.0.113.41")
|
|
print(get_if_list())
|
|
conf.route.resync()
|
|
print(conf.route.routes)
|
|
assert(conf.route.route("192.0.2.43") == ("scapy0.42", "203.0.113.42", "203.0.113.41"))
|
|
route_specific = (3221226027, 4294967295, "203.0.113.41", "scapy0.42", "203.0.113.42", 0)
|
|
assert(route_specific in conf.route.routes)
|
|
exit_status = os.system("ip link del name dev scapy0")
|
|
|
|
= catch looback device missing
|
|
~ linux needs_root
|
|
|
|
from mock import patch
|
|
|
|
# can't remove the lo device (or its address without causing trouble) - use some pseudo dummy instead
|
|
|
|
with patch('scapy.consts.LOOPBACK_NAME', 'scapy_lo_x'):
|
|
routes = read_routes()
|
|
|
|
= catch looback device no address assigned
|
|
~ linux needs_root
|
|
|
|
import os, socket
|
|
from mock import patch
|
|
|
|
exit_status = os.system("ip link add name scapy_lo type dummy")
|
|
assert(exit_status == 0)
|
|
exit_status = os.system("ip link set dev scapy_lo up")
|
|
assert(exit_status == 0)
|
|
|
|
with patch('scapy.consts.LOOPBACK_NAME', 'scapy_lo'):
|
|
routes = read_routes()
|
|
|
|
exit_status = os.system("ip addr add dev scapy_lo 10.10.0.1/24")
|
|
assert(exit_status == 0)
|
|
|
|
with patch('scapy.consts.LOOPBACK_NAME', 'scapy_lo'):
|
|
routes = read_routes()
|
|
|
|
got_lo_device = False
|
|
for route in routes:
|
|
dst_int, msk_int, gw_str, if_name, if_addr, metric = route
|
|
if if_name == 'scapy_lo':
|
|
got_lo_device = True
|
|
assert(if_addr == '10.10.0.1')
|
|
dst_addr = socket.inet_ntoa(struct.pack("!I", dst_int))
|
|
assert(dst_addr == '10.10.0.0')
|
|
msk = socket.inet_ntoa(struct.pack("!I", msk_int))
|
|
assert (msk == '255.255.255.0')
|
|
break
|
|
|
|
assert(got_lo_device)
|
|
|
|
exit_status = os.system("ip link del dev scapy_lo")
|
|
assert(exit_status == 0)
|
|
|
|
= IPv6 link-local address selection
|
|
|
|
from mock import patch
|
|
bck_conf_route6_routes = conf.route6.routes
|
|
conf.route6.routes = [('fe80::', 64, '::', 'scapy0', ['fe80::e039:91ff:fe79:1910'], 256)]
|
|
bck_conf_iface = conf.iface
|
|
conf.iface = "scapy0"
|
|
with patch("scapy.layers.l2.get_if_hwaddr") as mgih:
|
|
mgih.return_value = 'e2:39:91:79:19:10'
|
|
p = Ether()/IPv6(dst="ff02::1")/ICMPv6NIQueryName(data="ff02::1")
|
|
print(p.sprintf("%Ether.src% > %Ether.dst%\n%IPv6.src% > %IPv6.dst%"))
|
|
ip6_ll_address = 'fe80::e039:91ff:fe79:1910'
|
|
print(p[IPv6].src, ip6_ll_address)
|
|
assert p[IPv6].src == ip6_ll_address
|
|
mac_address = 'e2:39:91:79:19:10'
|
|
print(p[Ether].src, mac_address)
|
|
assert p[Ether].src == mac_address
|
|
|
|
conf.route6.routes = bck_conf_route6_routes
|
|
conf.iface = bck_conf_iface
|