add individual-coverage check

This commit is contained in:
Thomas Kriechbaumer 2017-02-15 18:52:32 +01:00
parent 337b1c9399
commit 6b22ca7a32
7 changed files with 274 additions and 114 deletions

View File

@ -43,6 +43,8 @@ matrix:
- debian-sid
packages:
- libssl-dev
- python: 3.5
env: TOXENV=individual_coverage
- python: 3.5
env: TOXENV=docs

View File

@ -49,3 +49,66 @@ exclude =
pathod/pathod.py
pathod/test.py
pathod/protocols/http2.py
[tool:individual_coverage]
exclude =
mitmproxy/addonmanager.py
mitmproxy/addons/onboardingapp/app.py
mitmproxy/addons/termlog.py
mitmproxy/certs.py
mitmproxy/connections.py
mitmproxy/contentviews/base.py
mitmproxy/contentviews/protobuf.py
mitmproxy/contentviews/wbxml.py
mitmproxy/contentviews/xml_html.py
mitmproxy/controller.py
mitmproxy/ctx.py
mitmproxy/exceptions.py
mitmproxy/export.py
mitmproxy/flow.py
mitmproxy/flowfilter.py
mitmproxy/http.py
mitmproxy/io.py
mitmproxy/io_compat.py
mitmproxy/log.py
mitmproxy/master.py
mitmproxy/net/check.py
mitmproxy/net/http/cookies.py
mitmproxy/net/http/headers.py
mitmproxy/net/http/message.py
mitmproxy/net/http/multipart.py
mitmproxy/net/http/url.py
mitmproxy/net/tcp.py
mitmproxy/options.py
mitmproxy/optmanager.py
mitmproxy/proxy/config.py
mitmproxy/proxy/modes/http_proxy.py
mitmproxy/proxy/modes/reverse_proxy.py
mitmproxy/proxy/modes/socks_proxy.py
mitmproxy/proxy/modes/transparent_proxy.py
mitmproxy/proxy/protocol/base.py
mitmproxy/proxy/protocol/http.py
mitmproxy/proxy/protocol/http1.py
mitmproxy/proxy/protocol/http2.py
mitmproxy/proxy/protocol/http_replay.py
mitmproxy/proxy/protocol/rawtcp.py
mitmproxy/proxy/protocol/tls.py
mitmproxy/proxy/protocol/websocket.py
mitmproxy/proxy/root_context.py
mitmproxy/proxy/server.py
mitmproxy/stateobject.py
mitmproxy/types/multidict.py
mitmproxy/utils/bits.py
pathod/language/actions.py
pathod/language/base.py
pathod/language/exceptions.py
pathod/language/generators.py
pathod/language/http.py
pathod/language/message.py
pathod/log.py
pathod/pathoc.py
pathod/pathod.py
pathod/protocols/http.py
pathod/protocols/http2.py
pathod/protocols/websockets.py
pathod/test.py

View File

@ -0,0 +1,82 @@
import io
import contextlib
import os
import sys
import glob
import multiprocessing
import configparser
import itertools
import pytest
def run_tests(src, test, fail):
stderr = io.StringIO()
stdout = io.StringIO()
with contextlib.redirect_stderr(stderr):
with contextlib.redirect_stdout(stdout):
e = pytest.main([
'-qq',
'--disable-pytest-warnings',
'--no-faulthandler',
'--cov', src.replace('.py', '').replace('/', '.'),
'--cov-fail-under', '100',
'--cov-report', 'term-missing:skip-covered',
test
])
if e == 0:
if fail:
print("SUCCESS but should have FAILED:", src, "Please remove this file from setup.cfg tool:individual_coverage/exclude.")
e = 42
else:
print("SUCCESS:", src)
else:
if fail:
print("Ignoring fail:", src)
e = 0
else:
cov = [l for l in stdout.getvalue().split("\n") if (src in l) or ("was never imported" in l)]
if len(cov) == 1:
print("FAIL:", cov[0])
else:
print("FAIL:", src, test, stdout.getvalue(), stdout.getvalue())
print(stderr.getvalue())
print(stdout.getvalue())
sys.exit(e)
def start_pytest(src, test, fail):
# run pytest in a new process, otherwise imports and modules might conflict
proc = multiprocessing.Process(target=run_tests, args=(src, test, fail))
proc.start()
proc.join()
return (src, test, proc.exitcode)
def main():
c = configparser.ConfigParser()
c.read('setup.cfg')
fs = c['tool:individual_coverage']['exclude'].strip().split('\n')
no_individual_cov = [f.strip() for f in fs]
excluded = ['mitmproxy/contrib/', 'mitmproxy/test/', 'mitmproxy/tools/', 'mitmproxy/platform/']
src_files = glob.glob('mitmproxy/**/*.py', recursive=True) + glob.glob('pathod/**/*.py', recursive=True)
src_files = [f for f in src_files if os.path.basename(f) != '__init__.py']
src_files = [f for f in src_files if not any(os.path.normpath(p) in f for p in excluded)]
ps = []
for src in sorted(src_files):
test = os.path.join("test", os.path.dirname(src), "test_" + os.path.basename(src))
if os.path.isfile(test):
ps.append((src, test, src in no_individual_cov))
result = list(itertools.starmap(start_pytest, ps))
if any(e != 0 for _, _, e in result):
sys.exit(1)
pass
if __name__ == '__main__':
main()

