[Console/Python 3] Remove pyreadline/gnureadline (#703)

* Fix warning texts not being printed

* Add legacy style + histfile

* Fix Doc, readme, appveyor

* Add packet fields auto-completion (thanks to @p-l-)

* Pickle, session, interact tests

* Remove trailing line
This commit is contained in:
gpotter2 2017-09-20 11:29:50 +02:00 committed by Guillaume Valadon
parent b0741b232d
commit 887a967643
12 changed files with 270 additions and 345 deletions

View File

@ -16,7 +16,7 @@ install:
- ps: .\.appveyor\InstallWindump.ps1
- choco install -y wireshark
# Install Python modules
- "%PYTHON%\\python -m pip install cryptography coverage mock pyreadline"
- "%PYTHON%\\python -m pip install cryptography coverage mock"
- set PATH="%PYTHON%\\Scripts\\;%PATH%"
test_script:

View File

@ -379,7 +379,6 @@ You need the following software packages in order to install Scapy on Windows:
* `Python <http://www.python.org>`_: `python-2.7.13.amd64.msi <https://www.python.org/ftp/python/2.7.13/python-2.7.13.amd64.msi>`_ (64bits) or `python-2.7.13.msi <https://www.python.org/ftp/python/2.7.13/python-2.7.13.msi>`_ (32bits). After installation, add the Python installation directory and its \Scripts subdirectory to your PATH. Depending on your Python version, the defaults would be ``C:\Python27`` and ``C:\Python27\Scripts`` respectively.
* `Npcap <https://nmap.org/npcap/>`_: `the latest version <https://nmap.org/npcap/#download>`_. Default values are recommanded. Scapy will also work with Winpcap.
* `pyreadline <https://pypi.python.org/pypi/pyreadline>`_: Depending on the installed version of Python `pyreadline-2.1.win-amd64.exe <https://pypi.python.org/packages/8b/13/bed49b87af0b4f345b4e54897b5ab6a4b848e4dd300ec4195a0016b8650c/pyreadline-2.1.win-amd64.exe>`_ (64bits) or `pyreadline-2.1.win32.exe <https://pypi.python.org/packages/bc/ca/316035ec616c08979bbed47fb25b843415cf2d118a2f95f55173334300a6/pyreadline-2.1.win32.exe>`_ (32bits)
* `Scapy <http://www.secdev.org/projects/scapy/>`_: `latest development version <https://github.com/secdev/scapy/archive/master.zip>`_ from the `Git repository <https://github.com/secdev/scapy>`_. Unzip the archive, open a command prompt in that directory and run "python setup.py install".
Just download the files and run the setup program. Choosing the default installation options should be safe.

View File

@ -761,36 +761,6 @@ def read_routes6():
warning("Error building scapy IPv6 routing table : %s" % str(e), True)
return routes6
if conf.interactive_shell != 'ipython' and conf.interactive:
try:
__IPYTHON__
except NameError:
def readLineScapy(prompt):
result = ""
end = False
while not end :
if not end and result != "":
line = readline.rl.readline("... ")
else:
line = readline.rl.readline(prompt)
if line.strip().endswith(":"):
end = False
elif result == "":
end = True
if line.strip() == "":
end = True
result = result + "\n" + line
return six.text_type(result)
try:
import readline
console = readline.GetOutputFile()
except (ImportError, AttributeError):
log_loading.info("Could not get readline console. Will not interpret ANSI color codes.")
else:
conf.readfunc = readLineScapy
orig_stdout = sys.stdout
sys.stdout = console
def get_working_if():
try:
# return the interface associated with the route with smallest

View File

@ -58,7 +58,7 @@ def autorun_commands(cmds, my_globals=None, ignore_globals=None, verb=0):
if cmd:
sys.stderr.write(sys.__dict__.get("ps2","... "))
else:
sys.stderr.write(str(sys.__dict__.get("ps1",ColorPrompt())))
sys.stderr.write(str(sys.__dict__.get("ps1", sys.ps1)))
l = cmds.pop()
print(l)

View File

@ -14,7 +14,7 @@ import os,time,socket,sys
from scapy import VERSION
from scapy.data import *
from scapy import base_classes
from scapy import themes
from scapy.themes import NoTheme, apply_ipython_color
from scapy.error import log_scapy
import scapy.modules.six as six
@ -326,26 +326,21 @@ def isCryptographyAdvanced():
else:
return True
def _prompt_changer(attr,val):
prompt = conf.prompt
def _prompt_changer(attr, val):
"""Change the current prompt theme"""
try:
ct = val
if isinstance(ct, themes.AnsiColorTheme) and ct.prompt(""):
## ^A and ^B delimit invisible characters for readline to count right.
## And we need ct.prompt() to do change something or else ^A and ^B will be
## displayed
prompt = "\001%s\002" % ct.prompt("\002"+prompt+"\001")
else:
prompt = ct.prompt(prompt)
sys.ps1 = val.prompt(conf.prompt)
except:
pass
sys.ps1 = prompt
try:
apply_ipython_color(get_ipython())
except NameError:
pass
class Conf(ConfClass):
"""This object contains the configuration of Scapy.
session : filename where the session will be saved
interactive_shell : If set to "ipython", use IPython as shell. Default: Python
interactive_shell : can be "ipython", "python" or "auto". Default: Auto
stealth : if 1, prevents any unwanted packet to go out (ARP, DNS, ...)
checkIPID: if 0, doesn't check that IPID matches between IP sent and ICMP IP citation received
if 1, checks that they either are equal or byte swapped equals (bug in some IP stacks)
@ -381,7 +376,6 @@ debug_tls:When 1, print some TLS session secrets when they are computed.
stealth = "not implemented"
iface = None
iface6 = None
readfunc = None
layers = LayersList()
commands = CommandsList()
logLevel = LogLevel()
@ -415,7 +409,7 @@ debug_tls:When 1, print some TLS session secrets when they are computed.
route6 = None # Filed by route6.py
auto_fragment = 1
debug_dissector = 0
color_theme = Interceptor("color_theme", themes.NoTheme(), _prompt_changer)
color_theme = Interceptor("color_theme", NoTheme(), _prompt_changer)
warning_threshold = 5
warning_next_only_once = False
prog = ProgPath()

View File

@ -50,7 +50,7 @@ class IPTools(object):
os.system("whois %s" % self.src)
def _ttl(self):
"""Returns ttl or hlim, depending on the IP version"""
return self.hlim if isinstance(self, IPv6) else self.ttl
return self.hlim if isinstance(self, scapy.layers.inet6.IPv6) else self.ttl
def ottl(self):
t = sorted([32,64,128,255]+[self._ttl()])
return t[t.index(self._ttl())+1]

View File

@ -10,27 +10,23 @@ Main module for interactive startup.
from __future__ import absolute_import
from __future__ import print_function
import atexit
import code
import getopt
import glob
import gzip
import sys, os, getopt, re, code
import gzip, glob
import importlib
import logging
import os
from random import choice
import re
import sys
import types
try:
import IPython # Allow testing
except:
pass
from scapy.config import conf
# Never add any global import, in main.py, that would trigger a warning messsage
# before the console handlers gets added in interact()
from scapy.error import log_interactive, log_loading, log_scapy, warning
import scapy.modules.six as six
from scapy.themes import DefaultTheme
from scapy import utils
from scapy.themes import DefaultTheme, apply_ipython_color
IGNORED = list(six.moves.builtins.__dict__)
@ -58,13 +54,37 @@ def _probe_config_file(cf):
else:
return cf_path
def _read_config_file(cf):
def _read_config_file(cf, _globals=globals(), _locals=locals(), interactive=True):
"""Read a config file: execute a python file while loading scapy, that may contain
some pre-configured values.
If _globals or _locals are specified, they will be updated with the loaded vars.
This allows an external program to use the function. Otherwise, vars are only available
from inside the scapy console.
params:
- _globals: the globals() vars
- _locals: the locals() vars
- interactive: specified whether or not errors should be printed using the scapy console or
raised.
ex, content of a config.py file:
'conf.verb = 42\n'
Manual loading:
>>> _read_config_file("./config.py"))
>>> conf.verb
42
"""
log_loading.debug("Loading config file [%s]" % cf)
try:
exec(compile(open(cf).read(), cf, 'exec'))
exec(compile(open(cf).read(), cf, 'exec'), _globals, _locals)
except IOError as e:
if interactive:
raise
log_loading.warning("Cannot read config file [%s] [%s]" % (cf,e))
except Exception as e:
if interactive:
raise
log_loading.exception("Error during evaluation of config file [%s]" % cf)
def _validate_local(x):
@ -182,25 +202,35 @@ def list_contrib(name=None):
def save_session(fname=None, session=None, pickleProto=-1):
"""Save current Scapy session to the file specified in the fname arg.
params:
- fname: file to save the scapy session in
- session: scapy session to use. If None, the console one will be used
- pickleProto: pickle proto version (default: -1 = latest)"""
from scapy import utils
if fname is None:
fname = conf.session
if not fname:
conf.session = fname = utils.get_temp_file(keep=True)
log_interactive.info("Use [%s] as session file" % fname)
log_interactive.info("Use [%s] as session file" % fname)
if session is None:
session = six.moves.builtins.__dict__["scapy_session"]
to_be_saved = session.copy()
if "__builtins__" in to_be_saved:
del(to_be_saved["__builtins__"])
for k in to_be_saved.keys():
if type(to_be_saved[k]) in [type, type, types.ModuleType]:
log_interactive.error("[%s] (%s) can't be saved." % (k, type(to_be_saved[k])))
del(to_be_saved[k])
i = to_be_saved[k]
if hasattr(i, "__module__") and (k[0] == "_" or i.__module__.startswith("IPython")):
del(to_be_saved[k])
elif isinstance(i, (type, type, types.ModuleType)):
if k[0] != "_":
log_interactive.error("[%s] (%s) can't be saved." % (k, type(to_be_saved[k])))
del(to_be_saved[k])
try:
os.rename(fname, fname+".bak")
except OSError:
@ -212,6 +242,11 @@ def save_session(fname=None, session=None, pickleProto=-1):
del f
def load_session(fname=None):
"""Load current Scapy session from the file specified in the fname arg.
This will erase any existing session.
params:
- fname: file to load the scapy session from"""
if fname is None:
fname = conf.session
try:
@ -226,9 +261,14 @@ def load_session(fname=None):
scapy_session = six.moves.builtins.__dict__["scapy_session"]
scapy_session.clear()
scapy_session.update(s)
log_loading.info("Loaded session [%s]" % conf.session)
log_loading.info("Loaded session [%s]" % fname)
def update_session(fname=None):
"""Update current Scapy session from the file specified in the fname arg.
params:
- fname: file to load the scapy session from"""
if fname is None:
fname = conf.session
try:
@ -247,9 +287,6 @@ def init_session(session_name, mydict=None):
GLOBKEYS.extend(scapy_builtins)
GLOBKEYS.append("scapy_session")
scapy_builtins=None # XXX replace with "with" statement
if mydict is not None:
six.moves.builtins.__dict__.update(mydict)
GLOBKEYS.extend(mydict)
if session_name:
try:
@ -280,6 +317,10 @@ def init_session(session_name, mydict=None):
six.moves.builtins.__dict__["scapy_session"] = SESSION
if mydict is not None:
six.moves.builtins.__dict__["scapy_session"].update(mydict)
GLOBKEYS.extend(mydict)
################
##### Main #####
################
@ -290,6 +331,7 @@ def scapy_delete_temp_files():
os.unlink(f)
except:
pass
del(conf.temp_files[:])
def _prepare_quote(quote, author, max_len=78):
"""This function processes a quote and returns a string that is ready
@ -314,32 +356,19 @@ to be used in the fancy prompt.
lines.append(' | %s-- %s' % (" " * (max_len - len(author) - 5), author))
return lines
def scapy_write_history_file(readline):
if conf.histfile:
try:
readline.write_history_file(conf.histfile)
except IOError as e:
try:
warning("Could not write history to [%s]\n\t (%s)" % (conf.histfile,e))
tmp = utils.get_temp_file(keep=True)
readline.write_history_file(tmp)
warning("Wrote history to [%s]" % tmp)
except:
warning("Could not write history to [%s]. Discarded" % tmp)
def interact(mydict=None,argv=None,mybanner=None,loglevel=20):
global SESSION
global GLOBKEYS
conf.interactive = True
if loglevel is not None:
conf.logLevel=loglevel
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter("%(levelname)s: %(message)s"))
log_scapy.addHandler(console_handler)
from scapy.config import conf
conf.color_theme = DefaultTheme()
conf.interactive = True
if loglevel is not None:
conf.logLevel = loglevel
STARTUP_FILE = DEFAULT_STARTUP_FILE
PRESTART_FILE = DEFAULT_PRESTART_FILE
@ -376,9 +405,9 @@ def interact(mydict=None,argv=None,mybanner=None,loglevel=20):
sys.exit(1)
if STARTUP_FILE:
_read_config_file(STARTUP_FILE)
_read_config_file(STARTUP_FILE, interactive=True)
if PRESTART_FILE:
_read_config_file(PRESTART_FILE)
_read_config_file(PRESTART_FILE, interactive=True)
init_session(session_name, mydict)
@ -434,92 +463,58 @@ def interact(mydict=None,argv=None,mybanner=None,loglevel=20):
if mybanner is not None:
the_banner += "\n"
the_banner += mybanner
try:
import readline, rlcompleter
except ImportError:
log_loading.info("Can't load Python libreadline or completer")
READLINE=0
else:
READLINE=1
class ScapyCompleter(rlcompleter.Completer):
def global_matches(self, text):
matches = []
n = len(text)
for lst in [dir(six.moves.builtins), SESSION]:
for word in lst:
if word[:n] == text and word != "__builtins__":
matches.append(word)
return matches
def attr_matches(self, text):
m = re.match(r"(\w+(\.\w+)*)\.(\w*)", text)
if not m:
return []
expr, attr = m.group(1, 3)
try:
object = eval(expr)
except:
try:
object = eval(expr, SESSION)
except (NameError, AttributeError):
return []
from scapy.packet import Packet, Packet_metaclass
if isinstance(object, Packet) or isinstance(object, Packet_metaclass):
words = [x for x in dir(object) if x[0] != "_"]
words += [x.name for x in object.fields_desc]
else:
words = dir(object)
if hasattr( object,"__class__" ):
words = words + rlcompleter.get_class_members(object.__class__)
matches = []
n = len(attr)
for word in words:
if word[:n] == attr and word != "__builtins__":
matches.append("%s.%s" % (expr, word))
return matches
readline.set_completer(ScapyCompleter().complete)
readline.parse_and_bind("C-o: operate-and-get-next")
readline.parse_and_bind("tab: complete")
if READLINE:
if conf.histfile:
try:
readline.read_history_file(conf.histfile)
except IOError:
pass
atexit.register(scapy_write_history_file,readline)
atexit.register(scapy_delete_temp_files)
IPYTHON=False
if conf.interactive_shell.lower() == "ipython":
if not conf.interactive_shell or conf.interactive_shell.lower() in ["ipython", "auto"]:
try:
import IPython
IPython
IPYTHON=True
except ImportError:
log_loading.warning("IPython not available. Using standard Python "
"shell instead.")
except NameError as e:
log_loading.warning("IPython not available. Using standard Python shell instead. "
"AutoCompletion, History are disabled.")
IPYTHON=False
init_session(session_name, mydict)
if IPYTHON:
banner = the_banner + " using IPython %s" % IPython.__version__
banner = the_banner + " using IPython %s\n" % IPython.__version__
from IPython.terminal.embed import InteractiveShellEmbed
from IPython.terminal.prompts import Prompts, Token
from IPython.utils.generics import complete_object
from traitlets.config.loader import Config
from scapy.packet import Packet
cfg = Config()
@complete_object.when_type(Packet)
def complete_packet(obj, prev_completions):
return prev_completions + [fld.name for fld in obj.fields_desc]
# Old way to embed IPython kept for backward compatibility
try:
args = [''] # IPython command line args (will be seen as sys.argv)
ipshell = IPython.Shell.IPShellEmbed(args, banner = banner)
ipshell(local_ns=SESSION)
except AttributeError:
pass
get_ipython
except NameError:
# Set "classic" prompt style when launched from run_scapy(.bat) files
class ClassicPrompt(Prompts):
def in_prompt_tokens(self, cli=None):
return [(Token.Prompt, '>>> '),]
def out_prompt_tokens(self):
return [(Token.OutPrompt, ''),]
cfg.TerminalInteractiveShell.prompts_class=ClassicPrompt # Set classic prompt style
apply_ipython_color(shell=cfg.TerminalInteractiveShell) # Register and apply scapy color style
cfg.TerminalInteractiveShell.confirm_exit = False # Remove confirm exit
cfg.TerminalInteractiveShell.separate_in = u'' # Remove spacing line
# In the IPython cookbook, see 'Updating-code-for-use-with-IPython-0.11-and-later'
IPython.embed(user_ns=SESSION, banner2=banner)
cfg.TerminalInteractiveShell.hist_file = conf.histfile
# configuration can thus be specified here.
ipshell = InteractiveShellEmbed(config=cfg,
banner1=banner,
hist_file=conf.histfile if conf.histfile else None,
user_ns=SESSION)
ipshell(local_ns=SESSION)
else:
code.interact(banner=the_banner, local=SESSION, readfunc=conf.readfunc)
code.interact(banner = the_banner, local=SESSION)
if conf.session:
save_session(conf.session, SESSION)

