From 1215cd2631fff0a7d93ad6e4c8f3f82f8f7bb3fa Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Sat, 20 Jan 2018 19:00:30 -0500 Subject: [PATCH] iostream: Use recv_into and friends in read_from_fd This has the same memory-allocation behavior as before, but it moves the buffer out of the recv() call to python code. --- tornado/iostream.py | 135 ++++++++++++++++++++++---------------------- 1 file changed, 68 insertions(+), 67 deletions(-) diff --git a/tornado/iostream.py b/tornado/iostream.py index 3bf05874..56b4002c 100644 --- a/tornado/iostream.py +++ b/tornado/iostream.py @@ -300,13 +300,18 @@ class BaseIOStream(object): """ raise NotImplementedError() - def read_from_fd(self): + def read_from_fd(self, buf): """Attempts to read from the underlying file. - Returns ``None`` if there was nothing to read (the socket - returned `~errno.EWOULDBLOCK` or equivalent), otherwise - returns the data. When possible, should return no more than - ``self.read_chunk_size`` bytes at a time. + Reads up to ``len(buf)`` bytes, storing them in the buffer. + Returns the number of bytes read. Returns None if there was + nothing to read (the socket returned `~errno.EWOULDBLOCK` or + equivalent), and zero on EOF. + + .. versionchanged:: 5.0 + + Interface redesigned to take a buffer and return a number + of bytes instead of a freshly-allocated object. """ raise NotImplementedError() @@ -820,31 +825,40 @@ class BaseIOStream(object): to read (i.e. the read returns EWOULDBLOCK or equivalent). On error closes the socket and raises an exception. """ - while True: - try: - chunk = self.read_from_fd() - except (socket.error, IOError, OSError) as e: - if errno_from_exception(e) == errno.EINTR: - continue - # ssl.SSLError is a subclass of socket.error - if self._is_connreset(e): - # Treat ECONNRESET as a connection close rather than - # an error to minimize log spam (the exception will - # be available on self.error for apps that care). + try: + while True: + try: + buf = bytearray(self.read_chunk_size) + bytes_read = self.read_from_fd(buf) + except (socket.error, IOError, OSError) as e: + if errno_from_exception(e) == errno.EINTR: + continue + # ssl.SSLError is a subclass of socket.error + if self._is_connreset(e): + # Treat ECONNRESET as a connection close rather than + # an error to minimize log spam (the exception will + # be available on self.error for apps that care). + self.close(exc_info=e) + return self.close(exc_info=e) - return - self.close(exc_info=e) - raise - break - if chunk is None: - return 0 - self._read_buffer += chunk - self._read_buffer_size += len(chunk) + raise + break + if bytes_read is None: + return 0 + elif bytes_read == 0: + self.close() + return 0 + self._read_buffer += memoryview(buf)[:bytes_read] + self._read_buffer_size += bytes_read + finally: + # Break the reference to buf so we don't waste a chunk's worth of + # memory in case an exception hangs on to our stack frame. + buf = None if self._read_buffer_size > self.max_buffer_size: gen_log.error("Reached maximum read buffer size") self.close() raise StreamBufferFullError("Reached maximum read buffer size") - return len(chunk) + return bytes_read def _run_streaming_callback(self): if self._streaming_callback is not None and self._read_buffer_size: @@ -1106,18 +1120,16 @@ class IOStream(BaseIOStream): socket.SO_ERROR) return socket.error(errno, os.strerror(errno)) - def read_from_fd(self): + def read_from_fd(self, buf): try: - chunk = self.socket.recv(self.read_chunk_size) + return self.socket.recv_into(buf) except socket.error as e: if e.args[0] in _ERRNO_WOULDBLOCK: return None else: raise - if not chunk: - self.close() - return None - return chunk + finally: + buf = None def write_to_fd(self, data): try: @@ -1528,35 +1540,29 @@ class SSLIOStream(IOStream): # See https://github.com/tornadoweb/tornado/pull/2008 del data - def read_from_fd(self): - if self._ssl_accepting: - # If the handshake hasn't finished yet, there can't be anything - # to read (attempting to read may or may not raise an exception - # depending on the SSL version) - return None + def read_from_fd(self, buf): try: - # SSLSocket objects have both a read() and recv() method, - # while regular sockets only have recv(). - # The recv() method blocks (at least in python 2.6) if it is - # called when there is nothing to read, so we have to use - # read() instead. - chunk = self.socket.read(self.read_chunk_size) - except ssl.SSLError as e: - # SSLError is a subclass of socket.error, so this except - # block must come first. - if e.args[0] == ssl.SSL_ERROR_WANT_READ: + if self._ssl_accepting: + # If the handshake hasn't finished yet, there can't be anything + # to read (attempting to read may or may not raise an exception + # depending on the SSL version) return None - else: - raise - except socket.error as e: - if e.args[0] in _ERRNO_WOULDBLOCK: - return None - else: - raise - if not chunk: - self.close() - return None - return chunk + try: + return self.socket.recv_into(buf) + except ssl.SSLError as e: + # SSLError is a subclass of socket.error, so this except + # block must come first. + if e.args[0] == ssl.SSL_ERROR_WANT_READ: + return None + else: + raise + except socket.error as e: + if e.args[0] in _ERRNO_WOULDBLOCK: + return None + else: + raise + finally: + buf = None def _is_connreset(self, e): if isinstance(e, ssl.SSLError) and e.args[0] == ssl.SSL_ERROR_EOF: @@ -1592,9 +1598,9 @@ class PipeIOStream(BaseIOStream): # See https://github.com/tornadoweb/tornado/pull/2008 del data - def read_from_fd(self): + def read_from_fd(self, buf): try: - chunk = self._fio.read(self.read_chunk_size) + return self._fio.readinto(buf) except (IOError, OSError) as e: if errno_from_exception(e) == errno.EBADF: # If the writing half of a pipe is closed, select will @@ -1603,13 +1609,8 @@ class PipeIOStream(BaseIOStream): return None else: raise - if chunk is None: - # Read would block - return None - if not chunk: - self.close() - return None - return chunk + finally: + buf = None def doctests():