View File

@ -10,7 +10,6 @@ from mitmproxy.exceptions import FlowReadException, Kill
from mitmproxy import flow
from mitmproxy import http
from mitmproxy import connections
from mitmproxy import tcp
from mitmproxy.proxy import ProxyConfig
from mitmproxy.proxy.server import DummyServer
from mitmproxy import master
@ -157,117 +156,6 @@ class TestHTTPFlow:
assert f.response.raw_content == b"abarb"
class TestWebSocketFlow:
def test_copy(self):
f = tflow.twebsocketflow()
f.get_state()
f2 = f.copy()
a = f.get_state()
b = f2.get_state()
del a["id"]
del b["id"]
del a["handshake_flow"]["id"]
del b["handshake_flow"]["id"]
assert a == b
assert not f == f2
assert f is not f2
assert f.client_key == f2.client_key
assert f.client_protocol == f2.client_protocol
assert f.client_extensions == f2.client_extensions
assert f.server_accept == f2.server_accept
assert f.server_protocol == f2.server_protocol
assert f.server_extensions == f2.server_extensions
assert f.messages is not f2.messages
assert f.handshake_flow is not f2.handshake_flow
for m in f.messages:
m2 = m.copy()
m2.set_state(m2.get_state())
assert m is not m2
assert m.get_state() == m2.get_state()
f = tflow.twebsocketflow(err=True)
f2 = f.copy()
assert f is not f2
assert f.handshake_flow is not f2.handshake_flow
assert f.error.get_state() == f2.error.get_state()
assert f.error is not f2.error
def test_match(self):
f = tflow.twebsocketflow()
assert not flowfilter.match("~b nonexistent", f)
assert flowfilter.match(None, f)
assert not flowfilter.match("~b nonexistent", f)
f = tflow.twebsocketflow(err=True)
assert flowfilter.match("~e", f)
with pytest.raises(ValueError):
flowfilter.match("~", f)
def test_repr(self):
f = tflow.twebsocketflow()
assert 'WebSocketFlow' in repr(f)
assert 'binary message: ' in repr(f.messages[0])
assert 'text message: ' in repr(f.messages[1])
class TestTCPFlow:
def test_copy(self):
f = tflow.ttcpflow()
f.get_state()
f2 = f.copy()
a = f.get_state()
b = f2.get_state()
del a["id"]
del b["id"]
assert a == b
assert not f == f2
assert f is not f2
assert f.messages is not f2.messages
for m in f.messages:
assert m.get_state()
m2 = m.copy()
assert not m == m2
assert m is not m2
a = m.get_state()
b = m2.get_state()
assert a == b
m = tcp.TCPMessage(False, 'foo')
m.set_state(f.messages[0].get_state())
assert m.timestamp == f.messages[0].timestamp
f = tflow.ttcpflow(err=True)
f2 = f.copy()
assert f is not f2
assert f.error.get_state() == f2.error.get_state()
assert f.error is not f2.error
def test_match(self):
f = tflow.ttcpflow()
assert not flowfilter.match("~b nonexistent", f)
assert flowfilter.match(None, f)
assert not flowfilter.match("~b nonexistent", f)
f = tflow.ttcpflow(err=True)
assert flowfilter.match("~e", f)
with pytest.raises(ValueError):
flowfilter.match("~", f)
def test_repr(self):
f = tflow.ttcpflow()
assert 'TCPFlow' in repr(f)
assert '-> ' in repr(f.messages[0])
class TestSerialize:
def _treader(self):

