Merge pull request #2301 from cortesi/encoding
commands: flow.encode, flow.decode, flow.encode.toggle
This commit is contained in:
commit
143872b574
|
@ -153,3 +153,62 @@ class Core:
|
|||
|
||||
ctx.master.addons.trigger("update", updated)
|
||||
ctx.log.alert("Set %s on %s flows." % (spec, len(updated)))
|
||||
|
||||
@command.command("flow.decode")
|
||||
def decode(self, flows: typing.Sequence[flow.Flow], part: str) -> None:
|
||||
"""
|
||||
Decode flows.
|
||||
"""
|
||||
updated = []
|
||||
for f in flows:
|
||||
p = getattr(f, part, None)
|
||||
if p:
|
||||
p.decode()
|
||||
updated.append(f)
|
||||
ctx.master.addons.trigger("update", updated)
|
||||
ctx.log.alert("Decoded %s flows." % len(updated))
|
||||
|
||||
@command.command("flow.encode.toggle")
|
||||
def encode_toggle(self, flows: typing.Sequence[flow.Flow], part: str) -> None:
|
||||
"""
|
||||
Toggle flow encoding on and off, using deflate for encoding.
|
||||
"""
|
||||
updated = []
|
||||
for f in flows:
|
||||
p = getattr(f, part, None)
|
||||
if p:
|
||||
current_enc = p.headers.get("content-encoding", "identity")
|
||||
if current_enc == "identity":
|
||||
p.encode("deflate")
|
||||
else:
|
||||
p.decode()
|
||||
updated.append(f)
|
||||
ctx.master.addons.trigger("update", updated)
|
||||
ctx.log.alert("Toggled encoding on %s flows." % len(updated))
|
||||
|
||||
@command.command("flow.encode")
|
||||
def encode(self, flows: typing.Sequence[flow.Flow], part: str, enc: str) -> None:
|
||||
"""
|
||||
Encode flows with a specified encoding.
|
||||
"""
|
||||
if enc not in self.encode_options():
|
||||
raise exceptions.CommandError("Invalid encoding format: %s" % enc)
|
||||
|
||||
updated = []
|
||||
for f in flows:
|
||||
p = getattr(f, part, None)
|
||||
if p:
|
||||
current_enc = p.headers.get("content-encoding", "identity")
|
||||
if current_enc == "identity":
|
||||
p.encode(enc)
|
||||
updated.append(f)
|
||||
ctx.master.addons.trigger("update", updated)
|
||||
ctx.log.alert("Encoded %s flows." % len(updated))
|
||||
|
||||
@command.command("flow.encode.options")
|
||||
def encode_options(self) -> typing.Sequence[str]:
|
||||
"""
|
||||
The possible values for an encoding specification.
|
||||
|
||||
"""
|
||||
return ["gzip", "deflate", "br"]
|
||||
|
|
|
@ -122,8 +122,6 @@ TAB_RESP = 1
|
|||
|
||||
|
||||
class FlowDetails(tabs.Tabs):
|
||||
highlight_color = "focusfield"
|
||||
|
||||
def __init__(self, master, tab_offset):
|
||||
self.master = master
|
||||
super().__init__([], tab_offset)
|
||||
|
@ -288,14 +286,7 @@ class FlowDetails(tabs.Tabs):
|
|||
self.view.settings[self.flow][(self.tab_offset, "prettyview")] = view.name.lower()
|
||||
|
||||
def keypress(self, size, key):
|
||||
conn = None # type: Optional[Union[http.HTTPRequest, http.HTTPResponse]]
|
||||
if self.tab_offset == TAB_REQ:
|
||||
conn = self.flow.request
|
||||
elif self.tab_offset == TAB_RESP:
|
||||
conn = self.flow.response
|
||||
|
||||
key = super().keypress(size, key)
|
||||
|
||||
key = common.shortcuts(key)
|
||||
if key in ("up", "down", "page up", "page down"):
|
||||
# Pass scroll events to the wrapped widget
|
||||
|
@ -313,39 +304,10 @@ class FlowDetails(tabs.Tabs):
|
|||
self.change_this_display_mode
|
||||
)
|
||||
)
|
||||
elif key == "z":
|
||||
self.flow.backup()
|
||||
enc = conn.headers.get("content-encoding", "identity")
|
||||
if enc != "identity":
|
||||
try:
|
||||
conn.decode()
|
||||
except ValueError:
|
||||
signals.status_message.send(
|
||||
message = "Could not decode - invalid data?"
|
||||
)
|
||||
else:
|
||||
signals.status_prompt_onekey.send(
|
||||
prompt = "Select encoding: ",
|
||||
keys = (
|
||||
("gzip", "z"),
|
||||
("deflate", "d"),
|
||||
("brotli", "b"),
|
||||
),
|
||||
callback = self.encode_callback,
|
||||
args = (conn,)
|
||||
)
|
||||
else:
|
||||
# Key is not handled here.
|
||||
return key
|
||||
|
||||
def encode_callback(self, key, conn):
|
||||
encoding_map = {
|
||||
"z": "gzip",
|
||||
"d": "deflate",
|
||||
"b": "br",
|
||||
}
|
||||
conn.encode(encoding_map[key])
|
||||
|
||||
|
||||
class FlowView(urwid.Frame):
|
||||
keyctx = "flowview"
|
||||
|
|
|
@ -318,6 +318,12 @@ def default_keymap(km):
|
|||
["flowview"]
|
||||
)
|
||||
km.add("p", "view.focus.prev", ["flowview"])
|
||||
km.add(
|
||||
"z",
|
||||
"console.choose \"Part\" request,response "
|
||||
"flow.encode.toggle @focus {choice}",
|
||||
["flowview"]
|
||||
)
|
||||
|
||||
|
||||
class ConsoleMaster(master.Master):
|
||||
|
|
|
@ -100,3 +100,31 @@ def test_flow_set():
|
|||
assert f.response.reason != "foo"
|
||||
sa.flow_set([f], "reason", "foo")
|
||||
assert f.response.reason == "foo"
|
||||
|
||||
|
||||
def test_encoding():
|
||||
sa = core.Core()
|
||||
with taddons.context():
|
||||
f = tflow.tflow()
|
||||
assert sa.encode_options()
|
||||
sa.encode([f], "request", "deflate")
|
||||
assert f.request.headers["content-encoding"] == "deflate"
|
||||
|
||||
sa.encode([f], "request", "br")
|
||||
assert f.request.headers["content-encoding"] == "deflate"
|
||||
|
||||
sa.decode([f], "request")
|
||||
assert "content-encoding" not in f.request.headers
|
||||
|
||||
sa.encode([f], "request", "br")
|
||||
assert f.request.headers["content-encoding"] == "br"
|
||||
|
||||
sa.encode_toggle([f], "request")
|
||||
assert "content-encoding" not in f.request.headers
|
||||
sa.encode_toggle([f], "request")
|
||||
assert f.request.headers["content-encoding"] == "deflate"
|
||||
sa.encode_toggle([f], "request")
|
||||
assert "content-encoding" not in f.request.headers
|
||||
|
||||
with pytest.raises(exceptions.CommandError):
|
||||
sa.encode([f], "request", "invalid")
|
||||
|
|
Loading…
Reference in New Issue