add advanced proxying options, add SSL-terminating capability to mitmproxy

This commit is contained in:
Maximilian Hils 2014-03-10 05:11:51 +01:00
parent 78750a8b4d
commit fe58c1c6eb
14 changed files with 182 additions and 164 deletions

View File

@ -10,7 +10,7 @@ __Host__ header field from the request, not the reverse proxy server.
<table class="table"> <table class="table">
<tbody> <tbody>
<tr> <tr>
<th width="20%">command-line</th> <td>-P http[s]://hostname[:port]</td> <th width="20%">command-line</th> <td>-R http[s]://hostname[:port]</td>
</tr> </tr>
<tr> <tr>
<th>mitmproxy shortcut</th> <td><b>P</b></td> <th>mitmproxy shortcut</th> <td><b>P</b></td>

View File

@ -1,12 +1,16 @@
from . import proxy from . import proxy
import re, filt import re, filt
import argparse import argparse
from argparse import ArgumentTypeError
from netlib import http
APP_HOST = "mitm.it" APP_HOST = "mitm.it"
APP_PORT = 80 APP_PORT = 80
class ParseException(Exception): pass
class OptionException(Exception): pass class ParseException(Exception):
pass
def _parse_hook(s): def _parse_hook(s):
sep, rem = s[0], s[1:] sep, rem = s[0], s[1:]
@ -91,6 +95,26 @@ def parse_setheader(s):
return _parse_hook(s) return _parse_hook(s)
def parse_server_spec(url):
normalized_url = re.sub("^https?2", "", url)
p = http.parse_url(normalized_url)
if not p or not p[1]:
raise ArgumentTypeError("Invalid server specification: %s" % url)
if url.lower().startswith("https2http"):
ssl = [True, False]
elif url.lower().startswith("http2https"):
ssl = [False, True]
elif url.lower().startswith("https"):
ssl = [True, True]
else:
ssl = [False, False]
return ssl + list(p[1:3])
def get_common_options(options): def get_common_options(options):
stickycookie, stickyauth = None, None stickycookie, stickyauth = None, None
if options.stickycookie_filt: if options.stickycookie_filt:
@ -104,17 +128,17 @@ def get_common_options(options):
try: try:
p = parse_replace_hook(i) p = parse_replace_hook(i)
except ParseException, e: except ParseException, e:
raise OptionException(e.message) raise ArgumentTypeError(e.message)
reps.append(p) reps.append(p)
for i in options.replace_file: for i in options.replace_file:
try: try:
patt, rex, path = parse_replace_hook(i) patt, rex, path = parse_replace_hook(i)
except ParseException, e: except ParseException, e:
raise OptionException(e.message) raise ArgumentTypeError(e.message)
try: try:
v = open(path, "rb").read() v = open(path, "rb").read()
except IOError, e: except IOError, e:
raise OptionException("Could not read replace file: %s"%path) raise ArgumentTypeError("Could not read replace file: %s"%path)
reps.append((patt, rex, v)) reps.append((patt, rex, v))
@ -123,7 +147,7 @@ def get_common_options(options):
try: try:
p = parse_setheader(i) p = parse_setheader(i)
except ParseException, e: except ParseException, e:
raise OptionException(e.message) raise ArgumentTypeError(e.message)
setheaders.append(p) setheaders.append(p)
return dict( return dict(
@ -185,16 +209,48 @@ def common_options(parser):
action="store", type = int, dest="port", default=8080, action="store", type = int, dest="port", default=8080,
help = "Proxy service port." help = "Proxy service port."
) )
# We could make a mutually exclusive group out of -R, -F, -T, but we don't do because
# - --upstream-server should be in that group as well, but it's already in a different group.
# - our own error messages are more helpful
parser.add_argument( parser.add_argument(
"-P", "-R",
action="store", dest="reverse_proxy", default=None, action="store", type=parse_server_spec, dest="reverse_proxy", default=None,
help="Reverse proxy to upstream server: http[s]://host[:port]" help="Reverse proxy to upstream server: http[s]://host[:port]"
) )
parser.add_argument( parser.add_argument(
"-F", "-F",
action="store", dest="forward_proxy", default=None, action="store", type=parse_server_spec, dest="forward_proxy", default=None,
help="Proxy to unconditionally forward to: http[s]://host[:port]" help="Proxy to unconditionally forward to: http[s]://host[:port]"
) )
parser.add_argument(
"-T",
action="store_true", dest="transparent_proxy", default=False,
help="Set transparent proxy mode."
)
group = parser.add_argument_group(
"Advanced Proxy Options",
"""
The following options allow a custom adjustment of the proxy behavior.
Normally, you don't want to use these options directly and use the provided wrappers instead (-R, -F, -T).
""".strip()
)
group.add_argument(
"--http-form-in", dest="http_form_in", default=None,
action="store", choices=("relative", "absolute"),
help="Override the HTTP request form accepted by the proxy"
)
group.add_argument(
"--http-form-out", dest="http_form_out", default=None,
action="store", choices=("relative", "absolute"),
help="Override the HTTP request form sent upstream by the proxy"
)
group.add_argument(
"--upstream-server", dest="manual_upstream_server", default=None,
action="store", type=parse_server_spec,
help="Override the destination server all requests are sent to."
)
parser.add_argument( parser.add_argument(
"-q", "-q",
action="store_true", dest="quiet", action="store_true", dest="quiet",
@ -216,11 +272,6 @@ def common_options(parser):
action="store", dest="stickycookie_filt", default=None, metavar="FILTER", action="store", dest="stickycookie_filt", default=None, metavar="FILTER",
help="Set sticky cookie filter. Matched against requests." help="Set sticky cookie filter. Matched against requests."
) )
parser.add_argument(
"-T",
action="store_true", dest="transparent_proxy", default=False,
help="Set transparent proxy mode."
)
parser.add_argument( parser.add_argument(
"-u", "-u",
action="store", dest="stickyauth_filt", default=None, metavar="FILTER", action="store", dest="stickyauth_filt", default=None, metavar="FILTER",

View File

@ -144,10 +144,6 @@ class StatusBar(common.WWrap):
r.append("[") r.append("[")
r.append(("heading_key", "u")) r.append(("heading_key", "u"))
r.append(":%s]"%self.master.stickyauth_txt) r.append(":%s]"%self.master.stickyauth_txt)
if self.master.server.config.reverse_proxy:
r.append("[")
r.append(("heading_key", "P"))
r.append(":%s]"%utils.unparse_url(*self.master.server.config.reverse_proxy))
if self.master.state.default_body_view.name != "Auto": if self.master.state.default_body_view.name != "Auto":
r.append("[") r.append("[")
r.append(("heading_key", "M")) r.append(("heading_key", "M"))
@ -172,6 +168,8 @@ class StatusBar(common.WWrap):
if opts: if opts:
r.append("[%s]"%(":".join(opts))) r.append("[%s]"%(":".join(opts)))
if self.master.server.config.upstream_server:
r.append("[dest:%s]"%utils.unparse_url(*self.master.server.config.upstream_server))
if self.master.scripts: if self.master.scripts:
r.append("[scripts:%s]"%len(self.master.scripts)) r.append("[scripts:%s]"%len(self.master.scripts))
if self.master.debug: if self.master.debug:
@ -763,15 +761,6 @@ class ConsoleMaster(flow.FlowMaster):
self.state.default_body_view = v self.state.default_body_view = v
self.refresh_focus() self.refresh_focus()
def set_reverse_proxy(self, txt):
if not txt:
self.server.config.reverse_proxy = None
else:
s = utils.parse_proxy_spec(txt)
if not s:
return "Invalid reverse proxy specification"
self.server.config.reverse_proxy = s
def drawscreen(self): def drawscreen(self):
size = self.ui.get_cols_rows() size = self.ui.get_cols_rows()
canvas = self.view.render(size, focus=1) canvas = self.view.render(size, focus=1)
@ -866,16 +855,6 @@ class ConsoleMaster(flow.FlowMaster):
contentview.view_prompts, contentview.view_prompts,
self.change_default_display_mode self.change_default_display_mode
) )
elif k == "P":
if self.server.config.reverse_proxy:
p = utils.unparse_url(*self.server.config.reverse_proxy)
else:
p = ""
self.prompt(
"Reverse proxy: ",
p,
self.set_reverse_proxy
)
elif k == "R": elif k == "R":
self.view_grideditor( self.view_grideditor(
grideditor.ReplaceEditor( grideditor.ReplaceEditor(

View File

@ -109,7 +109,6 @@ class HelpView(urwid.ListBox):
("q", "quit / return to flow list"), ("q", "quit / return to flow list"),
("Q", "quit without confirm prompt"), ("Q", "quit without confirm prompt"),
("P", "set reverse proxy mode"),
("R", "edit replacement patterns"), ("R", "edit replacement patterns"),
("s", "set/unset script"), ("s", "set/unset script"),
("S", "server replay"), ("S", "server replay"),

View File

@ -1027,7 +1027,7 @@ class HTTPHandler(ProtocolHandler, TemporaryServerChangeMixin):
raise http.HttpError(400, "Must not CONNECT on already encrypted connection") raise http.HttpError(400, "Must not CONNECT on already encrypted connection")
if self.expected_form_in == "absolute": if self.expected_form_in == "absolute":
if not self.c.config.upstream_server: if not self.c.config.get_upstream_server:
self.c.set_server_address((request.host, request.port), AddressPriority.FROM_PROTOCOL) self.c.set_server_address((request.host, request.port), AddressPriority.FROM_PROTOCOL)
flow.server_conn = self.c.server_conn # Update server_conn attribute on the flow flow.server_conn = self.c.server_conn # Update server_conn attribute on the flow
self.c.client_conn.send( self.c.client_conn.send(

View File

@ -1,7 +1,7 @@
import os import os
from .. import utils, platform from .. import utils, platform
from netlib import http_auth, certutils from netlib import http_auth, certutils
from .primitives import ConstUpstreamServerResolver, TransparentUpstreamServerResolver
TRANSPARENT_SSL_PORTS = [443, 8443] TRANSPARENT_SSL_PORTS = [443, 8443]
CONF_BASENAME = "mitmproxy" CONF_BASENAME = "mitmproxy"
@ -10,18 +10,17 @@ CONF_DIR = "~/.mitmproxy"
class ProxyConfig: class ProxyConfig:
def __init__(self, confdir=CONF_DIR, clientcerts=None, def __init__(self, confdir=CONF_DIR, clientcerts=None,
no_upstream_cert=False, body_size_limit=None, upstream_server=None, no_upstream_cert=False, body_size_limit=None, get_upstream_server=None,
http_form_in="absolute", http_form_out="relative", transparent_proxy=None, authenticator=None, http_form_in="absolute", http_form_out="relative", authenticator=None,
ciphers=None, certs=None ciphers=None, certs=None
): ):
self.ciphers = ciphers self.ciphers = ciphers
self.clientcerts = clientcerts self.clientcerts = clientcerts
self.no_upstream_cert = no_upstream_cert self.no_upstream_cert = no_upstream_cert
self.body_size_limit = body_size_limit self.body_size_limit = body_size_limit
self.upstream_server = upstream_server self.get_upstream_server = get_upstream_server
self.http_form_in = http_form_in self.http_form_in = http_form_in
self.http_form_out = http_form_out self.http_form_out = http_form_out
self.transparent_proxy = transparent_proxy
self.authenticator = authenticator self.authenticator = authenticator
self.confdir = os.path.expanduser(confdir) self.confdir = os.path.expanduser(confdir)
self.certstore = certutils.CertStore.from_store(self.confdir, CONF_BASENAME) self.certstore = certutils.CertStore.from_store(self.confdir, CONF_BASENAME)
@ -29,32 +28,34 @@ class ProxyConfig:
def process_proxy_options(parser, options): def process_proxy_options(parser, options):
body_size_limit = utils.parse_size(options.body_size_limit) body_size_limit = utils.parse_size(options.body_size_limit)
if options.reverse_proxy and options.transparent_proxy:
return parser.error("Can't set both reverse proxy and transparent proxy.")
c = 0
http_form_in, http_form_out = "absolute", "relative"
get_upstream_server = None
if options.transparent_proxy: if options.transparent_proxy:
c += 1
if not platform.resolver: if not platform.resolver:
return parser.error("Transparent mode not supported on this platform.") return parser.error("Transparent mode not supported on this platform.")
trans = dict( get_upstream_server = TransparentUpstreamServerResolver(platform.resolver(), TRANSPARENT_SSL_PORTS)
resolver=platform.resolver(), http_form_in, http_form_out = "relative", "relative"
sslports=TRANSPARENT_SSL_PORTS
)
else:
trans = None
if options.reverse_proxy: if options.reverse_proxy:
rp = utils.parse_proxy_spec(options.reverse_proxy) c += 1
if not rp: get_upstream_server = ConstUpstreamServerResolver(options.reverse_proxy)
return parser.error("Invalid reverse proxy specification: %s" % options.reverse_proxy) http_form_in, http_form_out = "relative", "relative"
else:
rp = None
if options.forward_proxy: if options.forward_proxy:
fp = utils.parse_proxy_spec(options.forward_proxy) c += 1
if not fp: get_upstream_server = ConstUpstreamServerResolver(options.forward_proxy)
return parser.error("Invalid forward proxy specification: %s" % options.forward_proxy) http_form_in, http_form_out = "absolute", "absolute"
else: if options.manual_upstream_server:
fp = None c += 1
get_upstream_server = ConstUpstreamServerResolver(options.manual_upstream_server)
if c > 1:
return parser.error("Transparent mode, reverse mode, forward mode and "
"specification of an upstream server are mutually exclusive.")
if options.http_form_in:
http_form_in = options.http_form_in
if options.http_form_out:
http_form_out = options.http_form_out
if options.clientcerts: if options.clientcerts:
options.clientcerts = os.path.expanduser(options.clientcerts) options.clientcerts = os.path.expanduser(options.clientcerts)
@ -94,10 +95,9 @@ def process_proxy_options(parser, options):
clientcerts=options.clientcerts, clientcerts=options.clientcerts,
body_size_limit=body_size_limit, body_size_limit=body_size_limit,
no_upstream_cert=options.no_upstream_cert, no_upstream_cert=options.no_upstream_cert,
upstream_server=(rp or fp), get_upstream_server=get_upstream_server,
http_form_in=("relative" if (rp or trans) else "absolute"), http_form_in=http_form_in,
http_form_out=("absolute" if fp else "relative"), http_form_out=http_form_out,
transparent_proxy=trans,
authenticator=authenticator, authenticator=authenticator,
ciphers=options.ciphers, ciphers=options.ciphers,
certs = certs, certs = certs,

View File

@ -18,17 +18,48 @@ class ProxyServerError(Exception):
pass pass
class UpstreamServerResolver(object):
def __call__(self, conn):
"""
Returns the address of the server to connect to.
"""
raise NotImplementedError
class ConstUpstreamServerResolver(UpstreamServerResolver):
def __init__(self, dst):
self.dst = dst
def __call__(self, conn):
return self.dst
class TransparentUpstreamServerResolver(UpstreamServerResolver):
def __init__(self, resolver, sslports):
self.resolver = resolver
self.sslports = sslports
def __call__(self, conn):
dst = self.resolver.original_addr(conn)
if not dst:
raise ProxyError(502, "Transparent mode failure: could not resolve original destination.")
if dst[1] in self.sslports:
ssl = True
else:
ssl = False
return [ssl, ssl] + list(dst)
class AddressPriority(object): class AddressPriority(object):
""" """
Enum that signifies the priority of the given address when choosing the destination host. Enum that signifies the priority of the given address when choosing the destination host.
Higher is better (None < i) Higher is better (None < i)
""" """
MANUALLY_CHANGED = 4 MANUALLY_CHANGED = 3
"""user changed the target address in the ui""" """user changed the target address in the ui"""
FROM_SETTINGS = 3 FROM_SETTINGS = 2
"""upstream proxy from arguments (reverse proxy or forward proxy)""" """upstream server from arguments (reverse proxy, forward proxy or from transparent resolver)"""
FROM_CONNECTION = 2
"""derived from transparent resolver"""
FROM_PROTOCOL = 1 FROM_PROTOCOL = 1
"""derived from protocol (e.g. absolute-form http requests)""" """derived from protocol (e.g. absolute-form http requests)"""

View File

@ -68,22 +68,13 @@ class ConnectionHandler:
try: try:
try: try:
# Can we already identify the target server and connect to it? # Can we already identify the target server and connect to it?
server_address = None if self.config.get_upstream_server:
address_priority = None upstream_info = self.config.get_upstream_server(self.client_conn.connection)
if self.config.upstream_server: self.set_server_address(upstream_info[2:], AddressPriority.FROM_SETTINGS)
server_address = self.config.upstream_server[1:] client_ssl, server_ssl = upstream_info[:2]
address_priority = AddressPriority.FROM_SETTINGS if client_ssl or server_ssl:
elif self.config.transparent_proxy: self.establish_server_connection()
server_address = self.config.transparent_proxy["resolver"].original_addr( self.establish_ssl(client=client_ssl, server=server_ssl)
self.client_conn.connection)
if not server_address:
raise ProxyError(502, "Transparent mode failure: could not resolve original destination.")
address_priority = AddressPriority.FROM_CONNECTION
self.log("transparent to %s:%s" % server_address)
if server_address:
self.set_server_address(server_address, address_priority)
self._handle_ssl()
while not self.close: while not self.close:
try: try:
@ -105,25 +96,6 @@ class ConnectionHandler:
self.log("clientdisconnect") self.log("clientdisconnect")
self.channel.tell("clientdisconnect", self) self.channel.tell("clientdisconnect", self)
def _handle_ssl(self):
"""
Helper function of .handle()
Check if we can already identify SSL connections.
If so, connect to the server and establish an SSL connection
"""
client_ssl = False
server_ssl = False
if self.config.transparent_proxy:
client_ssl = server_ssl = (self.server_conn.address.port in self.config.transparent_proxy["sslports"])
elif self.config.upstream_server:
client_ssl = server_ssl = (self.config.upstream_server[0] == "https")
# TODO: Make protocol generic (as with transparent proxies)
# TODO: Add SSL-terminating capatbility (SSL -> mitmproxy -> plain and vice versa)
if client_ssl or server_ssl:
self.establish_server_connection()
self.establish_ssl(client=client_ssl, server=server_ssl)
def del_server_connection(self): def del_server_connection(self):
""" """
Deletes an existing server connection. Deletes an existing server connection.

View File

@ -1,7 +1,6 @@
import os, datetime, urllib, re import os, datetime, urllib, re
import time, functools, cgi import time, functools, cgi
import json import json
from netlib import http
def timestamp(): def timestamp():
""" """
@ -143,14 +142,6 @@ class LRUCache:
return ret return ret
return wrap return wrap
def parse_proxy_spec(url):
p = http.parse_url(url)
if not p or not p[1]:
return None
return p[:3]
def parse_content_type(c): def parse_content_type(c):
""" """
A simple parser for content-type values. Returns a (type, subtype, A simple parser for content-type values. Returns a (type, subtype,

View File

@ -36,6 +36,17 @@ def test_parse_replace_hook():
) )
def test_parse_server_spec():
tutils.raises("Invalid server specification", cmdline.parse_server_spec, "")
assert cmdline.parse_server_spec("http://foo.com:88") == [False, False, "foo.com", 88]
assert cmdline.parse_server_spec("http://foo.com") == [False, False, "foo.com", 80]
assert cmdline.parse_server_spec("https://foo.com") == [True, True, "foo.com", 443]
assert cmdline.parse_server_spec("https2http://foo.com") == [True, False, "foo.com", 80]
assert cmdline.parse_server_spec("http2https://foo.com") == [False, True, "foo.com", 443]
tutils.raises("Invalid server specification", cmdline.parse_server_spec, "foo.com")
tutils.raises("Invalid server specification", cmdline.parse_server_spec, "http://")
def test_parse_setheaders(): def test_parse_setheaders():
x = cmdline.parse_setheader("/foo/bar/voing") x = cmdline.parse_setheader("/foo/bar/voing")
assert x == ("foo", "bar", "voing") assert x == ("foo", "bar", "voing")

View File

@ -43,28 +43,15 @@ class TestServerConnection:
sc.finish() sc.finish()
class MockParser:
def __init__(self):
self.err = None
def error(self, e):
self.err = e
def __repr__(self):
return "ParseError(%s)"%self.err
class TestProcessProxyOptions: class TestProcessProxyOptions:
def p(self, *args): def p(self, *args):
parser = argparse.ArgumentParser() parser = tutils.MockParser()
cmdline.common_options(parser) cmdline.common_options(parser)
opts = parser.parse_args(args=args) opts = parser.parse_args(args=args)
m = MockParser() return parser, process_proxy_options(parser, opts)
return m, process_proxy_options(m, opts)
def assert_err(self, err, *args): def assert_err(self, err, *args):
m, p = self.p(*args) tutils.raises(err, self.p, *args)
assert err.lower() in m.err.lower()
def assert_noerr(self, *args): def assert_noerr(self, *args):
m, p = self.p(*args) m, p = self.p(*args)
@ -84,11 +71,10 @@ class TestProcessProxyOptions:
@mock.patch("libmproxy.platform.resolver") @mock.patch("libmproxy.platform.resolver")
def test_transparent_reverse(self, o): def test_transparent_reverse(self, o):
self.assert_err("can't set both", "-P", "reverse", "-T") self.assert_err("mutually exclusive", "-R", "http://localhost", "-T")
self.assert_noerr("-T") self.assert_noerr("-T")
assert o.call_count == 1 self.assert_err("Invalid server specification", "-R", "reverse")
self.assert_err("invalid reverse proxy", "-P", "reverse") self.assert_noerr("-R", "http://localhost")
self.assert_noerr("-P", "http://localhost")
def test_client_certs(self): def test_client_certs(self):
with tutils.tmpdir() as confdir: with tutils.tmpdir() as confdir:

View File

@ -87,14 +87,6 @@ def test_LRUCache():
assert len(f._cachelist_one) == 2 assert len(f._cachelist_one) == 2
def test_parse_proxy_spec():
assert not utils.parse_proxy_spec("")
assert utils.parse_proxy_spec("http://foo.com:88") == ("http", "foo.com", 88)
assert utils.parse_proxy_spec("http://foo.com") == ("http", "foo.com", 80)
assert not utils.parse_proxy_spec("foo.com")
assert not utils.parse_proxy_spec("http://")
def test_unparse_url(): def test_unparse_url():
assert utils.unparse_url("http", "foo.com", 99, "") == "http://foo.com:99" assert utils.unparse_url("http", "foo.com", 99, "") == "http://foo.com:99"
assert utils.unparse_url("http", "foo.com", 80, "") == "http://foo.com" assert utils.unparse_url("http", "foo.com", 80, "") == "http://foo.com"

View File

@ -4,6 +4,7 @@ import shutil, tempfile
import flask import flask
from libmproxy.proxy.config import ProxyConfig from libmproxy.proxy.config import ProxyConfig
from libmproxy.proxy.server import ProxyServer from libmproxy.proxy.server import ProxyServer
from libmproxy.proxy.primitives import TransparentUpstreamServerResolver
import libpathod.test, libpathod.pathoc import libpathod.test, libpathod.pathoc
from libmproxy import flow, controller from libmproxy import flow, controller
from libmproxy.cmdline import APP_HOST, APP_PORT from libmproxy.cmdline import APP_HOST, APP_PORT
@ -193,10 +194,7 @@ class TransparentProxTest(ProxTestBase):
ports = [cls.server.port, cls.server2.port] ports = [cls.server.port, cls.server2.port]
else: else:
ports = [] ports = []
d["transparent_proxy"] = dict( d["get_upstream_server"] = TransparentUpstreamServerResolver(cls.resolver(cls.server.port), ports)
resolver = cls.resolver(cls.server.port),
sslports = ports
)
d["http_form_in"] = "relative" d["http_form_in"] = "relative"
d["http_form_out"] = "relative" d["http_form_out"] = "relative"
return d return d
@ -227,11 +225,12 @@ class ReverseProxTest(ProxTestBase):
@classmethod @classmethod
def get_proxy_config(cls): def get_proxy_config(cls):
d = ProxTestBase.get_proxy_config() d = ProxTestBase.get_proxy_config()
d["upstream_server"] = ( d["get_upstream_server"] = lambda c: (
"https" if cls.ssl else "http", True if cls.ssl else False,
"127.0.0.1", True if cls.ssl else False,
cls.server.port "127.0.0.1",
) cls.server.port
)
d["http_form_in"] = "relative" d["http_form_in"] = "relative"
d["http_form_out"] = "relative" d["http_form_out"] = "relative"
return d return d
@ -262,19 +261,17 @@ class ChainProxTest(ProxTestBase):
Chain n instances of mitmproxy in a row - because we can. Chain n instances of mitmproxy in a row - because we can.
""" """
n = 2 n = 2
chain_config = [lambda: ProxyConfig()] * n chain_config = [lambda port: ProxyConfig(
get_upstream_server = lambda c: (False, False, "127.0.0.1", port),
http_form_in = "absolute",
http_form_out = "absolute"
)] * n
@classmethod @classmethod
def setupAll(cls): def setupAll(cls):
super(ChainProxTest, cls).setupAll() super(ChainProxTest, cls).setupAll()
cls.chain = [] cls.chain = []
for i in range(cls.n): for i in range(cls.n):
config = cls.chain_config[i]() config = cls.chain_config[i](cls.proxy.port if i == 0 else cls.chain[-1].port)
config.upstream_server = ("http", "127.0.0.1",
cls.proxy.port if i == 0 else
cls.chain[-1].port
)
config.http_form_in = "absolute"
config.http_form_out = "absolute"
tmaster = cls.masterclass(config) tmaster = cls.masterclass(config)
tmaster.start_app(APP_HOST, APP_PORT, cls.externalapp) tmaster.start_app(APP_HOST, APP_PORT, cls.externalapp)
cls.chain.append(ProxyThread(tmaster)) cls.chain.append(ProxyThread(tmaster))

View File

@ -1,4 +1,4 @@
import os, shutil, tempfile import os, shutil, tempfile, argparse
from contextlib import contextmanager from contextlib import contextmanager
from libmproxy import flow, utils, controller from libmproxy import flow, utils, controller
from libmproxy.protocol import http from libmproxy.protocol import http
@ -136,6 +136,15 @@ def tmpdir(*args, **kwargs):
shutil.rmtree(temp_workdir) shutil.rmtree(temp_workdir)
class MockParser(argparse.ArgumentParser):
"""
argparse.ArgumentParser sys.exits() by default.
Make it more testable by throwing an exception instead.
"""
def error(self, message):
raise Exception(message)
def raises(exc, obj, *args, **kwargs): def raises(exc, obj, *args, **kwargs):
""" """
Assert that a callable raises a specified exception. Assert that a callable raises a specified exception.