diff --git a/.appveyor.yml b/.appveyor.yml index ffa3c7a8a..12d52b31e 100644 --- a/.appveyor.yml +++ b/.appveyor.yml @@ -16,7 +16,7 @@ install: - ps: .\.appveyor\InstallWindump.ps1 - choco install -y wireshark # Install Python modules - - "%PYTHON%\\python -m pip install cryptography coverage mock pyreadline" + - "%PYTHON%\\python -m pip install cryptography coverage mock" - set PATH="%PYTHON%\\Scripts\\;%PATH%" test_script: diff --git a/doc/scapy/installation.rst b/doc/scapy/installation.rst index 404288e3c..a34732599 100644 --- a/doc/scapy/installation.rst +++ b/doc/scapy/installation.rst @@ -379,7 +379,6 @@ You need the following software packages in order to install Scapy on Windows: * `Python `_: `python-2.7.13.amd64.msi `_ (64bits) or `python-2.7.13.msi `_ (32bits). After installation, add the Python installation directory and its \Scripts subdirectory to your PATH. Depending on your Python version, the defaults would be ``C:\Python27`` and ``C:\Python27\Scripts`` respectively. * `Npcap `_: `the latest version `_. Default values are recommanded. Scapy will also work with Winpcap. - * `pyreadline `_: Depending on the installed version of Python `pyreadline-2.1.win-amd64.exe `_ (64bits) or `pyreadline-2.1.win32.exe `_ (32bits) * `Scapy `_: `latest development version `_ from the `Git repository `_. Unzip the archive, open a command prompt in that directory and run "python setup.py install". Just download the files and run the setup program. Choosing the default installation options should be safe. diff --git a/scapy/arch/windows/__init__.py b/scapy/arch/windows/__init__.py index 4f4177171..12eef1145 100755 --- a/scapy/arch/windows/__init__.py +++ b/scapy/arch/windows/__init__.py @@ -761,36 +761,6 @@ def read_routes6(): warning("Error building scapy IPv6 routing table : %s" % str(e), True) return routes6 -if conf.interactive_shell != 'ipython' and conf.interactive: - try: - __IPYTHON__ - except NameError: - def readLineScapy(prompt): - result = "" - end = False - while not end : - if not end and result != "": - line = readline.rl.readline("... ") - else: - line = readline.rl.readline(prompt) - if line.strip().endswith(":"): - end = False - elif result == "": - end = True - if line.strip() == "": - end = True - result = result + "\n" + line - return six.text_type(result) - try: - import readline - console = readline.GetOutputFile() - except (ImportError, AttributeError): - log_loading.info("Could not get readline console. Will not interpret ANSI color codes.") - else: - conf.readfunc = readLineScapy - orig_stdout = sys.stdout - sys.stdout = console - def get_working_if(): try: # return the interface associated with the route with smallest diff --git a/scapy/autorun.py b/scapy/autorun.py index ec4a2356e..b20cde667 100644 --- a/scapy/autorun.py +++ b/scapy/autorun.py @@ -58,7 +58,7 @@ def autorun_commands(cmds, my_globals=None, ignore_globals=None, verb=0): if cmd: sys.stderr.write(sys.__dict__.get("ps2","... ")) else: - sys.stderr.write(str(sys.__dict__.get("ps1",ColorPrompt()))) + sys.stderr.write(str(sys.__dict__.get("ps1", sys.ps1))) l = cmds.pop() print(l) diff --git a/scapy/config.py b/scapy/config.py index 956d8b301..cc97c6dcf 100755 --- a/scapy/config.py +++ b/scapy/config.py @@ -14,7 +14,7 @@ import os,time,socket,sys from scapy import VERSION from scapy.data import * from scapy import base_classes -from scapy import themes +from scapy.themes import NoTheme, apply_ipython_color from scapy.error import log_scapy import scapy.modules.six as six @@ -326,26 +326,21 @@ def isCryptographyAdvanced(): else: return True - -def _prompt_changer(attr,val): - prompt = conf.prompt +def _prompt_changer(attr, val): + """Change the current prompt theme""" try: - ct = val - if isinstance(ct, themes.AnsiColorTheme) and ct.prompt(""): - ## ^A and ^B delimit invisible characters for readline to count right. - ## And we need ct.prompt() to do change something or else ^A and ^B will be - ## displayed - prompt = "\001%s\002" % ct.prompt("\002"+prompt+"\001") - else: - prompt = ct.prompt(prompt) + sys.ps1 = val.prompt(conf.prompt) except: pass - sys.ps1 = prompt + try: + apply_ipython_color(get_ipython()) + except NameError: + pass class Conf(ConfClass): """This object contains the configuration of Scapy. session : filename where the session will be saved -interactive_shell : If set to "ipython", use IPython as shell. Default: Python +interactive_shell : can be "ipython", "python" or "auto". Default: Auto stealth : if 1, prevents any unwanted packet to go out (ARP, DNS, ...) checkIPID: if 0, doesn't check that IPID matches between IP sent and ICMP IP citation received if 1, checks that they either are equal or byte swapped equals (bug in some IP stacks) @@ -381,7 +376,6 @@ debug_tls:When 1, print some TLS session secrets when they are computed. stealth = "not implemented" iface = None iface6 = None - readfunc = None layers = LayersList() commands = CommandsList() logLevel = LogLevel() @@ -415,7 +409,7 @@ debug_tls:When 1, print some TLS session secrets when they are computed. route6 = None # Filed by route6.py auto_fragment = 1 debug_dissector = 0 - color_theme = Interceptor("color_theme", themes.NoTheme(), _prompt_changer) + color_theme = Interceptor("color_theme", NoTheme(), _prompt_changer) warning_threshold = 5 warning_next_only_once = False prog = ProgPath() diff --git a/scapy/layers/inet.py b/scapy/layers/inet.py index 72eb5b136..83fa3bf38 100644 --- a/scapy/layers/inet.py +++ b/scapy/layers/inet.py @@ -50,7 +50,7 @@ class IPTools(object): os.system("whois %s" % self.src) def _ttl(self): """Returns ttl or hlim, depending on the IP version""" - return self.hlim if isinstance(self, IPv6) else self.ttl + return self.hlim if isinstance(self, scapy.layers.inet6.IPv6) else self.ttl def ottl(self): t = sorted([32,64,128,255]+[self._ttl()]) return t[t.index(self._ttl())+1] diff --git a/scapy/main.py b/scapy/main.py index 40cd4a60b..100fdcdc6 100644 --- a/scapy/main.py +++ b/scapy/main.py @@ -10,27 +10,23 @@ Main module for interactive startup. from __future__ import absolute_import from __future__ import print_function - -import atexit -import code -import getopt -import glob -import gzip +import sys, os, getopt, re, code +import gzip, glob import importlib import logging -import os from random import choice -import re -import sys import types +try: + import IPython # Allow testing +except: + pass -from scapy.config import conf +# Never add any global import, in main.py, that would trigger a warning messsage +# before the console handlers gets added in interact() from scapy.error import log_interactive, log_loading, log_scapy, warning import scapy.modules.six as six -from scapy.themes import DefaultTheme -from scapy import utils - +from scapy.themes import DefaultTheme, apply_ipython_color IGNORED = list(six.moves.builtins.__dict__) @@ -58,13 +54,37 @@ def _probe_config_file(cf): else: return cf_path -def _read_config_file(cf): +def _read_config_file(cf, _globals=globals(), _locals=locals(), interactive=True): + """Read a config file: execute a python file while loading scapy, that may contain + some pre-configured values. + + If _globals or _locals are specified, they will be updated with the loaded vars. + This allows an external program to use the function. Otherwise, vars are only available + from inside the scapy console. + + params: + - _globals: the globals() vars + - _locals: the locals() vars + - interactive: specified whether or not errors should be printed using the scapy console or + raised. + + ex, content of a config.py file: + 'conf.verb = 42\n' + Manual loading: + >>> _read_config_file("./config.py")) + >>> conf.verb + 42 + """ log_loading.debug("Loading config file [%s]" % cf) try: - exec(compile(open(cf).read(), cf, 'exec')) + exec(compile(open(cf).read(), cf, 'exec'), _globals, _locals) except IOError as e: + if interactive: + raise log_loading.warning("Cannot read config file [%s] [%s]" % (cf,e)) except Exception as e: + if interactive: + raise log_loading.exception("Error during evaluation of config file [%s]" % cf) def _validate_local(x): @@ -182,25 +202,35 @@ def list_contrib(name=None): def save_session(fname=None, session=None, pickleProto=-1): + """Save current Scapy session to the file specified in the fname arg. + + params: + - fname: file to save the scapy session in + - session: scapy session to use. If None, the console one will be used + - pickleProto: pickle proto version (default: -1 = latest)""" + from scapy import utils if fname is None: fname = conf.session if not fname: conf.session = fname = utils.get_temp_file(keep=True) - log_interactive.info("Use [%s] as session file" % fname) + log_interactive.info("Use [%s] as session file" % fname) + if session is None: session = six.moves.builtins.__dict__["scapy_session"] to_be_saved = session.copy() - if "__builtins__" in to_be_saved: del(to_be_saved["__builtins__"]) for k in to_be_saved.keys(): - if type(to_be_saved[k]) in [type, type, types.ModuleType]: - log_interactive.error("[%s] (%s) can't be saved." % (k, type(to_be_saved[k]))) - del(to_be_saved[k]) + i = to_be_saved[k] + if hasattr(i, "__module__") and (k[0] == "_" or i.__module__.startswith("IPython")): + del(to_be_saved[k]) + elif isinstance(i, (type, type, types.ModuleType)): + if k[0] != "_": + log_interactive.error("[%s] (%s) can't be saved." % (k, type(to_be_saved[k]))) + del(to_be_saved[k]) - try: os.rename(fname, fname+".bak") except OSError: @@ -212,6 +242,11 @@ def save_session(fname=None, session=None, pickleProto=-1): del f def load_session(fname=None): + """Load current Scapy session from the file specified in the fname arg. + This will erase any existing session. + + params: + - fname: file to load the scapy session from""" if fname is None: fname = conf.session try: @@ -226,9 +261,14 @@ def load_session(fname=None): scapy_session = six.moves.builtins.__dict__["scapy_session"] scapy_session.clear() scapy_session.update(s) - log_loading.info("Loaded session [%s]" % conf.session) + + log_loading.info("Loaded session [%s]" % fname) def update_session(fname=None): + """Update current Scapy session from the file specified in the fname arg. + + params: + - fname: file to load the scapy session from""" if fname is None: fname = conf.session try: @@ -247,9 +287,6 @@ def init_session(session_name, mydict=None): GLOBKEYS.extend(scapy_builtins) GLOBKEYS.append("scapy_session") scapy_builtins=None # XXX replace with "with" statement - if mydict is not None: - six.moves.builtins.__dict__.update(mydict) - GLOBKEYS.extend(mydict) if session_name: try: @@ -280,6 +317,10 @@ def init_session(session_name, mydict=None): six.moves.builtins.__dict__["scapy_session"] = SESSION + if mydict is not None: + six.moves.builtins.__dict__["scapy_session"].update(mydict) + GLOBKEYS.extend(mydict) + ################ ##### Main ##### ################ @@ -290,6 +331,7 @@ def scapy_delete_temp_files(): os.unlink(f) except: pass + del(conf.temp_files[:]) def _prepare_quote(quote, author, max_len=78): """This function processes a quote and returns a string that is ready @@ -314,32 +356,19 @@ to be used in the fancy prompt. lines.append(' | %s-- %s' % (" " * (max_len - len(author) - 5), author)) return lines -def scapy_write_history_file(readline): - if conf.histfile: - try: - readline.write_history_file(conf.histfile) - except IOError as e: - try: - warning("Could not write history to [%s]\n\t (%s)" % (conf.histfile,e)) - tmp = utils.get_temp_file(keep=True) - readline.write_history_file(tmp) - warning("Wrote history to [%s]" % tmp) - except: - warning("Could not write history to [%s]. Discarded" % tmp) - - def interact(mydict=None,argv=None,mybanner=None,loglevel=20): global SESSION global GLOBKEYS - conf.interactive = True - if loglevel is not None: - conf.logLevel=loglevel console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter("%(levelname)s: %(message)s")) log_scapy.addHandler(console_handler) + from scapy.config import conf conf.color_theme = DefaultTheme() + conf.interactive = True + if loglevel is not None: + conf.logLevel = loglevel STARTUP_FILE = DEFAULT_STARTUP_FILE PRESTART_FILE = DEFAULT_PRESTART_FILE @@ -376,9 +405,9 @@ def interact(mydict=None,argv=None,mybanner=None,loglevel=20): sys.exit(1) if STARTUP_FILE: - _read_config_file(STARTUP_FILE) + _read_config_file(STARTUP_FILE, interactive=True) if PRESTART_FILE: - _read_config_file(PRESTART_FILE) + _read_config_file(PRESTART_FILE, interactive=True) init_session(session_name, mydict) @@ -434,92 +463,58 @@ def interact(mydict=None,argv=None,mybanner=None,loglevel=20): if mybanner is not None: the_banner += "\n" the_banner += mybanner - - try: - import readline, rlcompleter - except ImportError: - log_loading.info("Can't load Python libreadline or completer") - READLINE=0 - else: - READLINE=1 - class ScapyCompleter(rlcompleter.Completer): - def global_matches(self, text): - matches = [] - n = len(text) - for lst in [dir(six.moves.builtins), SESSION]: - for word in lst: - if word[:n] == text and word != "__builtins__": - matches.append(word) - return matches - - - def attr_matches(self, text): - m = re.match(r"(\w+(\.\w+)*)\.(\w*)", text) - if not m: - return [] - expr, attr = m.group(1, 3) - try: - object = eval(expr) - except: - try: - object = eval(expr, SESSION) - except (NameError, AttributeError): - return [] - from scapy.packet import Packet, Packet_metaclass - if isinstance(object, Packet) or isinstance(object, Packet_metaclass): - words = [x for x in dir(object) if x[0] != "_"] - words += [x.name for x in object.fields_desc] - else: - words = dir(object) - if hasattr( object,"__class__" ): - words = words + rlcompleter.get_class_members(object.__class__) - matches = [] - n = len(attr) - for word in words: - if word[:n] == attr and word != "__builtins__": - matches.append("%s.%s" % (expr, word)) - return matches - - readline.set_completer(ScapyCompleter().complete) - readline.parse_and_bind("C-o: operate-and-get-next") - readline.parse_and_bind("tab: complete") - - if READLINE: - if conf.histfile: - try: - readline.read_history_file(conf.histfile) - except IOError: - pass - atexit.register(scapy_write_history_file,readline) - - atexit.register(scapy_delete_temp_files) IPYTHON=False - if conf.interactive_shell.lower() == "ipython": + if not conf.interactive_shell or conf.interactive_shell.lower() in ["ipython", "auto"]: try: - import IPython + IPython IPYTHON=True - except ImportError: - log_loading.warning("IPython not available. Using standard Python " - "shell instead.") + except NameError as e: + log_loading.warning("IPython not available. Using standard Python shell instead. " + "AutoCompletion, History are disabled.") IPYTHON=False + init_session(session_name, mydict) + if IPYTHON: - banner = the_banner + " using IPython %s" % IPython.__version__ + banner = the_banner + " using IPython %s\n" % IPython.__version__ + from IPython.terminal.embed import InteractiveShellEmbed + from IPython.terminal.prompts import Prompts, Token + from IPython.utils.generics import complete_object + from traitlets.config.loader import Config + from scapy.packet import Packet + + cfg = Config() + + @complete_object.when_type(Packet) + def complete_packet(obj, prev_completions): + return prev_completions + [fld.name for fld in obj.fields_desc] - # Old way to embed IPython kept for backward compatibility try: - args = [''] # IPython command line args (will be seen as sys.argv) - ipshell = IPython.Shell.IPShellEmbed(args, banner = banner) - ipshell(local_ns=SESSION) - except AttributeError: - pass + get_ipython + except NameError: + # Set "classic" prompt style when launched from run_scapy(.bat) files + class ClassicPrompt(Prompts): + def in_prompt_tokens(self, cli=None): + return [(Token.Prompt, '>>> '),] + def out_prompt_tokens(self): + return [(Token.OutPrompt, ''),] + cfg.TerminalInteractiveShell.prompts_class=ClassicPrompt # Set classic prompt style + apply_ipython_color(shell=cfg.TerminalInteractiveShell) # Register and apply scapy color style + cfg.TerminalInteractiveShell.confirm_exit = False # Remove confirm exit + cfg.TerminalInteractiveShell.separate_in = u'' # Remove spacing line - # In the IPython cookbook, see 'Updating-code-for-use-with-IPython-0.11-and-later' - IPython.embed(user_ns=SESSION, banner2=banner) + cfg.TerminalInteractiveShell.hist_file = conf.histfile + + # configuration can thus be specified here. + ipshell = InteractiveShellEmbed(config=cfg, + banner1=banner, + hist_file=conf.histfile if conf.histfile else None, + user_ns=SESSION) + ipshell(local_ns=SESSION) else: - code.interact(banner=the_banner, local=SESSION, readfunc=conf.readfunc) + code.interact(banner = the_banner, local=SESSION) if conf.session: save_session(conf.session, SESSION) diff --git a/scapy/themes.py b/scapy/themes.py index 033d0ca14..f613c6953 100644 --- a/scapy/themes.py +++ b/scapy/themes.py @@ -11,22 +11,37 @@ Color themes for the interactive console. ## Color themes ## ################## -class Color: - normal = "\033[0m" - black = "\033[30m" - red = "\033[31m" - green = "\033[32m" - yellow = "\033[33m" - blue = "\033[34m" - purple = "\033[35m" - cyan = "\033[36m" - grey = "\033[37m" +class ColorTable: + colors = { # Format: (ansi, pygments) + "normal": ("\033[0m", "noinherit"), + "black": ("\033[30m", "#ansiblack"), + "red": ("\033[31m", "#ansired"), + "green": ("\033[32m", "#ansigreen"), + "yellow": ("\033[33m", "#ansiyellow"), + "blue": ("\033[34m", "#ansiblue"), + "purple": ("\033[35m", "#ansipurple"), + "cyan": ("\033[36m", "#ansicyan"), + "grey": ("\033[37m", "#ansigrey"), - bold = "\033[1m" - uline = "\033[4m" - blink = "\033[5m" - invert = "\033[7m" + "bold": ("\033[1m", "bold"), + "uline": ("\033[4m", "underline"), + "blink": ("\033[5m", ""), + "invert": ("\033[7m", ""), + } + + def __repr__(self): + return "" + + def __getattr__(self, attr): + return self.colors.get(attr, [""])[0] + + def ansi_to_pygments(self, x): # Transform ansi encoded text to Pygments text + inv_map = {v[0]: v[1] for k, v in self.colors.items()} + for k, v in inv_map.items(): + x = x.replace(k, " "+v) + return x.strip() +Color = ColorTable() def create_styler(fmt=None, before="", after="", fmt2="%s"): def do_style(val, fmt=fmt, before=before, after=after, fmt2=fmt2): @@ -41,7 +56,12 @@ def create_styler(fmt=None, before="", after="", fmt2="%s"): class ColorTheme: def __repr__(self): return "<%s>" % self.__class__.__name__ + def __reduce__(self): + return (self.__class__, (), ()) def __getattr__(self, attr): + if attr in ["__getstate__", "__setstate__", "__getinitargs__", + "__reduce_ex__"]: + raise AttributeError() return create_styler() @@ -268,16 +288,14 @@ class HTMLTheme2(HTMLTheme): style_right = "#[#span class=right#]#%s#[#/span#]#" -class ColorPrompt: - __prompt = ">>> " - def __str__(self): - try: - from scapy import config - ct = config.conf.color_theme - if isinstance(ct, AnsiColorTheme): - ## ^A and ^B delimit invisible characters for readline to count right - return "\001%s\002" % ct.prompt("\002"+config.conf.prompt+"\001") - else: - return ct.prompt(config.conf.prompt) - except: - return self.__prompt +def apply_ipython_color(shell): + """Updates the specified IPython console shell with + the conf.color_theme scapy theme.""" + from IPython.terminal.prompts import Prompts, Token + shell.highlighting_style_overrides = { + Token.Prompt: Color.ansi_to_pygments(conf.color_theme.style_prompt), + } + try: + get_ipython().refresh_style() + except NameError: + pass diff --git a/scapy/utils.py b/scapy/utils.py index 2dbc038d8..5c5ab1774 100644 --- a/scapy/utils.py +++ b/scapy/utils.py @@ -36,7 +36,7 @@ def get_temp_file(keep=False, autoext=""): f = os.tempnam("","scapy") if not keep: conf.temp_files.append(f+autoext) - return f + return f + autoext def sane_color(x): r="" diff --git a/test/mock_windows.uts b/test/mock_windows.uts index 16eb480bb..69e494260 100644 --- a/test/mock_windows.uts +++ b/test/mock_windows.uts @@ -91,66 +91,6 @@ ifIndex DestinationPrefix NextHop test_read_routes6_windows() -############ -############ -+ Main.py emulator - -= Prepare readline patching functions - -from scapy.main import * -import scapy.config as conf -import sys - -import mock -import readline - -index = 0 -@mock.patch("pyreadline.console.console.Console.size") -@mock.patch("scapy.config.conf.readfunc") -def emulate_main_input(data, mock_readfunc, mock_pyr_size): - # This fix when the windows doesn't have a size (run with a windows server) - mock_pyr_size.return_value = (300, 300) - global index - index = 0 # reset var - def readlineScapy(*args, **kargs): - global index - if len(data) == index: - r_data = "exit(1 if hasattr(sys, 'last_value') and sys.last_value is not None else 0)" - else: - r_data = data[index] - if r_data.startswith("#AUTOCOMPLETE"): - send_text = re.match(r'#AUTOCOMPLETE{(.*)}', r_data).group(1) - cmpl = readline.rl.get_completer() - r_data = cmpl(send_text, 0) - index +=1 - print r_data - return r_data - mock_readfunc.side_effect = readlineScapy - sys.argv = [''] - sys.last_value = None - def console_exit(code): - raise SystemExit(code) - exit_code = -1 - try: - interact(mydict={"exit": console_exit}) - except SystemExit as e: - exit_code = str(e) - pass - assert exit_code == "0" - -= Test basic running -data = ["IP()", "assert _.name == 'IP'"] -emulate_main_input(data) - -= Test function parsing -data = ["def test():", " return True", "", "assert test() == True"] -emulate_main_input(data) - -= Test auto-completion -from ctypes import wintypes -data = ["#AUTOCOMPLETE{scapy.config.conf.vers}", "assert _ == scapy.config.conf.version"] -emulate_main_input(data) - ############ ############ + Windows arch unit tests diff --git a/test/pipetool.uts b/test/pipetool.uts index e371491ef..df106625f 100644 --- a/test/pipetool.uts +++ b/test/pipetool.uts @@ -264,6 +264,8 @@ os.unlink("t2.pcap") = Test InjectSink and Inject3Sink ~ needs_root +import mock + a = IP(dst="192.168.0.1")/ICMP() msgs = [] diff --git a/test/regression.uts b/test/regression.uts index 05787621c..b801e8da8 100644 --- a/test/regression.uts +++ b/test/regression.uts @@ -104,100 +104,77 @@ else: conf.route6.ifchange(LOOPBACK_NAME, "::1/128") True -= UTscapy route check -* Check that UTscapy has correctly replaced the routes. Many tests won't work otherwise - -if WINDOWS: - route_add_loopback() - -IP().src -assert _ == "127.0.0.1" ############ ############ + Main.py tests -= Prepare emulator -import mock -from mock import Mock += Pickle and unpickle a packet -if WINDOWS: - # This fix when the windows doesn't have a size (run with a windows server) - mock.patch("pyreadline.console.console.Console.size").return_value = (300, 300) +import scapy.modules.six as six -@mock.patch("scapy.config.conf.readfunc") -def emulate_main_input(data, mock_readfunc): - global index - index = 0 # reset var - def readlineScapy(*args, **kargs): - global index - if len(data) == index: - r_data = "exit(1 if hasattr(sys, 'last_value') and sys.last_value is not None else 0)" - else: - r_data = data[index] - index +=1 - print r_data - return r_data - mock_readfunc.side_effect = readlineScapy - def reduce_mock(self): - return (Mock, ()) - mock_readfunc.__reduce__ = reduce_mock - sys.argv = [''] - def console_exit(code): - raise SystemExit(code) - exit_code = -1 - try: - interact(mydict={"exit": console_exit}) - except SystemExit as e: - exit_code = str(e) - pass - assert exit_code == "0" +a = IP(dst="192.168.0.1")/UDP() -= Test basic save_session, load_session and update_session +b = six.moves.cPickle.dumps(a) +c = six.moves.cPickle.loads(b) -data = ["init_session(\"scapySession1\")",\ -"test_value = \"8.8.8.8\"",\ -"save_session()",\ -"del test_value",\ -"load_session()",\ -"update_session()",\ -"assert test_value == \"8.8.8.8\""] +assert c[IP].dst == "192.168.0.1" +assert str(c) == str(a) -emulate_main_input(data) - -= Test save_session, load_session and update_session with fname - -data = ["init_session(\"scapySession2\")",\ -"test_value = 7",\ -"save_session(fname=\"scapySaveSession.dat\")",\ -"del test_value",\ -"load_session(fname=\"scapySaveSession.dat\")",\ -"update_session(fname=\"scapySaveSession.dat\")",\ -"assert test_value == 7"] - -emulate_main_input(data) - -= Test pickling with packets - -data = ["init_session(\"scapySession1\")",\ -"test_value = IP(src=\"127.0.0.1\", dst=\"8.8.8.8\")",\ -"test_value2 = ICMPv6EchoReply(data=\"testData@%!\")",\ -"save_session()",\ -"del test_value",\ -"load_session()",\ -"assert test_value.src == \"127.0.0.1\"",\ -"assert test_value.dst == \"8.8.8.8\"",\ -"assert test_value2.data == \"testData@%!\""] - -emulate_main_input(data) - -= Clean up session files += Usage test +from scapy.main import _usage try: - os.remove("scapySaveSession.dat") -except OSError: + _usage() + assert False +except SystemExit: + assert True + += Session test + +# This is automatic when using the console +def get_var(var): + return six.moves.builtins.__dict__["scapy_session"][var] + +def set_var(var, value): + six.moves.builtins.__dict__["scapy_session"][var] = value + +def del_var(var): + del(six.moves.builtins.__dict__["scapy_session"][var]) + +init_session(None, {"init_value": 123}) +set_var("test_value", "8.8.8.8") # test_value = "8.8.8.8" +save_session() +del_var("test_value") +load_session() +update_session() +assert get_var("test_value") == "8.8.8.8" #test_value == "8.8.8.8" +assert get_var("init_value") == 123 + += Session test with fname + +init_session("scapySession2") +set_var("test_value", IP(dst="192.168.0.1")) # test_value = IP(dst="192.168.0.1") +save_session(fname="scapySession1.dat") +del_var("test_value") + +set_var("z", True) #z = True +load_session(fname="scapySession1.dat") +try: + get_var("z") + assert False +except: pass +set_var("z", False) #z = False +update_session(fname="scapySession1.dat") +assert get_var("test_value").dst == "192.168.0.1" #test_value.dst == "192.168.0.1" +assert not get_var("z") + += Clear session files + +os.remove("scapySession1.dat") + = Test temporary file creation ~ appveyor_only @@ -210,9 +187,32 @@ else: assert("scapy" in tmpfile and (IS_PYPY == True or "/tmp/" in tmpfile)) assert(conf.temp_files[0].endswith(".ut")) -assert(conf.temp_files.pop()) +scapy_delete_temp_files() assert(len(conf.temp_files) == 0) += Emulate interact() + +import mock, sys +try: + import IPython +except: + code_interact_import = "scapy.main.code.interact" +else: + code_interact_import = "scapy.main.IPython.terminal.embed.InteractiveShellEmbed" + +@mock.patch(code_interact_import) +def interact_emulator(code_int): + try: + code_int = lambda *args, **kwargs: lambda: None + interact(argv=["-s scapy1"], mybanner="What a test") + return True + except: + raise + return False + +assert interact_emulator() +sys.ps1 = ">>> " + = Test sane function sane("A\x00\xFFB") == "A..B" @@ -354,7 +354,7 @@ fd, fname = tempfile.mkstemp() os.write(fd, "conf.verb = 42\n") os.close(fd) from scapy.main import _read_config_file -_read_config_file(fname) +_read_config_file(fname, globals(), locals()) assert(conf.verb == 42) conf.verb = saved_conf_verb @@ -1413,10 +1413,11 @@ a=Ether(str(Ether()/IPv6()/TCP())) a.type == 0x86dd = IPv6 Class binding with GRE - build -str(IP()/GRE()/Ether()/IP()/GRE()/IPv6()) == b'E\x00\x00f\x00\x01\x00\x00@/|f\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x00eX\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00E\x00\x00@\x00\x01\x00\x00@/|\x8c\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x00\x86\xdd`\x00\x00\x00\x00\x00;@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01' +s = str(IP(src="127.0.0.1")/GRE()/Ether(dst="ff:ff:ff:ff:ff:ff", src="00:00:00:00:00:00")/IP()/GRE()/IPv6(src="::1")) +s == b'E\x00\x00f\x00\x01\x00\x00@/|f\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x00eX\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00E\x00\x00@\x00\x01\x00\x00@/|\x8c\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x00\x86\xdd`\x00\x00\x00\x00\x00;@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01' = IPv6 Class binding with GRE - dissection -p = IP(str(IP()/GRE()/Ether()/IP()/GRE()/IPv6())) +p = IP(s) GRE in p and p[GRE:1].proto == 0x6558 and p[GRE:2].proto == 0x86DD and IPv6 in p @@ -3156,16 +3157,22 @@ s == b"`\x00\x00\x00\x00\x18:\x01\xfe\x80\x00\x00\x00\x00\x00\x00\xba\xca:\xff\x p = IPv6(s) ICMPv6MLQuery in p and p[IPv6].dst == "ff02::1" +########### +########### += UTscapy route check +* Check that UTscapy has correctly replaced the routes. Many tests won't work otherwise + +# No more routing/networking tests after this line +if WINDOWS: + route_add_loopback() + +IP().src +assert _ == "127.0.0.1" ############ ############ + Ether tests with IPv6 -= Logic -# No more routing test after this line -if WINDOWS: - route_add_loopback() - = Ether IPv6 checking for dst ~ netaccess ipv6