100% testcoverage for netlib.http

This commit is contained in:
Aldo Cortesi 2012-06-23 15:07:42 +12:00
parent 5cf6aeb926
commit 1263221ddd
2 changed files with 118 additions and 46 deletions

View File

@ -57,36 +57,40 @@ def read_headers(fp):
return ret
def read_chunked(fp, limit):
def read_chunked(code, fp, limit):
"""
Read a chunked HTTP body.
May raise HttpError.
"""
content = ""
total = 0
while 1:
line = fp.readline(128)
if line == "":
raise IOError("Connection closed")
if line == '\r\n' or line == '\n':
continue
raise HttpError(code, "Connection closed prematurely")
if line != '\r\n' and line != '\n':
try:
length = int(line, 16)
except ValueError:
# FIXME: Not strictly correct - this could be from the server, in which
# case we should send a 502.
raise HttpError(400, "Invalid chunked encoding length: %s"%line)
raise HttpError(code, "Invalid chunked encoding length: %s"%line)
if not length:
break
total += length
if limit is not None and total > limit:
msg = "HTTP Body too large."\
" Limit is %s, chunked content length was at least %s"%(limit, total)
raise HttpError(509, msg)
raise HttpError(code, msg)
content += fp.read(length)
line = fp.readline(5)
if line != '\r\n':
raise IOError("Malformed chunked body")
raise HttpError(code, "Malformed chunked body")
while 1:
line = fp.readline()
if line == "":
raise IOError("Connection closed")
raise HttpError(code, "Connection closed prematurely")
if line == '\r\n' or line == '\n':
break
return content
@ -100,18 +104,27 @@ def has_chunked_encoding(headers):
return False
def read_http_body(rfile, headers, all, limit):
def read_http_body(code, rfile, headers, all, limit):
"""
Read an HTTP body:
code: The HTTP error code to be used when raising HttpError
rfile: A file descriptor to read from
headers: An ODictCaseless object
all: Should we read all data?
limit: Size limit.
"""
if has_chunked_encoding(headers):
content = read_chunked(rfile, limit)
content = read_chunked(code, rfile, limit)
elif "content-length" in headers:
try:
l = int(headers["content-length"][0])
except ValueError:
# FIXME: Not strictly correct - this could be from the server, in which
# case we should send a 502.
raise HttpError(400, "Invalid content-length header: %s"%headers["content-length"])
raise HttpError(code, "Invalid content-length header: %s"%headers["content-length"])
if limit is not None and l > limit:
raise HttpError(509, "HTTP Body too large. Limit is %s, content-length was %s"%(limit, l))
raise HttpError(code, "HTTP Body too large. Limit is %s, content-length was %s"%(limit, l))
content = rfile.read(l)
elif all:
content = rfile.read(limit if limit else None)
@ -121,6 +134,10 @@ def read_http_body(rfile, headers, all, limit):
def parse_http_protocol(s):
"""
Parse an HTTP protocol declaration. Returns a (major, minor) tuple, or
None.
"""
if not s.startswith("HTTP/"):
return None
major, minor = s.split('/')[1].split('.')
@ -201,18 +218,26 @@ def response_connection_close(httpversion, headers):
"""
if request_connection_close(httpversion, headers):
return True
elif not has_chunked_encoding(headers) and "content-length" in headers:
return True
elif (not has_chunked_encoding(headers)) and "content-length" in headers:
return False
return True
def read_http_body_request(rfile, wfile, headers, httpversion, limit):
"""
Read the HTTP body from a client request.
"""
if "expect" in headers:
# FIXME: Should be forwarded upstream
expect = ",".join(headers['expect'])
if expect == "100-continue" and httpversion >= (1, 1):
if "100-continue" in headers['expect'] and httpversion >= (1, 1):
wfile.write('HTTP/1.1 100 Continue\r\n')
wfile.write('Proxy-agent: %s\r\n'%version.NAMEVERSION)
wfile.write('\r\n')
del headers['expect']
return read_http_body(rfile, headers, False, limit)
return read_http_body(400, rfile, headers, False, limit)
def read_http_body_response(rfile, headers, False, limit):
"""
Read the HTTP body from a server response.
"""
return read_http_body(500, rfile, headers, False, limit)

View File

@ -2,6 +2,11 @@ import cStringIO, textwrap
from netlib import http, odict
import tutils
def test_httperror():
e = http.HttpError(404, "Not found")
assert str(e)
def test_has_chunked_encoding():
h = odict.ODictCaseless()
assert not http.has_chunked_encoding(h)
@ -11,19 +16,25 @@ def test_has_chunked_encoding():
def test_read_chunked():
s = cStringIO.StringIO("1\r\na\r\n0\r\n")
tutils.raises(IOError, http.read_chunked, s, None)
tutils.raises("closed prematurely", http.read_chunked, 500, s, None)
s = cStringIO.StringIO("1\r\na\r\n0\r\n\r\n")
assert http.read_chunked(s, None) == "a"
assert http.read_chunked(500, s, None) == "a"
s = cStringIO.StringIO("\r\n\r\n1\r\na\r\n0\r\n\r\n")
assert http.read_chunked(500, s, None) == "a"
s = cStringIO.StringIO("\r\n")
tutils.raises(IOError, http.read_chunked, s, None)
tutils.raises("closed prematurely", http.read_chunked, 500, s, None)
s = cStringIO.StringIO("1\r\nfoo")
tutils.raises(IOError, http.read_chunked, s, None)
tutils.raises("malformed chunked body", http.read_chunked, 500, s, None)
s = cStringIO.StringIO("foo\r\nfoo")
tutils.raises(http.HttpError, http.read_chunked, s, None)
tutils.raises(http.HttpError, http.read_chunked, 500, s, None)
s = cStringIO.StringIO("5\r\naaaaa\r\n0\r\n\r\n")
tutils.raises("too large", http.read_chunked, 500, s, 2)
def test_request_connection_close():
@ -34,27 +45,63 @@ def test_request_connection_close():
h["connection"] = ["keep-alive"]
assert not http.request_connection_close((1, 1), h)
h["connection"] = ["close"]
assert http.request_connection_close((1, 1), h)
def test_response_connection_close():
h = odict.ODictCaseless()
assert http.response_connection_close((1, 1), h)
h["content-length"] = [10]
assert not http.response_connection_close((1, 1), h)
h["connection"] = ["close"]
assert http.response_connection_close((1, 1), h)
def test_read_http_body_response():
h = odict.ODictCaseless()
h["content-length"] = [7]
s = cStringIO.StringIO("testing")
assert http.read_http_body_response(s, h, False, None) == "testing"
def test_read_http_body_request():
h = odict.ODictCaseless()
h["expect"] = ["100-continue"]
r = cStringIO.StringIO("testing")
w = cStringIO.StringIO()
assert http.read_http_body_request(r, w, h, (1, 1), None) == ""
assert "100 Continue" in w.getvalue()
def test_read_http_body():
h = odict.ODict()
h = odict.ODictCaseless()
s = cStringIO.StringIO("testing")
assert http.read_http_body(s, h, False, None) == ""
assert http.read_http_body(500, s, h, False, None) == ""
h["content-length"] = ["foo"]
s = cStringIO.StringIO("testing")
tutils.raises(http.HttpError, http.read_http_body, s, h, False, None)
tutils.raises(http.HttpError, http.read_http_body, 500, s, h, False, None)
h["content-length"] = [5]
s = cStringIO.StringIO("testing")
assert len(http.read_http_body(s, h, False, None)) == 5
assert len(http.read_http_body(500, s, h, False, None)) == 5
s = cStringIO.StringIO("testing")
tutils.raises(http.HttpError, http.read_http_body, s, h, False, 4)
tutils.raises(http.HttpError, http.read_http_body, 500, s, h, False, 4)
h = odict.ODict()
h = odict.ODictCaseless()
s = cStringIO.StringIO("testing")
assert len(http.read_http_body(s, h, True, 4)) == 4
assert len(http.read_http_body(500, s, h, True, 4)) == 4
s = cStringIO.StringIO("testing")
assert len(http.read_http_body(s, h, True, 100)) == 7
assert len(http.read_http_body(500, s, h, True, 100)) == 7
h = odict.ODictCaseless()
h["transfer-encoding"] = ["chunked"]
s = cStringIO.StringIO("5\r\naaaaa\r\n0\r\n\r\n")
assert http.read_http_body(500, s, h, True, 100) == "aaaaa"
def test_parse_http_protocol():
assert http.parse_http_protocol("HTTP/1.1") == (1, 1)