remove clean_bin, clarify unicode handling
This commit is contained in:
parent
317a5178ea
commit
d51cf543bb
|
@ -17,9 +17,11 @@ def tcp_message(ctx, tcp_msg):
|
|||
is_modified = False if modified_msg == tcp_msg.message else True
|
||||
tcp_msg.message = modified_msg
|
||||
|
||||
print("[tcp_message{}] from {} {} to {} {}:\r\n{}".format(
|
||||
" (modified)" if is_modified else "",
|
||||
"client" if tcp_msg.sender == tcp_msg.client_conn else "server",
|
||||
tcp_msg.sender.address,
|
||||
"server" if tcp_msg.receiver == tcp_msg.server_conn else "client",
|
||||
tcp_msg.receiver.address, strutils.clean_bin(tcp_msg.message)))
|
||||
print(
|
||||
"[tcp_message{}] from {} {} to {} {}:\r\n{}".format(
|
||||
" (modified)" if is_modified else "",
|
||||
"client" if tcp_msg.sender == tcp_msg.client_conn else "server",
|
||||
tcp_msg.sender.address,
|
||||
"server" if tcp_msg.receiver == tcp_msg.server_conn else "client",
|
||||
tcp_msg.receiver.address, strutils.bytes_to_escaped_str(tcp_msg.message))
|
||||
)
|
||||
|
|
|
@ -160,7 +160,7 @@ class ViewRaw(View):
|
|||
content_types = []
|
||||
|
||||
def __call__(self, data, **metadata):
|
||||
return "Raw", format_text(strutils.bytes_to_escaped_str(data))
|
||||
return "Raw", format_text(strutils.bytes_to_escaped_str(data, True))
|
||||
|
||||
|
||||
class ViewHex(View):
|
||||
|
@ -597,10 +597,9 @@ def safe_to_print(lines, encoding="utf8"):
|
|||
for line in lines:
|
||||
clean_line = []
|
||||
for (style, text) in line:
|
||||
try:
|
||||
text = strutils.clean_bin(text.decode(encoding, "strict"))
|
||||
except UnicodeDecodeError:
|
||||
text = strutils.clean_bin(text)
|
||||
if isinstance(text, bytes):
|
||||
text = text.decode(encoding, "replace")
|
||||
text = strutils.escape_control_characters(text)
|
||||
clean_line.append((style, text))
|
||||
yield clean_line
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import unicodedata
|
||||
import re
|
||||
import codecs
|
||||
|
||||
import six
|
||||
|
@ -28,51 +28,71 @@ def native(s, *encoding_opts):
|
|||
return s
|
||||
|
||||
|
||||
def clean_bin(s, keep_spacing=True):
|
||||
# type: (Union[bytes, six.text_type], bool) -> six.text_type
|
||||
"""
|
||||
Cleans binary data to make it safe to display.
|
||||
# Translate control characters to "safe" characters. This implementation initially
|
||||
# replaced them with the matching control pictures (http://unicode.org/charts/PDF/U2400.pdf),
|
||||
# but that turned out to render badly with monospace fonts. We are back to "." therefore.
|
||||
_control_char_trans = {
|
||||
x: ord(".") # x + 0x2400 for unicode control group pictures
|
||||
for x in range(32)
|
||||
}
|
||||
_control_char_trans[127] = ord(".") # 0x2421
|
||||
_control_char_trans_newline = _control_char_trans.copy()
|
||||
for x in ("\r", "\n", "\t"):
|
||||
del _control_char_trans_newline[ord(x)]
|
||||
|
||||
Args:
|
||||
keep_spacing: If False, tabs and newlines will also be replaced.
|
||||
|
||||
if six.PY2:
|
||||
pass
|
||||
else:
|
||||
_control_char_trans = str.maketrans(_control_char_trans)
|
||||
_control_char_trans_newline = str.maketrans(_control_char_trans_newline)
|
||||
|
||||
|
||||
def escape_control_characters(text, keep_spacing=True):
|
||||
"""
|
||||
if isinstance(s, six.text_type):
|
||||
if keep_spacing:
|
||||
keep = u" \n\r\t"
|
||||
else:
|
||||
keep = u" "
|
||||
Replace all unicode C1 control characters from the given text with their respective control pictures.
|
||||
For example, a null byte is replaced with the unicode character "\u2400".
|
||||
|
||||
Args:
|
||||
keep_spacing: If True, tabs and newlines will not be replaced.
|
||||
"""
|
||||
# type: (six.text_type) -> six.text_type
|
||||
if not isinstance(text, six.text_type):
|
||||
raise ValueError("text type must be unicode but is {}".format(type(text).__name__))
|
||||
|
||||
trans = _control_char_trans_newline if keep_spacing else _control_char_trans
|
||||
if six.PY2:
|
||||
return u"".join(
|
||||
ch if (unicodedata.category(ch)[0] not in "CZ" or ch in keep) else u"."
|
||||
for ch in s
|
||||
)
|
||||
else:
|
||||
if keep_spacing:
|
||||
keep = (9, 10, 13) # \t, \n, \r,
|
||||
else:
|
||||
keep = ()
|
||||
return "".join(
|
||||
chr(ch) if (31 < ch < 127 or ch in keep) else "."
|
||||
for ch in six.iterbytes(s)
|
||||
six.unichr(trans.get(ord(ch), ord(ch)))
|
||||
for ch in text
|
||||
)
|
||||
return text.translate(trans)
|
||||
|
||||
|
||||
def bytes_to_escaped_str(data):
|
||||
def bytes_to_escaped_str(data, keep_spacing=False):
|
||||
"""
|
||||
Take bytes and return a safe string that can be displayed to the user.
|
||||
|
||||
Single quotes are always escaped, double quotes are never escaped:
|
||||
"'" + bytes_to_escaped_str(...) + "'"
|
||||
gives a valid Python string.
|
||||
|
||||
Args:
|
||||
keep_spacing: If True, tabs and newlines will not be escaped.
|
||||
"""
|
||||
# TODO: We may want to support multi-byte characters without escaping them.
|
||||
# One way to do would be calling .decode("utf8", "backslashreplace") first
|
||||
# and then escaping UTF8 control chars (see clean_bin).
|
||||
|
||||
if not isinstance(data, bytes):
|
||||
raise ValueError("data must be bytes, but is {}".format(data.__class__.__name__))
|
||||
# We always insert a double-quote here so that we get a single-quoted string back
|
||||
# https://stackoverflow.com/questions/29019340/why-does-python-use-different-quotes-for-representing-strings-depending-on-their
|
||||
return repr(b'"' + data).lstrip("b")[2:-1]
|
||||
ret = repr(b'"' + data).lstrip("b")[2:-1]
|
||||
if keep_spacing:
|
||||
ret = re.sub(
|
||||
r"(?<!\\)(\\\\)*\\([nrt])",
|
||||
lambda m: (m.group(1) or "") + dict(n="\n", r="\r", t="\t")[m.group(2)],
|
||||
ret
|
||||
)
|
||||
return ret
|
||||
|
||||
|
||||
def escaped_str_to_bytes(data):
|
||||
|
@ -136,4 +156,8 @@ def hexdump(s):
|
|||
part = s[i:i + 16]
|
||||
x = " ".join("{:0=2x}".format(i) for i in six.iterbytes(part))
|
||||
x = x.ljust(47) # 16*2 + 15
|
||||
yield (offset, x, clean_bin(part, False))
|
||||
part_repr = escape_control_characters(
|
||||
part.decode("ascii", "replace").replace(u"\ufffd", u"."),
|
||||
False
|
||||
)
|
||||
yield (offset, x, part_repr)
|
||||
|
|
|
@ -255,7 +255,7 @@ class Frame(object):
|
|||
def __repr__(self):
|
||||
ret = repr(self.header)
|
||||
if self.payload:
|
||||
ret = ret + "\nPayload:\n" + strutils.clean_bin(self.payload)
|
||||
ret = ret + "\nPayload:\n" + strutils.bytes_to_escaped_str(self.payload)
|
||||
return ret
|
||||
|
||||
def human_readable(self):
|
||||
|
|
|
@ -62,8 +62,9 @@ class LogCtx(object):
|
|||
for line in strutils.hexdump(data):
|
||||
self("\t%s %s %s" % line)
|
||||
else:
|
||||
for i in strutils.clean_bin(data).split("\n"):
|
||||
self("\t%s" % i)
|
||||
data = data.decode("ascii", "replace").replace(u"\ufffd", u".")
|
||||
for i in strutils.escape_control_characters(data).split(u"\n"):
|
||||
self(u"\t%s" % i)
|
||||
|
||||
def __call__(self, line):
|
||||
self.lines.append(line)
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
# coding=utf-8
|
||||
import six
|
||||
|
||||
from netlib import strutils, tutils
|
||||
|
@ -15,18 +14,22 @@ def test_native():
|
|||
assert strutils.native(b"foo") == u"foo"
|
||||
|
||||
|
||||
def test_clean_bin():
|
||||
assert strutils.clean_bin(b"one") == u"one"
|
||||
assert strutils.clean_bin(b"\00ne") == u".ne"
|
||||
assert strutils.clean_bin(b"\nne") == u"\nne"
|
||||
assert strutils.clean_bin(b"\nne", False) == u".ne"
|
||||
assert strutils.clean_bin(u"\u2605".encode("utf8")) == u"..."
|
||||
|
||||
assert strutils.clean_bin(u"one") == u"one"
|
||||
assert strutils.clean_bin(u"\00ne") == u".ne"
|
||||
assert strutils.clean_bin(u"\nne") == u"\nne"
|
||||
assert strutils.clean_bin(u"\nne", False) == u".ne"
|
||||
assert strutils.clean_bin(u"\u2605") == u"\u2605"
|
||||
def test_escape_control_characters():
|
||||
assert strutils.escape_control_characters(u"one") == u"one"
|
||||
assert strutils.escape_control_characters(u"\00ne") == u".ne"
|
||||
assert strutils.escape_control_characters(u"\nne") == u"\nne"
|
||||
assert strutils.escape_control_characters(u"\nne", False) == u".ne"
|
||||
assert strutils.escape_control_characters(u"\u2605") == u"\u2605"
|
||||
assert (
|
||||
strutils.escape_control_characters(bytes(bytearray(range(128))).decode()) ==
|
||||
u'.........\t\n..\r.................. !"#$%&\'()*+,-./0123456789:;<'
|
||||
u'=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~.'
|
||||
)
|
||||
assert (
|
||||
strutils.escape_control_characters(bytes(bytearray(range(128))).decode(), False) ==
|
||||
u'................................ !"#$%&\'()*+,-./0123456789:;<'
|
||||
u'=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~.'
|
||||
)
|
||||
|
||||
|
||||
def test_bytes_to_escaped_str():
|
||||
|
@ -37,6 +40,14 @@ def test_bytes_to_escaped_str():
|
|||
assert strutils.bytes_to_escaped_str(b"'") == r"\'"
|
||||
assert strutils.bytes_to_escaped_str(b'"') == r'"'
|
||||
|
||||
assert strutils.bytes_to_escaped_str(b"\r\n\t") == "\\r\\n\\t"
|
||||
assert strutils.bytes_to_escaped_str(b"\r\n\t", True) == "\r\n\t"
|
||||
|
||||
assert strutils.bytes_to_escaped_str(b"\n", True) == "\n"
|
||||
assert strutils.bytes_to_escaped_str(b"\\n", True) == "\\ \\ n".replace(" ", "")
|
||||
assert strutils.bytes_to_escaped_str(b"\\\n", True) == "\\ \\ \n".replace(" ", "")
|
||||
assert strutils.bytes_to_escaped_str(b"\\\\n", True) == "\\ \\ \\ \\ n".replace(" ", "")
|
||||
|
||||
with tutils.raises(ValueError):
|
||||
strutils.bytes_to_escaped_str(u"such unicode")
|
||||
|
||||
|
@ -45,10 +56,9 @@ def test_escaped_str_to_bytes():
|
|||
assert strutils.escaped_str_to_bytes("foo") == b"foo"
|
||||
assert strutils.escaped_str_to_bytes("\x08") == b"\b"
|
||||
assert strutils.escaped_str_to_bytes("&!?=\\\\)") == br"&!?=\)"
|
||||
assert strutils.escaped_str_to_bytes("ü") == b'\xc3\xbc'
|
||||
assert strutils.escaped_str_to_bytes(u"\\x08") == b"\b"
|
||||
assert strutils.escaped_str_to_bytes(u"&!?=\\\\)") == br"&!?=\)"
|
||||
assert strutils.escaped_str_to_bytes(u"ü") == b'\xc3\xbc'
|
||||
assert strutils.escaped_str_to_bytes(u"\u00fc") == b'\xc3\xbc'
|
||||
|
||||
if six.PY2:
|
||||
with tutils.raises(ValueError):
|
||||
|
|
Loading…
Reference in New Issue