From f16ef3c1da730a8235c5305b503037c4aa5602bc Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Sun, 11 May 2014 23:43:55 -0400 Subject: [PATCH] Decouple read_from_buffer's search for the endpoint from consuming the data. This lets us call find_read_pos from read_to_buffer_loop, avoiding some unnecessary reads (e.g. it previously took a minimum of two recv calls to serve an http request, but now we can do it in one). --- tornado/iostream.py | 81 +++++++++++++++++++++++++++++---------------- 1 file changed, 52 insertions(+), 29 deletions(-) diff --git a/tornado/iostream.py b/tornado/iostream.py index e5423fe6..7f2e3649 100644 --- a/tornado/iostream.py +++ b/tornado/iostream.py @@ -493,6 +493,7 @@ class BaseIOStream(object): target_bytes = None else: target_bytes = 0 + next_find_pos = 0 # Pretend to have a pending callback so that an EOF in # _read_to_buffer doesn't trigger an immediate close # callback. At the end of this method we'll either @@ -513,31 +514,42 @@ class BaseIOStream(object): if self._read_to_buffer() == 0: break + self._run_streaming_callback() + # If we've read all the bytes we can use, break out of - # this loop. It would be better to call - # read_from_buffer here, but - # A) this has subtle interactions with the - # pending_callback and error_listener mechanisms - # B) Simply calling read_from_buffer on every iteration - # is inefficient for delimited reads. - # TODO: restructure this so we can call read_from_buffer - # for unbounded delimited reads. + # this loop. We can't just call read_from_buffer here + # because of subtle interactions with the + # pending_callback and error_listener mechanisms. + # + # If we've reached target_bytes, we know we're done. if (target_bytes is not None and self._read_buffer_size >= target_bytes): break + + # Otherwise, we need to call the more expensive find_read_pos. + # It's inefficient to do this on every read, so instead + # do it on the first read and whenever the read buffer + # size has doubled. + if self._read_buffer_size >= next_find_pos: + pos = self._find_read_pos() + if pos is not None: + return pos + next_find_pos = self._read_buffer_size * 2 + return self._find_read_pos() finally: self._pending_callbacks -= 1 def _handle_read(self): try: - self._read_to_buffer_loop() + pos = self._read_to_buffer_loop() except UnsatisfiableReadError: raise except Exception: gen_log.warning("error on read", exc_info=True) self.close(exc_info=True) return - if self._read_from_buffer(): + if pos is not None: + self._read_from_buffer(pos) return else: self._maybe_run_close_callback() @@ -578,11 +590,14 @@ class BaseIOStream(object): listening for reads on the socket. """ # See if we've already got the data from a previous read - if self._read_from_buffer(): + self._run_streaming_callback() + pos = self._find_read_pos() + if pos is not None: + self._read_from_buffer(pos) return self._check_closed() try: - self._read_to_buffer_loop() + pos = self._read_to_buffer_loop() except Exception: # If there was an in _read_to_buffer, we called close() already, # but couldn't run the close callback because of _pending_callbacks. @@ -590,7 +605,8 @@ class BaseIOStream(object): # applicable. self._maybe_run_close_callback() raise - if self._read_from_buffer(): + if pos is not None: + self._read_from_buffer(pos) return # We couldn't satisfy the read inline, so either close the stream # or listen for new data. @@ -628,25 +644,36 @@ class BaseIOStream(object): raise IOError("Reached maximum read buffer size") return len(chunk) - def _read_from_buffer(self): - """Attempts to complete the currently-pending read from the buffer. - - Returns True if the read was completed. - """ + def _run_streaming_callback(self): if self._streaming_callback is not None and self._read_buffer_size: bytes_to_consume = self._read_buffer_size if self._read_bytes is not None: bytes_to_consume = min(self._read_bytes, bytes_to_consume) self._read_bytes -= bytes_to_consume self._run_read_callback(bytes_to_consume, True) + + def _read_from_buffer(self, pos): + """Attempts to complete the currently-pending read from the buffer. + + The argument is either a position in the read buffer or None, + as returned by _find_read_pos. + """ + self._read_bytes = self._read_delimiter = self._read_regex = None + self._read_partial = False + self._run_read_callback(pos, False) + + def _find_read_pos(self): + """Attempts to find a position in the read buffer that satisfies + the currently-pending read. + + Returns a position in the buffer if the current read can be satisfied, + or None if it cannot. + """ if (self._read_bytes is not None and (self._read_buffer_size >= self._read_bytes or (self._read_partial and self._read_buffer_size > 0))): num_bytes = min(self._read_bytes, self._read_buffer_size) - self._read_bytes = None - self._read_partial = False - self._run_read_callback(num_bytes, False) - return True + return num_bytes elif self._read_delimiter is not None: # Multi-byte delimiters (e.g. '\r\n') may straddle two # chunks in the read buffer, so we can't easily find them @@ -663,9 +690,7 @@ class BaseIOStream(object): delimiter_len = len(self._read_delimiter) self._check_max_bytes(self._read_delimiter, loc + delimiter_len) - self._read_delimiter = None - self._run_read_callback(loc + delimiter_len, False) - return True + return loc + delimiter_len if len(self._read_buffer) == 1: break _double_prefix(self._read_buffer) @@ -677,15 +702,13 @@ class BaseIOStream(object): m = self._read_regex.search(self._read_buffer[0]) if m is not None: self._check_max_bytes(self._read_regex, m.end()) - self._read_regex = None - self._run_read_callback(m.end(), False) - return True + return m.end() if len(self._read_buffer) == 1: break _double_prefix(self._read_buffer) self._check_max_bytes(self._read_regex, len(self._read_buffer[0])) - return False + return None def _check_max_bytes(self, delimiter, size): if (self._read_max_bytes is not None and