diff --git a/benchmark.py b/benchmark.py index 9c69ff05..5ce54142 100755 --- a/benchmark.py +++ b/benchmark.py @@ -56,27 +56,46 @@ class Benchmark: writer.write(proxy.build_http_request( proxy.httpMethods.GET, b'/' )) - # await asyncio.sleep(0.1) + await asyncio.sleep(0.01) except KeyboardInterrupt: pass + @staticmethod + def parse_pipeline_response(response: proxy.HttpParser, raw: bytes, counter: int = 0) -> \ + Tuple[proxy.HttpParser, int]: + response.parse(raw) + if response.state != proxy.httpParserStates.COMPLETE: + # Need more data + return response, counter + + if response.buffer == b'': + # No more buffer left to parse + return response, counter + 1 + + # For pipelined requests we may have pending buffer, try parse them as responses + pipelined_response = proxy.HttpParser(proxy.httpParserTypes.RESPONSE_PARSER) + return Benchmark.parse_pipeline_response(pipelined_response, response.buffer, counter + 1) + @staticmethod async def recv(idd: int, reader: asyncio.StreamReader) -> None: - last_status_time = time.time() - num_completed_requests_per_connection: int = 0 + print_every = 1000 + last_print = time.time() + num_completed_requests: int = 0 + response = proxy.HttpParser(proxy.httpParserTypes.RESPONSE_PARSER) try: while True: - response = proxy.HttpParser(proxy.httpParserTypes.RESPONSE_PARSER) - while response.state != proxy.httpParserStates.COMPLETE: - raw = await reader.read(proxy.DEFAULT_BUFFER_SIZE) - print(raw) - response.parse(raw) - - num_completed_requests_per_connection += 1 - if num_completed_requests_per_connection % 50 == 0: - now = time.time() - print('[%d] Made 50 requests in last %.2f seconds' % (idd, now - last_status_time)) - last_status_time = now + raw = await reader.read(proxy.DEFAULT_BUFFER_SIZE) + response, total_parsed = Benchmark.parse_pipeline_response(response, raw) + if response.state == proxy.httpParserStates.COMPLETE: + response = proxy.HttpParser(proxy.httpParserTypes.RESPONSE_PARSER) + if total_parsed > 0: + num_completed_requests += total_parsed + # print('total parsed %d' % total_parsed) + if num_completed_requests % print_every == 0: + now = time.time() + print('[%d] Completed last %d requests in %.2f secs' % + (idd, print_every, now - last_print)) + last_print = now except KeyboardInterrupt: pass diff --git a/proxy.py b/proxy.py index 43245056..6631abcc 100755 --- a/proxy.py +++ b/proxy.py @@ -257,8 +257,16 @@ def build_http_response(status_code: int, line.append(reason) if headers is None: headers = {} - if body is not None and not any( - k.lower() == b'content-length' for k in headers): + has_content_length = False + has_transfer_encoding = False + for k in headers: + if k.lower() == b'content-length': + has_content_length = True + if k.lower() == b'transfer-encoding': + has_transfer_encoding = True + if body is not None and \ + not has_transfer_encoding and \ + not has_content_length: headers[b'Content-Length'] = bytes_(len(body)) return build_http_pkt(line, headers, body) @@ -501,10 +509,11 @@ class ChunkParser: # Expected size of next following chunk self.size: Optional[int] = None - def parse(self, raw: bytes) -> None: + def parse(self, raw: bytes) -> bytes: more = True if len(raw) > 0 else False - while more: + while more and self.state != chunkParserStates.COMPLETE: more, raw = self.process(raw) + return raw def process(self, raw: bytes) -> Tuple[bool, bytes]: if self.state == chunkParserStates.WAITING_FOR_SIZE: @@ -651,27 +660,29 @@ class HttpParser: self.buffer = b'' more = True if len(raw) > 0 else False - while more: + while more and self.state != httpParserStates.COMPLETE: if self.state in ( httpParserStates.HEADERS_COMPLETE, - httpParserStates.RCVING_BODY, - httpParserStates.COMPLETE): + httpParserStates.RCVING_BODY): if b'content-length' in self.headers: self.state = httpParserStates.RCVING_BODY if self.body is None: self.body = b'' - self.body += raw + total_size = int(self.header(b'content-length')) + received_size = len(self.body) + self.body += raw[:total_size - received_size] if self.body and \ - len(self.body) >= int(self.header(b'content-length')): + len(self.body) == int(self.header(b'content-length')): self.state = httpParserStates.COMPLETE + more, raw = len(raw) > 0, raw[total_size - received_size:] elif self.is_chunked_encoded(): if not self.chunk_parser: self.chunk_parser = ChunkParser() - self.chunk_parser.parse(raw) + raw = self.chunk_parser.parse(raw) if self.chunk_parser.state == chunkParserStates.COMPLETE: self.body = self.chunk_parser.body self.state = httpParserStates.COMPLETE - more, raw = False, b'' + more = False else: more, raw = self.process(raw) self.buffer = raw @@ -713,6 +724,7 @@ class HttpParser: elif self.state == httpParserStates.HEADERS_COMPLETE and \ self.type == httpParserTypes.REQUEST_PARSER and \ self.method == httpMethods.POST and \ + not self.is_chunked_encoded() and \ (b'content-length' not in self.headers or (b'content-length' in self.headers and int(self.headers[b'content-length'][1]) == 0)) and \ diff --git a/tests.py b/tests.py index dddcd2a2..b4d0e864 100644 --- a/tests.py +++ b/tests.py @@ -874,6 +874,41 @@ class TestHttpParser(unittest.TestCase): self.assertEqual(self.parser.body, b'Wikipedia in\r\n\r\nchunks.') self.assertEqual(self.parser.state, proxy.httpParserStates.COMPLETE) + def test_pipelined_response_parse(self) -> None: + response = proxy.build_http_response( + proxy.httpStatusCodes.OK, reason=b'OK', + headers={ + b'Content-Length': b'15' + }, + body=b'{"key":"value"}', + ) + self.assert_pipeline_response(response) + + def test_pipelined_chunked_response_parse(self) -> None: + response = proxy.build_http_response( + proxy.httpStatusCodes.OK, reason=b'OK', + headers={ + b'Transfer-Encoding': b'chunked', + b'Content-Type': b'application/json', + }, + body=b'f\r\n{"key":"value"}\r\n0\r\n\r\n' + ) + self.assert_pipeline_response(response) + + def assert_pipeline_response(self, response: bytes) -> None: + self.parser = proxy.HttpParser(proxy.httpParserTypes.RESPONSE_PARSER) + self.parser.parse(response + response) + self.assertEqual(self.parser.state, proxy.httpParserStates.COMPLETE) + self.assertEqual(self.parser.body, b'{"key":"value"}') + self.assertEqual(self.parser.buffer, response) + + # parse buffer + parser = proxy.HttpParser(proxy.httpParserTypes.RESPONSE_PARSER) + parser.parse(self.parser.buffer) + self.assertEqual(parser.state, proxy.httpParserStates.COMPLETE) + self.assertEqual(parser.body, b'{"key":"value"}') + self.assertEqual(parser.buffer, b'') + def test_chunked_request_parse(self) -> None: self.parser.parse(proxy.build_http_request( proxy.httpMethods.POST, b'http://example.org/',