iostream: Use recv_into and friends in read_from_fd

This has the same memory-allocation behavior as before, but it moves
the buffer out of the recv() call to python code.
This commit is contained in:
Ben Darnell 2018-01-20 19:00:30 -05:00
parent 67abb73db1
commit 1215cd2631
1 changed files with 68 additions and 67 deletions

View File

@ -300,13 +300,18 @@ class BaseIOStream(object):
"""
raise NotImplementedError()
def read_from_fd(self):
def read_from_fd(self, buf):
"""Attempts to read from the underlying file.
Returns ``None`` if there was nothing to read (the socket
returned `~errno.EWOULDBLOCK` or equivalent), otherwise
returns the data. When possible, should return no more than
``self.read_chunk_size`` bytes at a time.
Reads up to ``len(buf)`` bytes, storing them in the buffer.
Returns the number of bytes read. Returns None if there was
nothing to read (the socket returned `~errno.EWOULDBLOCK` or
equivalent), and zero on EOF.
.. versionchanged:: 5.0
Interface redesigned to take a buffer and return a number
of bytes instead of a freshly-allocated object.
"""
raise NotImplementedError()
@ -820,31 +825,40 @@ class BaseIOStream(object):
to read (i.e. the read returns EWOULDBLOCK or equivalent). On
error closes the socket and raises an exception.
"""
while True:
try:
chunk = self.read_from_fd()
except (socket.error, IOError, OSError) as e:
if errno_from_exception(e) == errno.EINTR:
continue
# ssl.SSLError is a subclass of socket.error
if self._is_connreset(e):
# Treat ECONNRESET as a connection close rather than
# an error to minimize log spam (the exception will
# be available on self.error for apps that care).
try:
while True:
try:
buf = bytearray(self.read_chunk_size)
bytes_read = self.read_from_fd(buf)
except (socket.error, IOError, OSError) as e:
if errno_from_exception(e) == errno.EINTR:
continue
# ssl.SSLError is a subclass of socket.error
if self._is_connreset(e):
# Treat ECONNRESET as a connection close rather than
# an error to minimize log spam (the exception will
# be available on self.error for apps that care).
self.close(exc_info=e)
return
self.close(exc_info=e)
return
self.close(exc_info=e)
raise
break
if chunk is None:
return 0
self._read_buffer += chunk
self._read_buffer_size += len(chunk)
raise
break
if bytes_read is None:
return 0
elif bytes_read == 0:
self.close()
return 0
self._read_buffer += memoryview(buf)[:bytes_read]
self._read_buffer_size += bytes_read
finally:
# Break the reference to buf so we don't waste a chunk's worth of
# memory in case an exception hangs on to our stack frame.
buf = None
if self._read_buffer_size > self.max_buffer_size:
gen_log.error("Reached maximum read buffer size")
self.close()
raise StreamBufferFullError("Reached maximum read buffer size")
return len(chunk)
return bytes_read
def _run_streaming_callback(self):
if self._streaming_callback is not None and self._read_buffer_size:
@ -1106,18 +1120,16 @@ class IOStream(BaseIOStream):
socket.SO_ERROR)
return socket.error(errno, os.strerror(errno))
def read_from_fd(self):
def read_from_fd(self, buf):
try:
chunk = self.socket.recv(self.read_chunk_size)
return self.socket.recv_into(buf)
except socket.error as e:
if e.args[0] in _ERRNO_WOULDBLOCK:
return None
else:
raise
if not chunk:
self.close()
return None
return chunk
finally:
buf = None
def write_to_fd(self, data):
try:
@ -1528,35 +1540,29 @@ class SSLIOStream(IOStream):
# See https://github.com/tornadoweb/tornado/pull/2008
del data
def read_from_fd(self):
if self._ssl_accepting:
# If the handshake hasn't finished yet, there can't be anything
# to read (attempting to read may or may not raise an exception
# depending on the SSL version)
return None
def read_from_fd(self, buf):
try:
# SSLSocket objects have both a read() and recv() method,
# while regular sockets only have recv().
# The recv() method blocks (at least in python 2.6) if it is
# called when there is nothing to read, so we have to use
# read() instead.
chunk = self.socket.read(self.read_chunk_size)
except ssl.SSLError as e:
# SSLError is a subclass of socket.error, so this except
# block must come first.
if e.args[0] == ssl.SSL_ERROR_WANT_READ:
if self._ssl_accepting:
# If the handshake hasn't finished yet, there can't be anything
# to read (attempting to read may or may not raise an exception
# depending on the SSL version)
return None
else:
raise
except socket.error as e:
if e.args[0] in _ERRNO_WOULDBLOCK:
return None
else:
raise
if not chunk:
self.close()
return None
return chunk
try:
return self.socket.recv_into(buf)
except ssl.SSLError as e:
# SSLError is a subclass of socket.error, so this except
# block must come first.
if e.args[0] == ssl.SSL_ERROR_WANT_READ:
return None
else:
raise
except socket.error as e:
if e.args[0] in _ERRNO_WOULDBLOCK:
return None
else:
raise
finally:
buf = None
def _is_connreset(self, e):
if isinstance(e, ssl.SSLError) and e.args[0] == ssl.SSL_ERROR_EOF:
@ -1592,9 +1598,9 @@ class PipeIOStream(BaseIOStream):
# See https://github.com/tornadoweb/tornado/pull/2008
del data
def read_from_fd(self):
def read_from_fd(self, buf):
try:
chunk = self._fio.read(self.read_chunk_size)
return self._fio.readinto(buf)
except (IOError, OSError) as e:
if errno_from_exception(e) == errno.EBADF:
# If the writing half of a pipe is closed, select will
@ -1603,13 +1609,8 @@ class PipeIOStream(BaseIOStream):
return None
else:
raise
if chunk is None:
# Read would block
return None
if not chunk:
self.close()
return None
return chunk
finally:
buf = None
def doctests():