diff --git a/tornado/iostream.py b/tornado/iostream.py index d435c1cb..4e8ce050 100644 --- a/tornado/iostream.py +++ b/tornado/iostream.py @@ -217,12 +217,16 @@ class IOStream(object): self._state = None self.socket.close() self.socket = None - if self._close_callback and self._pending_callbacks == 0: - # if there are pending callbacks, don't run the close callback - # until they're done (see _maybe_add_error_handler) - cb = self._close_callback - self._close_callback = None - self._run_callback(cb) + self._maybe_run_close_callback() + + def _maybe_run_close_callback(self): + if (self.socket is None and self._close_callback and + self._pending_callbacks == 0): + # if there are pending callbacks, don't run the close callback + # until they're done (see _maybe_add_error_handler) + cb = self._close_callback + self._close_callback = None + self._run_callback(cb) def reading(self): """Returns true if we are currently reading from the stream.""" @@ -310,22 +314,38 @@ class IOStream(object): self.io_loop.add_callback(wrapper) def _handle_read(self): - while True: + try: try: - # Read from the socket until we get EWOULDBLOCK or equivalent. - # SSL sockets do some internal buffering, and if the data is - # sitting in the SSL object's buffer select() and friends - # can't see it; the only way to find out if it's there is to - # try to read it. - result = self._read_to_buffer() - except Exception: - self.close() - return - if result == 0: - break - else: - if self._read_from_buffer(): - return + # Pretend to have a pending callback so that an EOF in + # _read_to_buffer doesn't trigger an immediate close + # callback. At the end of this method we'll either + # estabilsh a real pending callback via + # _read_from_buffer or run the close callback. + # + # We need two try statements here so that + # pending_callbacks is decremented before the `except` + # clause below (which calls `close` and does need to + # trigger the callback) + self._pending_callbacks += 1 + while True: + # Read from the socket until we get EWOULDBLOCK or equivalent. + # SSL sockets do some internal buffering, and if the data is + # sitting in the SSL object's buffer select() and friends + # can't see it; the only way to find out if it's there is to + # try to read it. + if self._read_to_buffer() == 0: + break + finally: + self._pending_callbacks -= 1 + except Exception: + logging.warning("error on read", exc_info=True) + self.close() + return + if self._read_from_buffer(): + return + else: + self._maybe_run_close_callback() + def _set_read_callback(self, callback): assert not self._read_callback, "Already reading" @@ -338,13 +358,16 @@ class IOStream(object): read callback on the next IOLoop iteration; otherwise starts listening for reads on the socket. """ + # See if we've already got the data from a previous read + if self._read_from_buffer(): + return + self._check_closed() while True: - # See if we've already got the data from a previous read - if self._read_from_buffer(): - return - self._check_closed() if self._read_to_buffer() == 0: break + self._check_closed() + if self._read_from_buffer(): + return self._add_io_state(self.io_loop.READ) def _read_from_socket(self): @@ -520,10 +543,7 @@ class IOStream(object): def _maybe_add_error_listener(self): if self._state is None and self._pending_callbacks == 0: if self.socket is None: - cb = self._close_callback - if cb is not None: - self._close_callback = None - self._run_callback(cb) + self._maybe_run_close_callback() else: self._add_io_state(ioloop.IOLoop.READ) diff --git a/tornado/test/iostream_test.py b/tornado/test/iostream_test.py index 01f9a984..ac8c207a 100644 --- a/tornado/test/iostream_test.py +++ b/tornado/test/iostream_test.py @@ -216,3 +216,20 @@ class TestIOStream(AsyncHTTPTestCase, LogTrapTestCase): finally: server.close() client.close() + + def test_large_read_until(self): + # Performance test: read_until used to have a quadratic component + # so a read_until of 4MB would take 8 seconds; now it takes 0.25 + # seconds. + server, client = self.make_iostream_pair() + try: + NUM_KB = 4096 + for i in xrange(NUM_KB): + client.write(b("A") * 1024) + client.write(b("\r\n")) + server.read_until(b("\r\n"), self.stop) + data = self.wait() + self.assertEqual(len(data), NUM_KB * 1024 + 2) + finally: + server.close() + client.close()