mitmproxy/examples/contrib/webscanner_helper/urlindex.py

169 lines
5.2 KiB
Python
Raw Permalink Normal View History

import abc
import datetime
import json
import logging
from pathlib import Path
from typing import Type, Dict, Union, Optional
from mitmproxy import flowfilter
from mitmproxy.http import HTTPFlow
logger = logging.getLogger(__name__)
class UrlIndexWriter(abc.ABC):
"""Abstract Add-on to write seen URLs.
For example, these URLs can be injected in a web application to improve the crawling of web application scanners.
The injection can be done using the URLInjection Add-on.
"""
def __init__(self, filename: Path):
"""Initializes the UrlIndexWriter.
Args:
filename: Path to file to which the URL index will be written.
"""
self.filepath = filename
@abc.abstractmethod
def load(self):
"""Load existing URL index."""
pass
@abc.abstractmethod
def add_url(self, flow: HTTPFlow):
"""Add new URL to URL index."""
pass
@abc.abstractmethod
def save(self):
pass
class SetEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, set):
return list(obj)
return json.JSONEncoder.default(self, obj)
class JSONUrlIndexWriter(UrlIndexWriter):
"""Writes seen URLs as JSON."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.host_urls = {}
def load(self):
if self.filepath.exists():
with self.filepath.open("r") as f:
self.host_urls = json.load(f)
for host in self.host_urls.keys():
for path, methods in self.host_urls[host].items():
for method, codes in methods.items():
self.host_urls[host][path] = {method: set(codes)}
def add_url(self, flow: HTTPFlow):
req = flow.request
res = flow.response
if req is not None and res is not None:
urls = self.host_urls.setdefault(f"{req.scheme}://{req.host}:{req.port}", dict())
methods = urls.setdefault(req.path, {})
codes = methods.setdefault(req.method, set())
codes.add(res.status_code)
def save(self):
with self.filepath.open("w") as f:
json.dump(self.host_urls, f, cls=SetEncoder)
class TextUrlIndexWriter(UrlIndexWriter):
"""Writes seen URLs as text."""
def load(self):
pass
def add_url(self, flow: HTTPFlow):
res = flow.response
req = flow.request
if res is not None and req is not None:
with self.filepath.open("a+") as f:
f.write(f"{datetime.datetime.utcnow().isoformat()} STATUS: {res.status_code} METHOD: "
f"{req.method} URL:{req.url}\n")
def save(self):
pass
WRITER: Dict[str, Type[UrlIndexWriter]] = {
"json": JSONUrlIndexWriter,
"text": TextUrlIndexWriter,
}
def filter_404(flow) -> bool:
"""Filters responses with status code 404."""
return flow.response.status_code != 404
class UrlIndexAddon:
"""Add-on to write seen URLs, either as JSON or as text.
For example, these URLs can be injected in a web application to improve the crawling of web application scanners.
The injection can be done using the URLInjection Add-on.
"""
index_filter: Optional[Union[str, flowfilter.TFilter]]
writer: UrlIndexWriter
OPT_FILEPATH = "URLINDEX_FILEPATH"
OPT_APPEND = "URLINDEX_APPEND"
OPT_INDEX_FILTER = "URLINDEX_FILTER"
def __init__(self, file_path: Union[str, Path], append: bool = True,
index_filter: Union[str, flowfilter.TFilter] = filter_404, index_format: str = "json"):
""" Initializes the urlindex add-on.
Args:
file_path: Path to file to which the URL index will be written. Can either be given as str or Path.
append: Bool to decide whether to append new URLs to the given file (as opposed to overwrite the contents
of the file)
index_filer: A mitmproxy filter with which the seen URLs will be filtered before being written. Can either
be given as str or as flowfilter.TFilter
index_format: The format of the URL index, can either be "json" or "text".
"""
if isinstance(index_filter, str):
self.index_filter = flowfilter.parse(index_filter)
if self.index_filter is None:
raise ValueError("Invalid filter expression.")
else:
self.index_filter = index_filter
file_path = Path(file_path)
try:
self.writer = WRITER[index_format.lower()](file_path)
except KeyError:
raise ValueError(f"Format '{index_format}' is not supported.")
if not append and file_path.exists():
file_path.unlink()
self.writer.load()
def response(self, flow: HTTPFlow):
"""Checks if the response should be included in the URL based on the index_filter and adds it to the URL index
if appropriate.
"""
if isinstance(self.index_filter, str) or self.index_filter is None:
raise ValueError("Invalid filter expression.")
else:
if self.index_filter(flow):
self.writer.add_url(flow)
def done(self):
"""Writes the URL index."""
self.writer.save()