Start solidifying proxy authentication

- Add a unit test file
- Remove some extraneous methods
- Change the auth API to make the authenticate method take a header object.
This commit is contained in:
Aldo Cortesi 2012-12-31 09:15:56 +13:00
parent cfab272321
commit 018c229ae4
3 changed files with 46 additions and 19 deletions

View File

@ -2,32 +2,35 @@ import binascii
import contrib.md5crypt as md5crypt
class NullProxyAuth():
""" No proxy auth at all (returns empty challange headers) """
def __init__(self, password_manager=None):
"""
No proxy auth at all (returns empty challange headers)
"""
def __init__(self, password_manager):
self.password_manager = password_manager
self.username = ""
def authenticate(self, auth_value):
""" Tests that the specified user is allowed to use the proxy (stub) """
def authenticate(self, headers):
"""
Tests that the specified user is allowed to use the proxy (stub)
"""
return True
def auth_challenge_headers(self):
""" Returns a dictionary containing the headers require to challenge the user """
"""
Returns a dictionary containing the headers require to challenge the user
"""
return {}
def get_username(self):
return self.username
class BasicProxyAuth(NullProxyAuth):
def __init__(self, password_manager, realm="mitmproxy"):
NullProxyAuth.__init__(self, password_manager)
self.realm = "mitmproxy"
def authenticate(self, auth_value):
if (not auth_value) or (not auth_value[0]):
return False;
def authenticate(self, headers):
auth_value = headers.get('Proxy-Authorization', [])
if not auth_value:
return False
try:
scheme, username, password = self.parse_authorization_header(auth_value[0])
except:
@ -49,6 +52,7 @@ class BasicProxyAuth(NullProxyAuth):
username, password = user.split(':')
return scheme, username, password
class PasswordManager():
def __init__(self):
pass
@ -56,8 +60,8 @@ class PasswordManager():
def test(self, username, password_token):
return False
class PermissivePasswordManager(PasswordManager):
class PermissivePasswordManager(PasswordManager):
def __init__(self):
PasswordManager.__init__(self)
@ -66,16 +70,17 @@ class PermissivePasswordManager(PasswordManager):
return True
return False
class HtpasswdPasswordManager(PasswordManager):
""" Read usernames and passwords from a file created by Apache htpasswd"""
class HtpasswdPasswordManager(PasswordManager):
"""
Read usernames and passwords from a file created by Apache htpasswd
"""
def __init__(self, filehandle):
PasswordManager.__init__(self)
entries = (line.strip().split(':') for line in filehandle)
valid_entries = (entry for entry in entries if len(entry)==2)
self.usernames = {username:token for username,token in valid_entries}
def test(self, username, password_token):
if username not in self.usernames:
return False
@ -84,8 +89,8 @@ class HtpasswdPasswordManager(PasswordManager):
expected = md5crypt.md5crypt(password_token, salt, '$'+magic+'$')
return expected==full_token
class SingleUserPasswordManager(PasswordManager):
class SingleUserPasswordManager(PasswordManager):
def __init__(self, username, password):
PasswordManager.__init__(self)
self.username = username

View File

@ -356,8 +356,12 @@ class ProxyHandler(tcp.BaseHandler):
headers = http.read_headers(self.rfile)
if headers is None:
raise ProxyError(400, "Invalid headers")
if authenticate and self.config.authenticator and not self.config.authenticator.authenticate(headers.get('Proxy-Authorization', [])):
raise ProxyError(407, "Proxy Authentication Required", self.config.authenticator.auth_challenge_headers())
if authenticate and self.config.authenticator and not self.config.authenticator.authenticate(headers):
raise ProxyError(
407,
"Proxy Authentication Required",
self.config.authenticator.auth_challenge_headers()
)
return headers
def send_response(self, response):

View File

@ -0,0 +1,18 @@
from libmproxy import authentication
from netlib import odict
class TestNullProxyAuth:
def test_simple(self):
na = authentication.NullProxyAuth(authentication.PermissivePasswordManager())
assert not na.auth_challenge_headers()
assert na.authenticate("foo")
class TestBasicProxyAuth:
def test_simple(self):
ba = authentication.BasicProxyAuth(authentication.PermissivePasswordManager())
h = odict.ODictCaseless()
assert ba.auth_challenge_headers()
assert not ba.authenticate(h)