fix lexing, sort of
This commit is contained in:
parent
74f5fa6a77
commit
76e6484107
|
@ -90,8 +90,7 @@ class Core:
|
|||
are emptied. Boolean values can be true, false or toggle.
|
||||
Multiple values are concatenated with a single space.
|
||||
"""
|
||||
value = " ".join(value)
|
||||
strspec = f"{option}={value}"
|
||||
strspec = f"{option}={' '.join(value)}"
|
||||
try:
|
||||
ctx.options.set(strspec)
|
||||
except exceptions.OptionsError as e:
|
||||
|
|
|
@ -3,16 +3,14 @@
|
|||
"""
|
||||
import functools
|
||||
import inspect
|
||||
import re
|
||||
import sys
|
||||
import textwrap
|
||||
import types
|
||||
import typing
|
||||
|
||||
import pyparsing
|
||||
|
||||
import mitmproxy.types
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import exceptions, command_lexer
|
||||
from mitmproxy.command_lexer import unquote
|
||||
|
||||
|
||||
def verify_arg_signature(f: typing.Callable, args: typing.Iterable[typing.Any], kwargs: dict) -> None:
|
||||
|
@ -144,16 +142,6 @@ class CommandManager:
|
|||
self.master = master
|
||||
self.commands = {}
|
||||
|
||||
self.expr_parser = pyparsing.ZeroOrMore(
|
||||
pyparsing.QuotedString('"', escChar='\\', unquoteResults=False)
|
||||
| pyparsing.QuotedString("'", escChar='\\', unquoteResults=False)
|
||||
| pyparsing.Combine(pyparsing.Literal('"')
|
||||
+ pyparsing.Word(pyparsing.printables + " ")
|
||||
+ pyparsing.StringEnd())
|
||||
| pyparsing.Word(pyparsing.printables)
|
||||
| pyparsing.Word(" \r\n\t")
|
||||
).leaveWhitespace()
|
||||
|
||||
def collect_commands(self, addon):
|
||||
for i in dir(addon):
|
||||
if not i.startswith("__"):
|
||||
|
@ -183,7 +171,7 @@ class CommandManager:
|
|||
Parse a possibly partial command. Return a sequence of ParseResults and a sequence of remainder type help items.
|
||||
"""
|
||||
|
||||
parts: typing.List[str] = self.expr_parser.parseString(cmdstr)
|
||||
parts: typing.List[str] = command_lexer.expr.parseString(cmdstr, parseAll=True)
|
||||
|
||||
parsed: typing.List[ParseResult] = []
|
||||
next_params: typing.List[CommandParameter] = [
|
||||
|
@ -284,28 +272,6 @@ class CommandManager:
|
|||
print(file=out)
|
||||
|
||||
|
||||
def unquote(x: str) -> str:
|
||||
quoted = (
|
||||
(x.startswith('"') and x.endswith('"'))
|
||||
or
|
||||
(x.startswith("'") and x.endswith("'"))
|
||||
)
|
||||
if quoted:
|
||||
x = x[1:-1]
|
||||
# not sure if this is the right place, but pypyarsing doesn't process escape sequences.
|
||||
x = re.sub(r"\\(.)", r"\g<1>", x)
|
||||
return x
|
||||
return x
|
||||
|
||||
|
||||
def quote(val: str) -> str:
|
||||
if not val:
|
||||
return '""'
|
||||
if all(ws not in val for ws in " \r\n\t"):
|
||||
return val
|
||||
return repr(val)
|
||||
|
||||
|
||||
def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
|
||||
"""
|
||||
Convert a string to a argument to the appropriate type.
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
import ast
|
||||
import re
|
||||
|
||||
import pyparsing
|
||||
|
||||
# TODO: There is a lot of work to be done here.
|
||||
# The current implementation is written in a way that _any_ input is valid,
|
||||
# which does not make sense once things get more complex.
|
||||
|
||||
PartialQuotedString = pyparsing.Regex(
|
||||
re.compile(
|
||||
r'''
|
||||
(["']) # start quote
|
||||
(?:
|
||||
(?!\1)[^\\] # unescaped character that is not our quote nor the begin of an escape sequence. We can't use \1 in []
|
||||
|
|
||||
(?:\\.) # escape sequence
|
||||
)*
|
||||
(?:\1|$) # end quote
|
||||
''',
|
||||
re.VERBOSE
|
||||
)
|
||||
)
|
||||
|
||||
expr = pyparsing.ZeroOrMore(
|
||||
PartialQuotedString
|
||||
| pyparsing.Word(" \r\n\t")
|
||||
| pyparsing.CharsNotIn("""'" \r\n\t""")
|
||||
).leaveWhitespace()
|
||||
|
||||
|
||||
def quote(val: str) -> str:
|
||||
if val and all(char not in val for char in "'\" \r\n\t"):
|
||||
return val
|
||||
return repr(val) # TODO: More of a hack.
|
||||
|
||||
|
||||
def unquote(x: str) -> str:
|
||||
quoted = (
|
||||
(x.startswith('"') and x.endswith('"'))
|
||||
or
|
||||
(x.startswith("'") and x.endswith("'"))
|
||||
)
|
||||
if quoted:
|
||||
try:
|
||||
x = ast.literal_eval(x)
|
||||
except Exception:
|
||||
x = x[1:-1]
|
||||
return x
|
|
@ -1,21 +1,18 @@
|
|||
import csv
|
||||
import shlex
|
||||
import typing
|
||||
|
||||
import mitmproxy.types
|
||||
from mitmproxy import command, command_lexer
|
||||
from mitmproxy import contentviews
|
||||
from mitmproxy import ctx
|
||||
from mitmproxy import command
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import flow
|
||||
from mitmproxy import http
|
||||
from mitmproxy import log
|
||||
from mitmproxy import contentviews
|
||||
from mitmproxy.utils import strutils
|
||||
import mitmproxy.types
|
||||
|
||||
|
||||
from mitmproxy.tools.console import keymap
|
||||
from mitmproxy.tools.console import overlay
|
||||
from mitmproxy.tools.console import signals
|
||||
from mitmproxy.tools.console import keymap
|
||||
from mitmproxy.utils import strutils
|
||||
|
||||
console_palettes = [
|
||||
"lowlight",
|
||||
|
@ -48,10 +45,12 @@ class UnsupportedLog:
|
|||
"""
|
||||
A small addon to dump info on flow types we don't support yet.
|
||||
"""
|
||||
|
||||
def websocket_message(self, f):
|
||||
message = f.messages[-1]
|
||||
ctx.log.info(f.message_info(message))
|
||||
ctx.log.debug(message.content if isinstance(message.content, str) else strutils.bytes_to_escaped_str(message.content))
|
||||
ctx.log.debug(
|
||||
message.content if isinstance(message.content, str) else strutils.bytes_to_escaped_str(message.content))
|
||||
|
||||
def websocket_end(self, f):
|
||||
ctx.log.info("WebSocket connection closed by {}: {} {}, {}".format(
|
||||
|
@ -78,6 +77,7 @@ class ConsoleAddon:
|
|||
An addon that exposes console-specific commands, and hooks into required
|
||||
events.
|
||||
"""
|
||||
|
||||
def __init__(self, master):
|
||||
self.master = master
|
||||
self.started = False
|
||||
|
@ -86,7 +86,7 @@ class ConsoleAddon:
|
|||
loader.add_option(
|
||||
"console_default_contentview", str, "auto",
|
||||
"The default content view mode.",
|
||||
choices = [i.name.lower() for i in contentviews.views]
|
||||
choices=[i.name.lower() for i in contentviews.views]
|
||||
)
|
||||
loader.add_option(
|
||||
"console_eventlog_verbosity", str, 'info',
|
||||
|
@ -142,7 +142,7 @@ class ConsoleAddon:
|
|||
opts = self.layout_options()
|
||||
off = self.layout_options().index(ctx.options.console_layout)
|
||||
ctx.options.update(
|
||||
console_layout = opts[(off + 1) % len(opts)]
|
||||
console_layout=opts[(off + 1) % len(opts)]
|
||||
)
|
||||
|
||||
@command.command("console.panes.next")
|
||||
|
@ -245,6 +245,7 @@ class ConsoleAddon:
|
|||
invoke another command with all occurrences of {choice} replaced by
|
||||
the choice the user made.
|
||||
"""
|
||||
|
||||
def callback(opt):
|
||||
# We're now outside of the call context...
|
||||
repl = cmd + " " + " ".join(args)
|
||||
|
@ -275,7 +276,7 @@ class ConsoleAddon:
|
|||
|
||||
def callback(opt):
|
||||
# We're now outside of the call context...
|
||||
repl = shlex.quote(" ".join(args))
|
||||
repl = " ".join(command_lexer.quote(x) for x in args)
|
||||
repl = repl.replace("{choice}", opt)
|
||||
try:
|
||||
self.master.commands.execute(subcmd + " " + repl)
|
||||
|
@ -287,22 +288,24 @@ class ConsoleAddon:
|
|||
)
|
||||
|
||||
@command.command("console.command")
|
||||
def console_command(self, *cmd_str: str) -> None:
|
||||
def console_command(self, *command_str: str) -> None:
|
||||
"""
|
||||
Prompt the user to edit a command with a (possibly empty) starting value.
|
||||
"""
|
||||
cmd_str = (command.quote(x) if x else "" for x in cmd_str)
|
||||
signals.status_prompt_command.send(partial=" ".join(cmd_str)) # type: ignore
|
||||
quoted = " ".join(command_lexer.quote(x) for x in command_str)
|
||||
signals.status_prompt_command.send(partial=quoted)
|
||||
|
||||
@command.command("console.command.set")
|
||||
def console_command_set(self, option_name: str) -> None:
|
||||
"""
|
||||
Prompt the user to set an option.
|
||||
"""
|
||||
option_value = getattr(self.master.options, option_name, None)
|
||||
option_value = command.quote(option_value)
|
||||
self.master.commands.execute(
|
||||
f"console.command set {option_name} {option_value}"
|
||||
option_value = getattr(self.master.options, option_name, None) or ""
|
||||
set_command = f"set {option_name} {option_value!r}"
|
||||
cursor = len(set_command) - 1
|
||||
signals.status_prompt_command.send(
|
||||
partial=set_command,
|
||||
cursor=cursor
|
||||
)
|
||||
|
||||
@command.command("console.view.keybindings")
|
||||
|
|
|
@ -26,7 +26,7 @@ def map(km):
|
|||
km.add("ctrl f", "console.nav.pagedown", ["global"], "Page down")
|
||||
km.add("ctrl b", "console.nav.pageup", ["global"], "Page up")
|
||||
|
||||
km.add("I", "set intercept_active=toggle", ["global"], "Toggle intercept")
|
||||
km.add("I", "set intercept_active toggle", ["global"], "Toggle intercept")
|
||||
km.add("i", "console.command.set intercept", ["global"], "Set intercept")
|
||||
km.add("W", "console.command.set save_stream_file", ["global"], "Stream to file")
|
||||
km.add("A", "flow.resume @all", ["flowlist", "flowview"], "Resume all intercepted flows")
|
||||
|
@ -48,7 +48,7 @@ def map(km):
|
|||
"Export this flow to file"
|
||||
)
|
||||
km.add("f", "console.command.set view_filter", ["flowlist"], "Set view filter")
|
||||
km.add("F", "set console_focus_follow=toggle", ["flowlist"], "Set focus follow")
|
||||
km.add("F", "set console_focus_follow toggle", ["flowlist"], "Set focus follow")
|
||||
km.add(
|
||||
"ctrl l",
|
||||
"console.command cut.clip ",
|
||||
|
@ -68,14 +68,14 @@ def map(km):
|
|||
"o",
|
||||
"""
|
||||
console.choose.cmd Order view.order.options
|
||||
set view_order={choice}
|
||||
set view_order {choice}
|
||||
""",
|
||||
["flowlist"],
|
||||
"Set flow list order"
|
||||
)
|
||||
km.add("r", "replay.client @focus", ["flowlist", "flowview"], "Replay this flow")
|
||||
km.add("S", "console.command replay.server ", ["flowlist"], "Start server replay")
|
||||
km.add("v", "set view_order_reversed=toggle", ["flowlist"], "Reverse flow list order")
|
||||
km.add("v", "set view_order_reversed toggle", ["flowlist"], "Reverse flow list order")
|
||||
km.add("U", "flow.mark @all false", ["flowlist"], "Un-set all marks")
|
||||
km.add("w", "console.command save.file @shown ", ["flowlist"], "Save listed flows to file")
|
||||
km.add("V", "flow.revert @focus", ["flowlist", "flowview"], "Revert changes to this flow")
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import os.path
|
||||
from typing import Optional
|
||||
|
||||
import urwid
|
||||
|
||||
|
@ -98,10 +99,15 @@ class ActionBar(urwid.WidgetWrap):
|
|||
self._w = urwid.Edit(self.prep_prompt(prompt), text or "")
|
||||
self.prompting = PromptStub(callback, args)
|
||||
|
||||
def sig_prompt_command(self, sender, partial=""):
|
||||
def sig_prompt_command(self, sender, partial: str = "", cursor: Optional[int] = None):
|
||||
signals.focus.send(self, section="footer")
|
||||
self._w = commander.CommandEdit(self.master, partial,
|
||||
self.command_history)
|
||||
self._w = commander.CommandEdit(
|
||||
self.master,
|
||||
partial,
|
||||
self.command_history,
|
||||
)
|
||||
if cursor is not None:
|
||||
self._w.cbuf.cursor = cursor
|
||||
self.prompting = commandexecutor.CommandExecutor(self.master)
|
||||
|
||||
def sig_prompt_onekey(self, sender, prompt, keys, callback, args=()):
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
import pyparsing
|
||||
import pytest
|
||||
|
||||
from mitmproxy import command_lexer
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"test_input,valid", [
|
||||
("'foo'", True),
|
||||
('"foo"', True),
|
||||
("'foo' bar'", False),
|
||||
("'foo\\' bar'", True),
|
||||
("'foo' 'bar'", False),
|
||||
("'foo'x", False),
|
||||
('''"foo ''', True),
|
||||
('''"foo 'bar' ''', True),
|
||||
]
|
||||
)
|
||||
def test_partial_quoted_string(test_input, valid):
|
||||
if valid:
|
||||
assert command_lexer.PartialQuotedString.parseString(test_input, parseAll=True)[0] == test_input
|
||||
else:
|
||||
with pytest.raises(pyparsing.ParseException):
|
||||
command_lexer.PartialQuotedString.parseString(test_input, parseAll=True)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"test_input,expected", [
|
||||
("'foo'", ["'foo'"]),
|
||||
('"foo"', ['"foo"']),
|
||||
("'foo' 'bar'", ["'foo'", ' ', "'bar'"]),
|
||||
("'foo'x", ["'foo'", 'x']),
|
||||
('''"foo''', ['"foo']),
|
||||
('''"foo 'bar' ''', ['''"foo 'bar' ''']),
|
||||
]
|
||||
)
|
||||
def test_expr(test_input, expected):
|
||||
assert list(command_lexer.expr.parseString(test_input, parseAll=True)) == expected
|
|
@ -269,7 +269,7 @@ class TestCommandBuffer:
|
|||
cb.text = "foo"
|
||||
assert cb.render()
|
||||
|
||||
cb.text = 'set view_filter=~bq test'
|
||||
cb.text = 'set view_filter ~bq test'
|
||||
ret = cb.render()
|
||||
assert ret[0] == ('commander_command', 'set')
|
||||
assert ret[1] == ('text', ' ')
|
||||
|
|
Loading…
Reference in New Issue