From 48351a18898fc1827abf14b534842b0f4b39bfe4 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 24 Mar 2018 19:13:36 +0545 Subject: [PATCH] issue #155: parent: support Context.shutdown(), reap children on exit. This permits graceful shutdown of individual contexts, without tearing down everything. Update mitogen.parent.Stream to also wait for the child to exit, to prevent the buildup of zombie processes. This introduces a blocking wait for process exit on the Broker thread, let's see if we can get away with it. Chances are reasonable that it'll cause needless hangs on heavily loaded machines. --- docs/api.rst | 12 ++++++++++++ mitogen/parent.py | 31 +++++++++++++++++++++++++++++-- tests/parent_test.py | 9 +++++++++ 3 files changed, 50 insertions(+), 2 deletions(-) diff --git a/docs/api.rst b/docs/api.rst index 45241afe..35ed547c 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -740,6 +740,18 @@ Context Class masters, and child contexts who later become parents. Currently when this class is required, the target context's router is upgraded at runtime. + .. method:: shutdown (wait=False) + + Arrange for the context to receive a ``SHUTDOWN`` message, triggering + graceful shutdown. + + Due to a lack of support for timers, no attempt is made yet to force + terminate a hung context using this method. This will be fixed shortly. + + :param bool wait: + If :py:data:`True`, block the calling thread until the context has + completely terminated. + .. method:: call_async (fn, \*args, \*\*kwargs) Arrange for the context's ``CALL_FUNCTION`` handle to receive a diff --git a/mitogen/parent.py b/mitogen/parent.py index 08081031..801c1282 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -308,6 +308,9 @@ class Stream(mitogen.core.Stream): #: True to cause context to write /tmp/mitogen.stats...log. profiling = False + #: Set to the child's PID by connect(). + pid = None + def __init__(self, *args, **kwargs): super(Stream, self).__init__(*args, **kwargs) self.sent_modules = set(['mitogen', 'mitogen.core']) @@ -351,6 +354,16 @@ class Stream(mitogen.core.Stream): ) ) + def on_disconnect(self, broker): + pid, status = os.waitpid(self.pid, os.WNOHANG) + if pid: + LOG.debug('%r: child process exit status was %d', self, status) + else: + LOG.debug('%r: child process still alive, sending SIGTERM', self) + os.kill(self.pid, signal.SIGTERM) + pid, status = os.waitpid(self.pid, 0) + super(Stream, self).on_disconnect(broker) + # Minimised, gzipped, base64'd and passed to 'python -c'. It forks, dups # file descriptor 0 as 100, creates a pipe, then execs a new interpreter # with a custom argv. @@ -425,8 +438,8 @@ class Stream(mitogen.core.Stream): def connect(self): LOG.debug('%r.connect()', self) - pid, fd = self.create_child(*self.get_boot_command()) - self.name = 'local.%s' % (pid,) + self.pid, fd = self.create_child(*self.get_boot_command()) + self.name = 'local.%s' % (self.pid,) self.receive_side = mitogen.core.Side(self, fd) self.transmit_side = mitogen.core.Side(self, os.dup(fd)) LOG.debug('%r.connect(): child process stdin/stdout=%r', @@ -492,6 +505,20 @@ class Context(mitogen.core.Context): receiver = self.call_async(fn, *args, **kwargs) return receiver.get().unpickle(throw_dead=False) + def shutdown(self, wait=False): + LOG.debug('%r.shutdown() sending SHUTDOWN', self) + latch = mitogen.core.Latch() + mitogen.core.listen(self, 'disconnect', lambda: latch.put(None)) + + self.send( + mitogen.core.Message( + handle=mitogen.core.SHUTDOWN, + ) + ) + + if wait: + latch.get() + class RouteMonitor(object): def __init__(self, router, parent=None): diff --git a/tests/parent_test.py b/tests/parent_test.py index 1a2d0a19..169d237b 100644 --- a/tests/parent_test.py +++ b/tests/parent_test.py @@ -1,3 +1,4 @@ +import os import subprocess import time @@ -7,6 +8,14 @@ import mitogen.parent import testlib +class ContextTest(testlib.RouterMixin, unittest2.TestCase): + def test_context_shutdown(self): + local = self.router.local() + pid = local.call(os.getpid) + local.shutdown(wait=True) + self.assertRaises(OSError, lambda: os.kill(pid, 0)) + + class IterReadTest(unittest2.TestCase): func = staticmethod(mitogen.parent.iter_read)