View File

@ -1 +1,59 @@
# TODO: write tests
import pytest
from mitmproxy import tcp
from mitmproxy import flowfilter
from mitmproxy.test import tflow
class TestTCPFlow:
def test_copy(self):
f = tflow.ttcpflow()
f.get_state()
f2 = f.copy()
a = f.get_state()
b = f2.get_state()
del a["id"]
del b["id"]
assert a == b
assert not f == f2
assert f is not f2
assert f.messages is not f2.messages
for m in f.messages:
assert m.get_state()
m2 = m.copy()
assert not m == m2
assert m is not m2
a = m.get_state()
b = m2.get_state()
assert a == b
m = tcp.TCPMessage(False, 'foo')
m.set_state(f.messages[0].get_state())
assert m.timestamp == f.messages[0].timestamp
f = tflow.ttcpflow(err=True)
f2 = f.copy()
assert f is not f2
assert f.error.get_state() == f2.error.get_state()
assert f.error is not f2.error
def test_match(self):
f = tflow.ttcpflow()
assert not flowfilter.match("~b nonexistent", f)
assert flowfilter.match(None, f)
assert not flowfilter.match("~b nonexistent", f)
f = tflow.ttcpflow(err=True)
assert flowfilter.match("~e", f)
with pytest.raises(ValueError):
flowfilter.match("~", f)
def test_repr(self):
f = tflow.ttcpflow()
assert 'TCPFlow' in repr(f)
assert '-> ' in repr(f.messages[0])

View File

@ -1 +1,62 @@
# TODO: write tests
import pytest
from mitmproxy import flowfilter
from mitmproxy.test import tflow
class TestWebSocketFlow:
def test_copy(self):
f = tflow.twebsocketflow()
f.get_state()
f2 = f.copy()
a = f.get_state()
b = f2.get_state()
del a["id"]
del b["id"]
del a["handshake_flow"]["id"]
del b["handshake_flow"]["id"]
assert a == b
assert not f == f2
assert f is not f2
assert f.client_key == f2.client_key
assert f.client_protocol == f2.client_protocol
assert f.client_extensions == f2.client_extensions
assert f.server_accept == f2.server_accept
assert f.server_protocol == f2.server_protocol
assert f.server_extensions == f2.server_extensions
assert f.messages is not f2.messages
assert f.handshake_flow is not f2.handshake_flow
for m in f.messages:
m2 = m.copy()
m2.set_state(m2.get_state())
assert m is not m2
assert m.get_state() == m2.get_state()
f = tflow.twebsocketflow(err=True)
f2 = f.copy()
assert f is not f2
assert f.handshake_flow is not f2.handshake_flow
assert f.error.get_state() == f2.error.get_state()
assert f.error is not f2.error
def test_match(self):
f = tflow.twebsocketflow()
assert not flowfilter.match("~b nonexistent", f)
assert flowfilter.match(None, f)
assert not flowfilter.match("~b nonexistent", f)
f = tflow.twebsocketflow(err=True)
assert flowfilter.match("~e", f)
with pytest.raises(ValueError):
flowfilter.match("~", f)
def test_repr(self):
f = tflow.twebsocketflow()
assert f.message_info(f.messages[0])
assert 'WebSocketFlow' in repr(f)
assert 'binary message: ' in repr(f.messages[0])
assert 'text message: ' in repr(f.messages[1])

View File

@ -36,6 +36,12 @@ commands =
mitmproxy/tools/web/ \
mitmproxy/contentviews/
[testenv:individual_coverage]
deps =
-rrequirements.txt
commands =
python3 test/individual_coverage.py
[testenv:wheel]
recreate = True
deps =