From 0acab862a65ef4a1823a1bfb702d8be1e3d7b83d Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Sun, 3 Mar 2013 10:37:28 +1300 Subject: [PATCH] Integrate HTTP auth, test to 100% --- .coveragerc | 3 +- netlib/contrib/__init__.py | 0 netlib/contrib/md5crypt.py | 94 ++++++++++++++++++++++++++++++ netlib/http.py | 22 +++++++- netlib/http_auth.py | 113 +++++++++++++++++++++++++++++++++++++ test/data/htpasswd | 1 + test/test_http.py | 11 +++- test/test_http_auth.py | 81 ++++++++++++++++++++++++++ 8 files changed, 322 insertions(+), 3 deletions(-) create mode 100644 netlib/contrib/__init__.py create mode 100644 netlib/contrib/md5crypt.py create mode 100644 netlib/http_auth.py create mode 100644 test/data/htpasswd create mode 100644 test/test_http_auth.py diff --git a/.coveragerc b/.coveragerc index 99f57cb0f..8076aebe3 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,2 +1,3 @@ [report] -include = *netlib* +omit = *contrib* +include = *netlib/netlib* diff --git a/netlib/contrib/__init__.py b/netlib/contrib/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/netlib/contrib/md5crypt.py b/netlib/contrib/md5crypt.py new file mode 100644 index 000000000..d64ea8ac4 --- /dev/null +++ b/netlib/contrib/md5crypt.py @@ -0,0 +1,94 @@ +# Based on FreeBSD src/lib/libcrypt/crypt.c 1.2 +# http://www.freebsd.org/cgi/cvsweb.cgi/~checkout~/src/lib/libcrypt/crypt.c?rev=1.2&content-type=text/plain + +# Original license: +# * "THE BEER-WARE LICENSE" (Revision 42): +# * wrote this file. As long as you retain this notice you +# * can do whatever you want with this stuff. If we meet some day, and you think +# * this stuff is worth it, you can buy me a beer in return. Poul-Henning Kamp + +# This port adds no further stipulations. I forfeit any copyright interest. + +import md5 + +def md5crypt(password, salt, magic='$1$'): + # /* The password first, since that is what is most unknown */ /* Then our magic string */ /* Then the raw salt */ + m = md5.new() + m.update(password + magic + salt) + + # /* Then just as many characters of the MD5(pw,salt,pw) */ + mixin = md5.md5(password + salt + password).digest() + for i in range(0, len(password)): + m.update(mixin[i % 16]) + + # /* Then something really weird... */ + # Also really broken, as far as I can tell. -m + i = len(password) + while i: + if i & 1: + m.update('\x00') + else: + m.update(password[0]) + i >>= 1 + + final = m.digest() + + # /* and now, just to make sure things don't run too fast */ + for i in range(1000): + m2 = md5.md5() + if i & 1: + m2.update(password) + else: + m2.update(final) + + if i % 3: + m2.update(salt) + + if i % 7: + m2.update(password) + + if i & 1: + m2.update(final) + else: + m2.update(password) + + final = m2.digest() + + # This is the bit that uses to64() in the original code. + + itoa64 = './0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' + + rearranged = '' + for a, b, c in ((0, 6, 12), (1, 7, 13), (2, 8, 14), (3, 9, 15), (4, 10, 5)): + v = ord(final[a]) << 16 | ord(final[b]) << 8 | ord(final[c]) + for i in range(4): + rearranged += itoa64[v & 0x3f]; v >>= 6 + + v = ord(final[11]) + for i in range(2): + rearranged += itoa64[v & 0x3f]; v >>= 6 + + return magic + salt + '$' + rearranged + +if __name__ == '__main__': + + def test(clear_password, the_hash): + magic, salt = the_hash[1:].split('$')[:2] + magic = '$' + magic + '$' + return md5crypt(clear_password, salt, magic) == the_hash + + test_cases = ( + (' ', '$1$yiiZbNIH$YiCsHZjcTkYd31wkgW8JF.'), + ('pass', '$1$YeNsbWdH$wvOF8JdqsoiLix754LTW90'), + ('____fifteen____', '$1$s9lUWACI$Kk1jtIVVdmT01p0z3b/hw1'), + ('____sixteen_____', '$1$dL3xbVZI$kkgqhCanLdxODGq14g/tW1'), + ('____seventeen____', '$1$NaH5na7J$j7y8Iss0hcRbu3kzoJs5V.'), + ('__________thirty-three___________', '$1$HO7Q6vzJ$yGwp2wbL5D7eOVzOmxpsy.'), + ('apache', '$apr1$J.w5a/..$IW9y6DR0oO/ADuhlMF5/X1') + ) + + for clearpw, hashpw in test_cases: + if test(clearpw, hashpw): + print '%s: pass' % clearpw + else: + print '%s: FAIL' % clearpw diff --git a/netlib/http.py b/netlib/http.py index bc09c8a10..10b6a4028 100644 --- a/netlib/http.py +++ b/netlib/http.py @@ -1,4 +1,4 @@ -import string, urlparse +import string, urlparse, binascii import odict class HttpError(Exception): @@ -169,6 +169,26 @@ def parse_http_protocol(s): return major, minor +def parse_http_basic_auth(s): + words = s.split() + if len(words) != 2: + return None + scheme = words[0] + try: + user = binascii.a2b_base64(words[1]) + except binascii.Error: + return None + parts = user.split(':') + if len(parts) != 2: + return None + return scheme, parts[0], parts[1] + + +def assemble_http_basic_auth(scheme, username, password): + v = binascii.b2a_base64(username + ":" + password) + return scheme + " " + v + + def parse_init(line): try: method, url, protocol = string.split(line) diff --git a/netlib/http_auth.py b/netlib/http_auth.py new file mode 100644 index 000000000..d478ab101 --- /dev/null +++ b/netlib/http_auth.py @@ -0,0 +1,113 @@ +import binascii +import contrib.md5crypt as md5crypt +import http + + +class NullProxyAuth(): + """ + No proxy auth at all (returns empty challange headers) + """ + def __init__(self, password_manager): + self.password_manager = password_manager + + def clean(self, headers): + """ + Clean up authentication headers, so they're not passed upstream. + """ + pass + + def authenticate(self, headers): + """ + Tests that the user is allowed to use the proxy + """ + return True + + def auth_challenge_headers(self): + """ + Returns a dictionary containing the headers require to challenge the user + """ + return {} + + +class BasicProxyAuth(NullProxyAuth): + CHALLENGE_HEADER = 'Proxy-Authenticate' + AUTH_HEADER = 'Proxy-Authorization' + def __init__(self, password_manager, realm): + NullProxyAuth.__init__(self, password_manager) + self.realm = realm + + def clean(self, headers): + del headers[self.AUTH_HEADER] + + def authenticate(self, headers): + auth_value = headers.get(self.AUTH_HEADER, []) + if not auth_value: + return False + parts = http.parse_http_basic_auth(auth_value[0]) + if not parts: + return False + scheme, username, password = parts + if scheme.lower()!='basic': + return False + if not self.password_manager.test(username, password): + return False + self.username = username + return True + + def auth_challenge_headers(self): + return {self.CHALLENGE_HEADER:'Basic realm="%s"'%self.realm} + + +class PassMan(): + def test(self, username, password_token): + return False + + +class PassManNonAnon: + """ + Ensure the user specifies a username, accept any password. + """ + def test(self, username, password_token): + if username: + return True + return False + + +class PassManHtpasswd: + """ + Read usernames and passwords from an htpasswd file + """ + def __init__(self, fp): + """ + Raises ValueError if htpasswd file is invalid. + """ + self.usernames = {} + for l in fp: + l = l.strip().split(':') + if len(l) != 2: + raise ValueError("Invalid htpasswd file.") + parts = l[1].split('$') + if len(parts) != 4: + raise ValueError("Invalid htpasswd file.") + self.usernames[l[0]] = dict( + token = l[1], + dummy = parts[0], + magic = parts[1], + salt = parts[2], + hashed_password = parts[3] + ) + + def test(self, username, password_token): + ui = self.usernames.get(username) + if not ui: + return False + expected = md5crypt.md5crypt(password_token, ui["salt"], '$'+ui["magic"]+'$') + return expected==ui["token"] + + +class PassManSingleUser: + def __init__(self, username, password): + self.username, self.password = username, password + + def test(self, username, password_token): + return self.username==username and self.password==password_token diff --git a/test/data/htpasswd b/test/data/htpasswd new file mode 100644 index 000000000..54c95b8cc --- /dev/null +++ b/test/data/htpasswd @@ -0,0 +1 @@ +test:$apr1$/LkYxy3x$WI4.YbiJlu537jLGEW2eu1 diff --git a/test/test_http.py b/test/test_http.py index 666dfdbb0..1c89900cc 100644 --- a/test/test_http.py +++ b/test/test_http.py @@ -1,4 +1,4 @@ -import cStringIO, textwrap +import cStringIO, textwrap, binascii from netlib import http, odict import tutils @@ -291,3 +291,12 @@ def test_parse_url(): assert not http.parse_url("https://foo:bar") assert not http.parse_url("https://foo:") + +def test_parse_http_basic_auth(): + vals = ("basic", "foo", "bar") + assert http.parse_http_basic_auth(http.assemble_http_basic_auth(*vals)) == vals + assert not http.parse_http_basic_auth("") + assert not http.parse_http_basic_auth("foo bar") + v = "basic " + binascii.b2a_base64("foo") + assert not http.parse_http_basic_auth(v) + diff --git a/test/test_http_auth.py b/test/test_http_auth.py new file mode 100644 index 000000000..cae69f5e8 --- /dev/null +++ b/test/test_http_auth.py @@ -0,0 +1,81 @@ +import binascii, cStringIO +from netlib import odict, http_auth, http +import tutils + +class TestPassManNonAnon: + def test_simple(self): + p = http_auth.PassManNonAnon() + assert not p.test("", "") + assert p.test("user", "") + + +class TestPassManHtpasswd: + def test_file_errors(self): + s = cStringIO.StringIO("foo") + tutils.raises("invalid htpasswd", http_auth.PassManHtpasswd, s) + s = cStringIO.StringIO("foo:bar$foo") + tutils.raises("invalid htpasswd", http_auth.PassManHtpasswd, s) + + def test_simple(self): + f = open(tutils.test_data.path("data/htpasswd")) + pm = http_auth.PassManHtpasswd(f) + + vals = ("basic", "test", "test") + p = http.assemble_http_basic_auth(*vals) + assert pm.test("test", "test") + assert not pm.test("test", "foo") + assert not pm.test("foo", "test") + assert not pm.test("test", "") + assert not pm.test("", "") + + +class TestPassManSingleUser: + def test_simple(self): + pm = http_auth.PassManSingleUser("test", "test") + assert pm.test("test", "test") + assert not pm.test("test", "foo") + assert not pm.test("foo", "test") + + +class TestNullProxyAuth: + def test_simple(self): + na = http_auth.NullProxyAuth(http_auth.PassManNonAnon()) + assert not na.auth_challenge_headers() + assert na.authenticate("foo") + na.clean({}) + + +class TestBasicProxyAuth: + def test_simple(self): + ba = http_auth.BasicProxyAuth(http_auth.PassManNonAnon(), "test") + h = odict.ODictCaseless() + assert ba.auth_challenge_headers() + assert not ba.authenticate(h) + + def test_authenticate_clean(self): + ba = http_auth.BasicProxyAuth(http_auth.PassManNonAnon(), "test") + + hdrs = odict.ODictCaseless() + vals = ("basic", "foo", "bar") + hdrs[ba.AUTH_HEADER] = [http.assemble_http_basic_auth(*vals)] + assert ba.authenticate(hdrs) + + ba.clean(hdrs) + assert not ba.AUTH_HEADER in hdrs + + + hdrs[ba.AUTH_HEADER] = [""] + assert not ba.authenticate(hdrs) + + hdrs[ba.AUTH_HEADER] = ["foo"] + assert not ba.authenticate(hdrs) + + vals = ("foo", "foo", "bar") + hdrs[ba.AUTH_HEADER] = [http.assemble_http_basic_auth(*vals)] + assert not ba.authenticate(hdrs) + + ba = http_auth.BasicProxyAuth(http_auth.PassMan(), "test") + vals = ("basic", "foo", "bar") + hdrs[ba.AUTH_HEADER] = [http.assemble_http_basic_auth(*vals)] + assert not ba.authenticate(hdrs) +