92 lines
2.6 KiB
Python
92 lines
2.6 KiB
Python
import itertools
|
|
import json
|
|
import typing
|
|
from collections.abc import MutableMapping
|
|
from typing import Any, Dict, Generator, List, TextIO, Callable
|
|
|
|
from mitmproxy import flowfilter
|
|
from mitmproxy.http import HTTPFlow
|
|
|
|
|
|
def f_id(x):
|
|
return x
|
|
|
|
|
|
class URLDict(MutableMapping):
|
|
"""Data structure to store information using filters as keys."""
|
|
|
|
def __init__(self):
|
|
self.store: Dict[flowfilter.TFilter, Any] = {}
|
|
|
|
def __getitem__(self, key, *, count=0):
|
|
if count:
|
|
ret = itertools.islice(self.get_generator(key), 0, count)
|
|
else:
|
|
ret = list(self.get_generator(key))
|
|
|
|
if ret:
|
|
return ret
|
|
else:
|
|
raise KeyError
|
|
|
|
def __setitem__(self, key: str, value):
|
|
fltr = flowfilter.parse(key)
|
|
if fltr:
|
|
self.store.__setitem__(fltr, value)
|
|
else:
|
|
raise ValueError("Not a valid filter")
|
|
|
|
def __delitem__(self, key):
|
|
self.store.__delitem__(key)
|
|
|
|
def __iter__(self):
|
|
return self.store.__iter__()
|
|
|
|
def __len__(self):
|
|
return self.store.__len__()
|
|
|
|
def get_generator(self, flow: HTTPFlow) -> Generator[Any, None, None]:
|
|
|
|
for fltr, value in self.store.items():
|
|
if flowfilter.match(fltr, flow):
|
|
yield value
|
|
|
|
def get(self, flow: HTTPFlow, default=None, *, count=0) -> List[Any]:
|
|
try:
|
|
return self.__getitem__(flow, count=count)
|
|
except KeyError:
|
|
return default
|
|
|
|
@classmethod
|
|
def _load(cls, json_obj, value_loader: Callable = f_id):
|
|
url_dict = cls()
|
|
for fltr, value in json_obj.items():
|
|
url_dict[fltr] = value_loader(value)
|
|
return url_dict
|
|
|
|
@classmethod
|
|
def load(cls, f: TextIO, value_loader: Callable = f_id):
|
|
json_obj = json.load(f)
|
|
return cls._load(json_obj, value_loader)
|
|
|
|
@classmethod
|
|
def loads(cls, json_str: str, value_loader: Callable = f_id):
|
|
json_obj = json.loads(json_str)
|
|
return cls._load(json_obj, value_loader)
|
|
|
|
def _dump(self, value_dumper: Callable = f_id) -> Dict:
|
|
dumped: Dict[typing.Union[flowfilter.TFilter, str], Any] = {}
|
|
for fltr, value in self.store.items():
|
|
if hasattr(fltr, 'pattern'):
|
|
# cast necessary for mypy
|
|
dumped[typing.cast(Any, fltr).pattern] = value_dumper(value)
|
|
else:
|
|
dumped[str(fltr)] = value_dumper(value)
|
|
return dumped
|
|
|
|
def dump(self, f: TextIO, value_dumper: Callable = f_id):
|
|
json.dump(self._dump(value_dumper), f)
|
|
|
|
def dumps(self, value_dumper: Callable = f_id):
|
|
return json.dumps(self._dump(value_dumper))
|