diff --git a/tornado/iostream.py b/tornado/iostream.py index a0d8399a..51410c49 100644 --- a/tornado/iostream.py +++ b/tornado/iostream.py @@ -137,15 +137,15 @@ class IOStream(object): def read_until_regex(self, regex, callback): """Call callback when we read the given regex pattern.""" - assert not self._read_callback, "Already reading" + self._set_read_callback(callback) self._read_regex = re.compile(regex) - self._read_until(callback) + self._try_inline_read() def read_until(self, delimiter, callback): """Call callback when we read the given delimiter.""" - assert not self._read_callback, "Already reading" + self._set_read_callback(callback) self._read_delimiter = delimiter - self._read_until(callback) + self._try_inline_read() def read_bytes(self, num_bytes, callback, streaming_callback=None): """Call callback when we read the given number of bytes. @@ -154,11 +154,11 @@ class IOStream(object): of data as they become available, and the argument to the final ``callback`` will be empty. """ - assert not self._read_callback, "Already reading" + self._set_read_callback(callback) assert isinstance(num_bytes, (int, long)) self._read_bytes = num_bytes self._streaming_callback = stack_context.wrap(streaming_callback) - self._read_until(callback) + self._try_inline_read() def read_until_close(self, callback, streaming_callback=None): """Reads all data from the socket until it is closed. @@ -170,12 +170,12 @@ class IOStream(object): Subject to ``max_buffer_size`` limit from `IOStream` constructor if a ``streaming_callback`` is not used. """ - assert not self._read_callback, "Already reading" + self._set_read_callback(callback) if self.closed(): self._run_callback(callback, self._consume(self._read_buffer_size)) + self._read_callback = None return self._read_until_close = True - self._read_callback = stack_context.wrap(callback) self._streaming_callback = stack_context.wrap(streaming_callback) self._add_io_state(self.io_loop.READ) @@ -327,11 +327,17 @@ class IOStream(object): if self._read_from_buffer(): return - def _read_until(self, callback): - """Assign given read callback and initiate read to buffer - unless stream has already been read or closed. + def _set_read_callback(self, callback): + assert not self._read_callback, "Already reading" + self._read_callback = callback + + def _try_inline_read(self): + """Attempt to complete the current read operation from buffered data. + + If the read can be completed without blocking, schedules the + read callback on the next IOLoop iteration; otherwise starts + listening for reads on the socket. """ - self._read_callback = stack_context.wrap(callback) while True: # See if we've already got the data from a previous read if self._read_from_buffer():