View File

@ -11,22 +11,37 @@ Color themes for the interactive console.
## Color themes ##
##################
class Color:
normal = "\033[0m"
black = "\033[30m"
red = "\033[31m"
green = "\033[32m"
yellow = "\033[33m"
blue = "\033[34m"
purple = "\033[35m"
cyan = "\033[36m"
grey = "\033[37m"
class ColorTable:
colors = { # Format: (ansi, pygments)
"normal": ("\033[0m", "noinherit"),
"black": ("\033[30m", "#ansiblack"),
"red": ("\033[31m", "#ansired"),
"green": ("\033[32m", "#ansigreen"),
"yellow": ("\033[33m", "#ansiyellow"),
"blue": ("\033[34m", "#ansiblue"),
"purple": ("\033[35m", "#ansipurple"),
"cyan": ("\033[36m", "#ansicyan"),
"grey": ("\033[37m", "#ansigrey"),
bold = "\033[1m"
uline = "\033[4m"
blink = "\033[5m"
invert = "\033[7m"
"bold": ("\033[1m", "bold"),
"uline": ("\033[4m", "underline"),
"blink": ("\033[5m", ""),
"invert": ("\033[7m", ""),
}
def __repr__(self):
return "<ColorTable>"
def __getattr__(self, attr):
return self.colors.get(attr, [""])[0]
def ansi_to_pygments(self, x): # Transform ansi encoded text to Pygments text
inv_map = {v[0]: v[1] for k, v in self.colors.items()}
for k, v in inv_map.items():
x = x.replace(k, " "+v)
return x.strip()
Color = ColorTable()
def create_styler(fmt=None, before="", after="", fmt2="%s"):
def do_style(val, fmt=fmt, before=before, after=after, fmt2=fmt2):
@ -41,7 +56,12 @@ def create_styler(fmt=None, before="", after="", fmt2="%s"):
class ColorTheme:
def __repr__(self):
return "<%s>" % self.__class__.__name__
def __reduce__(self):
return (self.__class__, (), ())
def __getattr__(self, attr):
if attr in ["__getstate__", "__setstate__", "__getinitargs__",
"__reduce_ex__"]:
raise AttributeError()
return create_styler()
@ -268,16 +288,14 @@ class HTMLTheme2(HTMLTheme):
style_right = "#[#span class=right#]#%s#[#/span#]#"
class ColorPrompt:
__prompt = ">>> "
def __str__(self):
try:
from scapy import config
ct = config.conf.color_theme
if isinstance(ct, AnsiColorTheme):
## ^A and ^B delimit invisible characters for readline to count right
return "\001%s\002" % ct.prompt("\002"+config.conf.prompt+"\001")
else:
return ct.prompt(config.conf.prompt)
except:
return self.__prompt
def apply_ipython_color(shell):
"""Updates the specified IPython console shell with
the conf.color_theme scapy theme."""
from IPython.terminal.prompts import Prompts, Token
shell.highlighting_style_overrides = {
Token.Prompt: Color.ansi_to_pygments(conf.color_theme.style_prompt),
}
try:
get_ipython().refresh_style()
except NameError:
pass

