mitmproxy/examples/contrib/webscanner_helper/urldict.py

91 lines
2.6 KiB
Python

import itertools
import json
from collections.abc import MutableMapping
from typing import Any, Callable, Generator, TextIO, Union, cast
from mitmproxy import flowfilter
from mitmproxy.http import HTTPFlow
def f_id(x):
return x
class URLDict(MutableMapping):
"""Data structure to store information using filters as keys."""
def __init__(self):
self.store: dict[flowfilter.TFilter, Any] = {}
def __getitem__(self, key, *, count=0):
if count:
ret = itertools.islice(self.get_generator(key), 0, count)
else:
ret = list(self.get_generator(key))
if ret:
return ret
else:
raise KeyError
def __setitem__(self, key: str, value):
fltr = flowfilter.parse(key)
if fltr:
self.store.__setitem__(fltr, value)
else:
raise ValueError("Not a valid filter")
def __delitem__(self, key):
self.store.__delitem__(key)
def __iter__(self):
return self.store.__iter__()
def __len__(self):
return self.store.__len__()
def get_generator(self, flow: HTTPFlow) -> Generator[Any, None, None]:
for fltr, value in self.store.items():
if flowfilter.match(fltr, flow):
yield value
def get(self, flow: HTTPFlow, default=None, *, count=0) -> list[Any]:
try:
return self.__getitem__(flow, count=count)
except KeyError:
return default
@classmethod
def _load(cls, json_obj, value_loader: Callable = f_id):
url_dict = cls()
for fltr, value in json_obj.items():
url_dict[fltr] = value_loader(value)
return url_dict
@classmethod
def load(cls, f: TextIO, value_loader: Callable = f_id):
json_obj = json.load(f)
return cls._load(json_obj, value_loader)
@classmethod
def loads(cls, json_str: str, value_loader: Callable = f_id):
json_obj = json.loads(json_str)
return cls._load(json_obj, value_loader)
def _dump(self, value_dumper: Callable = f_id) -> dict:
dumped: dict[Union[flowfilter.TFilter, str], Any] = {}
for fltr, value in self.store.items():
if hasattr(fltr, 'pattern'):
# cast necessary for mypy
dumped[cast(Any, fltr).pattern] = value_dumper(value)
else:
dumped[str(fltr)] = value_dumper(value)
return dumped
def dump(self, f: TextIO, value_dumper: Callable = f_id):
json.dump(self._dump(value_dumper), f)
def dumps(self, value_dumper: Callable = f_id):
return json.dumps(self._dump(value_dumper))