Merge pull request #2302 from cortesi/flowview
commands: view.setval, view.getval, view.setval.toggle
This commit is contained in:
commit
53ad658e9f
|
@ -296,6 +296,45 @@ class View(collections.Sequence):
|
|||
"""
|
||||
return self._store.get(flow_id)
|
||||
|
||||
@command.command("view.getval")
|
||||
def getvalue(self, f: mitmproxy.flow.Flow, key: str, default: str) -> str:
|
||||
"""
|
||||
Get a value from the settings store for the specified flow.
|
||||
"""
|
||||
return self.settings[f].get(key, default)
|
||||
|
||||
@command.command("view.setval.toggle")
|
||||
def setvalue_toggle(
|
||||
self,
|
||||
flows: typing.Sequence[mitmproxy.flow.Flow],
|
||||
key: str
|
||||
) -> None:
|
||||
"""
|
||||
Toggle a boolean value in the settings store, seting the value to
|
||||
the string "true" or "false".
|
||||
"""
|
||||
updated = []
|
||||
for f in flows:
|
||||
current = self.settings[f].get("key", "false")
|
||||
self.settings[f][key] = "false" if current == "true" else "true"
|
||||
updated.append(f)
|
||||
ctx.master.addons.trigger("update", updated)
|
||||
|
||||
@command.command("view.setval")
|
||||
def setvalue(
|
||||
self,
|
||||
flows: typing.Sequence[mitmproxy.flow.Flow],
|
||||
key: str, value: str
|
||||
) -> None:
|
||||
"""
|
||||
Set a value in the settings store for the specified flows.
|
||||
"""
|
||||
updated = []
|
||||
for f in flows:
|
||||
self.settings[f][key] = value
|
||||
updated.append(f)
|
||||
ctx.master.addons.trigger("update", updated)
|
||||
|
||||
@command.command("view.load")
|
||||
def load_file(self, path: str) -> None:
|
||||
"""
|
||||
|
|
|
@ -1,24 +1,9 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
|
||||
import os
|
||||
|
||||
import urwid
|
||||
import urwid.util
|
||||
|
||||
import mitmproxy.net
|
||||
from functools import lru_cache
|
||||
from mitmproxy.tools.console import signals
|
||||
from mitmproxy.utils import human
|
||||
|
||||
try:
|
||||
import pyperclip
|
||||
except:
|
||||
pyperclip = False
|
||||
|
||||
|
||||
VIEW_FLOW_REQUEST = 0
|
||||
VIEW_FLOW_RESPONSE = 1
|
||||
|
||||
METHOD_OPTIONS = [
|
||||
("get", "g"),
|
||||
|
@ -133,178 +118,6 @@ else:
|
|||
SYMBOL_DOWN = " "
|
||||
|
||||
|
||||
# Save file to disk
|
||||
def save_data(path, data):
|
||||
if not path:
|
||||
return
|
||||
try:
|
||||
if isinstance(data, bytes):
|
||||
mode = "wb"
|
||||
else:
|
||||
mode = "w"
|
||||
with open(path, mode) as f:
|
||||
f.write(data)
|
||||
except IOError as v:
|
||||
signals.status_message.send(message=v.strerror)
|
||||
|
||||
|
||||
def ask_save_overwrite(path, data):
|
||||
if os.path.exists(path):
|
||||
def save_overwrite(k):
|
||||
if k == "y":
|
||||
save_data(path, data)
|
||||
|
||||
signals.status_prompt_onekey.send(
|
||||
prompt = "'" + path + "' already exists. Overwrite?",
|
||||
keys = (
|
||||
("yes", "y"),
|
||||
("no", "n"),
|
||||
),
|
||||
callback = save_overwrite
|
||||
)
|
||||
else:
|
||||
save_data(path, data)
|
||||
|
||||
|
||||
def ask_save_path(data, prompt="File path"):
|
||||
signals.status_prompt_path.send(
|
||||
prompt = prompt,
|
||||
callback = ask_save_overwrite,
|
||||
args = (data, )
|
||||
)
|
||||
|
||||
|
||||
def ask_scope_and_callback(flow, cb, *args):
|
||||
request_has_content = flow.request and flow.request.raw_content
|
||||
response_has_content = flow.response and flow.response.raw_content
|
||||
|
||||
if request_has_content and response_has_content:
|
||||
signals.status_prompt_onekey.send(
|
||||
prompt = "Save",
|
||||
keys = (
|
||||
("request", "q"),
|
||||
("response", "s"),
|
||||
("both", "b"),
|
||||
),
|
||||
callback = cb,
|
||||
args = (flow,) + args
|
||||
)
|
||||
elif response_has_content:
|
||||
cb("s", flow, *args)
|
||||
else:
|
||||
cb("q", flow, *args)
|
||||
|
||||
|
||||
def copy_to_clipboard_or_prompt(data):
|
||||
# pyperclip calls encode('utf-8') on data to be copied without checking.
|
||||
# if data are already encoded that way UnicodeDecodeError is thrown.
|
||||
if isinstance(data, bytes):
|
||||
toclip = data.decode("utf8", "replace")
|
||||
else:
|
||||
toclip = data
|
||||
|
||||
try:
|
||||
pyperclip.copy(toclip)
|
||||
except (RuntimeError, UnicodeDecodeError, AttributeError, TypeError):
|
||||
def save(k):
|
||||
if k == "y":
|
||||
ask_save_path(data, "Save data")
|
||||
signals.status_prompt_onekey.send(
|
||||
prompt = "Cannot copy data to clipboard. Save as file?",
|
||||
keys = (
|
||||
("yes", "y"),
|
||||
("no", "n"),
|
||||
),
|
||||
callback = save
|
||||
)
|
||||
|
||||
|
||||
def format_flow_data(key, scope, flow):
|
||||
data = b""
|
||||
if scope in ("q", "b"):
|
||||
request = flow.request.copy()
|
||||
request.decode(strict=False)
|
||||
if request.content is None:
|
||||
return None, "Request content is missing"
|
||||
if key == "h":
|
||||
data += mitmproxy.net.http.http1.assemble_request(request)
|
||||
elif key == "c":
|
||||
data += request.get_content(strict=False)
|
||||
else:
|
||||
raise ValueError("Unknown key: {}".format(key))
|
||||
if scope == "b" and flow.request.raw_content and flow.response:
|
||||
# Add padding between request and response
|
||||
data += b"\r\n" * 2
|
||||
if scope in ("s", "b") and flow.response:
|
||||
response = flow.response.copy()
|
||||
response.decode(strict=False)
|
||||
if response.content is None:
|
||||
return None, "Response content is missing"
|
||||
if key == "h":
|
||||
data += mitmproxy.net.http.http1.assemble_response(response)
|
||||
elif key == "c":
|
||||
data += response.get_content(strict=False)
|
||||
else:
|
||||
raise ValueError("Unknown key: {}".format(key))
|
||||
return data, False
|
||||
|
||||
|
||||
def handle_flow_data(scope, flow, key, writer):
|
||||
"""
|
||||
key: _c_ontent, _h_eaders+content, _u_rl
|
||||
scope: re_q_uest, re_s_ponse, _b_oth
|
||||
writer: copy_to_clipboard_or_prompt, ask_save_path
|
||||
"""
|
||||
data, err = format_flow_data(key, scope, flow)
|
||||
|
||||
if err:
|
||||
signals.status_message.send(message=err)
|
||||
return
|
||||
|
||||
if not data:
|
||||
if scope == "q":
|
||||
signals.status_message.send(message="No request content.")
|
||||
elif scope == "s":
|
||||
signals.status_message.send(message="No response content.")
|
||||
else:
|
||||
signals.status_message.send(message="No content.")
|
||||
return
|
||||
|
||||
writer(data)
|
||||
|
||||
|
||||
def ask_save_body(scope, flow):
|
||||
"""
|
||||
Save either the request or the response body to disk.
|
||||
|
||||
scope: re_q_uest, re_s_ponse, _b_oth, None (ask user if necessary)
|
||||
"""
|
||||
|
||||
request_has_content = flow.request and flow.request.raw_content
|
||||
response_has_content = flow.response and flow.response.raw_content
|
||||
|
||||
if scope is None:
|
||||
ask_scope_and_callback(flow, ask_save_body)
|
||||
elif scope == "q" and request_has_content:
|
||||
ask_save_path(
|
||||
flow.request.get_content(strict=False),
|
||||
"Save request content to"
|
||||
)
|
||||
elif scope == "s" and response_has_content:
|
||||
ask_save_path(
|
||||
flow.response.get_content(strict=False),
|
||||
"Save response content to"
|
||||
)
|
||||
elif scope == "b" and request_has_content and response_has_content:
|
||||
ask_save_path(
|
||||
(flow.request.get_content(strict=False) + b"\n" +
|
||||
flow.response.get_content(strict=False)),
|
||||
"Save request & response content to"
|
||||
)
|
||||
else:
|
||||
signals.status_message.send(message="No content.")
|
||||
|
||||
|
||||
@lru_cache(maxsize=800)
|
||||
def raw_format_flow(f, flow):
|
||||
f = dict(f)
|
||||
|
|
|
@ -9,7 +9,6 @@ from mitmproxy import contentviews
|
|||
from mitmproxy import http
|
||||
from mitmproxy.tools.console import common
|
||||
from mitmproxy.tools.console import flowdetailview
|
||||
from mitmproxy.tools.console import overlay
|
||||
from mitmproxy.tools.console import searchable
|
||||
from mitmproxy.tools.console import signals
|
||||
from mitmproxy.tools.console import tabs
|
||||
|
@ -117,14 +116,10 @@ class FlowViewHeader(urwid.WidgetWrap):
|
|||
self._w = urwid.Pile([])
|
||||
|
||||
|
||||
TAB_REQ = 0
|
||||
TAB_RESP = 1
|
||||
|
||||
|
||||
class FlowDetails(tabs.Tabs):
|
||||
def __init__(self, master, tab_offset):
|
||||
def __init__(self, master):
|
||||
self.master = master
|
||||
super().__init__([], tab_offset)
|
||||
super().__init__([])
|
||||
self.show()
|
||||
self.last_displayed_body = None
|
||||
|
||||
|
@ -174,9 +169,8 @@ class FlowDetails(tabs.Tabs):
|
|||
msg, body = "", [urwid.Text([("error", "[content missing]")])]
|
||||
return msg, body
|
||||
else:
|
||||
s = self.view.settings[self.flow]
|
||||
full = s.get((self.tab_offset, "fullcontents"), False)
|
||||
if full:
|
||||
full = self.master.commands.call("view.getval @focus fullcontents false")
|
||||
if full == "true":
|
||||
limit = sys.maxsize
|
||||
else:
|
||||
limit = contentviews.VIEW_CUTOFF
|
||||
|
@ -232,12 +226,6 @@ class FlowDetails(tabs.Tabs):
|
|||
|
||||
return description, text_objects
|
||||
|
||||
def viewmode_get(self):
|
||||
return self.view.settings[self.flow].get(
|
||||
(self.tab_offset, "prettyview"),
|
||||
self.master.options.default_contentview
|
||||
)
|
||||
|
||||
def conn_text(self, conn):
|
||||
if conn:
|
||||
txt = common.format_keyvals(
|
||||
|
@ -245,7 +233,7 @@ class FlowDetails(tabs.Tabs):
|
|||
key = "header",
|
||||
val = "text"
|
||||
)
|
||||
viewmode = self.viewmode_get()
|
||||
viewmode = self.master.commands.call("console.flowview.mode")
|
||||
msg, body = self.content_view(viewmode, conn)
|
||||
|
||||
cols = [
|
||||
|
@ -281,32 +269,10 @@ class FlowDetails(tabs.Tabs):
|
|||
]
|
||||
return searchable.Searchable(txt)
|
||||
|
||||
def change_this_display_mode(self, t):
|
||||
view = contentviews.get(t)
|
||||
self.view.settings[self.flow][(self.tab_offset, "prettyview")] = view.name.lower()
|
||||
|
||||
def keypress(self, size, key):
|
||||
key = super().keypress(size, key)
|
||||
key = common.shortcuts(key)
|
||||
if key in ("up", "down", "page up", "page down"):
|
||||
# Pass scroll events to the wrapped widget
|
||||
self._w.keypress(size, key)
|
||||
elif key == "f":
|
||||
self.view.settings[self.flow][(self.tab_offset, "fullcontents")] = True
|
||||
signals.status_message.send(message="Loading all body data...")
|
||||
elif key == "m":
|
||||
opts = [i.name.lower() for i in contentviews.views]
|
||||
self.master.overlay(
|
||||
overlay.Chooser(
|
||||
"display mode",
|
||||
opts,
|
||||
self.viewmode_get(),
|
||||
self.change_this_display_mode
|
||||
)
|
||||
)
|
||||
else:
|
||||
# Key is not handled here.
|
||||
return key
|
||||
return self._w.keypress(size, key)
|
||||
|
||||
|
||||
class FlowView(urwid.Frame):
|
||||
|
@ -314,7 +280,7 @@ class FlowView(urwid.Frame):
|
|||
|
||||
def __init__(self, master):
|
||||
super().__init__(
|
||||
FlowDetails(master, 0),
|
||||
FlowDetails(master),
|
||||
header = FlowViewHeader(master),
|
||||
)
|
||||
self.master = master
|
||||
|
|
|
@ -29,6 +29,7 @@ from mitmproxy.tools.console import palettes
|
|||
from mitmproxy.tools.console import signals
|
||||
from mitmproxy.tools.console import statusbar
|
||||
from mitmproxy.tools.console import window
|
||||
from mitmproxy import contentviews
|
||||
from mitmproxy.utils import strutils
|
||||
|
||||
EVENTLOG_SIZE = 10000
|
||||
|
@ -225,6 +226,46 @@ class ConsoleAddon:
|
|||
"console.command flow.set @focus %s " % part
|
||||
)
|
||||
|
||||
@command.command("console.flowview.mode.set")
|
||||
def flowview_mode_set(self) -> None:
|
||||
"""
|
||||
Set the display mode for the current flow view.
|
||||
"""
|
||||
if self.master.window.focus.keyctx != "flowview":
|
||||
raise exceptions.CommandError("Not viewing a flow.")
|
||||
fv = self.master.window.windows["flowview"]
|
||||
idx = fv.body.tab_offset
|
||||
|
||||
def callback(opt):
|
||||
try:
|
||||
self.master.commands.call_args(
|
||||
"view.setval",
|
||||
["@focus", "flowview_mode_%s" % idx, opt]
|
||||
)
|
||||
except exceptions.CommandError as e:
|
||||
signals.status_message.send(message=str(e))
|
||||
|
||||
opts = [i.name.lower() for i in contentviews.views]
|
||||
self.master.overlay(overlay.Chooser("Mode", opts, "", callback))
|
||||
|
||||
@command.command("console.flowview.mode")
|
||||
def flowview_mode(self) -> str:
|
||||
"""
|
||||
Get the display mode for the current flow view.
|
||||
"""
|
||||
if self.master.window.focus.keyctx != "flowview":
|
||||
raise exceptions.CommandError("Not viewing a flow.")
|
||||
fv = self.master.window.windows["flowview"]
|
||||
idx = fv.body.tab_offset
|
||||
return self.master.commands.call_args(
|
||||
"view.getval",
|
||||
[
|
||||
"@focus",
|
||||
"flowview_mode_%s" % idx,
|
||||
self.master.options.default_contentview,
|
||||
]
|
||||
)
|
||||
|
||||
def running(self):
|
||||
self.started = True
|
||||
|
||||
|
@ -265,7 +306,7 @@ def default_keymap(km):
|
|||
"console.command export.file {choice} @focus ''",
|
||||
["flowlist", "flowview"]
|
||||
)
|
||||
km.add("f", "console.command 'set view_filter='", ["flowlist"])
|
||||
km.add("f", "console.command set view_filter=", ["flowlist"])
|
||||
km.add("F", "set console_focus_follow=toggle", ["flowlist"])
|
||||
km.add("g", "view.go 0", ["flowlist"])
|
||||
km.add("G", "view.go -1", ["flowlist"])
|
||||
|
@ -285,15 +326,15 @@ def default_keymap(km):
|
|||
["flowlist"]
|
||||
)
|
||||
km.add("r", "replay.client @focus", ["flowlist", "flowview"])
|
||||
km.add("S", "console.command 'replay.server '", ["flowlist"])
|
||||
km.add("S", "console.command replay.server ", ["flowlist"])
|
||||
km.add("v", "set console_order_reversed=toggle", ["flowlist"])
|
||||
km.add("U", "flow.mark @all false", ["flowlist"])
|
||||
km.add("w", "console.command 'save.file @shown '", ["flowlist"])
|
||||
km.add("w", "console.command save.file @shown ", ["flowlist"])
|
||||
km.add("V", "flow.revert @focus", ["flowlist", "flowview"])
|
||||
km.add("X", "flow.kill @focus", ["flowlist"])
|
||||
km.add("z", "view.remove @all", ["flowlist"])
|
||||
km.add("Z", "view.remove @hidden", ["flowlist"])
|
||||
km.add("|", "console.command 'script.run @focus '", ["flowlist", "flowview"])
|
||||
km.add("|", "console.command script.run @focus ", ["flowlist", "flowview"])
|
||||
km.add("enter", "console.view.flow @focus", ["flowlist"])
|
||||
|
||||
km.add(
|
||||
|
@ -302,7 +343,8 @@ def default_keymap(km):
|
|||
"console.edit.focus {choice}",
|
||||
["flowview"]
|
||||
)
|
||||
km.add("w", "console.command 'save.file @focus '", ["flowview"])
|
||||
km.add("f", "view.setval.toggle @focus fullcontents", ["flowview"])
|
||||
km.add("w", "console.command save.file @focus ", ["flowview"])
|
||||
km.add(" ", "view.focus.next", ["flowview"])
|
||||
km.add(
|
||||
"o",
|
||||
|
@ -318,6 +360,7 @@ def default_keymap(km):
|
|||
["flowview"]
|
||||
)
|
||||
km.add("p", "view.focus.prev", ["flowview"])
|
||||
km.add("m", "console.flowview.mode.set", ["flowview"])
|
||||
km.add(
|
||||
"z",
|
||||
"console.choose \"Part\" request,response "
|
||||
|
|
|
@ -260,6 +260,21 @@ def test_duplicate():
|
|||
assert v.focus.index == 2
|
||||
|
||||
|
||||
def test_setgetval():
|
||||
v = view.View()
|
||||
with taddons.context():
|
||||
f = tflow.tflow()
|
||||
v.add([f])
|
||||
v.setvalue([f], "key", "value")
|
||||
assert v.getvalue(f, "key", "default") == "value"
|
||||
assert v.getvalue(f, "unknow", "default") == "default"
|
||||
|
||||
v.setvalue_toggle([f], "key")
|
||||
assert v.getvalue(f, "key", "default") == "true"
|
||||
v.setvalue_toggle([f], "key")
|
||||
assert v.getvalue(f, "key", "default") == "false"
|
||||
|
||||
|
||||
def test_order():
|
||||
v = view.View()
|
||||
with taddons.context() as tctx:
|
||||
|
|
Loading…
Reference in New Issue