138 lines
5.1 KiB
Python
138 lines
5.1 KiB
Python
import abc
|
|
import logging
|
|
import random
|
|
import string
|
|
import time
|
|
from typing import Any
|
|
from typing import cast
|
|
|
|
from selenium import webdriver
|
|
|
|
import mitmproxy.http
|
|
from mitmproxy import flowfilter
|
|
from mitmproxy import master
|
|
from mitmproxy.script import concurrent
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
cookie_key_name = {
|
|
"path": "Path",
|
|
"expires": "Expires",
|
|
"domain": "Domain",
|
|
"is_http_only": "HttpOnly",
|
|
"is_secure": "Secure",
|
|
}
|
|
|
|
|
|
def randomString(string_length=10):
|
|
"""Generate a random string of fixed length"""
|
|
letters = string.ascii_lowercase
|
|
return "".join(random.choice(letters) for i in range(string_length))
|
|
|
|
|
|
class AuthorizationOracle(abc.ABC):
|
|
"""Abstract class for an authorization oracle which decides if a given request or response is authenticated."""
|
|
|
|
@abc.abstractmethod
|
|
def is_unauthorized_request(self, flow: mitmproxy.http.HTTPFlow) -> bool:
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def is_unauthorized_response(self, flow: mitmproxy.http.HTTPFlow) -> bool:
|
|
pass
|
|
|
|
|
|
class SeleniumAddon:
|
|
"""This Addon can be used in combination with web application scanners in order to help them to authenticate
|
|
against a web application.
|
|
|
|
Since the authentication is highly dependant on the web application, this add-on includes the abstract method
|
|
*login*. In order to use the add-on, a class for the web application inheriting from SeleniumAddon needs to be
|
|
created. This class needs to include the concrete selenium actions necessary to authenticate against the web
|
|
application. In addition, an authentication oracle which inherits from AuthorizationOracle should be created.
|
|
"""
|
|
|
|
def __init__(self, fltr: str, domain: str, auth_oracle: AuthorizationOracle):
|
|
self.filter = flowfilter.parse(fltr)
|
|
self.auth_oracle = auth_oracle
|
|
self.domain = domain
|
|
self.browser = None
|
|
self.set_cookies = False
|
|
|
|
options = webdriver.FirefoxOptions()
|
|
options.headless = True
|
|
|
|
profile = webdriver.FirefoxProfile()
|
|
profile.set_preference("network.proxy.type", 0)
|
|
self.browser = webdriver.Firefox(firefox_profile=profile, options=options)
|
|
self.cookies: list[dict[str, str]] = []
|
|
|
|
def _login(self, flow):
|
|
self.cookies = self.login(flow)
|
|
self.browser.get("about:blank")
|
|
self._set_request_cookies(flow)
|
|
self.set_cookies = True
|
|
|
|
def request(self, flow: mitmproxy.http.HTTPFlow):
|
|
if flow.request.is_replay:
|
|
logger.warning("Caught replayed request: " + str(flow))
|
|
if (
|
|
not self.filter or self.filter(flow)
|
|
) and self.auth_oracle.is_unauthorized_request(flow):
|
|
logger.debug("unauthorized request detected, perform login")
|
|
self._login(flow)
|
|
|
|
# has to be concurrent because replay.client is blocking and replayed flows
|
|
# will also call response
|
|
@concurrent
|
|
def response(self, flow: mitmproxy.http.HTTPFlow):
|
|
if flow.response and (self.filter is None or self.filter(flow)):
|
|
if self.auth_oracle.is_unauthorized_response(flow):
|
|
self._login(flow)
|
|
new_flow = flow.copy()
|
|
if master and hasattr(master, "commands"):
|
|
# cast necessary for mypy
|
|
cast(Any, master).commands.call("replay.client", [new_flow])
|
|
count = 0
|
|
while new_flow.response is None and count < 10:
|
|
logger.error("waiting since " + str(count) + " ...")
|
|
count = count + 1
|
|
time.sleep(1)
|
|
if new_flow.response:
|
|
flow.response = new_flow.response
|
|
else:
|
|
logger.warning(
|
|
"Could not call 'replay.client' command since master was not initialized yet."
|
|
)
|
|
|
|
if self.set_cookies and flow.response:
|
|
logger.debug("set set-cookie header for response")
|
|
self._set_set_cookie_headers(flow)
|
|
self.set_cookies = False
|
|
|
|
def done(self):
|
|
self.browser.close()
|
|
|
|
def _set_set_cookie_headers(self, flow: mitmproxy.http.HTTPFlow):
|
|
if flow.response and self.cookies:
|
|
for cookie in self.cookies:
|
|
parts = [f"{cookie['name']}={cookie['value']}"]
|
|
for k, v in cookie_key_name.items():
|
|
if k in cookie and isinstance(cookie[k], str):
|
|
parts.append(f"{v}={cookie[k]}")
|
|
elif k in cookie and isinstance(cookie[k], bool) and cookie[k]:
|
|
parts.append(cookie[k])
|
|
encoded_c = "; ".join(parts)
|
|
flow.response.headers["set-cookie"] = encoded_c
|
|
|
|
def _set_request_cookies(self, flow: mitmproxy.http.HTTPFlow):
|
|
if self.cookies:
|
|
cookies = "; ".join(
|
|
map(lambda c: f"{c['name']}={c['value']}", self.cookies)
|
|
)
|
|
flow.request.headers["cookie"] = cookies
|
|
|
|
@abc.abstractmethod
|
|
def login(self, flow: mitmproxy.http.HTTPFlow) -> list[dict[str, str]]:
|
|
pass
|