2017-01-13 07:25:04 +00:00
|
|
|
% Regression tests on Windows only for Scapy
|
|
|
|
|
|
|
|
# More informations at http://www.secdev.org/projects/UTscapy/
|
|
|
|
|
|
|
|
############
|
|
|
|
############
|
|
|
|
+ Mocked read_routes6() calls
|
|
|
|
|
|
|
|
= Windows with fake IPv6 adapter
|
|
|
|
|
|
|
|
import mock
|
|
|
|
|
|
|
|
def check_mandatory_ipv6_routes_windows(routes6):
|
|
|
|
"""Ensure that mandatory IPv6 routes are present. There is not always a loopback interface on windows !"""
|
|
|
|
if len(filter(lambda r: r[0] == "fe80::" and (r[1] == 64 or r[1] == 32), routes6)) < 1:
|
|
|
|
return False
|
|
|
|
if len(filter(lambda r: in6_islladdr(r[0]) and r[1] == 128, routes6)) < 1:
|
|
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
|
|
def dev_from_index_custom(if_index):
|
|
|
|
if_list = [{'mac': 'D0-50-99-56-DD-F9', 'win_index': '13', 'guid': '{C56DFFB3-992C-4964-B000-3E7C0F76E8BA}', 'name': 'Killer E2200 Gigabit Ethernet Controller', 'description': 'Ethernet'}, {'mac': '00-FF-0E-C7-25-37', 'win_index': '3', 'guid': '{0EC72537-B662-4F5D-B34E-48BFAE799BBE}', 'name': 'TAP-Windows Adapter V9', 'description': 'Ethernet 2'}]
|
|
|
|
values = {}
|
|
|
|
for i in if_list:
|
|
|
|
try:
|
|
|
|
interface = NetworkInterface(i)
|
|
|
|
values[interface.guid] = interface
|
|
|
|
except (KeyError, PcapNameNotFoundError):
|
|
|
|
pass
|
|
|
|
for devname, iface in values.items():
|
|
|
|
if iface.win_index == str(if_index):
|
|
|
|
return iface
|
|
|
|
raise ValueError("Unknown network interface index %r" % if_index)
|
|
|
|
|
|
|
|
@mock.patch("scapy.arch.windows.winpcapy_get_if_list")
|
|
|
|
@mock.patch("scapy.arch.windows.dev_from_index")
|
|
|
|
@mock.patch("scapy.arch.windows.sp.Popen")
|
|
|
|
def test_read_routes6_windows(mock_comm, mock_dev_from_index, mock_winpcapylist):
|
|
|
|
"""Test read_routes6() on Windows"""
|
|
|
|
# 'Get-NetRoute -AddressFamily IPV6 | select ifIndex, DestinationPrefix, NextHop'
|
|
|
|
get_net_route_output = """
|
|
|
|
ifIndex DestinationPrefix NextHop
|
|
|
|
------- ----------------- -------
|
|
|
|
3 ff00::/8 ::
|
|
|
|
16 ff00::/8 ::
|
|
|
|
13 ff00::/8 ::
|
|
|
|
1 ff00::/8 ::
|
|
|
|
13 fe80::dc1d:24e8:af00:125e/128 ::
|
|
|
|
3 fe80::9402:5804:cb16:fb3b/128 ::
|
|
|
|
16 fe80::100:7f:fffe/128 ::
|
|
|
|
3 fe80::/64 ::
|
|
|
|
16 fe80::/64 ::
|
|
|
|
13 fe80::/64 ::
|
|
|
|
13 2a01:e35:2f17:fe60:dc1d:24e8:af00:125e/128 ::
|
|
|
|
13 2a01:e35:2f17:fe60::/64 ::
|
|
|
|
1 ::1/128 ::
|
|
|
|
13 ::/0 fe80::224:d4ff:fea0:a6d7
|
|
|
|
"""
|
|
|
|
mock_comm.return_value.communicate.return_value = (get_net_route_output, "")
|
|
|
|
mock_winpcapylist.return_value = [u'\\Device\\NPF_{0EC72537-B662-4F5D-B34E-48BFAE799BBE}', u'\\Device\\NPF_{C56DFFB3-992C-4964-B000-3E7C0F76E8BA}']
|
|
|
|
# Mocked in6_getifaddr() output
|
|
|
|
mock_dev_from_index.side_effect = dev_from_index_custom
|
|
|
|
# Test the function
|
|
|
|
routes = read_routes6()
|
|
|
|
for r in routes:
|
|
|
|
print r
|
|
|
|
assert(len(routes) == 9)
|
|
|
|
assert(check_mandatory_ipv6_routes_windows(routes))
|
|
|
|
|
|
|
|
|
|
|
|
test_read_routes6_windows()
|
2017-01-15 17:32:00 +00:00
|
|
|
|
|
|
|
############
|
|
|
|
############
|
|
|
|
+ Main.py emulator
|
|
|
|
|
|
|
|
= Prepare readline patching functions
|
|
|
|
|
|
|
|
from scapy.main import *
|
|
|
|
import scapy.config as conf
|
|
|
|
import sys
|
|
|
|
|
|
|
|
import mock
|
|
|
|
import readline
|
|
|
|
from threading import Thread, Event
|
|
|
|
|
|
|
|
class sendTextAndTab(Thread):
|
|
|
|
"""Send text directly as Input"""
|
|
|
|
def __init__(self, event, text):
|
|
|
|
Thread.__init__(self)
|
|
|
|
self.stopped = event
|
|
|
|
self.send_text = text
|
|
|
|
def run(self):
|
|
|
|
import keyboard
|
|
|
|
time.sleep(1)
|
|
|
|
while not self.stopped.wait(0.5):
|
|
|
|
keyboard.write(self.send_text)
|
|
|
|
keyboard.send("tab")
|
|
|
|
keyboard.send("enter")
|
|
|
|
|
|
|
|
index = 0
|
|
|
|
@mock.patch("pyreadline.console.console.Console.size")
|
|
|
|
@mock.patch("scapy.config.conf.readfunc")
|
|
|
|
def emulate_main_input(data, mock_readfunc, mock_pyr_size):
|
|
|
|
# This fix when the windows doesn't have a size (run with a windows server)
|
|
|
|
mock_pyr_size.return_value = (300, 300)
|
|
|
|
global index
|
|
|
|
index = 0 # reset var
|
|
|
|
def readlineScapy(*args, **kargs):
|
|
|
|
global index
|
|
|
|
if len(data) == index:
|
|
|
|
r_data = "exit(1 if hasattr(sys, 'last_value') and sys.last_value is not None else 0)"
|
|
|
|
else:
|
|
|
|
r_data = data[index]
|
|
|
|
if r_data.startswith("#AUTOCOMPLETE"):
|
|
|
|
send_text = re.match(r'#AUTOCOMPLETE{(.*)}', r_data).group(1)
|
|
|
|
stopFlag = Event()
|
|
|
|
thread = sendTextAndTab(stopFlag, send_text)
|
|
|
|
thread.start()
|
|
|
|
# This will block the program until the thread has pushed the stuff
|
|
|
|
r_data = readline.rl.readline()
|
|
|
|
stopFlag.set()
|
|
|
|
index +=1
|
|
|
|
print r_data
|
|
|
|
return r_data
|
|
|
|
mock_readfunc.side_effect = readlineScapy
|
|
|
|
sys.argv = ['']
|
|
|
|
def console_exit(code):
|
|
|
|
raise SystemExit(code)
|
|
|
|
exit_code = -1
|
|
|
|
try:
|
|
|
|
interact(mydict={"exit": console_exit})
|
|
|
|
except SystemExit as e:
|
|
|
|
exit_code = str(e)
|
|
|
|
pass
|
|
|
|
assert exit_code == "0"
|
|
|
|
|
|
|
|
= Test basic running
|
|
|
|
data = ["IP()", "assert _.name == 'IP'"]
|
|
|
|
emulate_main_input(data)
|
|
|
|
|
|
|
|
= Test function parsing
|
|
|
|
data = ["def test():", " return True", "", "assert test() == True"]
|
|
|
|
emulate_main_input(data)
|
|
|
|
|
|
|
|
= Test auto-completion
|
|
|
|
from ctypes import wintypes
|
|
|
|
data = ["#AUTOCOMPLETE{scapy.config.conf.vers}", "assert _ == scapy.config.conf.version"]
|
|
|
|
emulate_main_input(data)
|