diff --git a/mitmproxy/addons/proxyauth.py b/mitmproxy/addons/proxyauth.py index 69d45029b..e9505d91b 100644 --- a/mitmproxy/addons/proxyauth.py +++ b/mitmproxy/addons/proxyauth.py @@ -1,35 +1,42 @@ import binascii +import weakref +from typing import Optional +from typing import Set # noqa +from typing import Tuple import passlib.apache +import mitmproxy.net.http +from mitmproxy import connections # noqa from mitmproxy import exceptions from mitmproxy import http -import mitmproxy.net.http - REALM = "mitmproxy" -def mkauth(username, password, scheme="basic"): +def mkauth(username: str, password: str, scheme: str = "basic") -> str: + """ + Craft a basic auth string + """ v = binascii.b2a_base64( (username + ":" + password).encode("utf8") ).decode("ascii") return scheme + " " + v -def parse_http_basic_auth(s): - words = s.split() - if len(words) != 2: - return None - scheme = words[0] +def parse_http_basic_auth(s: str) -> Tuple[str, str, str]: + """ + Parse a basic auth header. + Raises a ValueError if the input is invalid. + """ + scheme, authinfo = s.split() + if scheme.lower() != "basic": + raise ValueError("Unknown scheme") try: - user = binascii.a2b_base64(words[1]).decode("utf8", "replace") - except binascii.Error: - return None - parts = user.split(':') - if len(parts) != 2: - return None - return scheme, parts[0], parts[1] + user, password = binascii.a2b_base64(authinfo.encode()).decode("utf8", "replace").split(":") + except binascii.Error as e: + raise ValueError(str(e)) + return scheme, user, password class ProxyAuth: @@ -37,67 +44,74 @@ class ProxyAuth: self.nonanonymous = False self.htpasswd = None self.singleuser = None + self.mode = None + self.authenticated = weakref.WeakSet() # type: Set[connections.ClientConnection] + """Contains all connections that are permanently authenticated after an HTTP CONNECT""" - def enabled(self): + def enabled(self) -> bool: return any([self.nonanonymous, self.htpasswd, self.singleuser]) - def which_auth_header(self, f): - if f.mode == "regular": + def is_proxy_auth(self) -> bool: + """ + Returns: + - True, if authentication is done as if mitmproxy is a proxy + - False, if authentication is done as if mitmproxy is a HTTP server + """ + return self.mode in ("regular", "upstream") + + def which_auth_header(self) -> str: + if self.is_proxy_auth(): return 'Proxy-Authorization' else: return 'Authorization' - def auth_required_response(self, f): - if f.mode == "regular": - hdrname = 'Proxy-Authenticate' - else: - hdrname = 'WWW-Authenticate' - - headers = mitmproxy.net.http.Headers() - headers[hdrname] = 'Basic realm="%s"' % REALM - - if f.mode == "transparent": - return http.make_error_response( - 401, - "Authentication Required", - headers - ) - else: + def auth_required_response(self) -> http.HTTPResponse: + if self.is_proxy_auth(): return http.make_error_response( 407, "Proxy Authentication Required", - headers, + mitmproxy.net.http.Headers(Proxy_Authenticate='Basic realm="{}"'.format(REALM)), + ) + else: + return http.make_error_response( + 401, + "Authentication Required", + mitmproxy.net.http.Headers(WWW_Authenticate='Basic realm="{}"'.format(REALM)), ) - def check(self, f): - auth_value = f.request.headers.get(self.which_auth_header(f), None) - if not auth_value: - return False - parts = parse_http_basic_auth(auth_value) - if not parts: - return False - scheme, username, password = parts - if scheme.lower() != 'basic': - return False + def check(self, f: http.HTTPFlow) -> Optional[Tuple[str, str]]: + """ + Check if a request is correctly authenticated. + Returns: + - a (username, password) tuple if successful, + - None, otherwise. + """ + auth_value = f.request.headers.get(self.which_auth_header(), "") + try: + scheme, username, password = parse_http_basic_auth(auth_value) + except ValueError: + return None if self.nonanonymous: - pass + return username, password elif self.singleuser: - if [username, password] != self.singleuser: - return False + if [username, password] == self.singleuser: + return username, password elif self.htpasswd: - if not self.htpasswd.check_password(username, password): - return False - else: - raise NotImplementedError("Should never happen.") + if self.htpasswd.check_password(username, password): + return username, password - return True + return None - def authenticate(self, f): - if self.check(f): - del f.request.headers[self.which_auth_header(f)] + def authenticate(self, f: http.HTTPFlow) -> bool: + valid_credentials = self.check(f) + if valid_credentials: + f.metadata["proxyauth"] = valid_credentials + del f.request.headers[self.which_auth_header()] + return True else: - f.response = self.auth_required_response(f) + f.response = self.auth_required_response() + return False # Handlers def configure(self, options, updated): @@ -125,24 +139,28 @@ class ProxyAuth: ) else: self.htpasswd = None + if "mode" in updated: + self.mode = options.mode if self.enabled(): if options.mode == "transparent": raise exceptions.OptionsError( "Proxy Authentication not supported in transparent mode." ) - elif options.mode == "socks5": + if options.mode == "socks5": raise exceptions.OptionsError( "Proxy Authentication not supported in SOCKS mode. " "https://github.com/mitmproxy/mitmproxy/issues/738" ) - # TODO: check for multiple auth options + # TODO: check for multiple auth options - def http_connect(self, f): - if self.enabled() and f.mode == "regular": - self.authenticate(f) - - def requestheaders(self, f): + def http_connect(self, f: http.HTTPFlow) -> None: if self.enabled(): - # Are we already authenticated in CONNECT? - if not (f.mode == "regular" and f.server_conn.via): - self.authenticate(f) + if self.authenticate(f): + self.authenticated.add(f.client_conn) + + def requestheaders(self, f: http.HTTPFlow) -> None: + if self.enabled(): + # Is this connection authenticated by a previous HTTP CONNECT? + if f.client_conn in self.authenticated: + return + self.authenticate(f) diff --git a/test/mitmproxy/addons/test_proxyauth.py b/test/mitmproxy/addons/test_proxyauth.py index b59b87c18..dd5829ab9 100644 --- a/test/mitmproxy/addons/test_proxyauth.py +++ b/test/mitmproxy/addons/test_proxyauth.py @@ -1,21 +1,27 @@ import binascii + import pytest from mitmproxy import exceptions +from mitmproxy.addons import proxyauth from mitmproxy.test import taddons from mitmproxy.test import tflow from mitmproxy.test import tutils -from mitmproxy.addons import proxyauth def test_parse_http_basic_auth(): assert proxyauth.parse_http_basic_auth( proxyauth.mkauth("test", "test") ) == ("basic", "test", "test") - assert not proxyauth.parse_http_basic_auth("") - assert not proxyauth.parse_http_basic_auth("foo bar") - v = "basic " + binascii.b2a_base64(b"foo").decode("ascii") - assert not proxyauth.parse_http_basic_auth(v) + with pytest.raises(ValueError): + proxyauth.parse_http_basic_auth("") + with pytest.raises(ValueError): + proxyauth.parse_http_basic_auth("foo bar") + with pytest.raises(ValueError): + proxyauth.parse_http_basic_auth("basic abc") + with pytest.raises(ValueError): + v = "basic " + binascii.b2a_base64(b"foo").decode("ascii") + proxyauth.parse_http_basic_auth(v) def test_configure(): @@ -42,14 +48,14 @@ def test_configure(): ctx.configure( up, - auth_htpasswd = tutils.test_data.path( + auth_htpasswd=tutils.test_data.path( "mitmproxy/net/data/htpasswd" ) ) assert up.htpasswd assert up.htpasswd.check_password("test", "test") assert not up.htpasswd.check_password("test", "foo") - ctx.configure(up, auth_htpasswd = None) + ctx.configure(up, auth_htpasswd=None) assert not up.htpasswd with pytest.raises(exceptions.OptionsError): @@ -57,11 +63,14 @@ def test_configure(): with pytest.raises(exceptions.OptionsError): ctx.configure(up, auth_nonanonymous=True, mode="socks5") + ctx.configure(up, mode="regular") + assert up.mode == "regular" + def test_check(): up = proxyauth.ProxyAuth() with taddons.context() as ctx: - ctx.configure(up, auth_nonanonymous=True) + ctx.configure(up, auth_nonanonymous=True, mode="regular") f = tflow.tflow() assert not up.check(f) f.request.headers["Proxy-Authorization"] = proxyauth.mkauth( @@ -73,7 +82,7 @@ def test_check(): assert not up.check(f) f.request.headers["Proxy-Authorization"] = proxyauth.mkauth( - "test", "test", scheme = "unknown" + "test", "test", scheme="unknown" ) assert not up.check(f) @@ -87,8 +96,8 @@ def test_check(): ctx.configure( up, - auth_singleuser = None, - auth_htpasswd = tutils.test_data.path( + auth_singleuser=None, + auth_htpasswd=tutils.test_data.path( "mitmproxy/net/data/htpasswd" ) ) @@ -105,7 +114,7 @@ def test_check(): def test_authenticate(): up = proxyauth.ProxyAuth() with taddons.context() as ctx: - ctx.configure(up, auth_nonanonymous=True) + ctx.configure(up, auth_nonanonymous=True, mode="regular") f = tflow.tflow() assert not f.response @@ -121,13 +130,12 @@ def test_authenticate(): assert not f.request.headers.get("Proxy-Authorization") f = tflow.tflow() - f.mode = "transparent" + ctx.configure(up, mode="reverse") assert not f.response up.authenticate(f) assert f.response.status_code == 401 f = tflow.tflow() - f.mode = "transparent" f.request.headers["Authorization"] = proxyauth.mkauth( "test", "test" ) @@ -139,7 +147,7 @@ def test_authenticate(): def test_handlers(): up = proxyauth.ProxyAuth() with taddons.context() as ctx: - ctx.configure(up, auth_nonanonymous=True) + ctx.configure(up, auth_nonanonymous=True, mode="regular") f = tflow.tflow() assert not f.response @@ -151,3 +159,15 @@ def test_handlers(): assert not f.response up.http_connect(f) assert f.response.status_code == 407 + + f = tflow.tflow() + f.request.method = "CONNECT" + f.request.headers["Proxy-Authorization"] = proxyauth.mkauth( + "test", "test" + ) + up.http_connect(f) + assert not f.response + + f2 = tflow.tflow(client_conn=f.client_conn) + up.requestheaders(f2) + assert not f2.response