Another hackish graceful shutdown method.
This commit is contained in:
parent
4e5add97e5
commit
70c2032bb2
|
@ -231,6 +231,7 @@ class Side(object):
|
|||
|
||||
def close(self):
|
||||
if self.fd is not None:
|
||||
IOLOG.debug('%r.close()', self)
|
||||
try:
|
||||
os.close(self.fd)
|
||||
except OSError, e:
|
||||
|
@ -339,6 +340,8 @@ class Stream(BasicStream):
|
|||
IOLOG.debug('%r.Transmit()', self)
|
||||
written = os.write(self.write_side.fd, self._output_buf[:4096])
|
||||
self._output_buf = self._output_buf[written:]
|
||||
if (not self._output_buf) and not self._context.broker.graceful_count:
|
||||
self.Disconnect()
|
||||
|
||||
def WriteMore(self):
|
||||
return bool(self._output_buf)
|
||||
|
@ -368,9 +371,7 @@ class Stream(BasicStream):
|
|||
fn(_DEAD)
|
||||
|
||||
def Shutdown(self):
|
||||
# This works for entirely the wrong reason. Depends on partial
|
||||
# _output_buf always being maintained by accident.
|
||||
LOG.debug('%r.Shutdown()', self)
|
||||
"""Override BasicStream behaviour of immediately disconnecting."""
|
||||
|
||||
def Accept(self, rfd, wfd):
|
||||
self.read_side = Side(self, os.dup(rfd))
|
||||
|
@ -416,8 +417,8 @@ class Context(object):
|
|||
"""Slave does nothing, _BrokerMain() will .Shutdown its streams."""
|
||||
|
||||
def _Shutdown(self, data):
|
||||
LOG.debug('Received SHUTDOWN')
|
||||
if data != _DEAD and self.stream:
|
||||
LOG.debug('Received SHUTDOWN')
|
||||
self.broker.Shutdown()
|
||||
|
||||
def Disconnect(self):
|
||||
|
@ -554,6 +555,7 @@ class IoLogger(BasicStream):
|
|||
LOG.debug('%r.Receive()', self)
|
||||
buf = os.read(self.read_side.fd, 4096)
|
||||
if not buf:
|
||||
LOG.debug('%r decrement graceful_count', self)
|
||||
self._broker.graceful_count -= 1
|
||||
return self.Disconnect()
|
||||
|
||||
|
@ -644,7 +646,8 @@ class Broker(object):
|
|||
self._CallAndUpdate(side.stream, side.stream.Shutdown)
|
||||
|
||||
deadline = time.time() + self.graceful_timeout
|
||||
while self.graceful_count and time.time() < deadline:
|
||||
while ((self._readers or self._writers) and
|
||||
(self.graceful_count or time.time() < deadline)):
|
||||
self._LoopOnce(1.0)
|
||||
|
||||
for context in self._contexts.itervalues():
|
||||
|
|
|
@ -139,7 +139,7 @@ class LocalStream(econtext.core.Stream):
|
|||
|
||||
def Shutdown(self):
|
||||
"""Requesting the slave gracefully shut itself down."""
|
||||
LOG.debug('%r enqueing SHUTDOWN')
|
||||
LOG.debug('%r enqueuing SHUTDOWN', self)
|
||||
self.Enqueue(econtext.core.SHUTDOWN, None)
|
||||
|
||||
def _FindGlobal(self, module_name, class_name):
|
||||
|
@ -228,6 +228,9 @@ class SSHStream(LocalStream):
|
|||
|
||||
|
||||
class Broker(econtext.core.Broker):
|
||||
#: Always allow time for slaves to drain.
|
||||
graceful_count = 1
|
||||
|
||||
def CreateListener(self, address=None, backlog=30):
|
||||
"""Listen on `address `for connections from newly spawned contexts."""
|
||||
self._listener = Listener(self, address, backlog)
|
||||
|
|
Loading…
Reference in New Issue