Merge pull request #4586 from rbdixon/meta_replay_filter

Metadata and replay filter syntax
This commit is contained in:
Maximilian Hils 2021-05-14 08:37:39 +02:00 committed by GitHub
commit b70358cbde
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 68 additions and 1 deletions

View File

@ -61,6 +61,7 @@ If you depend on these features, please raise your voice in
* Multiple Browsers: The `browser.start` command may be executed more than once to start additional * Multiple Browsers: The `browser.start` command may be executed more than once to start additional
browser sessions. (@rbdixon) browser sessions. (@rbdixon)
* Improve readability of SHA256 fingerprint. (@wrekone) * Improve readability of SHA256 fingerprint. (@wrekone)
* Metadata and Replay Flow Filters: Flows may be filtered based on metadata and replay status. (@rbdixon)
* --- TODO: add new PRs above this line --- * --- TODO: add new PRs above this line ---
* ... and various other fixes, documentation improvements, dependency version bumps, etc. * ... and various other fixes, documentation improvements, dependency version bumps, etc.

View File

@ -36,7 +36,6 @@ import functools
import re import re
import sys import sys
from typing import Callable, ClassVar, Optional, Sequence, Type from typing import Callable, ClassVar, Optional, Sequence, Type
import pyparsing as pp import pyparsing as pp
from mitmproxy import flow, http, tcp from mitmproxy import flow, http, tcp
@ -382,6 +381,41 @@ class FDst(_Rex):
return f.server_conn.address and self.re.search(r) return f.server_conn.address and self.re.search(r)
class FReplay(_Action):
code = "replay"
help = "Match replayed flows"
def __call__(self, f):
return f.is_replay is not None
class FReplayClient(_Action):
code = "replayq"
help = "Match replayed client request"
def __call__(self, f):
return f.is_replay == 'request'
class FReplayServer(_Action):
code = "replays"
help = "Match replayed server response"
def __call__(self, f):
return f.is_replay == 'response'
class FMeta(_Rex):
code = "meta"
help = "Flow metadata"
flags = re.MULTILINE
is_binary = False
def __call__(self, f):
m = "\n".join([f"{key}: {value}" for key, value in f.metadata.items()])
return self.re.search(m)
class _Int(_Action): class _Int(_Action):
def __init__(self, num): def __init__(self, num):
@ -444,6 +478,9 @@ filter_unary: Sequence[Type[_Action]] = [
FErr, FErr,
FHTTP, FHTTP,
FMarked, FMarked,
FReplay,
FReplayClient,
FReplayServer,
FReq, FReq,
FResp, FResp,
FTCP, FTCP,
@ -464,6 +501,7 @@ filter_rex: Sequence[Type[_Rex]] = [
FMethod, FMethod,
FSrc, FSrc,
FUrl, FUrl,
FMeta,
] ]
filter_int = [ filter_int = [
FCode FCode

View File

@ -24,6 +24,9 @@ class TestParsing:
assert flowfilter.parse("~m foobar") assert flowfilter.parse("~m foobar")
assert flowfilter.parse("~u foobar") assert flowfilter.parse("~u foobar")
assert flowfilter.parse("~q ~c 10") assert flowfilter.parse("~q ~c 10")
assert flowfilter.parse("~replay")
assert flowfilter.parse("~replayq")
assert flowfilter.parse("~replays")
p = flowfilter.parse("~q ~c 10") p = flowfilter.parse("~q ~c 10")
self._dump(p) self._dump(p)
assert len(p.lst) == 2 assert len(p.lst) == 2
@ -296,6 +299,31 @@ class TestMatchingHTTPFlow:
assert self.q("!~c 201 !~c 202", s) assert self.q("!~c 201 !~c 202", s)
assert not self.q("!~c 201 !~c 200", s) assert not self.q("!~c 201 !~c 200", s)
def test_replay(self):
f = tflow.tflow()
assert not self.q("~replay", f)
f.is_replay = "request"
assert self.q("~replay", f)
assert self.q("~replayq", f)
assert not self.q("~replays", f)
f.is_replay = "response"
assert self.q("~replay", f)
assert not self.q("~replayq", f)
assert self.q("~replays", f)
def test_metadata(self):
f = tflow.tflow()
f.metadata["a"] = 1
f.metadata["b"] = "string"
f.metadata["c"] = {"key": "value"}
assert self.q("~meta a", f)
assert not self.q("~meta no", f)
assert self.q("~meta string", f)
assert self.q("~meta key", f)
assert self.q("~meta value", f)
assert self.q("~meta \"b: string\"", f)
assert self.q("~meta \"'key': 'value'\"", f)
class TestMatchingTCPFlow: class TestMatchingTCPFlow: