This commit is contained in:
Shadab Zafar 2016-07-07 19:37:33 +05:30
parent b4469d2579
commit 147f61fa62
3 changed files with 29 additions and 24 deletions

View File

@ -172,7 +172,7 @@ class ViewHex(View):
def _format(data): def _format(data):
for offset, hexa, s in strutils.hexdump(data): for offset, hexa, s in strutils.hexdump(data):
yield [ yield [
("offset", offset + " "), ("offset", offset + b" "),
("text", hexa + " "), ("text", hexa + " "),
("text", s) ("text", s)
] ]

View File

@ -244,21 +244,21 @@ class DumpMaster(flow.FlowMaster):
stickycookie = "" stickycookie = ""
if flow.client_conn: if flow.client_conn:
client = click.style(strutils.bytes_to_escaped_str(flow.client_conn.address.host), bold=True) client = click.style(strutils.escape_control_characters(flow.client_conn.address.host), bold=True)
else: else:
client = click.style("[replay]", fg="yellow", bold=True) client = click.style("[replay]", fg="yellow", bold=True)
method = flow.request.data.method method = flow.request.method
method_color = dict( method_color = dict(
GET="green", GET="green",
DELETE="red" DELETE="red"
).get(method.upper(), "magenta") ).get(method.upper(), "magenta")
method = click.style(strutils.bytes_to_escaped_str(method), fg=method_color, bold=True) method = click.style(strutils.escape_control_characters(method), fg=method_color, bold=True)
if self.showhost: if self.showhost:
url = flow.request.pretty_url url = flow.request.pretty_url
else: else:
url = flow.request.url url = flow.request.url
url = click.style(url, bold=True) url = click.style(strutils.escape_control_characters(url), bold=True)
httpversion = "" httpversion = ""
if flow.request.http_version not in ("HTTP/1.1", "HTTP/1.0"): if flow.request.http_version not in ("HTTP/1.1", "HTTP/1.0"):
@ -288,7 +288,7 @@ class DumpMaster(flow.FlowMaster):
elif 400 <= code < 600: elif 400 <= code < 600:
code_color = "red" code_color = "red"
code = click.style(str(code), fg=code_color, bold=True, blink=(code == 418)) code = click.style(str(code), fg=code_color, bold=True, blink=(code == 418))
reason = click.style(strutils.bytes_to_escaped_str(flow.response.data.reason), fg=code_color, bold=True) reason = click.style(strutils.escape_control_characters(flow.response.reason), fg=code_color, bold=True)
if flow.response.content is None: if flow.response.content is None:
size = "(content missing)" size = "(content missing)"

View File

