diff --git a/econtext/core.py b/econtext/core.py index 8b169dd8..6aca378e 100644 --- a/econtext/core.py +++ b/econtext/core.py @@ -339,8 +339,6 @@ class Stream(BasicStream): IOLOG.debug('%r.Transmit()', self) written = os.write(self.write_side.fd, self._output_buf[:4096]) self._output_buf = self._output_buf[written:] - if (not self._alive) and not self._output_buf: - self.Disconnect() def WriteMore(self): return bool(self._output_buf) @@ -369,13 +367,10 @@ class Stream(BasicStream): LOG.debug('%r.Disconnect(): killing %r: %r', self, handle, fn) fn(_DEAD) - _alive = True - def Shutdown(self): # This works for entirely the wrong reason. Depends on partial # _output_buf always being maintained by accident. LOG.debug('%r.Shutdown()', self) - self._alive = False def Accept(self, rfd, wfd): self.read_side = Side(self, os.dup(rfd)) @@ -539,6 +534,7 @@ class IoLogger(BasicStream): self.read_side = Side(self, self._rsock.fileno()) self.write_side = Side(self, dest_fd) + broker.graceful_count += 1 self._broker.UpdateStream(self) def __repr__(self): @@ -558,6 +554,7 @@ class IoLogger(BasicStream): LOG.debug('%r.Receive()', self) buf = os.read(self.read_side.fd, 4096) if not buf: + self._broker.graceful_count -= 1 return self.Disconnect() self._buf += buf @@ -570,6 +567,8 @@ class Broker(object): stream that is associated with them, and for I/O multiplexing. """ _waker = None + graceful_count = 0 + graceful_timeout = 3.0 def __init__(self): self._alive = True @@ -644,12 +643,18 @@ class Broker(object): for side in self._readers | self._writers: self._CallAndUpdate(side.stream, side.stream.Shutdown) - deadline = time.time() + 1.0 - while (self._readers or self._writers) and time.time() < deadline: + deadline = time.time() + self.graceful_timeout + while self.graceful_count and time.time() < deadline: self._LoopOnce(1.0) + for context in self._contexts.itervalues(): + stream = context.stream + if stream: + stream.Disconnect() + self._UpdateStream(stream) + for side in self._readers | self._writers: - LOG.error('_BrokerMain() force disconnecting %r', side.stream) + LOG.error('_BrokerMain() force disconnecting %r', side) side.stream.Disconnect() except Exception: LOG.exception('_BrokerMain() crashed')