From 7f5d4de759e0c5050e506b4c3f55a46e5df1a102 Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Fri, 30 Dec 2011 16:02:40 -0800 Subject: [PATCH] Avoid merging the entire read buffer for IOStream.read_until. Among other things, this dramatically speeds up downloads of large chunked files over a fast network with SimpleHTTPClient. Fixes #425. --- demos/benchmark/chunk_benchmark.py | 47 ++++++++++++++++++++++++++++++ tornado/iostream.py | 32 ++++++++++++++++++-- 2 files changed, 77 insertions(+), 2 deletions(-) create mode 100755 demos/benchmark/chunk_benchmark.py diff --git a/demos/benchmark/chunk_benchmark.py b/demos/benchmark/chunk_benchmark.py new file mode 100755 index 00000000..1502838a --- /dev/null +++ b/demos/benchmark/chunk_benchmark.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python +# +# Downloads a large file in chunked encoding with both curl and simple clients + +import logging +from tornado.curl_httpclient import CurlAsyncHTTPClient +from tornado.simple_httpclient import SimpleAsyncHTTPClient +from tornado.ioloop import IOLoop +from tornado.options import define, options, parse_command_line +from tornado.web import RequestHandler, Application + +define('port', default=8888) +define('num_chunks', default=1000) +define('chunk_size', default=2048) + +class ChunkHandler(RequestHandler): + def get(self): + for i in xrange(options.num_chunks): + self.write('A' * options.chunk_size) + self.flush() + self.finish() + +def main(): + parse_command_line() + app = Application([('/', ChunkHandler)]) + app.listen(options.port, address='127.0.0.1') + def callback(response): + response.rethrow() + assert len(response.body) == (options.num_chunks * options.chunk_size) + logging.warning("fetch completed in %s seconds", response.request_time) + IOLoop.instance().stop() + + logging.warning("Starting fetch with curl client") + curl_client = CurlAsyncHTTPClient() + curl_client.fetch('http://localhost:%d/' % options.port, + callback=callback) + IOLoop.instance().start() + + logging.warning("Starting fetch with simple client") + simple_client = SimpleAsyncHTTPClient() + simple_client.fetch('http://localhost:%d/' % options.port, + callback=callback) + IOLoop.instance().start() + + +if __name__ == '__main__': + main() diff --git a/tornado/iostream.py b/tornado/iostream.py index 6b02b19a..460f7f34 100644 --- a/tornado/iostream.py +++ b/tornado/iostream.py @@ -412,8 +412,25 @@ class IOStream(object): self._run_callback(callback, self._consume(num_bytes)) return True elif self._read_delimiter is not None: - _merge_prefix(self._read_buffer, sys.maxint) - loc = self._read_buffer[0].find(self._read_delimiter) + # Multi-byte delimiters (e.g. '\r\n') may straddle two + # chunks in the read buffer, so we can't easily find them + # without collapsing the buffer. However, since protocols + # using delimited reads (as opposed to reads of a known + # length) tend to be "line" oriented, the delimiter is likely + # to be in the first few chunks. Merge the buffer gradually + # since large merges are relatively expensive and get undone in + # consume(). + loc = -1 + if self._read_buffer: + loc = self._read_buffer[0].find(self._read_delimiter) + while loc == -1 and len(self._read_buffer) > 1: + # Grow by doubling, but don't split the second chunk just + # because the first one is small. + new_len = max(len(self._read_buffer[0]) * 2, + (len(self._read_buffer[0]) + + len(self._read_buffer[1]))) + _merge_prefix(self._read_buffer, new_len) + loc = self._read_buffer[0].find(self._read_delimiter) if loc != -1: callback = self._read_callback delimiter_len = len(self._read_delimiter) @@ -424,6 +441,17 @@ class IOStream(object): self._consume(loc + delimiter_len)) return True elif self._read_regex is not None: + m = None + if self._read_buffer: + m = self._read_regex.search(self._read_buffer[0]) + while m is None and len(self._read_buffer) > 1: + # Grow by doubling, but don't split the second chunk just + # because the first one is small. + new_len = max(len(self._read_buffer[0]) * 2, + (len(self._read_buffer[0]) + + len(self._read_buffer[1]))) + _merge_prefix(self._read_buffer, new_len) + m = self._read_regex.search(self._read_buffer[0]) _merge_prefix(self._read_buffer, sys.maxint) m = self._read_regex.search(self._read_buffer[0]) if m: