Merge pull request #112 from hamstah/protobuf-view

Adds a new view for protocol buffers
This commit is contained in:
Aldo Cortesi 2013-04-19 17:18:44 -07:00
commit 793c41a5c4
3 changed files with 46 additions and 1 deletions

View File

@ -12,7 +12,7 @@ import netlib.utils
import common import common
from .. import utils, encoding, flow from .. import utils, encoding, flow
from ..contrib import jsbeautifier, html2text from ..contrib import jsbeautifier, html2text
import subprocess
try: try:
import pyamf import pyamf
from pyamf import remoting, flex from pyamf import remoting, flex
@ -364,6 +364,38 @@ class ViewImage:
) )
return "%s image"%img.format, fmt return "%s image"%img.format, fmt
class ViewProtobuf:
"""Human friendly view of protocol buffers
The view uses the protoc compiler to decode the binary
"""
name = "Protocol Buffer"
prompt = ("protobuf", "p")
content_types = ["application/x-protobuf"]
@staticmethod
def is_available():
try:
p = subprocess.Popen(["protoc", "--version"], stdout=subprocess.PIPE)
out, _ = p.communicate()
return out.startswith("libprotoc")
except:
return False
def decode_protobuf(self, content):
# if Popen raises OSError, it will be caught in
# get_content_view and fall back to Raw
p = subprocess.Popen(['protoc', '--decode_raw'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, _ = p.communicate(input=content)
return out
def __call__(self, hdrs, content, limit):
decoded = self.decode_protobuf(content)
txt = _view_text(decoded[:limit], len(decoded), limit)
return "Protobuf", txt
views = [ views = [
ViewAuto(), ViewAuto(),
@ -381,6 +413,9 @@ views = [
if pyamf: if pyamf:
views.append(ViewAMF()) views.append(ViewAMF())
if ViewProtobuf.is_available():
views.append(ViewProtobuf())
content_types_map = {} content_types_map = {}
for i in views: for i in views:
for ct in i.content_types: for ct in i.content_types:

2
test/data/protobuf01 Normal file
View File

@ -0,0 +1,2 @@
$3bbc333c-e61c-433b-819a-0b9a8cc103b8

View File

@ -234,6 +234,14 @@ if pyamf:
p = tutils.test_data.path("data/amf03") p = tutils.test_data.path("data/amf03")
assert v([], file(p).read(), sys.maxint) assert v([], file(p).read(), sys.maxint)
if cv.ViewProtobuf.is_available():
def test_view_protobuf_request():
v = cv.ViewProtobuf()
p = tutils.test_data.path("data/protobuf01")
content_type, output = v([], file(p).read(), sys.maxint)
assert content_type == "Protobuf"
assert output[0].text == '1: "3bbc333c-e61c-433b-819a-0b9a8cc103b8"'
def test_get_by_shortcut(): def test_get_by_shortcut():
assert cv.get_by_shortcut("h") assert cv.get_by_shortcut("h")