View File

@ -36,7 +36,7 @@ def get_temp_file(keep=False, autoext=""):
f = os.tempnam("","scapy")
if not keep:
conf.temp_files.append(f+autoext)
return f
return f + autoext
def sane_color(x):
r=""

View File

@ -91,66 +91,6 @@ ifIndex DestinationPrefix NextHop
test_read_routes6_windows()
############
############
+ Main.py emulator
= Prepare readline patching functions
from scapy.main import *
import scapy.config as conf
import sys
import mock
import readline
index = 0
@mock.patch("pyreadline.console.console.Console.size")
@mock.patch("scapy.config.conf.readfunc")
def emulate_main_input(data, mock_readfunc, mock_pyr_size):
# This fix when the windows doesn't have a size (run with a windows server)
mock_pyr_size.return_value = (300, 300)
global index
index = 0 # reset var
def readlineScapy(*args, **kargs):
global index
if len(data) == index:
r_data = "exit(1 if hasattr(sys, 'last_value') and sys.last_value is not None else 0)"
else:
r_data = data[index]
if r_data.startswith("#AUTOCOMPLETE"):
send_text = re.match(r'#AUTOCOMPLETE{(.*)}', r_data).group(1)
cmpl = readline.rl.get_completer()
r_data = cmpl(send_text, 0)
index +=1
print r_data
return r_data
mock_readfunc.side_effect = readlineScapy
sys.argv = ['']
sys.last_value = None
def console_exit(code):
raise SystemExit(code)
exit_code = -1
try:
interact(mydict={"exit": console_exit})
except SystemExit as e:
exit_code = str(e)
pass
assert exit_code == "0"
= Test basic running
data = ["IP()", "assert _.name == 'IP'"]
emulate_main_input(data)
= Test function parsing
data = ["def test():", " return True", "", "assert test() == True"]
emulate_main_input(data)
= Test auto-completion
from ctypes import wintypes
data = ["#AUTOCOMPLETE{scapy.config.conf.vers}", "assert _ == scapy.config.conf.version"]
emulate_main_input(data)
############
############
+ Windows arch unit tests

