Added add-on with support for proxy authentication using selenium.
Co-authored-by: weichweich <14820950+weichweich@users.noreply.github.com>
This commit is contained in:
parent
4bf93ec379
commit
7dd98b4959
|
@ -0,0 +1,130 @@
|
|||
import abc
|
||||
import logging
|
||||
import random
|
||||
import string
|
||||
import time
|
||||
from typing import Dict, List, cast, Any
|
||||
|
||||
import mitmproxy.http
|
||||
from mitmproxy import flowfilter
|
||||
from mitmproxy import master
|
||||
from mitmproxy.script import concurrent
|
||||
from selenium import webdriver
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
cookie_key_name = {
|
||||
"path": "Path",
|
||||
"expires": "Expires",
|
||||
"domain": "Domain",
|
||||
"is_http_only": "HttpOnly",
|
||||
"is_secure": "Secure"
|
||||
}
|
||||
|
||||
|
||||
def randomString(string_length=10):
|
||||
"""Generate a random string of fixed length """
|
||||
letters = string.ascii_lowercase
|
||||
return ''.join(random.choice(letters) for i in range(string_length))
|
||||
|
||||
|
||||
class AuthorizationOracle(abc.ABC):
|
||||
"""Abstract class for an authorization oracle which decides if a given request or response is authenticated."""
|
||||
@abc.abstractmethod
|
||||
def is_unauthorized_request(self, flow: mitmproxy.http.HTTPFlow) -> bool:
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def is_unauthorized_response(self, flow: mitmproxy.http.HTTPFlow) -> bool:
|
||||
pass
|
||||
|
||||
|
||||
class SeleniumAddon:
|
||||
""" This Addon can be used in combination with web application scanners in order to help them to authenticate
|
||||
against a web application.
|
||||
|
||||
Since the authentication is highly dependant on the web application, this add-on includes the abstract method
|
||||
*login*. In order to use the add-on, a class for the web application inheriting from SeleniumAddon needs to be
|
||||
created. This class needs to include the concrete selenium actions necessary to authenticate against the web
|
||||
application. In addition, an authentication oracle which inherits from AuthorizationOracle should be created.
|
||||
"""
|
||||
def __init__(self, fltr: str, domain: str,
|
||||
auth_oracle: AuthorizationOracle):
|
||||
self.filter = flowfilter.parse(fltr)
|
||||
self.auth_oracle = auth_oracle
|
||||
self.domain = domain
|
||||
self.browser = None
|
||||
self.set_cookies = False
|
||||
|
||||
options = webdriver.FirefoxOptions()
|
||||
options.headless = True
|
||||
|
||||
profile = webdriver.FirefoxProfile()
|
||||
profile.set_preference('network.proxy.type', 0)
|
||||
self.browser = webdriver.Firefox(firefox_profile=profile,
|
||||
options=options)
|
||||
self.cookies: List[Dict[str, str]] = []
|
||||
|
||||
def _login(self, flow):
|
||||
self.cookies = self.login(flow)
|
||||
self.browser.get("about:blank")
|
||||
self._set_request_cookies(flow)
|
||||
self.set_cookies = True
|
||||
|
||||
def request(self, flow: mitmproxy.http.HTTPFlow):
|
||||
if flow.request.is_replay:
|
||||
logger.warning("Caught replayed request: " + str(flow))
|
||||
if (not self.filter or self.filter(flow)) and self.auth_oracle.is_unauthorized_request(flow):
|
||||
logger.debug("unauthorized request detected, perform login")
|
||||
self._login(flow)
|
||||
|
||||
# has to be concurrent because replay.client is blocking and replayed flows
|
||||
# will also call response
|
||||
@concurrent
|
||||
def response(self, flow: mitmproxy.http.HTTPFlow):
|
||||
if flow.response and (self.filter is None or self.filter(flow)):
|
||||
if self.auth_oracle.is_unauthorized_response(flow):
|
||||
self._login(flow)
|
||||
new_flow = flow.copy()
|
||||
if master and hasattr(master, 'commands'):
|
||||
# cast necessary for mypy
|
||||
cast(Any, master).commands.call("replay.client", [new_flow])
|
||||
count = 0
|
||||
while new_flow.response is None and count < 10:
|
||||
logger.error("waiting since " + str(count) + " ...")
|
||||
count = count + 1
|
||||
time.sleep(1)
|
||||
if new_flow.response:
|
||||
flow.response = new_flow.response
|
||||
else:
|
||||
logger.warning("Could not call 'replay.client' command since master was not initialized yet.")
|
||||
|
||||
if self.set_cookies and flow.response:
|
||||
logger.debug("set set-cookie header for response")
|
||||
self._set_set_cookie_headers(flow)
|
||||
self.set_cookies = False
|
||||
|
||||
def done(self):
|
||||
self.browser.close()
|
||||
|
||||
def _set_set_cookie_headers(self, flow: mitmproxy.http.HTTPFlow):
|
||||
if flow.response and self.cookies:
|
||||
for cookie in self.cookies:
|
||||
parts = [f"{cookie['name']}={cookie['value']}"]
|
||||
for k, v in cookie_key_name.items():
|
||||
if k in cookie and isinstance(cookie[k], str):
|
||||
parts.append(f"{v}={cookie[k]}")
|
||||
elif k in cookie and isinstance(cookie[k], bool) and cookie[k]:
|
||||
parts.append(cookie[k])
|
||||
encoded_c = "; ".join(parts)
|
||||
flow.response.headers["set-cookie"] = encoded_c
|
||||
|
||||
def _set_request_cookies(self, flow: mitmproxy.http.HTTPFlow):
|
||||
if self.cookies:
|
||||
cookies = "; ".join(
|
||||
map(lambda c: f"{c['name']}={c['value']}", self.cookies))
|
||||
flow.request.headers["cookie"] = cookies
|
||||
|
||||
@abc.abstractmethod
|
||||
def login(self, flow: mitmproxy.http.HTTPFlow) -> List[Dict[str, str]]:
|
||||
pass
|
|
@ -0,0 +1,118 @@
|
|||
from unittest import mock
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from mitmproxy.test import tflow
|
||||
from mitmproxy.test import tutils
|
||||
from mitmproxy.http import HTTPFlow
|
||||
|
||||
from mitmproxy.addons.proxyauth_selenium import logger, randomString, AuthorizationOracle, SeleniumAddon
|
||||
|
||||
|
||||
class TestRandomString:
|
||||
|
||||
def test_random_string(self):
|
||||
res = randomString()
|
||||
assert isinstance(res, str)
|
||||
assert len(res) == 10
|
||||
|
||||
res_5 = randomString(5)
|
||||
assert isinstance(res_5, str)
|
||||
assert len(res_5) == 5
|
||||
|
||||
|
||||
class AuthenticationOracleTest(AuthorizationOracle):
|
||||
def is_unauthorized_request(self, flow: HTTPFlow) -> bool:
|
||||
return True
|
||||
|
||||
def is_unauthorized_response(self, flow: HTTPFlow) -> bool:
|
||||
return True
|
||||
|
||||
|
||||
oracle = AuthenticationOracleTest()
|
||||
|
||||
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def selenium_addon(request):
|
||||
addon = SeleniumAddon(fltr=r"~u http://example\.com/login\.php", domain=r"~d http://example\.com",
|
||||
auth_oracle=oracle)
|
||||
browser = MagicMock()
|
||||
addon.browser = browser
|
||||
yield addon
|
||||
|
||||
def fin():
|
||||
addon.browser.close()
|
||||
|
||||
request.addfinalizer(fin)
|
||||
|
||||
|
||||
class TestSeleniumAddon:
|
||||
|
||||
def test_request_replay(self, selenium_addon):
|
||||
f = tflow.tflow(resp=tutils.tresp())
|
||||
f.request.is_replay = True
|
||||
with mock.patch.object(logger, 'warning') as mock_warning:
|
||||
selenium_addon.request(f)
|
||||
mock_warning.assert_called()
|
||||
|
||||
def test_request(self, selenium_addon):
|
||||
f = tflow.tflow(resp=tutils.tresp())
|
||||
f.request.url = "http://example.com/login.php"
|
||||
selenium_addon.set_cookies = False
|
||||
assert not selenium_addon.set_cookies
|
||||
with mock.patch.object(logger, 'debug') as mock_debug:
|
||||
selenium_addon.request(f)
|
||||
mock_debug.assert_called()
|
||||
assert selenium_addon.set_cookies
|
||||
|
||||
def test_request_filtered(self, selenium_addon):
|
||||
f = tflow.tflow(resp=tutils.tresp())
|
||||
selenium_addon.set_cookies = False
|
||||
assert not selenium_addon.set_cookies
|
||||
selenium_addon.request(f)
|
||||
assert not selenium_addon.set_cookies
|
||||
|
||||
def test_request_cookies(self, selenium_addon):
|
||||
f = tflow.tflow(resp=tutils.tresp())
|
||||
f.request.url = "http://example.com/login.php"
|
||||
selenium_addon.set_cookies = False
|
||||
assert not selenium_addon.set_cookies
|
||||
with mock.patch.object(logger, 'debug') as mock_debug:
|
||||
with mock.patch('mitmproxy.addons.proxyauth_selenium.SeleniumAddon.login',
|
||||
return_value=[{"name": "cookie", "value": "test"}]) as mock_login:
|
||||
selenium_addon.request(f)
|
||||
mock_debug.assert_called()
|
||||
assert selenium_addon.set_cookies
|
||||
mock_login.assert_called()
|
||||
|
||||
def test_request_filter_None(self, selenium_addon):
|
||||
f = tflow.tflow(resp=tutils.tresp())
|
||||
fltr = selenium_addon.filter
|
||||
selenium_addon.filter = None
|
||||
assert not selenium_addon.filter
|
||||
selenium_addon.set_cookies = False
|
||||
assert not selenium_addon.set_cookies
|
||||
|
||||
with mock.patch.object(logger, 'debug') as mock_debug:
|
||||
selenium_addon.request(f)
|
||||
mock_debug.assert_called()
|
||||
selenium_addon.filter = fltr
|
||||
assert selenium_addon.set_cookies
|
||||
|
||||
def test_response(self, selenium_addon):
|
||||
f = tflow.tflow(resp=tutils.tresp())
|
||||
f.request.url = "http://example.com/login.php"
|
||||
selenium_addon.set_cookies = False
|
||||
with mock.patch('mitmproxy.addons.proxyauth_selenium.SeleniumAddon.login', return_value=[]) as mock_login:
|
||||
selenium_addon.response(f)
|
||||
mock_login.assert_called()
|
||||
|
||||
def test_response_cookies(self, selenium_addon):
|
||||
f = tflow.tflow(resp=tutils.tresp())
|
||||
f.request.url = "http://example.com/login.php"
|
||||
selenium_addon.set_cookies = False
|
||||
with mock.patch('mitmproxy.addons.proxyauth_selenium.SeleniumAddon.login',
|
||||
return_value=[{"name": "cookie", "value": "test"}]) as mock_login:
|
||||
selenium_addon.response(f)
|
||||
mock_login.assert_called()
|
Loading…
Reference in New Issue