mitmproxy/examples/contrib/webscanner_helper/urlindex.py

175 lines
5.2 KiB
Python

import abc
import datetime
import json
import logging
from pathlib import Path
from mitmproxy import flowfilter
from mitmproxy.http import HTTPFlow
logger = logging.getLogger(__name__)
class UrlIndexWriter(abc.ABC):
"""Abstract Add-on to write seen URLs.
For example, these URLs can be injected in a web application to improve the crawling of web application scanners.
The injection can be done using the URLInjection Add-on.
"""
def __init__(self, filename: Path):
"""Initializes the UrlIndexWriter.
Args:
filename: Path to file to which the URL index will be written.
"""
self.filepath = filename
@abc.abstractmethod
def load(self):
"""Load existing URL index."""
@abc.abstractmethod
def add_url(self, flow: HTTPFlow):
"""Add new URL to URL index."""
@abc.abstractmethod
def save(self):
pass
class SetEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, set):
return list(obj)
return json.JSONEncoder.default(self, obj)
class JSONUrlIndexWriter(UrlIndexWriter):
"""Writes seen URLs as JSON."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.host_urls = {}
def load(self):
if self.filepath.exists():
with self.filepath.open("r") as f:
self.host_urls = json.load(f)
for host in self.host_urls.keys():
for path, methods in self.host_urls[host].items():
for method, codes in methods.items():
self.host_urls[host][path] = {method: set(codes)}
def add_url(self, flow: HTTPFlow):
req = flow.request
res = flow.response
if req is not None and res is not None:
urls = self.host_urls.setdefault(
f"{req.scheme}://{req.host}:{req.port}", dict()
)
methods = urls.setdefault(req.path, {})
codes = methods.setdefault(req.method, set())
codes.add(res.status_code)
def save(self):
with self.filepath.open("w") as f:
json.dump(self.host_urls, f, cls=SetEncoder)
class TextUrlIndexWriter(UrlIndexWriter):
"""Writes seen URLs as text."""
def load(self):
pass
def add_url(self, flow: HTTPFlow):
res = flow.response
req = flow.request
if res is not None and req is not None:
with self.filepath.open("a+") as f:
f.write(
f"{datetime.datetime.utcnow().isoformat()} STATUS: {res.status_code} METHOD: "
f"{req.method} URL:{req.url}\n"
)
def save(self):
pass
WRITER: dict[str, type[UrlIndexWriter]] = {
"json": JSONUrlIndexWriter,
"text": TextUrlIndexWriter,
}
def filter_404(flow) -> bool:
"""Filters responses with status code 404."""
return flow.response.status_code != 404
class UrlIndexAddon:
"""Add-on to write seen URLs, either as JSON or as text.
For example, these URLs can be injected in a web application to improve the crawling of web application scanners.
The injection can be done using the URLInjection Add-on.
"""
index_filter: str | flowfilter.TFilter | None
writer: UrlIndexWriter
OPT_FILEPATH = "URLINDEX_FILEPATH"
OPT_APPEND = "URLINDEX_APPEND"
OPT_INDEX_FILTER = "URLINDEX_FILTER"
def __init__(
self,
file_path: str | Path,
append: bool = True,
index_filter: str | flowfilter.TFilter = filter_404,
index_format: str = "json",
):
"""Initializes the urlindex add-on.
Args:
file_path: Path to file to which the URL index will be written. Can either be given as str or Path.
append: Bool to decide whether to append new URLs to the given file (as opposed to overwrite the contents
of the file)
index_filer: A mitmproxy filter with which the seen URLs will be filtered before being written. Can either
be given as str or as flowfilter.TFilter
index_format: The format of the URL index, can either be "json" or "text".
"""
if isinstance(index_filter, str):
self.index_filter = flowfilter.parse(index_filter)
if self.index_filter is None:
raise ValueError("Invalid filter expression.")
else:
self.index_filter = index_filter
file_path = Path(file_path)
try:
self.writer = WRITER[index_format.lower()](file_path)
except KeyError:
raise ValueError(f"Format '{index_format}' is not supported.")
if not append and file_path.exists():
file_path.unlink()
self.writer.load()
def response(self, flow: HTTPFlow):
"""Checks if the response should be included in the URL based on the index_filter and adds it to the URL index
if appropriate.
"""
if isinstance(self.index_filter, str) or self.index_filter is None:
raise ValueError("Invalid filter expression.")
else:
if self.index_filter(flow):
self.writer.add_url(flow)
def done(self):
"""Writes the URL index."""
self.writer.save()