Integrate HTTP auth, test to 100%
This commit is contained in:
parent
97537417f0
commit
0acab862a6
|
@ -1,2 +1,3 @@
|
||||||
[report]
|
[report]
|
||||||
include = *netlib*
|
omit = *contrib*
|
||||||
|
include = *netlib/netlib*
|
||||||
|
|
|
@ -0,0 +1,94 @@
|
||||||
|
# Based on FreeBSD src/lib/libcrypt/crypt.c 1.2
|
||||||
|
# http://www.freebsd.org/cgi/cvsweb.cgi/~checkout~/src/lib/libcrypt/crypt.c?rev=1.2&content-type=text/plain
|
||||||
|
|
||||||
|
# Original license:
|
||||||
|
# * "THE BEER-WARE LICENSE" (Revision 42):
|
||||||
|
# * <phk@login.dknet.dk> wrote this file. As long as you retain this notice you
|
||||||
|
# * can do whatever you want with this stuff. If we meet some day, and you think
|
||||||
|
# * this stuff is worth it, you can buy me a beer in return. Poul-Henning Kamp
|
||||||
|
|
||||||
|
# This port adds no further stipulations. I forfeit any copyright interest.
|
||||||
|
|
||||||
|
import md5
|
||||||
|
|
||||||
|
def md5crypt(password, salt, magic='$1$'):
|
||||||
|
# /* The password first, since that is what is most unknown */ /* Then our magic string */ /* Then the raw salt */
|
||||||
|
m = md5.new()
|
||||||
|
m.update(password + magic + salt)
|
||||||
|
|
||||||
|
# /* Then just as many characters of the MD5(pw,salt,pw) */
|
||||||
|
mixin = md5.md5(password + salt + password).digest()
|
||||||
|
for i in range(0, len(password)):
|
||||||
|
m.update(mixin[i % 16])
|
||||||
|
|
||||||
|
# /* Then something really weird... */
|
||||||
|
# Also really broken, as far as I can tell. -m
|
||||||
|
i = len(password)
|
||||||
|
while i:
|
||||||
|
if i & 1:
|
||||||
|
m.update('\x00')
|
||||||
|
else:
|
||||||
|
m.update(password[0])
|
||||||
|
i >>= 1
|
||||||
|
|
||||||
|
final = m.digest()
|
||||||
|
|
||||||
|
# /* and now, just to make sure things don't run too fast */
|
||||||
|
for i in range(1000):
|
||||||
|
m2 = md5.md5()
|
||||||
|
if i & 1:
|
||||||
|
m2.update(password)
|
||||||
|
else:
|
||||||
|
m2.update(final)
|
||||||
|
|
||||||
|
if i % 3:
|
||||||
|
m2.update(salt)
|
||||||
|
|
||||||
|
if i % 7:
|
||||||
|
m2.update(password)
|
||||||
|
|
||||||
|
if i & 1:
|
||||||
|
m2.update(final)
|
||||||
|
else:
|
||||||
|
m2.update(password)
|
||||||
|
|
||||||
|
final = m2.digest()
|
||||||
|
|
||||||
|
# This is the bit that uses to64() in the original code.
|
||||||
|
|
||||||
|
itoa64 = './0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
|
||||||
|
|
||||||
|
rearranged = ''
|
||||||
|
for a, b, c in ((0, 6, 12), (1, 7, 13), (2, 8, 14), (3, 9, 15), (4, 10, 5)):
|
||||||
|
v = ord(final[a]) << 16 | ord(final[b]) << 8 | ord(final[c])
|
||||||
|
for i in range(4):
|
||||||
|
rearranged += itoa64[v & 0x3f]; v >>= 6
|
||||||
|
|
||||||
|
v = ord(final[11])
|
||||||
|
for i in range(2):
|
||||||
|
rearranged += itoa64[v & 0x3f]; v >>= 6
|
||||||
|
|
||||||
|
return magic + salt + '$' + rearranged
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
|
||||||
|
def test(clear_password, the_hash):
|
||||||
|
magic, salt = the_hash[1:].split('$')[:2]
|
||||||
|
magic = '$' + magic + '$'
|
||||||
|
return md5crypt(clear_password, salt, magic) == the_hash
|
||||||
|
|
||||||
|
test_cases = (
|
||||||
|
(' ', '$1$yiiZbNIH$YiCsHZjcTkYd31wkgW8JF.'),
|
||||||
|
('pass', '$1$YeNsbWdH$wvOF8JdqsoiLix754LTW90'),
|
||||||
|
('____fifteen____', '$1$s9lUWACI$Kk1jtIVVdmT01p0z3b/hw1'),
|
||||||
|
('____sixteen_____', '$1$dL3xbVZI$kkgqhCanLdxODGq14g/tW1'),
|
||||||
|
('____seventeen____', '$1$NaH5na7J$j7y8Iss0hcRbu3kzoJs5V.'),
|
||||||
|
('__________thirty-three___________', '$1$HO7Q6vzJ$yGwp2wbL5D7eOVzOmxpsy.'),
|
||||||
|
('apache', '$apr1$J.w5a/..$IW9y6DR0oO/ADuhlMF5/X1')
|
||||||
|
)
|
||||||
|
|
||||||
|
for clearpw, hashpw in test_cases:
|
||||||
|
if test(clearpw, hashpw):
|
||||||
|
print '%s: pass' % clearpw
|
||||||
|
else:
|
||||||
|
print '%s: FAIL' % clearpw
|
|
@ -1,4 +1,4 @@
|
||||||
import string, urlparse
|
import string, urlparse, binascii
|
||||||
import odict
|
import odict
|
||||||
|
|
||||||
class HttpError(Exception):
|
class HttpError(Exception):
|
||||||
|
@ -169,6 +169,26 @@ def parse_http_protocol(s):
|
||||||
return major, minor
|
return major, minor
|
||||||
|
|
||||||
|
|
||||||
|
def parse_http_basic_auth(s):
|
||||||
|
words = s.split()
|
||||||
|
if len(words) != 2:
|
||||||
|
return None
|
||||||
|
scheme = words[0]
|
||||||
|
try:
|
||||||
|
user = binascii.a2b_base64(words[1])
|
||||||
|
except binascii.Error:
|
||||||
|
return None
|
||||||
|
parts = user.split(':')
|
||||||
|
if len(parts) != 2:
|
||||||
|
return None
|
||||||
|
return scheme, parts[0], parts[1]
|
||||||
|
|
||||||
|
|
||||||
|
def assemble_http_basic_auth(scheme, username, password):
|
||||||
|
v = binascii.b2a_base64(username + ":" + password)
|
||||||
|
return scheme + " " + v
|
||||||
|
|
||||||
|
|
||||||
def parse_init(line):
|
def parse_init(line):
|
||||||
try:
|
try:
|
||||||
method, url, protocol = string.split(line)
|
method, url, protocol = string.split(line)
|
||||||
|
|
|
@ -0,0 +1,113 @@
|
||||||
|
import binascii
|
||||||
|
import contrib.md5crypt as md5crypt
|
||||||
|
import http
|
||||||
|
|
||||||
|
|
||||||
|
class NullProxyAuth():
|
||||||
|
"""
|
||||||
|
No proxy auth at all (returns empty challange headers)
|
||||||
|
"""
|
||||||
|
def __init__(self, password_manager):
|
||||||
|
self.password_manager = password_manager
|
||||||
|
|
||||||
|
def clean(self, headers):
|
||||||
|
"""
|
||||||
|
Clean up authentication headers, so they're not passed upstream.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
def authenticate(self, headers):
|
||||||
|
"""
|
||||||
|
Tests that the user is allowed to use the proxy
|
||||||
|
"""
|
||||||
|
return True
|
||||||
|
|
||||||
|
def auth_challenge_headers(self):
|
||||||
|
"""
|
||||||
|
Returns a dictionary containing the headers require to challenge the user
|
||||||
|
"""
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
|
class BasicProxyAuth(NullProxyAuth):
|
||||||
|
CHALLENGE_HEADER = 'Proxy-Authenticate'
|
||||||
|
AUTH_HEADER = 'Proxy-Authorization'
|
||||||
|
def __init__(self, password_manager, realm):
|
||||||
|
NullProxyAuth.__init__(self, password_manager)
|
||||||
|
self.realm = realm
|
||||||
|
|
||||||
|
def clean(self, headers):
|
||||||
|
del headers[self.AUTH_HEADER]
|
||||||
|
|
||||||
|
def authenticate(self, headers):
|
||||||
|
auth_value = headers.get(self.AUTH_HEADER, [])
|
||||||
|
if not auth_value:
|
||||||
|
return False
|
||||||
|
parts = http.parse_http_basic_auth(auth_value[0])
|
||||||
|
if not parts:
|
||||||
|
return False
|
||||||
|
scheme, username, password = parts
|
||||||
|
if scheme.lower()!='basic':
|
||||||
|
return False
|
||||||
|
if not self.password_manager.test(username, password):
|
||||||
|
return False
|
||||||
|
self.username = username
|
||||||
|
return True
|
||||||
|
|
||||||
|
def auth_challenge_headers(self):
|
||||||
|
return {self.CHALLENGE_HEADER:'Basic realm="%s"'%self.realm}
|
||||||
|
|
||||||
|
|
||||||
|
class PassMan():
|
||||||
|
def test(self, username, password_token):
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
class PassManNonAnon:
|
||||||
|
"""
|
||||||
|
Ensure the user specifies a username, accept any password.
|
||||||
|
"""
|
||||||
|
def test(self, username, password_token):
|
||||||
|
if username:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
class PassManHtpasswd:
|
||||||
|
"""
|
||||||
|
Read usernames and passwords from an htpasswd file
|
||||||
|
"""
|
||||||
|
def __init__(self, fp):
|
||||||
|
"""
|
||||||
|
Raises ValueError if htpasswd file is invalid.
|
||||||
|
"""
|
||||||
|
self.usernames = {}
|
||||||
|
for l in fp:
|
||||||
|
l = l.strip().split(':')
|
||||||
|
if len(l) != 2:
|
||||||
|
raise ValueError("Invalid htpasswd file.")
|
||||||
|
parts = l[1].split('$')
|
||||||
|
if len(parts) != 4:
|
||||||
|
raise ValueError("Invalid htpasswd file.")
|
||||||
|
self.usernames[l[0]] = dict(
|
||||||
|
token = l[1],
|
||||||
|
dummy = parts[0],
|
||||||
|
magic = parts[1],
|
||||||
|
salt = parts[2],
|
||||||
|
hashed_password = parts[3]
|
||||||
|
)
|
||||||
|
|
||||||
|
def test(self, username, password_token):
|
||||||
|
ui = self.usernames.get(username)
|
||||||
|
if not ui:
|
||||||
|
return False
|
||||||
|
expected = md5crypt.md5crypt(password_token, ui["salt"], '$'+ui["magic"]+'$')
|
||||||
|
return expected==ui["token"]
|
||||||
|
|
||||||
|
|
||||||
|
class PassManSingleUser:
|
||||||
|
def __init__(self, username, password):
|
||||||
|
self.username, self.password = username, password
|
||||||
|
|
||||||
|
def test(self, username, password_token):
|
||||||
|
return self.username==username and self.password==password_token
|
|
@ -0,0 +1 @@
|
||||||
|
test:$apr1$/LkYxy3x$WI4.YbiJlu537jLGEW2eu1
|
|
@ -1,4 +1,4 @@
|
||||||
import cStringIO, textwrap
|
import cStringIO, textwrap, binascii
|
||||||
from netlib import http, odict
|
from netlib import http, odict
|
||||||
import tutils
|
import tutils
|
||||||
|
|
||||||
|
@ -291,3 +291,12 @@ def test_parse_url():
|
||||||
assert not http.parse_url("https://foo:bar")
|
assert not http.parse_url("https://foo:bar")
|
||||||
assert not http.parse_url("https://foo:")
|
assert not http.parse_url("https://foo:")
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_http_basic_auth():
|
||||||
|
vals = ("basic", "foo", "bar")
|
||||||
|
assert http.parse_http_basic_auth(http.assemble_http_basic_auth(*vals)) == vals
|
||||||
|
assert not http.parse_http_basic_auth("")
|
||||||
|
assert not http.parse_http_basic_auth("foo bar")
|
||||||
|
v = "basic " + binascii.b2a_base64("foo")
|
||||||
|
assert not http.parse_http_basic_auth(v)
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,81 @@
|
||||||
|
import binascii, cStringIO
|
||||||
|
from netlib import odict, http_auth, http
|
||||||
|
import tutils
|
||||||
|
|
||||||
|
class TestPassManNonAnon:
|
||||||
|
def test_simple(self):
|
||||||
|
p = http_auth.PassManNonAnon()
|
||||||
|
assert not p.test("", "")
|
||||||
|
assert p.test("user", "")
|
||||||
|
|
||||||
|
|
||||||
|
class TestPassManHtpasswd:
|
||||||
|
def test_file_errors(self):
|
||||||
|
s = cStringIO.StringIO("foo")
|
||||||
|
tutils.raises("invalid htpasswd", http_auth.PassManHtpasswd, s)
|
||||||
|
s = cStringIO.StringIO("foo:bar$foo")
|
||||||
|
tutils.raises("invalid htpasswd", http_auth.PassManHtpasswd, s)
|
||||||
|
|
||||||
|
def test_simple(self):
|
||||||
|
f = open(tutils.test_data.path("data/htpasswd"))
|
||||||
|
pm = http_auth.PassManHtpasswd(f)
|
||||||
|
|
||||||
|
vals = ("basic", "test", "test")
|
||||||
|
p = http.assemble_http_basic_auth(*vals)
|
||||||
|
assert pm.test("test", "test")
|
||||||
|
assert not pm.test("test", "foo")
|
||||||
|
assert not pm.test("foo", "test")
|
||||||
|
assert not pm.test("test", "")
|
||||||
|
assert not pm.test("", "")
|
||||||
|
|
||||||
|
|
||||||
|
class TestPassManSingleUser:
|
||||||
|
def test_simple(self):
|
||||||
|
pm = http_auth.PassManSingleUser("test", "test")
|
||||||
|
assert pm.test("test", "test")
|
||||||
|
assert not pm.test("test", "foo")
|
||||||
|
assert not pm.test("foo", "test")
|
||||||
|
|
||||||
|
|
||||||
|
class TestNullProxyAuth:
|
||||||
|
def test_simple(self):
|
||||||
|
na = http_auth.NullProxyAuth(http_auth.PassManNonAnon())
|
||||||
|
assert not na.auth_challenge_headers()
|
||||||
|
assert na.authenticate("foo")
|
||||||
|
na.clean({})
|
||||||
|
|
||||||
|
|
||||||
|
class TestBasicProxyAuth:
|
||||||
|
def test_simple(self):
|
||||||
|
ba = http_auth.BasicProxyAuth(http_auth.PassManNonAnon(), "test")
|
||||||
|
h = odict.ODictCaseless()
|
||||||
|
assert ba.auth_challenge_headers()
|
||||||
|
assert not ba.authenticate(h)
|
||||||
|
|
||||||
|
def test_authenticate_clean(self):
|
||||||
|
ba = http_auth.BasicProxyAuth(http_auth.PassManNonAnon(), "test")
|
||||||
|
|
||||||
|
hdrs = odict.ODictCaseless()
|
||||||
|
vals = ("basic", "foo", "bar")
|
||||||
|
hdrs[ba.AUTH_HEADER] = [http.assemble_http_basic_auth(*vals)]
|
||||||
|
assert ba.authenticate(hdrs)
|
||||||
|
|
||||||
|
ba.clean(hdrs)
|
||||||
|
assert not ba.AUTH_HEADER in hdrs
|
||||||
|
|
||||||
|
|
||||||
|
hdrs[ba.AUTH_HEADER] = [""]
|
||||||
|
assert not ba.authenticate(hdrs)
|
||||||
|
|
||||||
|
hdrs[ba.AUTH_HEADER] = ["foo"]
|
||||||
|
assert not ba.authenticate(hdrs)
|
||||||
|
|
||||||
|
vals = ("foo", "foo", "bar")
|
||||||
|
hdrs[ba.AUTH_HEADER] = [http.assemble_http_basic_auth(*vals)]
|
||||||
|
assert not ba.authenticate(hdrs)
|
||||||
|
|
||||||
|
ba = http_auth.BasicProxyAuth(http_auth.PassMan(), "test")
|
||||||
|
vals = ("basic", "foo", "bar")
|
||||||
|
hdrs[ba.AUTH_HEADER] = [http.assemble_http_basic_auth(*vals)]
|
||||||
|
assert not ba.authenticate(hdrs)
|
||||||
|
|
Loading…
Reference in New Issue