Merge pull request #833 from zbuc/contentview_scripts

Contentview scripts
This commit is contained in:
Maximilian Hils 2015-11-13 18:41:05 -08:00
commit dce469d4c1
7 changed files with 198 additions and 28 deletions

View File

@ -0,0 +1,66 @@
import string
from libmproxy import script, flow, utils
import libmproxy.contentviews as cv
from netlib.http import Headers
import lxml.html
import lxml.etree
class ViewPigLatin(cv.View):
name = "pig_latin_HTML"
prompt = ("pig latin HTML", "l")
content_types = ["text/html"]
def __call__(self, data, **metadata):
if utils.isXML(data):
parser = lxml.etree.HTMLParser(
strip_cdata=True,
remove_blank_text=True
)
d = lxml.html.fromstring(data, parser=parser)
docinfo = d.getroottree().docinfo
def piglify(src):
words = string.split(src)
ret = ''
for word in words:
idx = -1
while word[idx] in string.punctuation and (idx * -1) != len(word): idx -= 1
if word[0].lower() in 'aeiou':
if idx == -1: ret += word[0:] + "hay"
else: ret += word[0:len(word)+idx+1] + "hay" + word[idx+1:]
else:
if idx == -1: ret += word[1:] + word[0] + "ay"
else: ret += word[1:len(word)+idx+1] + word[0] + "ay" + word[idx+1:]
ret += ' '
return ret.strip()
def recurse(root):
if hasattr(root, 'text') and root.text:
root.text = piglify(root.text)
if hasattr(root, 'tail') and root.tail:
root.tail = piglify(root.tail)
if len(root):
for child in root:
recurse(child)
recurse(d)
s = lxml.etree.tostring(
d,
pretty_print=True,
doctype=docinfo.doctype
)
return "HTML", cv.format_text(s)
pig_view = ViewPigLatin()
def start(context, argv):
context.add_contentview(pig_view)
def stop(context):
context.remove_contentview(pig_view)

View File

@ -479,34 +479,9 @@ class ViewWBXML(View):
return None return None
views = [ views = []
ViewAuto(),
ViewRaw(),
ViewHex(),
ViewJSON(),
ViewXML(),
ViewWBXML(),
ViewHTML(),
ViewHTMLOutline(),
ViewJavaScript(),
ViewCSS(),
ViewURLEncoded(),
ViewMultipart(),
ViewImage(),
]
if pyamf:
views.append(ViewAMF())
if ViewProtobuf.is_available():
views.append(ViewProtobuf())
content_types_map = {} content_types_map = {}
for i in views: view_prompts = []
for ct in i.content_types:
l = content_types_map.setdefault(ct, [])
l.append(i)
view_prompts = [i.prompt for i in views]
def get_by_shortcut(c): def get_by_shortcut(c):
@ -515,6 +490,58 @@ def get_by_shortcut(c):
return i return i
def add(view):
# TODO: auto-select a different name (append an integer?)
for i in views:
if i.name == view.name:
raise ContentViewException("Duplicate view: " + view.name)
# TODO: the UI should auto-prompt for a replacement shortcut
for prompt in view_prompts:
if prompt[1] == view.prompt[1]:
raise ContentViewException("Duplicate view shortcut: " + view.prompt[1])
views.append(view)
for ct in view.content_types:
l = content_types_map.setdefault(ct, [])
l.append(view)
view_prompts.append(view.prompt)
def remove(view):
for ct in view.content_types:
l = content_types_map.setdefault(ct, [])
l.remove(view)
if not len(l):
del content_types_map[ct]
view_prompts.remove(view.prompt)
views.remove(view)
add(ViewAuto())
add(ViewRaw())
add(ViewHex())
add(ViewJSON())
add(ViewXML())
add(ViewWBXML())
add(ViewHTML())
add(ViewHTMLOutline())
add(ViewJavaScript())
add(ViewCSS())
add(ViewURLEncoded())
add(ViewMultipart())
add(ViewImage())
if pyamf:
add(ViewAMF())
if ViewProtobuf.is_available():
add(ViewProtobuf())
def get(name): def get(name):
for i in views: for i in views:
if i.name == name: if i.name == name:

View File

@ -9,7 +9,7 @@ import cookielib
import os import os
import re import re
import urlparse import urlparse
import inspect
from netlib import wsgi from netlib import wsgi
from netlib.exceptions import HttpException from netlib.exceptions import HttpException
@ -21,6 +21,7 @@ from .proxy.config import HostMatcher
from .protocol.http_replay import RequestReplayThread from .protocol.http_replay import RequestReplayThread
from .protocol import Kill from .protocol import Kill
from .models import ClientConnection, ServerConnection, HTTPResponse, HTTPFlow, HTTPRequest from .models import ClientConnection, ServerConnection, HTTPResponse, HTTPFlow, HTTPRequest
from . import contentviews as cv
class AppRegistry: class AppRegistry:

View File

@ -5,6 +5,8 @@ import threading
import shlex import shlex
import sys import sys
from . import contentviews as cv
class ScriptError(Exception): class ScriptError(Exception):
pass pass
@ -56,6 +58,12 @@ class ScriptContext:
def app_registry(self): def app_registry(self):
return self._master.apps return self._master.apps
def add_contentview(self, view_obj):
cv.add(view_obj)
def remove_contentview(self, view_obj):
cv.remove(view_obj)
class Script: class Script:
""" """

View File

@ -210,6 +210,21 @@ Larry
assert "decoded gzip" in r[0] assert "decoded gzip" in r[0]
assert "Raw" in r[0] assert "Raw" in r[0]
def test_add_cv(self):
class TestContentView(cv.View):
name = "test"
prompt = ("t", "test")
tcv = TestContentView()
cv.add(tcv)
# repeated addition causes exception
tutils.raises(
ContentViewException,
cv.add,
tcv
)
if pyamf: if pyamf:
def test_view_amf_request(): def test_view_amf_request():

View File

@ -0,0 +1,52 @@
from libmproxy import script, flow
import libmproxy.contentviews as cv
from netlib.http import Headers
def test_custom_views():
class ViewNoop(cv.View):
name = "noop"
prompt = ("noop", "n")
content_types = ["text/none"]
def __call__(self, data, **metadata):
return "noop", cv.format_text(data)
view_obj = ViewNoop()
cv.add(view_obj)
assert cv.get("noop")
r = cv.get_content_view(
cv.get("noop"),
"[1, 2, 3]",
headers=Headers(
content_type="text/plain"
)
)
assert "noop" in r[0]
# now try content-type matching
r = cv.get_content_view(
cv.get("Auto"),
"[1, 2, 3]",
headers=Headers(
content_type="text/none"
)
)
assert "noop" in r[0]
# now try removing the custom view
cv.remove(view_obj)
r = cv.get_content_view(
cv.get("Auto"),
"[1, 2, 3]",
headers=Headers(
content_type="text/none"
)
)
assert "noop" not in r[0]

View File

@ -127,3 +127,4 @@ def test_command_parsing():
absfilepath = os.path.normcase(tutils.test_data.path("scripts/a.py")) absfilepath = os.path.normcase(tutils.test_data.path("scripts/a.py"))
s = script.Script(absfilepath, fm) s = script.Script(absfilepath, fm)
assert os.path.isfile(s.args[0]) assert os.path.isfile(s.args[0])