@ -40,7 +40,7 @@ def test_strfuncs():
flow.response.status_code = 300 flow.response.status_code = 300
m.echo_flow(flow) m.echo_flow(flow)
flow = tutils.tflow(resp=netlib.tutils.tresp(content="{")) flow = tutils.tflow(resp=netlib.tutils.tresp(content=b"{"))
flow.response.headers["content-type"] = "application/json" flow.response.headers["content-type"] = "application/json"
flow.response.status_code = 400 flow.response.status_code = 400
m.echo_flow(flow) m.echo_flow(flow)
@ -70,15 +70,20 @@ class TestDumpMaster(mastertest.MasterTest):
def test_basic(self): def test_basic(self):
for i in (1, 2, 3): for i in (1, 2, 3):
assert "GET" in self.dummy_cycle(self.mkmaster("~s", flow_detail=i), 1, "")
assert "GET" in self.dummy_cycle( assert "GET" in self.dummy_cycle(
self.mkmaster("~s", flow_detail=i), self.mkmaster("~s", flow_detail=i),
1, 1,
"\x00\x00\x00" b""
) )
assert "GET" in self.dummy_cycle( assert "GET" in self.dummy_cycle(
self.mkmaster("~s", flow_detail=i), self.mkmaster("~s", flow_detail=i),
1, "ascii" 1,
b"\x00\x00\x00"
)
assert "GET" in self.dummy_cycle(
self.mkmaster("~s", flow_detail=i),
1,
b"ascii"
) )
def test_error(self): def test_error(self):
@ -115,12 +120,12 @@ class TestDumpMaster(mastertest.MasterTest):
o = dump.Options(server_replay=[p], kill=True) o = dump.Options(server_replay=[p], kill=True)
m = dump.DumpMaster(None, o, outfile=cs) m = dump.DumpMaster(None, o, outfile=cs)
self.cycle(m, "content") self.cycle(m, b"content")
self.cycle(m, "content") self.cycle(m, b"content")
o = dump.Options(server_replay=[p], kill=False) o = dump.Options(server_replay=[p], kill=False)
m = dump.DumpMaster(None, o, outfile=cs) m = dump.DumpMaster(None, o, outfile=cs)
self.cycle(m, "nonexistent") self.cycle(m, b"nonexistent")
o = dump.Options(client_replay=[p], kill=False) o = dump.Options(client_replay=[p], kill=False)
m = dump.DumpMaster(None, o, outfile=cs) m = dump.DumpMaster(None, o, outfile=cs)
@ -131,7 +136,7 @@ class TestDumpMaster(mastertest.MasterTest):
self.flowfile(p) self.flowfile(p)
assert "GET" in self.dummy_cycle( assert "GET" in self.dummy_cycle(
self.mkmaster(None, flow_detail=1, rfile=p), self.mkmaster(None, flow_detail=1, rfile=p),
0, "", 0, b"",
) )
tutils.raises( tutils.raises(
@ -149,7 +154,7 @@ class TestDumpMaster(mastertest.MasterTest):
def test_filter(self): def test_filter(self):
assert "GET" not in self.dummy_cycle( assert "GET" not in self.dummy_cycle(
self.mkmaster("~u foo", verbosity=1), 1, "" self.mkmaster("~u foo", verbosity=1), 1, b""
) )
def test_app(self): def test_app(self):
@ -162,21 +167,21 @@ class TestDumpMaster(mastertest.MasterTest):
cs = StringIO() cs = StringIO()
o = dump.Options(replacements=[(".*", "content", "foo")]) o = dump.Options(replacements=[(".*", "content", "foo")])
m = dump.DumpMaster(None, o, outfile=cs) m = dump.DumpMaster(None, o, outfile=cs)
f = self.cycle(m, "content") f = self.cycle(m, b"content")
assert f.request.content == "foo" assert f.request.content == b"foo"
def test_setheader(self): def test_setheader(self):
cs = StringIO() cs = StringIO()
o = dump.Options(setheaders=[(".*", "one", "two")]) o = dump.Options(setheaders=[(".*", "one", "two")])
m = dump.DumpMaster(None, o, outfile=cs) m = dump.DumpMaster(None, o, outfile=cs)
f = self.cycle(m, "content") f = self.cycle(m, b"content")
assert f.request.headers["one"] == "two" assert f.request.headers["one"] == "two"
def test_write(self): def test_write(self):
with tutils.tmpdir() as d: with tutils.tmpdir() as d:
p = os.path.join(d, "a") p = os.path.join(d, "a")
self.dummy_cycle( self.dummy_cycle(
self.mkmaster(None, outfile=(p, "wb"), verbosity=0), 1, "" self.mkmaster(None, outfile=(p, "wb"), verbosity=0), 1, b""
) )
assert len(list(flow.FlowReader(open(p, "rb")).stream())) == 1 assert len(list(flow.FlowReader(open(p, "rb")).stream())) == 1
@ -185,11 +190,11 @@ class TestDumpMaster(mastertest.MasterTest):
p = os.path.join(d, "a.append") p = os.path.join(d, "a.append")
self.dummy_cycle( self.dummy_cycle(
self.mkmaster(None, outfile=(p, "wb"), verbosity=0), self.mkmaster(None, outfile=(p, "wb"), verbosity=0),
1, "" 1, b""
) )
self.dummy_cycle( self.dummy_cycle(
self.mkmaster(None, outfile=(p, "ab"), verbosity=0), self.mkmaster(None, outfile=(p, "ab"), verbosity=0),
1, "" 1, b""
) )
assert len(list(flow.FlowReader(open(p, "rb")).stream())) == 2 assert len(list(flow.FlowReader(open(p, "rb")).stream())) == 2
@ -205,7 +210,7 @@ class TestDumpMaster(mastertest.MasterTest):
None, None,
scripts=[tutils.test_data.path("data/scripts/all.py")], verbosity=1 scripts=[tutils.test_data.path("data/scripts/all.py")], verbosity=1
), ),
1, "", 1, b"",
) )
assert "XCLIENTCONNECT" in ret assert "XCLIENTCONNECT" in ret
assert "XSERVERCONNECT" in ret assert "XSERVERCONNECT" in ret
@ -226,11 +231,11 @@ class TestDumpMaster(mastertest.MasterTest):
def test_stickycookie(self): def test_stickycookie(self):
self.dummy_cycle( self.dummy_cycle(
self.mkmaster(None, stickycookie = ".*"), self.mkmaster(None, stickycookie = ".*"),
1, "" 1, b""
) )
def test_stickyauth(self): def test_stickyauth(self):
self.dummy_cycle( self.dummy_cycle(
self.mkmaster(None, stickyauth = ".*"), self.mkmaster(None, stickyauth = ".*"),
1, "" 1, b""
) )