Increase performance of IOStream.read_until and read_until_regex.

_handle_read and _try_inline_read now only call _read_from_buffer
once, after calling _read_to_buffer as many times as they can.  This
allows the progressive _double_prefix calls in _read_from_buffer to
work as efficiently as possible.

In testing with a 4MB read, performance improved by a factor of 32 (8
seconds to 0.25 seconds)
This commit is contained in:
Ben Darnell 2012-03-25 17:12:00 -07:00
parent c86b8ce005
commit 41463a9851
2 changed files with 66 additions and 29 deletions

View File

@ -217,12 +217,16 @@ class IOStream(object):
self._state = None
self.socket.close()
self.socket = None
if self._close_callback and self._pending_callbacks == 0:
# if there are pending callbacks, don't run the close callback
# until they're done (see _maybe_add_error_handler)
cb = self._close_callback
self._close_callback = None
self._run_callback(cb)
self._maybe_run_close_callback()
def _maybe_run_close_callback(self):
if (self.socket is None and self._close_callback and
self._pending_callbacks == 0):
# if there are pending callbacks, don't run the close callback
# until they're done (see _maybe_add_error_handler)
cb = self._close_callback
self._close_callback = None
self._run_callback(cb)
def reading(self):
"""Returns true if we are currently reading from the stream."""
@ -310,22 +314,38 @@ class IOStream(object):
self.io_loop.add_callback(wrapper)
def _handle_read(self):
while True:
try:
try:
# Read from the socket until we get EWOULDBLOCK or equivalent.
# SSL sockets do some internal buffering, and if the data is
# sitting in the SSL object's buffer select() and friends
# can't see it; the only way to find out if it's there is to
# try to read it.
result = self._read_to_buffer()
except Exception:
self.close()
return
if result == 0:
break
else:
if self._read_from_buffer():
return
# Pretend to have a pending callback so that an EOF in
# _read_to_buffer doesn't trigger an immediate close
# callback. At the end of this method we'll either
# estabilsh a real pending callback via
# _read_from_buffer or run the close callback.
#
# We need two try statements here so that
# pending_callbacks is decremented before the `except`
# clause below (which calls `close` and does need to
# trigger the callback)
self._pending_callbacks += 1
while True:
# Read from the socket until we get EWOULDBLOCK or equivalent.
# SSL sockets do some internal buffering, and if the data is
# sitting in the SSL object's buffer select() and friends
# can't see it; the only way to find out if it's there is to
# try to read it.
if self._read_to_buffer() == 0:
break
finally:
self._pending_callbacks -= 1
except Exception:
logging.warning("error on read", exc_info=True)
self.close()
return
if self._read_from_buffer():
return
else:
self._maybe_run_close_callback()
def _set_read_callback(self, callback):
assert not self._read_callback, "Already reading"
@ -338,13 +358,16 @@ class IOStream(object):
read callback on the next IOLoop iteration; otherwise starts
listening for reads on the socket.
"""
# See if we've already got the data from a previous read
if self._read_from_buffer():
return
self._check_closed()
while True:
# See if we've already got the data from a previous read
if self._read_from_buffer():
return
self._check_closed()
if self._read_to_buffer() == 0:
break
self._check_closed()
if self._read_from_buffer():
return
self._add_io_state(self.io_loop.READ)
def _read_from_socket(self):
@ -520,10 +543,7 @@ class IOStream(object):
def _maybe_add_error_listener(self):
if self._state is None and self._pending_callbacks == 0:
if self.socket is None:
cb = self._close_callback
if cb is not None:
self._close_callback = None
self._run_callback(cb)
self._maybe_run_close_callback()
else:
self._add_io_state(ioloop.IOLoop.READ)

View File

@ -216,3 +216,20 @@ class TestIOStream(AsyncHTTPTestCase, LogTrapTestCase):
finally:
server.close()
client.close()
def test_large_read_until(self):
# Performance test: read_until used to have a quadratic component
# so a read_until of 4MB would take 8 seconds; now it takes 0.25
# seconds.
server, client = self.make_iostream_pair()
try:
NUM_KB = 4096
for i in xrange(NUM_KB):
client.write(b("A") * 1024)
client.write(b("\r\n"))
server.read_until(b("\r\n"), self.stop)
data = self.wait()
self.assertEqual(len(data), NUM_KB * 1024 + 2)
finally:
server.close()
client.close()