Decouple read_from_buffer's search for the endpoint from consuming the data.

This lets us call find_read_pos from read_to_buffer_loop, avoiding some
unnecessary reads (e.g. it previously took a minimum of two recv calls
to serve an http request, but now we can do it in one).
This commit is contained in:
Ben Darnell 2014-05-11 23:43:55 -04:00
parent 4ee968c1d5
commit f16ef3c1da
1 changed files with 52 additions and 29 deletions

View File

@ -493,6 +493,7 @@ class BaseIOStream(object):
target_bytes = None target_bytes = None
else: else:
target_bytes = 0 target_bytes = 0
next_find_pos = 0
# Pretend to have a pending callback so that an EOF in # Pretend to have a pending callback so that an EOF in
# _read_to_buffer doesn't trigger an immediate close # _read_to_buffer doesn't trigger an immediate close
# callback. At the end of this method we'll either # callback. At the end of this method we'll either
@ -513,31 +514,42 @@ class BaseIOStream(object):
if self._read_to_buffer() == 0: if self._read_to_buffer() == 0:
break break
self._run_streaming_callback()
# If we've read all the bytes we can use, break out of # If we've read all the bytes we can use, break out of
# this loop. It would be better to call # this loop. We can't just call read_from_buffer here
# read_from_buffer here, but # because of subtle interactions with the
# A) this has subtle interactions with the # pending_callback and error_listener mechanisms.
# pending_callback and error_listener mechanisms #
# B) Simply calling read_from_buffer on every iteration # If we've reached target_bytes, we know we're done.
# is inefficient for delimited reads.
# TODO: restructure this so we can call read_from_buffer
# for unbounded delimited reads.
if (target_bytes is not None and if (target_bytes is not None and
self._read_buffer_size >= target_bytes): self._read_buffer_size >= target_bytes):
break break
# Otherwise, we need to call the more expensive find_read_pos.
# It's inefficient to do this on every read, so instead
# do it on the first read and whenever the read buffer
# size has doubled.
if self._read_buffer_size >= next_find_pos:
pos = self._find_read_pos()
if pos is not None:
return pos
next_find_pos = self._read_buffer_size * 2
return self._find_read_pos()
finally: finally:
self._pending_callbacks -= 1 self._pending_callbacks -= 1
def _handle_read(self): def _handle_read(self):
try: try:
self._read_to_buffer_loop() pos = self._read_to_buffer_loop()
except UnsatisfiableReadError: except UnsatisfiableReadError:
raise raise
except Exception: except Exception:
gen_log.warning("error on read", exc_info=True) gen_log.warning("error on read", exc_info=True)
self.close(exc_info=True) self.close(exc_info=True)
return return
if self._read_from_buffer(): if pos is not None:
self._read_from_buffer(pos)
return return
else: else:
self._maybe_run_close_callback() self._maybe_run_close_callback()
@ -578,11 +590,14 @@ class BaseIOStream(object):
listening for reads on the socket. listening for reads on the socket.
""" """
# See if we've already got the data from a previous read # See if we've already got the data from a previous read
if self._read_from_buffer(): self._run_streaming_callback()
pos = self._find_read_pos()
if pos is not None:
self._read_from_buffer(pos)
return return
self._check_closed() self._check_closed()
try: try:
self._read_to_buffer_loop() pos = self._read_to_buffer_loop()
except Exception: except Exception:
# If there was an in _read_to_buffer, we called close() already, # If there was an in _read_to_buffer, we called close() already,
# but couldn't run the close callback because of _pending_callbacks. # but couldn't run the close callback because of _pending_callbacks.
@ -590,7 +605,8 @@ class BaseIOStream(object):
# applicable. # applicable.
self._maybe_run_close_callback() self._maybe_run_close_callback()
raise raise
if self._read_from_buffer(): if pos is not None:
self._read_from_buffer(pos)
return return
# We couldn't satisfy the read inline, so either close the stream # We couldn't satisfy the read inline, so either close the stream
# or listen for new data. # or listen for new data.
@ -628,25 +644,36 @@ class BaseIOStream(object):
raise IOError("Reached maximum read buffer size") raise IOError("Reached maximum read buffer size")
return len(chunk) return len(chunk)
def _read_from_buffer(self): def _run_streaming_callback(self):
"""Attempts to complete the currently-pending read from the buffer.
Returns True if the read was completed.
"""
if self._streaming_callback is not None and self._read_buffer_size: if self._streaming_callback is not None and self._read_buffer_size:
bytes_to_consume = self._read_buffer_size bytes_to_consume = self._read_buffer_size
if self._read_bytes is not None: if self._read_bytes is not None:
bytes_to_consume = min(self._read_bytes, bytes_to_consume) bytes_to_consume = min(self._read_bytes, bytes_to_consume)
self._read_bytes -= bytes_to_consume self._read_bytes -= bytes_to_consume
self._run_read_callback(bytes_to_consume, True) self._run_read_callback(bytes_to_consume, True)
def _read_from_buffer(self, pos):
"""Attempts to complete the currently-pending read from the buffer.
The argument is either a position in the read buffer or None,
as returned by _find_read_pos.
"""
self._read_bytes = self._read_delimiter = self._read_regex = None
self._read_partial = False
self._run_read_callback(pos, False)
def _find_read_pos(self):
"""Attempts to find a position in the read buffer that satisfies
the currently-pending read.
Returns a position in the buffer if the current read can be satisfied,
or None if it cannot.
"""
if (self._read_bytes is not None and if (self._read_bytes is not None and
(self._read_buffer_size >= self._read_bytes or (self._read_buffer_size >= self._read_bytes or
(self._read_partial and self._read_buffer_size > 0))): (self._read_partial and self._read_buffer_size > 0))):
num_bytes = min(self._read_bytes, self._read_buffer_size) num_bytes = min(self._read_bytes, self._read_buffer_size)
self._read_bytes = None return num_bytes
self._read_partial = False
self._run_read_callback(num_bytes, False)
return True
elif self._read_delimiter is not None: elif self._read_delimiter is not None:
# Multi-byte delimiters (e.g. '\r\n') may straddle two # Multi-byte delimiters (e.g. '\r\n') may straddle two
# chunks in the read buffer, so we can't easily find them # chunks in the read buffer, so we can't easily find them
@ -663,9 +690,7 @@ class BaseIOStream(object):
delimiter_len = len(self._read_delimiter) delimiter_len = len(self._read_delimiter)
self._check_max_bytes(self._read_delimiter, self._check_max_bytes(self._read_delimiter,
loc + delimiter_len) loc + delimiter_len)
self._read_delimiter = None return loc + delimiter_len
self._run_read_callback(loc + delimiter_len, False)
return True
if len(self._read_buffer) == 1: if len(self._read_buffer) == 1:
break break
_double_prefix(self._read_buffer) _double_prefix(self._read_buffer)
@ -677,15 +702,13 @@ class BaseIOStream(object):
m = self._read_regex.search(self._read_buffer[0]) m = self._read_regex.search(self._read_buffer[0])
if m is not None: if m is not None:
self._check_max_bytes(self._read_regex, m.end()) self._check_max_bytes(self._read_regex, m.end())
self._read_regex = None return m.end()
self._run_read_callback(m.end(), False)
return True
if len(self._read_buffer) == 1: if len(self._read_buffer) == 1:
break break
_double_prefix(self._read_buffer) _double_prefix(self._read_buffer)
self._check_max_bytes(self._read_regex, self._check_max_bytes(self._read_regex,
len(self._read_buffer[0])) len(self._read_buffer[0]))
return False return None
def _check_max_bytes(self, delimiter, size): def _check_max_bytes(self, delimiter, size):
if (self._read_max_bytes is not None and if (self._read_max_bytes is not None and