From e88f02efe024734e982c967341b81af75c150ccc Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Sun, 16 Sep 2012 16:58:48 -0700 Subject: [PATCH] Split IOStream into BaseIOStream and IOStream. IOStream the socket-specific code; BaseIOStream will be the basis for non-socket-based streams (i.e. pipes) --- tornado/iostream.py | 324 ++++++++++++++++++++++-------------- website/sphinx/iostream.rst | 38 ++++- 2 files changed, 232 insertions(+), 130 deletions(-) diff --git a/tornado/iostream.py b/tornado/iostream.py index 8713d8c9..94f00f96 100644 --- a/tornado/iostream.py +++ b/tornado/iostream.py @@ -14,7 +14,14 @@ # License for the specific language governing permissions and limitations # under the License. -"""A utility class to write to and read from a non-blocking socket.""" +"""Utility classes to write to and read from non-blocking files and sockets. + +Contents: + +* `BaseIOStream`: Generic interface for reading and writing. +* `IOStream`: Implementation of BaseIOStream using non-blocking sockets. +* `SSLIOStream`: SSL-aware version of IOStream. +""" from __future__ import absolute_import, division, with_statement @@ -36,55 +43,21 @@ except ImportError: ssl = None -class IOStream(object): - r"""A utility class to write to and read from a non-blocking socket. +class BaseIOStream(object): + """A utility class to write to and read from a non-blocking file or socket. We support a non-blocking ``write()`` and a family of ``read_*()`` methods. All of the methods take callbacks (since writing and reading are non-blocking and asynchronous). - The socket parameter may either be connected or unconnected. For - server operations the socket is the result of calling socket.accept(). - For client operations the socket is created with socket.socket(), - and may either be connected before passing it to the IOStream or - connected with IOStream.connect. - When a stream is closed due to an error, the IOStream's `error` attribute contains the exception object. - A very simple (and broken) HTTP client using this class:: - - from tornado import ioloop - from tornado import iostream - import socket - - def send_request(): - stream.write("GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n") - stream.read_until("\r\n\r\n", on_headers) - - def on_headers(data): - headers = {} - for line in data.split("\r\n"): - parts = line.split(":") - if len(parts) == 2: - headers[parts[0].strip()] = parts[1].strip() - stream.read_bytes(int(headers["Content-Length"]), on_body) - - def on_body(data): - print data - stream.close() - ioloop.IOLoop.instance().stop() - - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) - stream = iostream.IOStream(s) - stream.connect(("friendfeed.com", 80), send_request) - ioloop.IOLoop.instance().start() - + Subclasses must implement `fileno`, `close_fd`, `write_to_fd`, + `read_from_fd`, and optionally `get_fd_error`. """ - def __init__(self, socket, io_loop=None, max_buffer_size=104857600, + def __init__(self, io_loop=None, max_buffer_size=104857600, read_chunk_size=4096): - self.socket = socket - self.socket.setblocking(False) self.io_loop = io_loop or ioloop.IOLoop.instance() self.max_buffer_size = max_buffer_size self.read_chunk_size = read_chunk_size @@ -105,40 +78,45 @@ class IOStream(object): self._connecting = False self._state = None self._pending_callbacks = 0 + self._closed = False - def connect(self, address, callback=None): - """Connects the socket to a remote address without blocking. + def fileno(self): + """Returns the file descriptor for this stream.""" + raise NotImplementedError() - May only be called if the socket passed to the constructor was - not previously connected. The address parameter is in the - same format as for socket.connect, i.e. a (host, port) tuple. - If callback is specified, it will be called when the - connection is completed. + def close_fd(self): + """Closes the file underlying this stream. - Note that it is safe to call IOStream.write while the - connection is pending, in which case the data will be written - as soon as the connection is ready. Calling IOStream read - methods before the socket is connected works on some platforms - but is non-portable. + ``close_fd`` is called by `BaseIOStream` and should not be called + elsewhere; other users should call `close` instead. """ - self._connecting = True - try: - self.socket.connect(address) - except socket.error, e: - # In non-blocking mode we expect connect() to raise an - # exception with EINPROGRESS or EWOULDBLOCK. - # - # On freebsd, other errors such as ECONNREFUSED may be - # returned immediately when attempting to connect to - # localhost, so handle them the same way as an error - # reported later in _handle_connect. - if e.args[0] not in (errno.EINPROGRESS, errno.EWOULDBLOCK): - gen_log.warning("Connect error on fd %d: %s", - self.socket.fileno(), e) - self.close() - return - self._connect_callback = stack_context.wrap(callback) - self._add_io_state(self.io_loop.WRITE) + raise NotImplementedError() + + def write_to_fd(self, data): + """Attempts to write ``data`` to the underlying file. + + Returns the number of bytes written. + """ + raise NotImplementedError() + + def read_from_fd(self): + """Attempts to read from the underlying file. + + Returns ``None`` if there was nothing to read (the socket returned + EWOULDBLOCK or equivalent), otherwise returns the data. When possible, + should return no more than ``self.read_chunk_size`` bytes at a time. + """ + raise NotImplementedError() + + def get_fd_error(self): + """Returns information about any error on the underlying file. + + This method is called after the IOLoop has signaled an error on the + file descriptor, and should return an Exception (such as `socket.error` + with additional information, or None if no such information is + available. + """ + return None def read_until_regex(self, regex, callback): """Call callback when we read the given regex pattern.""" @@ -219,7 +197,7 @@ class IOStream(object): def close(self): """Close this stream.""" - if self.socket is not None: + if not self.closed(): if any(sys.exc_info()): self.error = sys.exc_info()[1] if self._read_until_close: @@ -229,14 +207,14 @@ class IOStream(object): self._run_callback(callback, self._consume(self._read_buffer_size)) if self._state is not None: - self.io_loop.remove_handler(self.socket.fileno()) + self.io_loop.remove_handler(self.fileno()) self._state = None - self.socket.close() - self.socket = None + self.close_fd() + self._closed = True self._maybe_run_close_callback() def _maybe_run_close_callback(self): - if (self.socket is None and self._close_callback and + if (self.closed() and self._close_callback and self._pending_callbacks == 0): # if there are pending callbacks, don't run the close callback # until they're done (see _maybe_add_error_handler) @@ -254,27 +232,25 @@ class IOStream(object): def closed(self): """Returns true if the stream has been closed.""" - return self.socket is None + return self._closed def _handle_events(self, fd, events): - if not self.socket: + if self.closed(): gen_log.warning("Got events for closed stream %d", fd) return try: if events & self.io_loop.READ: self._handle_read() - if not self.socket: + if self.closed(): return if events & self.io_loop.WRITE: if self._connecting: self._handle_connect() self._handle_write() - if not self.socket: + if self.closed(): return if events & self.io_loop.ERROR: - errno = self.socket.getsockopt(socket.SOL_SOCKET, - socket.SO_ERROR) - self.error = socket.error(errno, os.strerror(errno)) + self.error = self.get_fd_error() # We may have queued up a user callback in _handle_read or # _handle_write, so don't close the IOStream until those # callbacks have had a chance to run. @@ -291,7 +267,7 @@ class IOStream(object): assert self._state is not None, \ "shouldn't happen: _handle_events without self._state" self._state = state - self.io_loop.update_handler(self.socket.fileno(), self._state) + self.io_loop.update_handler(self.fileno(), self._state) except Exception: gen_log.error("Uncaught exception, closing connection.", exc_info=True) @@ -393,24 +369,6 @@ class IOStream(object): return self._maybe_add_error_listener() - def _read_from_socket(self): - """Attempts to read from the socket. - - Returns the data read or None if there is nothing to read. - May be overridden in subclasses. - """ - try: - chunk = self.socket.recv(self.read_chunk_size) - except socket.error, e: - if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN): - return None - else: - raise - if not chunk: - self.close() - return None - return chunk - def _read_to_buffer(self): """Reads from the socket and appends the result to the read buffer. @@ -419,11 +377,11 @@ class IOStream(object): error closes the socket and raises an exception. """ try: - chunk = self._read_from_socket() + chunk = self.read_from_fd() except socket.error, e: # ssl.SSLError is a subclass of socket.error gen_log.warning("Read error on %d: %s", - self.socket.fileno(), e) + self.fileno(), e) self.close() raise if chunk is None: @@ -496,24 +454,6 @@ class IOStream(object): _double_prefix(self._read_buffer) return False - def _handle_connect(self): - err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) - if err != 0: - self.error = socket.error(err, os.strerror(err)) - # IOLoop implementations may vary: some of them return - # an error state before the socket becomes writable, so - # in that case a connection failure would be handled by the - # error path in _handle_events instead of here. - gen_log.warning("Connect error on fd %d: %s", - self.socket.fileno(), errno.errorcode[err]) - self.close() - return - if self._connect_callback is not None: - callback = self._connect_callback - self._connect_callback = None - self._run_callback(callback) - self._connecting = False - def _handle_write(self): while self._write_buffer: try: @@ -524,7 +464,7 @@ class IOStream(object): # process. Therefore we must not call socket.send # with more than 128KB at a time. _merge_prefix(self._write_buffer, 128 * 1024) - num_bytes = self.socket.send(self._write_buffer[0]) + num_bytes = self.write_to_fd(self._write_buffer[0]) if num_bytes == 0: # With OpenSSL, if we couldn't write the entire buffer, # the very same string object must be used on the @@ -545,7 +485,7 @@ class IOStream(object): break else: gen_log.warning("Write error on %d: %s", - self.socket.fileno(), e) + self.fileno(), e) self.close() return if not self._write_buffer and self._write_callback: @@ -561,12 +501,12 @@ class IOStream(object): return self._read_buffer.popleft() def _check_closed(self): - if not self.socket: + if self.closed(): raise IOError("Stream is closed") def _maybe_add_error_listener(self): if self._state is None and self._pending_callbacks == 0: - if self.socket is None: + if self.closed(): self._maybe_run_close_callback() else: self._add_io_state(ioloop.IOLoop.READ) @@ -592,17 +532,143 @@ class IOStream(object): (since the write callback is optional so we can have a fast-path write with no `_run_callback`) """ - if self.socket is None: + if self.closed(): # connection has been closed, so there can be no future events return if self._state is None: self._state = ioloop.IOLoop.ERROR | state with stack_context.NullContext(): self.io_loop.add_handler( - self.socket.fileno(), self._handle_events, self._state) + self.fileno(), self._handle_events, self._state) elif not self._state & state: self._state = self._state | state - self.io_loop.update_handler(self.socket.fileno(), self._state) + self.io_loop.update_handler(self.fileno(), self._state) + + +class IOStream(BaseIOStream): + r"""Socket-based IOStream implementation. + + This class supports the read and write methods from `BaseIOStream` + plus a `connect` method. + + The socket parameter may either be connected or unconnected. For + server operations the socket is the result of calling socket.accept(). + For client operations the socket is created with socket.socket(), + and may either be connected before passing it to the IOStream or + connected with IOStream.connect. + + A very simple (and broken) HTTP client using this class:: + + from tornado import ioloop + from tornado import iostream + import socket + + def send_request(): + stream.write("GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n") + stream.read_until("\r\n\r\n", on_headers) + + def on_headers(data): + headers = {} + for line in data.split("\r\n"): + parts = line.split(":") + if len(parts) == 2: + headers[parts[0].strip()] = parts[1].strip() + stream.read_bytes(int(headers["Content-Length"]), on_body) + + def on_body(data): + print data + stream.close() + ioloop.IOLoop.instance().stop() + + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) + stream = iostream.IOStream(s) + stream.connect(("friendfeed.com", 80), send_request) + ioloop.IOLoop.instance().start() + """ + def __init__(self, socket, *args, **kwargs): + self.socket = socket + self.socket.setblocking(False) + super(IOStream, self).__init__(*args, **kwargs) + + def fileno(self): + return self.socket.fileno() + + def close_fd(self): + self.socket.close() + self.socket = None + + def get_fd_error(self): + errno = self.socket.getsockopt(socket.SOL_SOCKET, + socket.SO_ERROR) + return socket.error(errno, os.strerror(errno)) + + def read_from_fd(self): + try: + chunk = self.socket.recv(self.read_chunk_size) + except socket.error, e: + if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN): + return None + else: + raise + if not chunk: + self.close() + return None + return chunk + + def write_to_fd(self, data): + return self.socket.send(data) + + def connect(self, address, callback=None): + """Connects the socket to a remote address without blocking. + + May only be called if the socket passed to the constructor was + not previously connected. The address parameter is in the + same format as for socket.connect, i.e. a (host, port) tuple. + If callback is specified, it will be called when the + connection is completed. + + Note that it is safe to call IOStream.write while the + connection is pending, in which case the data will be written + as soon as the connection is ready. Calling IOStream read + methods before the socket is connected works on some platforms + but is non-portable. + """ + self._connecting = True + try: + self.socket.connect(address) + except socket.error, e: + # In non-blocking mode we expect connect() to raise an + # exception with EINPROGRESS or EWOULDBLOCK. + # + # On freebsd, other errors such as ECONNREFUSED may be + # returned immediately when attempting to connect to + # localhost, so handle them the same way as an error + # reported later in _handle_connect. + if e.args[0] not in (errno.EINPROGRESS, errno.EWOULDBLOCK): + gen_log.warning("Connect error on fd %d: %s", + self.socket.fileno(), e) + self.close() + return + self._connect_callback = stack_context.wrap(callback) + self._add_io_state(self.io_loop.WRITE) + + def _handle_connect(self): + err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) + if err != 0: + self.error = socket.error(err, os.strerror(err)) + # IOLoop implementations may vary: some of them return + # an error state before the socket becomes writable, so + # in that case a connection failure would be handled by the + # error path in _handle_events instead of here. + gen_log.warning("Connect error on fd %d: %s", + self.socket.fileno(), errno.errorcode[err]) + self.close() + return + if self._connect_callback is not None: + callback = self._connect_callback + self._connect_callback = None + self._run_callback(callback) + self._connecting = False class SSLIOStream(IOStream): @@ -700,7 +766,7 @@ class SSLIOStream(IOStream): **self._ssl_options) super(SSLIOStream, self)._handle_connect() - def _read_from_socket(self): + def read_from_fd(self): if self._ssl_accepting: # If the handshake hasn't finished yet, there can't be anything # to read (attempting to read may or may not raise an exception diff --git a/website/sphinx/iostream.rst b/website/sphinx/iostream.rst index 28de480b..d37a3793 100644 --- a/website/sphinx/iostream.rst +++ b/website/sphinx/iostream.rst @@ -2,4 +2,40 @@ ===================================================================== .. automodule:: tornado.iostream - :members: + + Base class + ---------- + + .. autoclass:: BaseIOStream + + Main interface + ^^^^^^^^^^^^^^ + + .. automethod:: BaseIOStream.write + .. automethod:: BaseIOStream.read_bytes + .. automethod:: BaseIOStream.read_until + .. automethod:: BaseIOStream.read_until_regex + .. automethod:: BaseIOStream.read_until_close + .. automethod:: BaseIOStream.close + .. automethod:: BaseIOStream.set_close_callback + .. automethod:: BaseIOStream.closed + .. automethod:: BaseIOStream.reading + .. automethod:: BaseIOStream.writing + + Methods for subclasses + ^^^^^^^^^^^^^^^^^^^^^^ + + .. automethod:: BaseIOStream.fileno + .. automethod:: BaseIOStream.close_fd + .. automethod:: BaseIOStream.write_to_fd + .. automethod:: BaseIOStream.read_from_fd + .. automethod:: BaseIOStream.get_fd_error + + Implementations + --------------- + + .. autoclass:: IOStream + :members: + + .. autoclass:: SSLIOStream + :members: