commit
d734f6bbd6
|
@ -1,5 +1,6 @@
|
|||
import binascii
|
||||
import weakref
|
||||
import ldap3
|
||||
from typing import Optional
|
||||
from typing import MutableMapping # noqa
|
||||
from typing import Tuple
|
||||
|
@ -46,11 +47,12 @@ class ProxyAuth:
|
|||
self.nonanonymous = False
|
||||
self.htpasswd = None
|
||||
self.singleuser = None
|
||||
self.ldapserver = None
|
||||
self.authenticated = weakref.WeakKeyDictionary() # type: MutableMapping[connections.ClientConnection, Tuple[str, str]]
|
||||
"""Contains all connections that are permanently authenticated after an HTTP CONNECT"""
|
||||
|
||||
def enabled(self) -> bool:
|
||||
return any([self.nonanonymous, self.htpasswd, self.singleuser])
|
||||
return any([self.nonanonymous, self.htpasswd, self.singleuser, self.ldapserver])
|
||||
|
||||
def is_proxy_auth(self) -> bool:
|
||||
"""
|
||||
|
@ -99,7 +101,20 @@ class ProxyAuth:
|
|||
elif self.htpasswd:
|
||||
if self.htpasswd.check_password(username, password):
|
||||
return username, password
|
||||
|
||||
elif self.ldapserver:
|
||||
if not username or not password:
|
||||
return None
|
||||
dn = ctx.options.proxyauth.split(":")[2]
|
||||
parts = dn.split("?")
|
||||
conn = ldap3.Connection(
|
||||
self.ldapserver,
|
||||
parts[0] + username + parts[1],
|
||||
password,
|
||||
auto_bind=True)
|
||||
if conn:
|
||||
conn.search(parts[1][1:], '(' + parts[0] + username + ')', attributes=['objectclass'])
|
||||
if ctx.options.proxyauth.split(":")[3] in conn.entries[0]['objectclass']:
|
||||
return username, password
|
||||
return None
|
||||
|
||||
def authenticate(self, f: http.HTTPFlow) -> bool:
|
||||
|
@ -118,6 +133,7 @@ class ProxyAuth:
|
|||
self.nonanonymous = False
|
||||
self.singleuser = None
|
||||
self.htpasswd = None
|
||||
self.ldapserver = None
|
||||
if ctx.options.proxyauth:
|
||||
if ctx.options.proxyauth == "any":
|
||||
self.nonanonymous = True
|
||||
|
@ -129,6 +145,21 @@ class ProxyAuth:
|
|||
raise exceptions.OptionsError(
|
||||
"Could not open htpasswd file: %s" % p
|
||||
)
|
||||
elif ctx.options.proxyauth.startswith("ldap"):
|
||||
parts = ctx.options.proxyauth.split(":")
|
||||
if len(parts) != 4:
|
||||
raise exceptions.OptionsError(
|
||||
"Invalid ldap specification"
|
||||
)
|
||||
if parts[0] == "ldaps":
|
||||
server = ldap3.Server(parts[1], use_ssl=True)
|
||||
elif parts[0] == "ldap":
|
||||
server = ldap3.Server(parts[1])
|
||||
else:
|
||||
raise exceptions.OptionsError(
|
||||
"Invalid ldap specfication on the first part"
|
||||
)
|
||||
self.ldapserver = server
|
||||
else:
|
||||
parts = ctx.options.proxyauth.split(':')
|
||||
if len(parts) != 2:
|
||||
|
|
|
@ -204,8 +204,12 @@ class Options(optmanager.OptManager):
|
|||
"""
|
||||
Require proxy authentication. Value may be "any" to require
|
||||
authenticaiton but accept any credentials, start with "@" to specify
|
||||
a path to an Apache htpasswd file, or be of the form
|
||||
"username:password".
|
||||
a path to an Apache htpasswd file, be of the form
|
||||
"username:password", or be of the form
|
||||
"ldap[s]:url_server_ldap:dn:group", the dn must include "?", which will be
|
||||
the username prompted, and the group is the group the user must belong to
|
||||
an example would be
|
||||
"ldap:ldap.forumsys.com:uid=?,dc=example,dc=com:person".
|
||||
"""
|
||||
)
|
||||
self.add_option(
|
||||
|
|
1
setup.py
1
setup.py
|
@ -71,6 +71,7 @@ setup(
|
|||
"hyperframe>=5.0, <6",
|
||||
"jsbeautifier>=1.6.3, <1.7",
|
||||
"kaitaistruct>=0.7, <0.8",
|
||||
"ldap3>=2.2.0, <=2.2.3",
|
||||
"passlib>=1.6.5, <1.8",
|
||||
"pyasn1>=0.1.9, <0.3",
|
||||
"pyOpenSSL>=16.0, <17.1",
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import binascii
|
||||
import ldap3
|
||||
|
||||
import pytest
|
||||
|
||||
|
@ -41,6 +42,20 @@ def test_configure():
|
|||
ctx.configure(up, proxyauth=None)
|
||||
assert not up.nonanonymous
|
||||
|
||||
ctx.configure(up, proxyauth="ldap:fake_server:fake_dn:fake_group")
|
||||
assert up.ldapserver
|
||||
|
||||
ctx.configure(up, proxyauth="ldap:fake_server:uid=?,dc=example,dc=com:person")
|
||||
assert up.ldapserver
|
||||
ctx.configure(up, proxyauth="ldaps:fake_server.com:uid=?,dc=example,dc=com:person")
|
||||
assert up.ldapserver
|
||||
|
||||
with pytest.raises(exceptions.OptionsError):
|
||||
ctx.configure(up, proxyauth="ldap:fake_serveruid=?dc=example,dc=com:person")
|
||||
|
||||
with pytest.raises(exceptions.OptionsError):
|
||||
ctx.configure(up, proxyauth="ldapssssssss:fake_server.com:uid=?,dc=example,dc=com:person")
|
||||
|
||||
with pytest.raises(exceptions.OptionsError):
|
||||
ctx.configure(
|
||||
up,
|
||||
|
@ -67,7 +82,7 @@ def test_configure():
|
|||
ctx.configure(up, proxyauth="any", mode="socks5")
|
||||
|
||||
|
||||
def test_check():
|
||||
def test_check(monkeypatch):
|
||||
up = proxyauth.ProxyAuth()
|
||||
with taddons.context() as ctx:
|
||||
ctx.configure(up, proxyauth="any", mode="regular")
|
||||
|
@ -109,6 +124,27 @@ def test_check():
|
|||
)
|
||||
assert not up.check(f)
|
||||
|
||||
ctx.configure(
|
||||
up,
|
||||
proxyauth="ldap:fake-server:cn=?,ou=test,o=lab:test"
|
||||
)
|
||||
conn = ldap3.Connection("fake-server", user="cn=user0,ou=test,o=lab", password="password", client_strategy=ldap3.MOCK_SYNC)
|
||||
conn.bind()
|
||||
conn.strategy.add_entry('cn=user0,ou=test,o=lab', {'userPassword': 'test0', 'sn': 'user0_sn', 'revision': 0, 'objectClass': 'test'})
|
||||
|
||||
def conn_mp(ldap, user, password, **kwargs):
|
||||
return conn
|
||||
|
||||
monkeypatch.setattr(ldap3, "Connection", conn_mp)
|
||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||
"user0", "test0"
|
||||
)
|
||||
assert up.check(f)
|
||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||
"", ""
|
||||
)
|
||||
assert not up.check(f)
|
||||
|
||||
|
||||
def test_authenticate():
|
||||
up = proxyauth.ProxyAuth()
|
||||
|
|
Loading…
Reference in New Issue