Add pipeline response parsing tests (#137)
* Add pipeline response parsing tests * build_http_response now only adds content-length if transfer-encoding is not provided. Also return pending raw chunks from ChunkParser so that we can parse pipelined chunk responses.
This commit is contained in:
parent
69445a8921
commit
c77f8b5789
47
benchmark.py
47
benchmark.py
|
@ -56,27 +56,46 @@ class Benchmark:
|
|||
writer.write(proxy.build_http_request(
|
||||
proxy.httpMethods.GET, b'/'
|
||||
))
|
||||
# await asyncio.sleep(0.1)
|
||||
await asyncio.sleep(0.01)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def parse_pipeline_response(response: proxy.HttpParser, raw: bytes, counter: int = 0) -> \
|
||||
Tuple[proxy.HttpParser, int]:
|
||||
response.parse(raw)
|
||||
if response.state != proxy.httpParserStates.COMPLETE:
|
||||
# Need more data
|
||||
return response, counter
|
||||
|
||||
if response.buffer == b'':
|
||||
# No more buffer left to parse
|
||||
return response, counter + 1
|
||||
|
||||
# For pipelined requests we may have pending buffer, try parse them as responses
|
||||
pipelined_response = proxy.HttpParser(proxy.httpParserTypes.RESPONSE_PARSER)
|
||||
return Benchmark.parse_pipeline_response(pipelined_response, response.buffer, counter + 1)
|
||||
|
||||
@staticmethod
|
||||
async def recv(idd: int, reader: asyncio.StreamReader) -> None:
|
||||
last_status_time = time.time()
|
||||
num_completed_requests_per_connection: int = 0
|
||||
print_every = 1000
|
||||
last_print = time.time()
|
||||
num_completed_requests: int = 0
|
||||
response = proxy.HttpParser(proxy.httpParserTypes.RESPONSE_PARSER)
|
||||
try:
|
||||
while True:
|
||||
response = proxy.HttpParser(proxy.httpParserTypes.RESPONSE_PARSER)
|
||||
while response.state != proxy.httpParserStates.COMPLETE:
|
||||
raw = await reader.read(proxy.DEFAULT_BUFFER_SIZE)
|
||||
print(raw)
|
||||
response.parse(raw)
|
||||
|
||||
num_completed_requests_per_connection += 1
|
||||
if num_completed_requests_per_connection % 50 == 0:
|
||||
now = time.time()
|
||||
print('[%d] Made 50 requests in last %.2f seconds' % (idd, now - last_status_time))
|
||||
last_status_time = now
|
||||
raw = await reader.read(proxy.DEFAULT_BUFFER_SIZE)
|
||||
response, total_parsed = Benchmark.parse_pipeline_response(response, raw)
|
||||
if response.state == proxy.httpParserStates.COMPLETE:
|
||||
response = proxy.HttpParser(proxy.httpParserTypes.RESPONSE_PARSER)
|
||||
if total_parsed > 0:
|
||||
num_completed_requests += total_parsed
|
||||
# print('total parsed %d' % total_parsed)
|
||||
if num_completed_requests % print_every == 0:
|
||||
now = time.time()
|
||||
print('[%d] Completed last %d requests in %.2f secs' %
|
||||
(idd, print_every, now - last_print))
|
||||
last_print = now
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
|
|
34
proxy.py
34
proxy.py
|
@ -257,8 +257,16 @@ def build_http_response(status_code: int,
|
|||
line.append(reason)
|
||||
if headers is None:
|
||||
headers = {}
|
||||
if body is not None and not any(
|
||||
k.lower() == b'content-length' for k in headers):
|
||||
has_content_length = False
|
||||
has_transfer_encoding = False
|
||||
for k in headers:
|
||||
if k.lower() == b'content-length':
|
||||
has_content_length = True
|
||||
if k.lower() == b'transfer-encoding':
|
||||
has_transfer_encoding = True
|
||||
if body is not None and \
|
||||
not has_transfer_encoding and \
|
||||
not has_content_length:
|
||||
headers[b'Content-Length'] = bytes_(len(body))
|
||||
return build_http_pkt(line, headers, body)
|
||||
|
||||
|
@ -501,10 +509,11 @@ class ChunkParser:
|
|||
# Expected size of next following chunk
|
||||
self.size: Optional[int] = None
|
||||
|
||||
def parse(self, raw: bytes) -> None:
|
||||
def parse(self, raw: bytes) -> bytes:
|
||||
more = True if len(raw) > 0 else False
|
||||
while more:
|
||||
while more and self.state != chunkParserStates.COMPLETE:
|
||||
more, raw = self.process(raw)
|
||||
return raw
|
||||
|
||||
def process(self, raw: bytes) -> Tuple[bool, bytes]:
|
||||
if self.state == chunkParserStates.WAITING_FOR_SIZE:
|
||||
|
@ -651,27 +660,29 @@ class HttpParser:
|
|||
self.buffer = b''
|
||||
|
||||
more = True if len(raw) > 0 else False
|
||||
while more:
|
||||
while more and self.state != httpParserStates.COMPLETE:
|
||||
if self.state in (
|
||||
httpParserStates.HEADERS_COMPLETE,
|
||||
httpParserStates.RCVING_BODY,
|
||||
httpParserStates.COMPLETE):
|
||||
httpParserStates.RCVING_BODY):
|
||||
if b'content-length' in self.headers:
|
||||
self.state = httpParserStates.RCVING_BODY
|
||||
if self.body is None:
|
||||
self.body = b''
|
||||
self.body += raw
|
||||
total_size = int(self.header(b'content-length'))
|
||||
received_size = len(self.body)
|
||||
self.body += raw[:total_size - received_size]
|
||||
if self.body and \
|
||||
len(self.body) >= int(self.header(b'content-length')):
|
||||
len(self.body) == int(self.header(b'content-length')):
|
||||
self.state = httpParserStates.COMPLETE
|
||||
more, raw = len(raw) > 0, raw[total_size - received_size:]
|
||||
elif self.is_chunked_encoded():
|
||||
if not self.chunk_parser:
|
||||
self.chunk_parser = ChunkParser()
|
||||
self.chunk_parser.parse(raw)
|
||||
raw = self.chunk_parser.parse(raw)
|
||||
if self.chunk_parser.state == chunkParserStates.COMPLETE:
|
||||
self.body = self.chunk_parser.body
|
||||
self.state = httpParserStates.COMPLETE
|
||||
more, raw = False, b''
|
||||
more = False
|
||||
else:
|
||||
more, raw = self.process(raw)
|
||||
self.buffer = raw
|
||||
|
@ -713,6 +724,7 @@ class HttpParser:
|
|||
elif self.state == httpParserStates.HEADERS_COMPLETE and \
|
||||
self.type == httpParserTypes.REQUEST_PARSER and \
|
||||
self.method == httpMethods.POST and \
|
||||
not self.is_chunked_encoded() and \
|
||||
(b'content-length' not in self.headers or
|
||||
(b'content-length' in self.headers and
|
||||
int(self.headers[b'content-length'][1]) == 0)) and \
|
||||
|
|
35
tests.py
35
tests.py
|
@ -874,6 +874,41 @@ class TestHttpParser(unittest.TestCase):
|
|||
self.assertEqual(self.parser.body, b'Wikipedia in\r\n\r\nchunks.')
|
||||
self.assertEqual(self.parser.state, proxy.httpParserStates.COMPLETE)
|
||||
|
||||
def test_pipelined_response_parse(self) -> None:
|
||||
response = proxy.build_http_response(
|
||||
proxy.httpStatusCodes.OK, reason=b'OK',
|
||||
headers={
|
||||
b'Content-Length': b'15'
|
||||
},
|
||||
body=b'{"key":"value"}',
|
||||
)
|
||||
self.assert_pipeline_response(response)
|
||||
|
||||
def test_pipelined_chunked_response_parse(self) -> None:
|
||||
response = proxy.build_http_response(
|
||||
proxy.httpStatusCodes.OK, reason=b'OK',
|
||||
headers={
|
||||
b'Transfer-Encoding': b'chunked',
|
||||
b'Content-Type': b'application/json',
|
||||
},
|
||||
body=b'f\r\n{"key":"value"}\r\n0\r\n\r\n'
|
||||
)
|
||||
self.assert_pipeline_response(response)
|
||||
|
||||
def assert_pipeline_response(self, response: bytes) -> None:
|
||||
self.parser = proxy.HttpParser(proxy.httpParserTypes.RESPONSE_PARSER)
|
||||
self.parser.parse(response + response)
|
||||
self.assertEqual(self.parser.state, proxy.httpParserStates.COMPLETE)
|
||||
self.assertEqual(self.parser.body, b'{"key":"value"}')
|
||||
self.assertEqual(self.parser.buffer, response)
|
||||
|
||||
# parse buffer
|
||||
parser = proxy.HttpParser(proxy.httpParserTypes.RESPONSE_PARSER)
|
||||
parser.parse(self.parser.buffer)
|
||||
self.assertEqual(parser.state, proxy.httpParserStates.COMPLETE)
|
||||
self.assertEqual(parser.body, b'{"key":"value"}')
|
||||
self.assertEqual(parser.buffer, b'')
|
||||
|
||||
def test_chunked_request_parse(self) -> None:
|
||||
self.parser.parse(proxy.build_http_request(
|
||||
proxy.httpMethods.POST, b'http://example.org/',
|
||||
|
|
Loading…
Reference in New Issue