Merge pull request #1550 from cortesi/script
Add "run_once" support to the script addon, use it in mitmproxy
This commit is contained in:
commit
734d177007
|
@ -10,6 +10,7 @@ import traceback
|
|||
from mitmproxy import exceptions
|
||||
from mitmproxy import controller
|
||||
from mitmproxy import ctx
|
||||
from mitmproxy.flow import master as flowmaster
|
||||
|
||||
|
||||
import watchdog.events
|
||||
|
@ -67,7 +68,11 @@ def scriptenv(path, args):
|
|||
tb = tb.tb_next
|
||||
if not os.path.abspath(s[0]).startswith(scriptdir):
|
||||
break
|
||||
ctx.log.error("Script error: %s" % "".join(traceback.format_exception(etype, value, tb)))
|
||||
ctx.log.error(
|
||||
"Script error: %s" % "".join(
|
||||
traceback.format_exception(etype, value, tb)
|
||||
)
|
||||
)
|
||||
finally:
|
||||
sys.argv = oldargs
|
||||
sys.path.pop()
|
||||
|
@ -189,6 +194,15 @@ class ScriptLoader():
|
|||
"""
|
||||
An addon that manages loading scripts from options.
|
||||
"""
|
||||
def run_once(self, command, flows):
|
||||
sc = Script(command)
|
||||
sc.load_script()
|
||||
for f in flows:
|
||||
for evt, o in flowmaster.event_sequence(f):
|
||||
sc.run(evt, o)
|
||||
sc.done()
|
||||
return sc
|
||||
|
||||
def configure(self, options, updated):
|
||||
if "scripts" in updated:
|
||||
for s in options.scripts:
|
||||
|
|
|
@ -22,7 +22,6 @@ from mitmproxy import contentviews
|
|||
from mitmproxy import controller
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import flow
|
||||
from mitmproxy import script
|
||||
from mitmproxy import utils
|
||||
import mitmproxy.options
|
||||
from mitmproxy.console import flowlist
|
||||
|
@ -329,39 +328,13 @@ class ConsoleMaster(flow.FlowMaster):
|
|||
self.loop.widget = window
|
||||
self.loop.draw_screen()
|
||||
|
||||
def _run_script_method(self, method, s, f):
|
||||
status, val = s.run(method, f)
|
||||
if val:
|
||||
if status:
|
||||
signals.add_log("Method %s return: %s" % (method, val), "debug")
|
||||
else:
|
||||
signals.add_log(
|
||||
"Method %s error: %s" %
|
||||
(method, val[1]), "error")
|
||||
|
||||
def run_script_once(self, command, f):
|
||||
if not command:
|
||||
return
|
||||
signals.add_log("Running script on flow: %s" % command, "debug")
|
||||
|
||||
sc = self.addons.get("scriptloader")
|
||||
try:
|
||||
s = script.Script(command)
|
||||
s.load()
|
||||
except script.ScriptException as e:
|
||||
signals.status_message.send(
|
||||
message='Error loading "{}".'.format(command)
|
||||
)
|
||||
signals.add_log('Error loading "{}":\n{}'.format(command, e), "error")
|
||||
return
|
||||
|
||||
if f.request:
|
||||
self._run_script_method("request", s, f)
|
||||
if f.response:
|
||||
self._run_script_method("response", s, f)
|
||||
if f.error:
|
||||
self._run_script_method("error", s, f)
|
||||
s.unload()
|
||||
signals.flow_change.send(self, flow = f)
|
||||
with self.handlecontext():
|
||||
sc.run_once(command, [f])
|
||||
except mitmproxy.exceptions.AddonError as e:
|
||||
signals.add_log("Script error: %s" % e, "warn")
|
||||
|
||||
def toggle_eventlog(self):
|
||||
self.options.eventlog = not self.options.eventlog
|
||||
|
|
|
@ -46,12 +46,6 @@ class DumpMaster(flow.FlowMaster):
|
|||
self.addons.add(options, dumper.Dumper())
|
||||
# This line is just for type hinting
|
||||
self.options = self.options # type: Options
|
||||
self.server_replay_ignore_params = options.server_replay_ignore_params
|
||||
self.server_replay_ignore_content = options.server_replay_ignore_content
|
||||
self.server_replay_ignore_host = options.server_replay_ignore_host
|
||||
self.refresh_server_playback = options.refresh_server_playback
|
||||
self.server_replay_ignore_payload_params = options.server_replay_ignore_payload_params
|
||||
|
||||
self.set_stream_large_bodies(options.stream_large_bodies)
|
||||
|
||||
if self.server and self.options.http2 and not tcp.HAS_ALPN: # pragma: no cover
|
||||
|
|
|
@ -15,6 +15,30 @@ from mitmproxy.onboarding import app
|
|||
from mitmproxy.protocol import http_replay
|
||||
|
||||
|
||||
def event_sequence(f):
|
||||
if isinstance(f, models.HTTPFlow):
|
||||
if f.request:
|
||||
yield "request", f
|
||||
if f.response:
|
||||
yield "responseheaders", f
|
||||
yield "response", f
|
||||
if f.error:
|
||||
yield "error", f
|
||||
elif isinstance(f, models.TCPFlow):
|
||||
messages = f.messages
|
||||
f.messages = []
|
||||
f.reply = controller.DummyReply()
|
||||
yield "tcp_open", f
|
||||
while messages:
|
||||
f.messages.append(messages.pop(0))
|
||||
yield "tcp_message", f
|
||||
if f.error:
|
||||
yield "tcp_error", f
|
||||
yield "tcp_close", f
|
||||
else:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class FlowMaster(controller.Master):
|
||||
|
||||
@property
|
||||
|
@ -114,28 +138,9 @@ class FlowMaster(controller.Master):
|
|||
f.request.host = self.server.config.upstream_server.address.host
|
||||
f.request.port = self.server.config.upstream_server.address.port
|
||||
f.request.scheme = self.server.config.upstream_server.scheme
|
||||
|
||||
f.reply = controller.DummyReply()
|
||||
if f.request:
|
||||
self.request(f)
|
||||
if f.response:
|
||||
self.responseheaders(f)
|
||||
self.response(f)
|
||||
if f.error:
|
||||
self.error(f)
|
||||
elif isinstance(f, models.TCPFlow):
|
||||
messages = f.messages
|
||||
f.messages = []
|
||||
f.reply = controller.DummyReply()
|
||||
self.tcp_open(f)
|
||||
while messages:
|
||||
f.messages.append(messages.pop(0))
|
||||
self.tcp_message(f)
|
||||
if f.error:
|
||||
self.tcp_error(f)
|
||||
self.tcp_close(f)
|
||||
else:
|
||||
raise NotImplementedError()
|
||||
f.reply = controller.DummyReply()
|
||||
for e, o in event_sequence(f):
|
||||
getattr(self, e)(o)
|
||||
|
||||
def load_flows(self, fr):
|
||||
"""
|
||||
|
|
|
@ -137,6 +137,31 @@ class TestScript(mastertest.MasterTest):
|
|||
|
||||
|
||||
class TestScriptLoader(mastertest.MasterTest):
|
||||
def test_run_once(self):
|
||||
s = state.State()
|
||||
o = options.Options(scripts=[])
|
||||
m = master.FlowMaster(o, None, s)
|
||||
sl = script.ScriptLoader()
|
||||
m.addons.add(o, sl)
|
||||
|
||||
f = tutils.tflow(resp=True)
|
||||
with m.handlecontext():
|
||||
sc = sl.run_once(
|
||||
tutils.test_data.path(
|
||||
"data/addonscripts/recorder.py"
|
||||
), [f]
|
||||
)
|
||||
evts = [i[1] for i in sc.ns.call_log]
|
||||
assert evts == ['start', 'request', 'responseheaders', 'response', 'done']
|
||||
|
||||
with m.handlecontext():
|
||||
tutils.raises(
|
||||
"file not found",
|
||||
sl.run_once,
|
||||
"nonexistent",
|
||||
[f]
|
||||
)
|
||||
|
||||
def test_simple(self):
|
||||
s = state.State()
|
||||
o = options.Options(scripts=[])
|
||||
|
|
Loading…
Reference in New Issue