mitmproxy/test/mitmproxy/test_dump.py

242 lines
7.4 KiB
Python

import os
from cStringIO import StringIO
from libmproxy.exceptions import ContentViewException
from libmproxy.models import HTTPResponse
import netlib.tutils
from netlib.http import CONTENT_MISSING
from libmproxy import dump, flow
from libmproxy.proxy import Log
from . import tutils
import mock
def test_strfuncs():
o = dump.Options()
m = dump.DumpMaster(None, o)
m.outfile = StringIO()
m.o.flow_detail = 0
m.echo_flow(tutils.tflow())
assert not m.outfile.getvalue()
m.o.flow_detail = 4
m.echo_flow(tutils.tflow())
assert m.outfile.getvalue()
m.outfile = StringIO()
m.echo_flow(tutils.tflow(resp=True))
assert "<<" in m.outfile.getvalue()
m.outfile = StringIO()
m.echo_flow(tutils.tflow(err=True))
assert "<<" in m.outfile.getvalue()
flow = tutils.tflow()
flow.request = netlib.tutils.treq()
flow.request.stickycookie = True
flow.client_conn = mock.MagicMock()
flow.client_conn.address.host = "foo"
flow.response = netlib.tutils.tresp(content=CONTENT_MISSING)
flow.response.is_replay = True
flow.response.status_code = 300
m.echo_flow(flow)
flow = tutils.tflow(resp=netlib.tutils.tresp(content="{"))
flow.response.headers["content-type"] = "application/json"
flow.response.status_code = 400
m.echo_flow(flow)
@mock.patch("libmproxy.contentviews.get_content_view")
def test_contentview(get_content_view):
get_content_view.side_effect = ContentViewException(""), ("x", iter([]))
o = dump.Options(flow_detail=4, verbosity=3)
m = dump.DumpMaster(None, o, StringIO())
m.echo_flow(tutils.tflow())
assert "Content viewer failed" in m.outfile.getvalue()
class TestDumpMaster:
def _cycle(self, m, content):
f = tutils.tflow(req=netlib.tutils.treq(content=content))
l = Log("connect")
l.reply = mock.MagicMock()
m.handle_log(l)
m.handle_clientconnect(f.client_conn)
m.handle_serverconnect(f.server_conn)
m.handle_request(f)
f.response = HTTPResponse.wrap(netlib.tutils.tresp(content=content))
f = m.handle_response(f)
m.handle_clientdisconnect(f.client_conn)
return f
def _dummy_cycle(self, n, filt, content, **options):
cs = StringIO()
o = dump.Options(filtstr=filt, **options)
m = dump.DumpMaster(None, o, outfile=cs)
for i in range(n):
self._cycle(m, content)
m.shutdown()
return cs.getvalue()
def _flowfile(self, path):
f = open(path, "wb")
fw = flow.FlowWriter(f)
t = tutils.tflow(resp=True)
fw.add(t)
f.close()
def test_error(self):
cs = StringIO()
o = dump.Options(flow_detail=1)
m = dump.DumpMaster(None, o, outfile=cs)
f = tutils.tflow(err=True)
m.handle_request(f)
assert m.handle_error(f)
assert "error" in cs.getvalue()
def test_missing_content(self):
cs = StringIO()
o = dump.Options(flow_detail=3)
m = dump.DumpMaster(None, o, outfile=cs)
f = tutils.tflow()
f.request.content = CONTENT_MISSING
m.handle_request(f)
f.response = HTTPResponse.wrap(netlib.tutils.tresp())
f.response.content = CONTENT_MISSING
m.handle_response(f)
assert "content missing" in cs.getvalue()
def test_replay(self):
cs = StringIO()
o = dump.Options(server_replay=["nonexistent"], kill=True)
tutils.raises(dump.DumpError, dump.DumpMaster, None, o, outfile=cs)
with tutils.tmpdir() as t:
p = os.path.join(t, "rep")
self._flowfile(p)
o = dump.Options(server_replay=[p], kill=True)
m = dump.DumpMaster(None, o, outfile=cs)
self._cycle(m, "content")
self._cycle(m, "content")
o = dump.Options(server_replay=[p], kill=False)
m = dump.DumpMaster(None, o, outfile=cs)
self._cycle(m, "nonexistent")
o = dump.Options(client_replay=[p], kill=False)
m = dump.DumpMaster(None, o, outfile=cs)
def test_read(self):
with tutils.tmpdir() as t:
p = os.path.join(t, "read")
self._flowfile(p)
assert "GET" in self._dummy_cycle(
0,
None,
"",
flow_detail=1,
rfile=p
)
tutils.raises(
dump.DumpError, self._dummy_cycle,
0, None, "", verbosity=1, rfile="/nonexistent"
)
tutils.raises(
dump.DumpError, self._dummy_cycle,
0, None, "", verbosity=1, rfile="test_dump.py"
)
def test_options(self):
o = dump.Options(verbosity = 2)
assert o.verbosity == 2
def test_filter(self):
assert not "GET" in self._dummy_cycle(1, "~u foo", "", verbosity=1)
def test_app(self):
o = dump.Options(app=True)
s = mock.MagicMock()
m = dump.DumpMaster(s, o)
assert len(m.apps.apps) == 1
def test_replacements(self):
cs = StringIO()
o = dump.Options(replacements=[(".*", "content", "foo")])
m = dump.DumpMaster(None, o, outfile=cs)
f = self._cycle(m, "content")
assert f.request.content == "foo"
def test_setheader(self):
cs = StringIO()
o = dump.Options(setheaders=[(".*", "one", "two")])
m = dump.DumpMaster(None, o, outfile=cs)
f = self._cycle(m, "content")
assert f.request.headers["one"] == "two"
def test_basic(self):
for i in (1, 2, 3):
assert "GET" in self._dummy_cycle(1, "~s", "", flow_detail=i)
assert "GET" in self._dummy_cycle(
1,
"~s",
"\x00\x00\x00",
flow_detail=i)
assert "GET" in self._dummy_cycle(1, "~s", "ascii", flow_detail=i)
def test_write(self):
with tutils.tmpdir() as d:
p = os.path.join(d, "a")
self._dummy_cycle(1, None, "", outfile=(p, "wb"), verbosity=0)
assert len(list(flow.FlowReader(open(p, "rb")).stream())) == 1
def test_write_append(self):
with tutils.tmpdir() as d:
p = os.path.join(d, "a.append")
self._dummy_cycle(1, None, "", outfile=(p, "wb"), verbosity=0)
self._dummy_cycle(1, None, "", outfile=(p, "ab"), verbosity=0)
assert len(list(flow.FlowReader(open(p, "rb")).stream())) == 2
def test_write_err(self):
tutils.raises(
dump.DumpError,
self._dummy_cycle,
1,
None,
"",
outfile = ("nonexistentdir/foo", "wb")
)
def test_script(self):
ret = self._dummy_cycle(
1, None, "",
scripts=[tutils.test_data.path("scripts/all.py")], verbosity=1
)
assert "XCLIENTCONNECT" in ret
assert "XSERVERCONNECT" in ret
assert "XREQUEST" in ret
assert "XRESPONSE" in ret
assert "XCLIENTDISCONNECT" in ret
tutils.raises(
dump.DumpError,
self._dummy_cycle, 1, None, "", scripts=["nonexistent"]
)
tutils.raises(
dump.DumpError,
self._dummy_cycle, 1, None, "", scripts=["starterr.py"]
)
def test_stickycookie(self):
self._dummy_cycle(1, None, "", stickycookie = ".*")
def test_stickyauth(self):
self._dummy_cycle(1, None, "", stickyauth = ".*")