diff --git a/tornado/httpserver.py b/tornado/httpserver.py index ecae243c..9de647e3 100644 --- a/tornado/httpserver.py +++ b/tornado/httpserver.py @@ -226,7 +226,13 @@ class HTTPConnection(object): if disconnect: self.close() return - self.stream.read_until(b("\r\n\r\n"), self._header_callback) + try: + # Use a try/except instead of checking stream.closed() + # directly, because in some cases the stream doesn't discover + # that it's closed until you try to read from it. + self.stream.read_until(b("\r\n\r\n"), self._header_callback) + except iostream.StreamClosedError: + self.close() def _on_headers(self, data): try: diff --git a/tornado/iostream.py b/tornado/iostream.py index 18b4460a..23457d9f 100644 --- a/tornado/iostream.py +++ b/tornado/iostream.py @@ -48,6 +48,8 @@ try: except ImportError: _set_nonblocking = None +class StreamClosedError(IOError): + pass class BaseIOStream(object): """A utility class to write to and read from a non-blocking file or socket. @@ -386,6 +388,12 @@ class BaseIOStream(object): chunk = self.read_from_fd() except (socket.error, IOError, OSError), e: # ssl.SSLError is a subclass of socket.error + if e.args[0] == errno.ECONNRESET: + # Treat ECONNRESET as a connection close rather than + # an error to minimize log spam (the exception will + # be available on self.error for apps that care). + self.close() + return gen_log.warning("Read error on %d: %s", self.fileno(), e) self.close() @@ -508,7 +516,7 @@ class BaseIOStream(object): def _check_closed(self): if self.closed(): - raise IOError("Stream is closed") + raise StreamClosedError("Stream is closed") def _maybe_add_error_listener(self): if self._state is None and self._pending_callbacks == 0: diff --git a/tornado/test/httpserver_test.py b/tornado/test/httpserver_test.py index 3a3b7c95..cb42b713 100644 --- a/tornado/test/httpserver_test.py +++ b/tornado/test/httpserver_test.py @@ -12,7 +12,8 @@ from tornado.simple_httpclient import SimpleAsyncHTTPClient from tornado.testing import AsyncHTTPTestCase, AsyncHTTPSTestCase, AsyncTestCase, ExpectLog from tornado.test.util import unittest from tornado.util import b, bytes_type -from tornado.web import Application, RequestHandler +from tornado.web import Application, RequestHandler, asynchronous +import datetime import os import shutil import socket @@ -399,3 +400,141 @@ class UnixSocketTest(AsyncTestCase): UnixSocketTest = unittest.skipIf( not hasattr(socket, 'AF_UNIX') or sys.platform == 'cygwin', "unix sockets not supported on this platform") + +class KeepAliveTest(AsyncHTTPTestCase): + """Tests various scenarios for HTTP 1.1 keep-alive support. + + These tests don't use AsyncHTTPClient because we want to control + connection reuse and closing. + """ + def get_app(self): + test = self + + class HelloHandler(RequestHandler): + def get(self): + self.finish('Hello world') + + class LargeHandler(RequestHandler): + def get(self): + # 512KB should be bigger than the socket buffers so it will + # be written out in chunks. + self.write(''.join(chr(i % 256) * 1024 for i in xrange(512))) + + class FinishOnCloseHandler(RequestHandler): + @asynchronous + def get(self): + self.flush() + + def on_connection_close(self): + # This is not very realistic, but finishing the request + # from the close callback has the right timing to mimic + # some errors seen in the wild. + self.finish('closed') + + return Application([('/', HelloHandler), + ('/large', LargeHandler), + ('/finish_on_close', FinishOnCloseHandler)]) + + def setUp(self): + super(KeepAliveTest, self).setUp() + self.http_version = b('HTTP/1.1') + + def tearDown(self): + # We just closed the client side of the socket; let the IOLoop run + # once to make sure the server side got the message. + self.io_loop.add_timeout(datetime.timedelta(seconds=0.001), self.stop) + self.wait() + + if hasattr(self, 'stream'): + self.stream.close() + super(KeepAliveTest, self).tearDown() + + # The next few methods are a crude manual http client + def connect(self): + self.stream = IOStream(socket.socket(), io_loop=self.io_loop) + self.stream.connect(('localhost', self.get_http_port()), self.stop) + self.wait() + + def read_headers(self): + self.stream.read_until(b('\r\n'), self.stop) + first_line = self.wait() + self.assertTrue(first_line.startswith(self.http_version + b(' 200')), first_line) + self.stream.read_until(b('\r\n\r\n'), self.stop) + header_bytes = self.wait() + headers = HTTPHeaders.parse(header_bytes.decode('latin1')) + return headers + + def read_response(self): + headers = self.read_headers() + self.stream.read_bytes(int(headers['Content-Length']), self.stop) + body = self.wait() + self.assertEqual(b('Hello world'), body) + + def close(self): + self.stream.close() + del self.stream + + def test_two_requests(self): + self.connect() + self.stream.write(b('GET / HTTP/1.1\r\n\r\n')) + self.read_response() + self.stream.write(b('GET / HTTP/1.1\r\n\r\n')) + self.read_response() + self.close() + + def test_request_close(self): + self.connect() + self.stream.write(b('GET / HTTP/1.1\r\nConnection: close\r\n\r\n')) + self.read_response() + self.stream.read_until_close(callback=self.stop) + data = self.wait() + self.assertTrue(not data) + self.close() + + # keepalive is supported for http 1.0 too, but it's opt-in + def test_http10(self): + self.http_version = b('HTTP/1.0') + self.connect() + self.stream.write(b('GET / HTTP/1.0\r\n\r\n')) + self.read_response() + self.stream.read_until_close(callback=self.stop) + data = self.wait() + self.assertTrue(not data) + self.close() + + def test_http10_keepalive(self): + self.http_version = b('HTTP/1.0') + self.connect() + self.stream.write(b('GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n')) + self.read_response() + self.stream.write(b('GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n')) + self.read_response() + self.close() + + def test_pipelined_requests(self): + self.connect() + self.stream.write(b('GET / HTTP/1.1\r\n\r\nGET / HTTP/1.1\r\n\r\n')) + self.read_response() + self.read_response() + self.close() + + def test_pipelined_cancel(self): + self.connect() + self.stream.write(b('GET / HTTP/1.1\r\n\r\nGET / HTTP/1.1\r\n\r\n')) + # only read once + self.read_response() + self.close() + + def test_cancel_during_download(self): + self.connect() + self.stream.write(b('GET /large HTTP/1.1\r\n\r\n')) + self.read_headers() + self.stream.read_bytes(1024, self.stop) + self.wait() + self.close() + + def test_finish_while_closed(self): + self.connect() + self.stream.write(b('GET /finish_on_close HTTP/1.1\r\n\r\n')) + self.read_headers() + self.close() diff --git a/website/sphinx/releases/next.rst b/website/sphinx/releases/next.rst index 3e8b0f70..7a8880e2 100644 --- a/website/sphinx/releases/next.rst +++ b/website/sphinx/releases/next.rst @@ -131,3 +131,10 @@ In progress a time relative to `IOLoop.time`, not `time.time`. (`time.time` will continue to work only as long as the IOLoop's ``time_func`` argument is not used). +* `IOStream` now raises a new exception + `tornado.iostream.StreamClosedException` when you attempt to read or + write after the stream has been closed (by either side). +* `IOStream` now simply closes the connection when it gets an + ``ECONNRESET`` error, rather than logging it as an error. +* `HTTPServer` no longer logs an error when it is unable to read a second + request from an HTTP 1.1 keep-alive connection.