Split IOStream into BaseIOStream and IOStream.
IOStream the socket-specific code; BaseIOStream will be the basis for non-socket-based streams (i.e. pipes)
This commit is contained in:
parent
186e947112
commit
e88f02efe0
|
@ -14,7 +14,14 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""A utility class to write to and read from a non-blocking socket."""
|
||||
"""Utility classes to write to and read from non-blocking files and sockets.
|
||||
|
||||
Contents:
|
||||
|
||||
* `BaseIOStream`: Generic interface for reading and writing.
|
||||
* `IOStream`: Implementation of BaseIOStream using non-blocking sockets.
|
||||
* `SSLIOStream`: SSL-aware version of IOStream.
|
||||
"""
|
||||
|
||||
from __future__ import absolute_import, division, with_statement
|
||||
|
||||
|
@ -36,55 +43,21 @@ except ImportError:
|
|||
ssl = None
|
||||
|
||||
|
||||
class IOStream(object):
|
||||
r"""A utility class to write to and read from a non-blocking socket.
|
||||
class BaseIOStream(object):
|
||||
"""A utility class to write to and read from a non-blocking file or socket.
|
||||
|
||||
We support a non-blocking ``write()`` and a family of ``read_*()`` methods.
|
||||
All of the methods take callbacks (since writing and reading are
|
||||
non-blocking and asynchronous).
|
||||
|
||||
The socket parameter may either be connected or unconnected. For
|
||||
server operations the socket is the result of calling socket.accept().
|
||||
For client operations the socket is created with socket.socket(),
|
||||
and may either be connected before passing it to the IOStream or
|
||||
connected with IOStream.connect.
|
||||
|
||||
When a stream is closed due to an error, the IOStream's `error`
|
||||
attribute contains the exception object.
|
||||
|
||||
A very simple (and broken) HTTP client using this class::
|
||||
|
||||
from tornado import ioloop
|
||||
from tornado import iostream
|
||||
import socket
|
||||
|
||||
def send_request():
|
||||
stream.write("GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n")
|
||||
stream.read_until("\r\n\r\n", on_headers)
|
||||
|
||||
def on_headers(data):
|
||||
headers = {}
|
||||
for line in data.split("\r\n"):
|
||||
parts = line.split(":")
|
||||
if len(parts) == 2:
|
||||
headers[parts[0].strip()] = parts[1].strip()
|
||||
stream.read_bytes(int(headers["Content-Length"]), on_body)
|
||||
|
||||
def on_body(data):
|
||||
print data
|
||||
stream.close()
|
||||
ioloop.IOLoop.instance().stop()
|
||||
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
|
||||
stream = iostream.IOStream(s)
|
||||
stream.connect(("friendfeed.com", 80), send_request)
|
||||
ioloop.IOLoop.instance().start()
|
||||
|
||||
Subclasses must implement `fileno`, `close_fd`, `write_to_fd`,
|
||||
`read_from_fd`, and optionally `get_fd_error`.
|
||||
"""
|
||||
def __init__(self, socket, io_loop=None, max_buffer_size=104857600,
|
||||
def __init__(self, io_loop=None, max_buffer_size=104857600,
|
||||
read_chunk_size=4096):
|
||||
self.socket = socket
|
||||
self.socket.setblocking(False)
|
||||
self.io_loop = io_loop or ioloop.IOLoop.instance()
|
||||
self.max_buffer_size = max_buffer_size
|
||||
self.read_chunk_size = read_chunk_size
|
||||
|
@ -105,40 +78,45 @@ class IOStream(object):
|
|||
self._connecting = False
|
||||
self._state = None
|
||||
self._pending_callbacks = 0
|
||||
self._closed = False
|
||||
|
||||
def connect(self, address, callback=None):
|
||||
"""Connects the socket to a remote address without blocking.
|
||||
def fileno(self):
|
||||
"""Returns the file descriptor for this stream."""
|
||||
raise NotImplementedError()
|
||||
|
||||
May only be called if the socket passed to the constructor was
|
||||
not previously connected. The address parameter is in the
|
||||
same format as for socket.connect, i.e. a (host, port) tuple.
|
||||
If callback is specified, it will be called when the
|
||||
connection is completed.
|
||||
def close_fd(self):
|
||||
"""Closes the file underlying this stream.
|
||||
|
||||
Note that it is safe to call IOStream.write while the
|
||||
connection is pending, in which case the data will be written
|
||||
as soon as the connection is ready. Calling IOStream read
|
||||
methods before the socket is connected works on some platforms
|
||||
but is non-portable.
|
||||
``close_fd`` is called by `BaseIOStream` and should not be called
|
||||
elsewhere; other users should call `close` instead.
|
||||
"""
|
||||
self._connecting = True
|
||||
try:
|
||||
self.socket.connect(address)
|
||||
except socket.error, e:
|
||||
# In non-blocking mode we expect connect() to raise an
|
||||
# exception with EINPROGRESS or EWOULDBLOCK.
|
||||
#
|
||||
# On freebsd, other errors such as ECONNREFUSED may be
|
||||
# returned immediately when attempting to connect to
|
||||
# localhost, so handle them the same way as an error
|
||||
# reported later in _handle_connect.
|
||||
if e.args[0] not in (errno.EINPROGRESS, errno.EWOULDBLOCK):
|
||||
gen_log.warning("Connect error on fd %d: %s",
|
||||
self.socket.fileno(), e)
|
||||
self.close()
|
||||
return
|
||||
self._connect_callback = stack_context.wrap(callback)
|
||||
self._add_io_state(self.io_loop.WRITE)
|
||||
raise NotImplementedError()
|
||||
|
||||
def write_to_fd(self, data):
|
||||
"""Attempts to write ``data`` to the underlying file.
|
||||
|
||||
Returns the number of bytes written.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def read_from_fd(self):
|
||||
"""Attempts to read from the underlying file.
|
||||
|
||||
Returns ``None`` if there was nothing to read (the socket returned
|
||||
EWOULDBLOCK or equivalent), otherwise returns the data. When possible,
|
||||
should return no more than ``self.read_chunk_size`` bytes at a time.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def get_fd_error(self):
|
||||
"""Returns information about any error on the underlying file.
|
||||
|
||||
This method is called after the IOLoop has signaled an error on the
|
||||
file descriptor, and should return an Exception (such as `socket.error`
|
||||
with additional information, or None if no such information is
|
||||
available.
|
||||
"""
|
||||
return None
|
||||
|
||||
def read_until_regex(self, regex, callback):
|
||||
"""Call callback when we read the given regex pattern."""
|
||||
|
@ -219,7 +197,7 @@ class IOStream(object):
|
|||
|
||||
def close(self):
|
||||
"""Close this stream."""
|
||||
if self.socket is not None:
|
||||
if not self.closed():
|
||||
if any(sys.exc_info()):
|
||||
self.error = sys.exc_info()[1]
|
||||
if self._read_until_close:
|
||||
|
@ -229,14 +207,14 @@ class IOStream(object):
|
|||
self._run_callback(callback,
|
||||
self._consume(self._read_buffer_size))
|
||||
if self._state is not None:
|
||||
self.io_loop.remove_handler(self.socket.fileno())
|
||||
self.io_loop.remove_handler(self.fileno())
|
||||
self._state = None
|
||||
self.socket.close()
|
||||
self.socket = None
|
||||
self.close_fd()
|
||||
self._closed = True
|
||||
self._maybe_run_close_callback()
|
||||
|
||||
def _maybe_run_close_callback(self):
|
||||
if (self.socket is None and self._close_callback and
|
||||
if (self.closed() and self._close_callback and
|
||||
self._pending_callbacks == 0):
|
||||
# if there are pending callbacks, don't run the close callback
|
||||
# until they're done (see _maybe_add_error_handler)
|
||||
|
@ -254,27 +232,25 @@ class IOStream(object):
|
|||
|
||||
def closed(self):
|
||||
"""Returns true if the stream has been closed."""
|
||||
return self.socket is None
|
||||
return self._closed
|
||||
|
||||
def _handle_events(self, fd, events):
|
||||
if not self.socket:
|
||||
if self.closed():
|
||||
gen_log.warning("Got events for closed stream %d", fd)
|
||||
return
|
||||
try:
|
||||
if events & self.io_loop.READ:
|
||||
self._handle_read()
|
||||
if not self.socket:
|
||||
if self.closed():
|
||||
return
|
||||
if events & self.io_loop.WRITE:
|
||||
if self._connecting:
|
||||
self._handle_connect()
|
||||
self._handle_write()
|
||||
if not self.socket:
|
||||
if self.closed():
|
||||
return
|
||||
if events & self.io_loop.ERROR:
|
||||
errno = self.socket.getsockopt(socket.SOL_SOCKET,
|
||||
socket.SO_ERROR)
|
||||
self.error = socket.error(errno, os.strerror(errno))
|
||||
self.error = self.get_fd_error()
|
||||
# We may have queued up a user callback in _handle_read or
|
||||
# _handle_write, so don't close the IOStream until those
|
||||
# callbacks have had a chance to run.
|
||||
|
@ -291,7 +267,7 @@ class IOStream(object):
|
|||
assert self._state is not None, \
|
||||
"shouldn't happen: _handle_events without self._state"
|
||||
self._state = state
|
||||
self.io_loop.update_handler(self.socket.fileno(), self._state)
|
||||
self.io_loop.update_handler(self.fileno(), self._state)
|
||||
except Exception:
|
||||
gen_log.error("Uncaught exception, closing connection.",
|
||||
exc_info=True)
|
||||
|
@ -393,24 +369,6 @@ class IOStream(object):
|
|||
return
|
||||
self._maybe_add_error_listener()
|
||||
|
||||
def _read_from_socket(self):
|
||||
"""Attempts to read from the socket.
|
||||
|
||||
Returns the data read or None if there is nothing to read.
|
||||
May be overridden in subclasses.
|
||||
"""
|
||||
try:
|
||||
chunk = self.socket.recv(self.read_chunk_size)
|
||||
except socket.error, e:
|
||||
if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
|
||||
return None
|
||||
else:
|
||||
raise
|
||||
if not chunk:
|
||||
self.close()
|
||||
return None
|
||||
return chunk
|
||||
|
||||
def _read_to_buffer(self):
|
||||
"""Reads from the socket and appends the result to the read buffer.
|
||||
|
||||
|
@ -419,11 +377,11 @@ class IOStream(object):
|
|||
error closes the socket and raises an exception.
|
||||
"""
|
||||
try:
|
||||
chunk = self._read_from_socket()
|
||||
chunk = self.read_from_fd()
|
||||
except socket.error, e:
|
||||
# ssl.SSLError is a subclass of socket.error
|
||||
gen_log.warning("Read error on %d: %s",
|
||||
self.socket.fileno(), e)
|
||||
self.fileno(), e)
|
||||
self.close()
|
||||
raise
|
||||
if chunk is None:
|
||||
|
@ -496,24 +454,6 @@ class IOStream(object):
|
|||
_double_prefix(self._read_buffer)
|
||||
return False
|
||||
|
||||
def _handle_connect(self):
|
||||
err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
|
||||
if err != 0:
|
||||
self.error = socket.error(err, os.strerror(err))
|
||||
# IOLoop implementations may vary: some of them return
|
||||
# an error state before the socket becomes writable, so
|
||||
# in that case a connection failure would be handled by the
|
||||
# error path in _handle_events instead of here.
|
||||
gen_log.warning("Connect error on fd %d: %s",
|
||||
self.socket.fileno(), errno.errorcode[err])
|
||||
self.close()
|
||||
return
|
||||
if self._connect_callback is not None:
|
||||
callback = self._connect_callback
|
||||
self._connect_callback = None
|
||||
self._run_callback(callback)
|
||||
self._connecting = False
|
||||
|
||||
def _handle_write(self):
|
||||
while self._write_buffer:
|
||||
try:
|
||||
|
@ -524,7 +464,7 @@ class IOStream(object):
|
|||
# process. Therefore we must not call socket.send
|
||||
# with more than 128KB at a time.
|
||||
_merge_prefix(self._write_buffer, 128 * 1024)
|
||||
num_bytes = self.socket.send(self._write_buffer[0])
|
||||
num_bytes = self.write_to_fd(self._write_buffer[0])
|
||||
if num_bytes == 0:
|
||||
# With OpenSSL, if we couldn't write the entire buffer,
|
||||
# the very same string object must be used on the
|
||||
|
@ -545,7 +485,7 @@ class IOStream(object):
|
|||
break
|
||||
else:
|
||||
gen_log.warning("Write error on %d: %s",
|
||||
self.socket.fileno(), e)
|
||||
self.fileno(), e)
|
||||
self.close()
|
||||
return
|
||||
if not self._write_buffer and self._write_callback:
|
||||
|
@ -561,12 +501,12 @@ class IOStream(object):
|
|||
return self._read_buffer.popleft()
|
||||
|
||||
def _check_closed(self):
|
||||
if not self.socket:
|
||||
if self.closed():
|
||||
raise IOError("Stream is closed")
|
||||
|
||||
def _maybe_add_error_listener(self):
|
||||
if self._state is None and self._pending_callbacks == 0:
|
||||
if self.socket is None:
|
||||
if self.closed():
|
||||
self._maybe_run_close_callback()
|
||||
else:
|
||||
self._add_io_state(ioloop.IOLoop.READ)
|
||||
|
@ -592,17 +532,143 @@ class IOStream(object):
|
|||
(since the write callback is optional so we can have a
|
||||
fast-path write with no `_run_callback`)
|
||||
"""
|
||||
if self.socket is None:
|
||||
if self.closed():
|
||||
# connection has been closed, so there can be no future events
|
||||
return
|
||||
if self._state is None:
|
||||
self._state = ioloop.IOLoop.ERROR | state
|
||||
with stack_context.NullContext():
|
||||
self.io_loop.add_handler(
|
||||
self.socket.fileno(), self._handle_events, self._state)
|
||||
self.fileno(), self._handle_events, self._state)
|
||||
elif not self._state & state:
|
||||
self._state = self._state | state
|
||||
self.io_loop.update_handler(self.socket.fileno(), self._state)
|
||||
self.io_loop.update_handler(self.fileno(), self._state)
|
||||
|
||||
|
||||
class IOStream(BaseIOStream):
|
||||
r"""Socket-based IOStream implementation.
|
||||
|
||||
This class supports the read and write methods from `BaseIOStream`
|
||||
plus a `connect` method.
|
||||
|
||||
The socket parameter may either be connected or unconnected. For
|
||||
server operations the socket is the result of calling socket.accept().
|
||||
For client operations the socket is created with socket.socket(),
|
||||
and may either be connected before passing it to the IOStream or
|
||||
connected with IOStream.connect.
|
||||
|
||||
A very simple (and broken) HTTP client using this class::
|
||||
|
||||
from tornado import ioloop
|
||||
from tornado import iostream
|
||||
import socket
|
||||
|
||||
def send_request():
|
||||
stream.write("GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n")
|
||||
stream.read_until("\r\n\r\n", on_headers)
|
||||
|
||||
def on_headers(data):
|
||||
headers = {}
|
||||
for line in data.split("\r\n"):
|
||||
parts = line.split(":")
|
||||
if len(parts) == 2:
|
||||
headers[parts[0].strip()] = parts[1].strip()
|
||||
stream.read_bytes(int(headers["Content-Length"]), on_body)
|
||||
|
||||
def on_body(data):
|
||||
print data
|
||||
stream.close()
|
||||
ioloop.IOLoop.instance().stop()
|
||||
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
|
||||
stream = iostream.IOStream(s)
|
||||
stream.connect(("friendfeed.com", 80), send_request)
|
||||
ioloop.IOLoop.instance().start()
|
||||
"""
|
||||
def __init__(self, socket, *args, **kwargs):
|
||||
self.socket = socket
|
||||
self.socket.setblocking(False)
|
||||
super(IOStream, self).__init__(*args, **kwargs)
|
||||
|
||||
def fileno(self):
|
||||
return self.socket.fileno()
|
||||
|
||||
def close_fd(self):
|
||||
self.socket.close()
|
||||
self.socket = None
|
||||
|
||||
def get_fd_error(self):
|
||||
errno = self.socket.getsockopt(socket.SOL_SOCKET,
|
||||
socket.SO_ERROR)
|
||||
return socket.error(errno, os.strerror(errno))
|
||||
|
||||
def read_from_fd(self):
|
||||
try:
|
||||
chunk = self.socket.recv(self.read_chunk_size)
|
||||
except socket.error, e:
|
||||
if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
|
||||
return None
|
||||
else:
|
||||
raise
|
||||
if not chunk:
|
||||
self.close()
|
||||
return None
|
||||
return chunk
|
||||
|
||||
def write_to_fd(self, data):
|
||||
return self.socket.send(data)
|
||||
|
||||
def connect(self, address, callback=None):
|
||||
"""Connects the socket to a remote address without blocking.
|
||||
|
||||
May only be called if the socket passed to the constructor was
|
||||
not previously connected. The address parameter is in the
|
||||
same format as for socket.connect, i.e. a (host, port) tuple.
|
||||
If callback is specified, it will be called when the
|
||||
connection is completed.
|
||||
|
||||
Note that it is safe to call IOStream.write while the
|
||||
connection is pending, in which case the data will be written
|
||||
as soon as the connection is ready. Calling IOStream read
|
||||
methods before the socket is connected works on some platforms
|
||||
but is non-portable.
|
||||
"""
|
||||
self._connecting = True
|
||||
try:
|
||||
self.socket.connect(address)
|
||||
except socket.error, e:
|
||||
# In non-blocking mode we expect connect() to raise an
|
||||
# exception with EINPROGRESS or EWOULDBLOCK.
|
||||
#
|
||||
# On freebsd, other errors such as ECONNREFUSED may be
|
||||
# returned immediately when attempting to connect to
|
||||
# localhost, so handle them the same way as an error
|
||||
# reported later in _handle_connect.
|
||||
if e.args[0] not in (errno.EINPROGRESS, errno.EWOULDBLOCK):
|
||||
gen_log.warning("Connect error on fd %d: %s",
|
||||
self.socket.fileno(), e)
|
||||
self.close()
|
||||
return
|
||||
self._connect_callback = stack_context.wrap(callback)
|
||||
self._add_io_state(self.io_loop.WRITE)
|
||||
|
||||
def _handle_connect(self):
|
||||
err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
|
||||
if err != 0:
|
||||
self.error = socket.error(err, os.strerror(err))
|
||||
# IOLoop implementations may vary: some of them return
|
||||
# an error state before the socket becomes writable, so
|
||||
# in that case a connection failure would be handled by the
|
||||
# error path in _handle_events instead of here.
|
||||
gen_log.warning("Connect error on fd %d: %s",
|
||||
self.socket.fileno(), errno.errorcode[err])
|
||||
self.close()
|
||||
return
|
||||
if self._connect_callback is not None:
|
||||
callback = self._connect_callback
|
||||
self._connect_callback = None
|
||||
self._run_callback(callback)
|
||||
self._connecting = False
|
||||
|
||||
|
||||
class SSLIOStream(IOStream):
|
||||
|
@ -700,7 +766,7 @@ class SSLIOStream(IOStream):
|
|||
**self._ssl_options)
|
||||
super(SSLIOStream, self)._handle_connect()
|
||||
|
||||
def _read_from_socket(self):
|
||||
def read_from_fd(self):
|
||||
if self._ssl_accepting:
|
||||
# If the handshake hasn't finished yet, there can't be anything
|
||||
# to read (attempting to read may or may not raise an exception
|
||||
|
|
|
@ -2,4 +2,40 @@
|
|||
=====================================================================
|
||||
|
||||
.. automodule:: tornado.iostream
|
||||
|
||||
Base class
|
||||
----------
|
||||
|
||||
.. autoclass:: BaseIOStream
|
||||
|
||||
Main interface
|
||||
^^^^^^^^^^^^^^
|
||||
|
||||
.. automethod:: BaseIOStream.write
|
||||
.. automethod:: BaseIOStream.read_bytes
|
||||
.. automethod:: BaseIOStream.read_until
|
||||
.. automethod:: BaseIOStream.read_until_regex
|
||||
.. automethod:: BaseIOStream.read_until_close
|
||||
.. automethod:: BaseIOStream.close
|
||||
.. automethod:: BaseIOStream.set_close_callback
|
||||
.. automethod:: BaseIOStream.closed
|
||||
.. automethod:: BaseIOStream.reading
|
||||
.. automethod:: BaseIOStream.writing
|
||||
|
||||
Methods for subclasses
|
||||
^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
.. automethod:: BaseIOStream.fileno
|
||||
.. automethod:: BaseIOStream.close_fd
|
||||
.. automethod:: BaseIOStream.write_to_fd
|
||||
.. automethod:: BaseIOStream.read_from_fd
|
||||
.. automethod:: BaseIOStream.get_fd_error
|
||||
|
||||
Implementations
|
||||
---------------
|
||||
|
||||
.. autoclass:: IOStream
|
||||
:members:
|
||||
|
||||
.. autoclass:: SSLIOStream
|
||||
:members:
|
||||
|
|
Loading…
Reference in New Issue