View File

@ -264,6 +264,8 @@ os.unlink("t2.pcap")
= Test InjectSink and Inject3Sink
~ needs_root
import mock
a = IP(dst="192.168.0.1")/ICMP()
msgs = []

View File

@ -104,100 +104,77 @@ else:
conf.route6.ifchange(LOOPBACK_NAME, "::1/128")
True
= UTscapy route check
* Check that UTscapy has correctly replaced the routes. Many tests won't work otherwise
if WINDOWS:
route_add_loopback()
IP().src
assert _ == "127.0.0.1"
############
############
+ Main.py tests
= Prepare emulator
import mock
from mock import Mock
= Pickle and unpickle a packet
if WINDOWS:
# This fix when the windows doesn't have a size (run with a windows server)
mock.patch("pyreadline.console.console.Console.size").return_value = (300, 300)
import scapy.modules.six as six
@mock.patch("scapy.config.conf.readfunc")
def emulate_main_input(data, mock_readfunc):
global index
index = 0 # reset var
def readlineScapy(*args, **kargs):
global index
if len(data) == index:
r_data = "exit(1 if hasattr(sys, 'last_value') and sys.last_value is not None else 0)"
else:
r_data = data[index]
index +=1
print r_data
return r_data
mock_readfunc.side_effect = readlineScapy
def reduce_mock(self):
return (Mock, ())
mock_readfunc.__reduce__ = reduce_mock
sys.argv = ['']
def console_exit(code):
raise SystemExit(code)
exit_code = -1
try:
interact(mydict={"exit": console_exit})
except SystemExit as e:
exit_code = str(e)
pass
assert exit_code == "0"
a = IP(dst="192.168.0.1")/UDP()
= Test basic save_session, load_session and update_session
b = six.moves.cPickle.dumps(a)
c = six.moves.cPickle.loads(b)
data = ["init_session(\"scapySession1\")",\
"test_value = \"8.8.8.8\"",\
"save_session()",\
"del test_value",\
"load_session()",\
"update_session()",\
"assert test_value == \"8.8.8.8\""]
assert c[IP].dst == "192.168.0.1"
assert str(c) == str(a)
emulate_main_input(data)
= Test save_session, load_session and update_session with fname
data = ["init_session(\"scapySession2\")",\
"test_value = 7",\
"save_session(fname=\"scapySaveSession.dat\")",\
"del test_value",\
"load_session(fname=\"scapySaveSession.dat\")",\
"update_session(fname=\"scapySaveSession.dat\")",\
"assert test_value == 7"]
emulate_main_input(data)
= Test pickling with packets
data = ["init_session(\"scapySession1\")",\
"test_value = IP(src=\"127.0.0.1\", dst=\"8.8.8.8\")",\
"test_value2 = ICMPv6EchoReply(data=\"testData@%!\")",\
"save_session()",\
"del test_value",\
"load_session()",\
"assert test_value.src == \"127.0.0.1\"",\
"assert test_value.dst == \"8.8.8.8\"",\
"assert test_value2.data == \"testData@%!\""]
emulate_main_input(data)
= Clean up session files
= Usage test
from scapy.main import _usage
try:
os.remove("scapySaveSession.dat")
except OSError:
_usage()
assert False
except SystemExit:
assert True
= Session test
# This is automatic when using the console
def get_var(var):
return six.moves.builtins.__dict__["scapy_session"][var]
def set_var(var, value):
six.moves.builtins.__dict__["scapy_session"][var] = value
def del_var(var):
del(six.moves.builtins.__dict__["scapy_session"][var])
init_session(None, {"init_value": 123})
set_var("test_value", "8.8.8.8") # test_value = "8.8.8.8"
save_session()
del_var("test_value")
load_session()
update_session()
assert get_var("test_value") == "8.8.8.8" #test_value == "8.8.8.8"
assert get_var("init_value") == 123
= Session test with fname
init_session("scapySession2")
set_var("test_value", IP(dst="192.168.0.1")) # test_value = IP(dst="192.168.0.1")
save_session(fname="scapySession1.dat")
del_var("test_value")
set_var("z", True) #z = True
load_session(fname="scapySession1.dat")
try:
get_var("z")
assert False
except:
pass
set_var("z", False) #z = False
update_session(fname="scapySession1.dat")
assert get_var("test_value").dst == "192.168.0.1" #test_value.dst == "192.168.0.1"
assert not get_var("z")
= Clear session files
os.remove("scapySession1.dat")
= Test temporary file creation
~ appveyor_only
@ -210,9 +187,32 @@ else:
assert("scapy" in tmpfile and (IS_PYPY == True or "/tmp/" in tmpfile))
assert(conf.temp_files[0].endswith(".ut"))
assert(conf.temp_files.pop())
scapy_delete_temp_files()
assert(len(conf.temp_files) == 0)
= Emulate interact()
import mock, sys
try:
import IPython
except:
code_interact_import = "scapy.main.code.interact"
else:
code_interact_import = "scapy.main.IPython.terminal.embed.InteractiveShellEmbed"
@mock.patch(code_interact_import)
def interact_emulator(code_int):
try:
code_int = lambda *args, **kwargs: lambda: None
interact(argv=["-s scapy1"], mybanner="What a test")
return True
except:
raise
return False
assert interact_emulator()
sys.ps1 = ">>> "
= Test sane function
sane("A\x00\xFFB") == "A..B"
@ -354,7 +354,7 @@ fd, fname = tempfile.mkstemp()
os.write(fd, "conf.verb = 42\n")
os.close(fd)
from scapy.main import _read_config_file
_read_config_file(fname)
_read_config_file(fname, globals(), locals())
assert(conf.verb == 42)
conf.verb = saved_conf_verb
@ -1413,10 +1413,11 @@ a=Ether(str(Ether()/IPv6()/TCP()))
a.type == 0x86dd
= IPv6 Class binding with GRE - build
str(IP()/GRE()/Ether()/IP()/GRE()/IPv6()) == b'E\x00\x00f\x00\x01\x00\x00@/|f\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x00eX\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00E\x00\x00@\x00\x01\x00\x00@/|\x8c\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x00\x86\xdd`\x00\x00\x00\x00\x00;@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01'
s = str(IP(src="127.0.0.1")/GRE()/Ether(dst="ff:ff:ff:ff:ff:ff", src="00:00:00:00:00:00")/IP()/GRE()/IPv6(src="::1"))
s == b'E\x00\x00f\x00\x01\x00\x00@/|f\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x00eX\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00E\x00\x00@\x00\x01\x00\x00@/|\x8c\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x00\x86\xdd`\x00\x00\x00\x00\x00;@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01'
= IPv6 Class binding with GRE - dissection
p = IP(str(IP()/GRE()/Ether()/IP()/GRE()/IPv6()))
p = IP(s)
GRE in p and p[GRE:1].proto == 0x6558 and p[GRE:2].proto == 0x86DD and IPv6 in p
@ -3156,16 +3157,22 @@ s == b"`\x00\x00\x00\x00\x18:\x01\xfe\x80\x00\x00\x00\x00\x00\x00\xba\xca:\xff\x
p = IPv6(s)
ICMPv6MLQuery in p and p[IPv6].dst == "ff02::1"
###########
###########
= UTscapy route check
* Check that UTscapy has correctly replaced the routes. Many tests won't work otherwise
# No more routing/networking tests after this line
if WINDOWS:
route_add_loopback()
IP().src
assert _ == "127.0.0.1"
############
############
+ Ether tests with IPv6
= Logic
# No more routing test after this line
if WINDOWS:
route_add_loopback()
= Ether IPv6 checking for dst
~ netaccess ipv6