Convert examples and example tests for new-style scripts

Remove the test that just loads all the example scripts for now - it's a very
low-value test, and we need to think of something better.
This commit is contained in:
Aldo Cortesi 2016-07-14 16:20:27 +12:00
parent a6821aad8e
commit b94f5fd361
14 changed files with 105 additions and 246 deletions

View File

@ -62,7 +62,7 @@ class ViewPigLatin(contentviews.View):
pig_view = ViewPigLatin()
def start():
def configure(options):
contentviews.add(pig_view)

View File

@ -6,7 +6,7 @@ from mitmproxy import filt
state = {}
def start():
def configure(options):
if len(sys.argv) != 2:
raise ValueError("Usage: -s 'filt.py FILTER'")
state["filter"] = filt.parse(sys.argv[1])

View File

@ -6,7 +6,7 @@ from mitmproxy.flow import FlowWriter
state = {}
def start():
def configure(options):
if len(sys.argv) != 2:
raise ValueError('Usage: -s "flowriter.py filename"')

View File

@ -61,7 +61,7 @@ class Context(object):
context = Context()
def start():
def configure(options):
"""
On start we create a HARLog instance. You will have to adapt this to
suit your actual needs of HAR generation. As it will probably be

View File

@ -7,7 +7,7 @@ from mitmproxy.models import decoded
iframe_url = None
def start():
def configure(options):
if len(sys.argv) != 2:
raise ValueError('Usage: -s "iframe_injector.py url"')
global iframe_url

View File

@ -8,7 +8,7 @@ from mitmproxy.models import decoded
state = {}
def start():
def configure(options):
if len(sys.argv) != 3:
raise ValueError('Usage: -s "modify_response_body.py old new"')
# You may want to use Python's argparse for more sophisticated argument

View File

@ -16,7 +16,7 @@ def hello_world():
# Register the app using the magic domain "proxapp" on port 80. Requests to
# this domain and port combination will now be routed to the WSGI app instance.
def start():
def configure(options):
mitmproxy.ctx.master.apps.add(app, "proxapp", 80)
# SSL works too, but the magic domain needs to be resolvable from the mitmproxy machine due to mitmproxy's design.

View File

@ -4,11 +4,11 @@ import mitmproxy
"""
def start():
def configure(options):
"""
Called once on script startup, before any other events.
Called once on script startup before any other events, and whenever options changes.
"""
mitmproxy.ctx.log("start")
mitmproxy.ctx.log("configure")
def clientconnect(root_layer):

View File

@ -113,7 +113,7 @@ class TlsFeedback(TlsLayer):
tls_strategy = None
def start():
def configure(options):
global tls_strategy
if len(sys.argv) == 2:
tls_strategy = ProbabilisticStrategy(float(sys.argv[1]))

View File

@ -32,6 +32,7 @@ Events = frozenset([
"error",
"log",
"done",
"script_change",
])

View File

