multipart encoder and tests
This commit is contained in:
parent
4df325335b
commit
d08d2185ea
|
@ -1,8 +1,43 @@
|
|||
import re
|
||||
|
||||
import mimetypes
|
||||
from urllib.parse import quote
|
||||
from mitmproxy.net.http import headers
|
||||
|
||||
|
||||
def encode(head, l):
|
||||
|
||||
k = head.get("content-type")
|
||||
if k:
|
||||
k = headers.parse_content_type(k)
|
||||
if k is not None:
|
||||
try:
|
||||
boundary = k[2]["boundary"].encode("ascii")
|
||||
boundary = quote(boundary)
|
||||
except (KeyError, UnicodeError):
|
||||
return b""
|
||||
hdrs = []
|
||||
for key, value in l:
|
||||
file_type = mimetypes.guess_type(str(key))[0] or "text/plain; charset=utf-8"
|
||||
|
||||
if key:
|
||||
hdrs.append(b"--%b" % boundary.encode('utf-8'))
|
||||
disposition = b'form-data; name="%b"' % key
|
||||
hdrs.append(b"Content-Disposition: %b" % disposition)
|
||||
hdrs.append(b"Content-Type: %b" % file_type.encode('utf-8'))
|
||||
hdrs.append(b'')
|
||||
hdrs.append(value)
|
||||
hdrs.append(b'')
|
||||
|
||||
if value is not None:
|
||||
# If boundary is found in value then raise ValueError
|
||||
if re.search(rb"^--%b$" % re.escape(boundary.encode('utf-8')), value):
|
||||
raise ValueError(b"boundary found in encoded string")
|
||||
|
||||
hdrs.append(b"--%b--\r\n" % boundary.encode('utf-8'))
|
||||
temp = b"\r\n".join(hdrs)
|
||||
return temp
|
||||
|
||||
|
||||
def decode(hdrs, content):
|
||||
"""
|
||||
Takes a multipart boundary encoded string and returns list of (key, value) tuples.
|
||||
|
@ -19,14 +54,14 @@ def decode(hdrs, content):
|
|||
|
||||
rx = re.compile(br'\bname="([^"]+)"')
|
||||
r = []
|
||||
|
||||
for i in content.split(b"--" + boundary):
|
||||
parts = i.splitlines()
|
||||
if len(parts) > 1 and parts[0][0:2] != b"--":
|
||||
match = rx.search(parts[1])
|
||||
if match:
|
||||
key = match.group(1)
|
||||
value = b"".join(parts[3 + parts[2:].index(b""):])
|
||||
r.append((key, value))
|
||||
if content is not None:
|
||||
for i in content.split(b"--" + boundary):
|
||||
parts = i.splitlines()
|
||||
if len(parts) > 1 and parts[0][0:2] != b"--":
|
||||
match = rx.search(parts[1])
|
||||
if match:
|
||||
key = match.group(1)
|
||||
value = b"".join(parts[3 + parts[2:].index(b""):])
|
||||
r.append((key, value))
|
||||
return r
|
||||
return []
|
||||
|
|
|
@ -468,8 +468,8 @@ class Request(message.Message):
|
|||
return ()
|
||||
|
||||
def _set_multipart_form(self, value):
|
||||
self.content = mitmproxy.net.http.multipart.encode(self.headers, value)
|
||||
self.headers["content-type"] = "multipart/form-data"
|
||||
self.content = mitmproxy.net.http.url.encode(value, self.get_text(strict=False)).encode()
|
||||
|
||||
@property
|
||||
def multipart_form(self):
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
from mitmproxy.net.http import Headers
|
||||
from mitmproxy.net.http import multipart
|
||||
import pytest
|
||||
|
||||
|
||||
def test_decode():
|
||||
|
@ -22,3 +23,20 @@ def test_decode():
|
|||
assert len(form) == 2
|
||||
assert form[0] == (b"field1", b"value1")
|
||||
assert form[1] == (b"field2", b"value2")
|
||||
|
||||
|
||||
def test_encode():
|
||||
data = [("file".encode('utf-8'), "shell.jpg".encode('utf-8')),
|
||||
("file_size".encode('utf-8'), "1000".encode('utf-8'))]
|
||||
headers = Headers(
|
||||
content_type='multipart/form-data; boundary=127824672498'
|
||||
)
|
||||
content = multipart.encode(headers, data)
|
||||
|
||||
assert b'Content-Disposition: form-data; name="file"' in content
|
||||
assert b'Content-Type: text/plain; charset=utf-8\r\n\r\nshell.jpg\r\n\r\n--127824672498\r\n' in content
|
||||
assert b'1000\r\n\r\n--127824672498--\r\n'
|
||||
assert len(content) == 252
|
||||
|
||||
with pytest.raises(ValueError, match=r"boundary found in encoded string"):
|
||||
multipart.encode(headers, [("key".encode('utf-8'), "--127824672498".encode('utf-8'))])
|
||||
|
|
|
@ -371,7 +371,7 @@ class TestRequestUtils:
|
|||
assert list(request.multipart_form.items()) == []
|
||||
|
||||
def test_set_multipart_form(self):
|
||||
request = treq(content=b"foobar")
|
||||
request.multipart_form = [("filename", "shell.jpg"), ("file_size", "1000")]
|
||||
assert request.headers['Content-Type'] == "multipart/form-data"
|
||||
assert request.content
|
||||
request = treq()
|
||||
request.multipart_form = [("file", "shell.jpg"), ("file_size", "1000")]
|
||||
assert request.headers["Content-Type"] == 'multipart/form-data'
|
||||
assert request.content is None
|
||||
|
|
Loading…
Reference in New Issue