From 70c2032bb21fab36a6a4b4f06f0cea39354a64ce Mon Sep 17 00:00:00 2001 From: David Wilson Date: Thu, 11 Aug 2016 17:29:41 +0100 Subject: [PATCH] Another hackish graceful shutdown method. --- econtext/core.py | 13 ++++++++----- econtext/master.py | 5 ++++- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/econtext/core.py b/econtext/core.py index 6aca378e..9c2a4b3c 100644 --- a/econtext/core.py +++ b/econtext/core.py @@ -231,6 +231,7 @@ class Side(object): def close(self): if self.fd is not None: + IOLOG.debug('%r.close()', self) try: os.close(self.fd) except OSError, e: @@ -339,6 +340,8 @@ class Stream(BasicStream): IOLOG.debug('%r.Transmit()', self) written = os.write(self.write_side.fd, self._output_buf[:4096]) self._output_buf = self._output_buf[written:] + if (not self._output_buf) and not self._context.broker.graceful_count: + self.Disconnect() def WriteMore(self): return bool(self._output_buf) @@ -368,9 +371,7 @@ class Stream(BasicStream): fn(_DEAD) def Shutdown(self): - # This works for entirely the wrong reason. Depends on partial - # _output_buf always being maintained by accident. - LOG.debug('%r.Shutdown()', self) + """Override BasicStream behaviour of immediately disconnecting.""" def Accept(self, rfd, wfd): self.read_side = Side(self, os.dup(rfd)) @@ -416,8 +417,8 @@ class Context(object): """Slave does nothing, _BrokerMain() will .Shutdown its streams.""" def _Shutdown(self, data): - LOG.debug('Received SHUTDOWN') if data != _DEAD and self.stream: + LOG.debug('Received SHUTDOWN') self.broker.Shutdown() def Disconnect(self): @@ -554,6 +555,7 @@ class IoLogger(BasicStream): LOG.debug('%r.Receive()', self) buf = os.read(self.read_side.fd, 4096) if not buf: + LOG.debug('%r decrement graceful_count', self) self._broker.graceful_count -= 1 return self.Disconnect() @@ -644,7 +646,8 @@ class Broker(object): self._CallAndUpdate(side.stream, side.stream.Shutdown) deadline = time.time() + self.graceful_timeout - while self.graceful_count and time.time() < deadline: + while ((self._readers or self._writers) and + (self.graceful_count or time.time() < deadline)): self._LoopOnce(1.0) for context in self._contexts.itervalues(): diff --git a/econtext/master.py b/econtext/master.py index 0486e9a9..a510f1e0 100644 --- a/econtext/master.py +++ b/econtext/master.py @@ -139,7 +139,7 @@ class LocalStream(econtext.core.Stream): def Shutdown(self): """Requesting the slave gracefully shut itself down.""" - LOG.debug('%r enqueing SHUTDOWN') + LOG.debug('%r enqueuing SHUTDOWN', self) self.Enqueue(econtext.core.SHUTDOWN, None) def _FindGlobal(self, module_name, class_name): @@ -228,6 +228,9 @@ class SSHStream(LocalStream): class Broker(econtext.core.Broker): + #: Always allow time for slaves to drain. + graceful_count = 1 + def CreateListener(self, address=None, backlog=30): """Listen on `address `for connections from newly spawned contexts.""" self._listener = Listener(self, address, backlog)