Avoid merging the entire read buffer for IOStream.read_until.
Among other things, this dramatically speeds up downloads of large chunked files over a fast network with SimpleHTTPClient. Fixes #425.
This commit is contained in:
parent
0b31d8de4c
commit
7f5d4de759
|
@ -0,0 +1,47 @@
|
|||
#!/usr/bin/env python
|
||||
#
|
||||
# Downloads a large file in chunked encoding with both curl and simple clients
|
||||
|
||||
import logging
|
||||
from tornado.curl_httpclient import CurlAsyncHTTPClient
|
||||
from tornado.simple_httpclient import SimpleAsyncHTTPClient
|
||||
from tornado.ioloop import IOLoop
|
||||
from tornado.options import define, options, parse_command_line
|
||||
from tornado.web import RequestHandler, Application
|
||||
|
||||
define('port', default=8888)
|
||||
define('num_chunks', default=1000)
|
||||
define('chunk_size', default=2048)
|
||||
|
||||
class ChunkHandler(RequestHandler):
|
||||
def get(self):
|
||||
for i in xrange(options.num_chunks):
|
||||
self.write('A' * options.chunk_size)
|
||||
self.flush()
|
||||
self.finish()
|
||||
|
||||
def main():
|
||||
parse_command_line()
|
||||
app = Application([('/', ChunkHandler)])
|
||||
app.listen(options.port, address='127.0.0.1')
|
||||
def callback(response):
|
||||
response.rethrow()
|
||||
assert len(response.body) == (options.num_chunks * options.chunk_size)
|
||||
logging.warning("fetch completed in %s seconds", response.request_time)
|
||||
IOLoop.instance().stop()
|
||||
|
||||
logging.warning("Starting fetch with curl client")
|
||||
curl_client = CurlAsyncHTTPClient()
|
||||
curl_client.fetch('http://localhost:%d/' % options.port,
|
||||
callback=callback)
|
||||
IOLoop.instance().start()
|
||||
|
||||
logging.warning("Starting fetch with simple client")
|
||||
simple_client = SimpleAsyncHTTPClient()
|
||||
simple_client.fetch('http://localhost:%d/' % options.port,
|
||||
callback=callback)
|
||||
IOLoop.instance().start()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
|
@ -412,8 +412,25 @@ class IOStream(object):
|
|||
self._run_callback(callback, self._consume(num_bytes))
|
||||
return True
|
||||
elif self._read_delimiter is not None:
|
||||
_merge_prefix(self._read_buffer, sys.maxint)
|
||||
loc = self._read_buffer[0].find(self._read_delimiter)
|
||||
# Multi-byte delimiters (e.g. '\r\n') may straddle two
|
||||
# chunks in the read buffer, so we can't easily find them
|
||||
# without collapsing the buffer. However, since protocols
|
||||
# using delimited reads (as opposed to reads of a known
|
||||
# length) tend to be "line" oriented, the delimiter is likely
|
||||
# to be in the first few chunks. Merge the buffer gradually
|
||||
# since large merges are relatively expensive and get undone in
|
||||
# consume().
|
||||
loc = -1
|
||||
if self._read_buffer:
|
||||
loc = self._read_buffer[0].find(self._read_delimiter)
|
||||
while loc == -1 and len(self._read_buffer) > 1:
|
||||
# Grow by doubling, but don't split the second chunk just
|
||||
# because the first one is small.
|
||||
new_len = max(len(self._read_buffer[0]) * 2,
|
||||
(len(self._read_buffer[0]) +
|
||||
len(self._read_buffer[1])))
|
||||
_merge_prefix(self._read_buffer, new_len)
|
||||
loc = self._read_buffer[0].find(self._read_delimiter)
|
||||
if loc != -1:
|
||||
callback = self._read_callback
|
||||
delimiter_len = len(self._read_delimiter)
|
||||
|
@ -424,6 +441,17 @@ class IOStream(object):
|
|||
self._consume(loc + delimiter_len))
|
||||
return True
|
||||
elif self._read_regex is not None:
|
||||
m = None
|
||||
if self._read_buffer:
|
||||
m = self._read_regex.search(self._read_buffer[0])
|
||||
while m is None and len(self._read_buffer) > 1:
|
||||
# Grow by doubling, but don't split the second chunk just
|
||||
# because the first one is small.
|
||||
new_len = max(len(self._read_buffer[0]) * 2,
|
||||
(len(self._read_buffer[0]) +
|
||||
len(self._read_buffer[1])))
|
||||
_merge_prefix(self._read_buffer, new_len)
|
||||
m = self._read_regex.search(self._read_buffer[0])
|
||||
_merge_prefix(self._read_buffer, sys.maxint)
|
||||
m = self._read_regex.search(self._read_buffer[0])
|
||||
if m:
|
||||
|
|
Loading…
Reference in New Issue