diff --git a/mitogen/parent.py b/mitogen/parent.py index 8d9fbd2a..4731c711 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -505,6 +505,48 @@ def write_all(fd, s, deadline=None): poller.close() +class IteratingRead(object): + def __init__(self, fds, deadline=None): + self.deadline = deadline + self.poller = PREFERRED_POLLER() + for fd in fds: + self.poller.start_receive(fd) + + self.bits = [] + self.timeout = None + + def close(self): + self.poller.close() + + def __iter__(self): + return self + + def next(self): + while self.poller.readers: + if self.deadline is not None: + timeout = max(0, self.deadline - time.time()) + if timeout == 0: + break + + for fd in self.poller.poll(timeout): + s, disconnected = mitogen.core.io_op(os.read, fd, 4096) + if disconnected or not s: + IOLOG.debug('iter_read(%r) -> disconnected', fd) + self.poller.stop_receive(fd) + else: + IOLOG.debug('iter_read(%r) -> %r', fd, s) + self.bits.append(s) + return s + + if not poller.readers: + raise EofError(u'EOF on stream; last 300 bytes received: %r' % + (b('').join(self.bits)[-300:].decode('latin1'),)) + + raise mitogen.core.TimeoutError('read timed out') + + __next__ = next + + def iter_read(fds, deadline=None): """Return a generator that arranges for up to 4096-byte chunks to be read at a time from the file descriptor `fd` until the generator is destroyed. @@ -522,36 +564,7 @@ def iter_read(fds, deadline=None): :raises mitogen.core.StreamError: Attempt to read past end of file. """ - poller = PREFERRED_POLLER() - for fd in fds: - poller.start_receive(fd) - - bits = [] - timeout = None - try: - while poller.readers: - if deadline is not None: - timeout = max(0, deadline - time.time()) - if timeout == 0: - break - - for fd in poller.poll(timeout): - s, disconnected = mitogen.core.io_op(os.read, fd, 4096) - if disconnected or not s: - IOLOG.debug('iter_read(%r) -> disconnected', fd) - poller.stop_receive(fd) - else: - IOLOG.debug('iter_read(%r) -> %r', fd, s) - bits.append(s) - yield s - finally: - poller.close() - - if not poller.readers: - raise EofError(u'EOF on stream; last 300 bytes received: %r' % - (b('').join(bits)[-300:].decode('latin1'),)) - - raise mitogen.core.TimeoutError('read timed out') + return IteratingRead(fds=fds, deadline=deadline) def discard_until(fd, s, deadline):