@ -56,6 +56,13 @@ class Data(object):
dirname = os.path.dirname(inspect.getsourcefile(m))
self.dirname = os.path.abspath(dirname)
def push(self, subpath):
"""
Change the data object to a path relative to the module.
"""
self.dirname = os.path.join(self.dirname, subpath)
return self
def path(self, path):
"""
Returns a path to the package data housed at 'path' under this

View File

@ -1,34 +0,0 @@
import mock
from mitmproxy.script.reloader import watch, unwatch
from test.mitmproxy import tutils
from threading import Event
def test_simple():
with tutils.tmpdir():
with open("foo.py", "w"):
pass
script = mock.Mock()
script.path = "foo.py"
e = Event()
def _onchange():
e.set()
watch(script, _onchange)
with tutils.raises("already observed"):
watch(script, _onchange)
# Some reloaders don't register a change directly after watching, because they first need to initialize.
# To test if watching works at all, we do repeated writes every 100ms.
for _ in range(100):
with open("foo.py", "a") as f:
f.write(".")
if e.wait(0.1):
break
else:
raise AssertionError("No change detected.")
unwatch(script)

View File

@ -1,83 +0,0 @@
from mitmproxy.script import Script
from mitmproxy.exceptions import ScriptException
from test.mitmproxy import tutils
class TestParseCommand:
def test_empty_command(self):
with tutils.raises(ScriptException):
Script.parse_command("")
with tutils.raises(ScriptException):
Script.parse_command(" ")
def test_no_script_file(self):
with tutils.raises("not found"):
Script.parse_command("notfound")
with tutils.tmpdir() as dir:
with tutils.raises("not a file"):
Script.parse_command(dir)
def test_parse_args(self):
with tutils.chdir(tutils.test_data.dirname):
assert Script.parse_command("data/scripts/a.py") == ("data/scripts/a.py", [])
assert Script.parse_command("data/scripts/a.py foo bar") == ("data/scripts/a.py", ["foo", "bar"])
assert Script.parse_command("data/scripts/a.py 'foo bar'") == ("data/scripts/a.py", ["foo bar"])
@tutils.skip_not_windows
def test_parse_windows(self):
with tutils.chdir(tutils.test_data.dirname):
assert Script.parse_command("data\\scripts\\a.py") == ("data\\scripts\\a.py", [])
assert Script.parse_command("data\\scripts\\a.py 'foo \\ bar'") == ("data\\scripts\\a.py", ['foo \\ bar'])
def test_simple():
with tutils.chdir(tutils.test_data.path("data/scripts")):
s = Script("a.py --var 42")
assert s.path == "a.py"
assert s.ns is None
s.load()
assert s.ns["var"] == 42
s.run("here")
assert s.ns["var"] == 43
s.unload()
assert s.ns is None
with tutils.raises(ScriptException):
s.run("here")
with Script("a.py --var 42") as s:
s.run("here")
def test_script_exception():
with tutils.chdir(tutils.test_data.path("data/scripts")):
s = Script("syntaxerr.py")
with tutils.raises(ScriptException):
s.load()
s = Script("starterr.py")
with tutils.raises(ScriptException):
s.load()
s = Script("a.py")
s.load()
with tutils.raises(ScriptException):
s.load()
s = Script("a.py")
with tutils.raises(ScriptException):
s.run("here")
with tutils.raises(ScriptException):
with Script("reqerr.py") as s:
s.run("request", None)
s = Script("unloaderr.py")
s.load()
with tutils.raises(ScriptException):
s.unload()

View File

@ -1,151 +1,119 @@
import glob
import json
import mock
import os
import sys
from contextlib import contextmanager
from mitmproxy import script
import os.path
from mitmproxy.flow import master
from mitmproxy.flow import state
from mitmproxy import options
from mitmproxy import contentviews
from mitmproxy.builtins import script
import netlib.utils
from netlib import tutils as netutils
from netlib.http import Headers
from . import tutils
from . import tutils, mastertest
example_dir = netlib.utils.Data(__name__).path("../../examples")
example_dir = netlib.utils.Data(__name__).push("../../examples")
@contextmanager
def example(command):
command = os.path.join(example_dir, command)
with script.Script(command) as s:
yield s
class ScriptError(Exception):
pass
@mock.patch("mitmproxy.ctx.master")
@mock.patch("mitmproxy.ctx.log")
def test_load_scripts(log, master):
scripts = glob.glob("%s/*.py" % example_dir)
for f in scripts:
if "har_extractor" in f:
continue
if "flowwriter" in f:
f += " -"
if "iframe_injector" in f:
f += " foo" # one argument required
if "filt" in f:
f += " ~a"
if "modify_response_body" in f:
f += " foo bar" # two arguments required
s = script.Script(f)
try:
s.load()
except Exception as v:
if "ImportError" not in str(v):
raise
else:
s.unload()
class RaiseMaster(master.FlowMaster):
def add_event(self, e, level):
if level in ("warn", "error"):
raise ScriptError(e)
def test_add_header():
flow = tutils.tflow(resp=netutils.tresp())
with example("add_header.py") as ex:
ex.run("response", flow)
assert flow.response.headers["newheader"] == "foo"
def tscript(cmd, args=""):
cmd = example_dir.path(cmd) + " " + args
m = RaiseMaster(options.Options(), None, state.State())
sc = script.Script(cmd)
m.addons.add(sc)
return m, sc
@mock.patch("mitmproxy.contentviews.remove")
@mock.patch("mitmproxy.contentviews.add")
def test_custom_contentviews(add, remove):
with example("custom_contentviews.py"):
assert add.called
pig = add.call_args[0][0]
_, fmt = pig(b"<html>test!</html>")
assert any(b'esttay!' in val[0][1] for val in fmt)
assert not pig(b"gobbledygook")
assert remove.called
class TestScripts(mastertest.MasterTest):
def test_add_header(self):
m, _ = tscript("add_header.py")
f = tutils.tflow(resp=netutils.tresp())
self.invoke(m, "response", f)
assert f.response.headers["newheader"] == "foo"
def test_custom_contentviews(self):
m, sc = tscript("custom_contentviews.py")
pig = contentviews.get("pig_latin_HTML")
_, fmt = pig("<html>test!</html>")
assert any('esttay!' in val[0][1] for val in fmt)
assert not pig("gobbledygook")
def test_iframe_injector():
with tutils.raises(script.ScriptException):
with example("iframe_injector.py"):
pass
def test_iframe_injector(self):
with tutils.raises(ScriptError):
tscript("iframe_injector.py")
flow = tutils.tflow(resp=netutils.tresp(content=b"<html>mitmproxy</html>"))
with example("iframe_injector.py http://example.org/evil_iframe") as ex:
ex.run("response", flow)
m, sc = tscript("iframe_injector.py", "http://example.org/evil_iframe")
flow = tutils.tflow(resp=netutils.tresp(content="<html>mitmproxy</html>"))
self.invoke(m, "response", flow)
content = flow.response.content
assert b'iframe' in content and b'evil_iframe' in content
assert 'iframe' in content and 'evil_iframe' in content
def test_modify_form(self):
m, sc = tscript("modify_form.py")
def test_modify_form():
form_header = Headers(content_type="application/x-www-form-urlencoded")
flow = tutils.tflow(req=netutils.treq(headers=form_header))
with example("modify_form.py") as ex:
ex.run("request", flow)
assert flow.request.urlencoded_form[b"mitmproxy"] == b"rocks"
form_header = Headers(content_type="application/x-www-form-urlencoded")
f = tutils.tflow(req=netutils.treq(headers=form_header))
self.invoke(m, "request", f)
flow.request.headers["content-type"] = ""
ex.run("request", flow)
assert list(flow.request.urlencoded_form.items()) == [(b"foo", b"bar")]
assert f.request.urlencoded_form["mitmproxy"] == "rocks"
f.request.headers["content-type"] = ""
self.invoke(m, "request", f)
assert list(f.request.urlencoded_form.items()) == [("foo", "bar")]
def test_modify_querystring():
flow = tutils.tflow(req=netutils.treq(path=b"/search?q=term"))
with example("modify_querystring.py") as ex:
ex.run("request", flow)
assert flow.request.query["mitmproxy"] == "rocks"
def test_modify_querystring(self):
m, sc = tscript("modify_querystring.py")
f = tutils.tflow(req=netutils.treq(path="/search?q=term"))
flow.request.path = "/"
ex.run("request", flow)
assert flow.request.query["mitmproxy"] == "rocks"
self.invoke(m, "request", f)
assert f.request.query["mitmproxy"] == "rocks"
f.request.path = "/"
self.invoke(m, "request", f)
assert f.request.query["mitmproxy"] == "rocks"
def test_modify_response_body():
with tutils.raises(script.ScriptException):
with example("modify_response_body.py"):
assert True
def test_modify_response_body(self):
with tutils.raises(ScriptError):
tscript("modify_response_body.py")
flow = tutils.tflow(resp=netutils.tresp(content=b"I <3 mitmproxy"))
with example("modify_response_body.py mitmproxy rocks") as ex:
assert ex.ns["state"]["old"] == b"mitmproxy" and ex.ns["state"]["new"] == b"rocks"
ex.run("response", flow)
assert flow.response.content == b"I <3 rocks"
m, sc = tscript("modify_response_body.py", "mitmproxy rocks")
f = tutils.tflow(resp=netutils.tresp(content="I <3 mitmproxy"))
self.invoke(m, "response", f)
assert f.response.content == "I <3 rocks"
def test_redirect_requests(self):
m, sc = tscript("redirect_requests.py")
f = tutils.tflow(req=netutils.treq(host="example.org"))
self.invoke(m, "request", f)
assert f.request.host == "mitmproxy.org"
def test_redirect_requests():
flow = tutils.tflow(req=netutils.treq(host=b"example.org"))
with example("redirect_requests.py") as ex:
ex.run("request", flow)
assert flow.request.host == "mitmproxy.org"
def test_har_extractor(self):
with tutils.raises(ScriptError):
tscript("har_extractor.py")
with tutils.tmpdir() as tdir:
times = dict(
timestamp_start=746203272,
timestamp_end=746203272,
)
@mock.patch("mitmproxy.ctx.log")
def test_har_extractor(log):
if sys.version_info >= (3, 0):
with tutils.raises("does not work on Python 3"):
with example("har_extractor.py -"):
pass
return
path = os.path.join(tdir, "file")
m, sc = tscript("har_extractor.py", path)
f = tutils.tflow(
req=netutils.treq(**times),
resp=netutils.tresp(**times)
)
self.invoke(m, "response", f)
m.addons.remove(sc)
with tutils.raises(script.ScriptException):
with example("har_extractor.py"):
pass
times = dict(
timestamp_start=746203272,
timestamp_end=746203272,
)
flow = tutils.tflow(
req=netutils.treq(**times),
resp=netutils.tresp(**times)
)
with example("har_extractor.py -") as ex:
ex.run("response", flow)
with open(tutils.test_data.path("data/har_extractor.har")) as fp:
fp = open(path, "rb")
test_data = json.load(fp)
assert json.loads(ex.ns["context"].HARLog.json()) == test_data["test_response"]
assert len(test_data["log"]["pages"]) == 1