diff --git a/docs/api.rst b/docs/api.rst index 45241afe..35ed547c 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -740,6 +740,18 @@ Context Class masters, and child contexts who later become parents. Currently when this class is required, the target context's router is upgraded at runtime. + .. method:: shutdown (wait=False) + + Arrange for the context to receive a ``SHUTDOWN`` message, triggering + graceful shutdown. + + Due to a lack of support for timers, no attempt is made yet to force + terminate a hung context using this method. This will be fixed shortly. + + :param bool wait: + If :py:data:`True`, block the calling thread until the context has + completely terminated. + .. method:: call_async (fn, \*args, \*\*kwargs) Arrange for the context's ``CALL_FUNCTION`` handle to receive a diff --git a/mitogen/parent.py b/mitogen/parent.py index 08081031..801c1282 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -308,6 +308,9 @@ class Stream(mitogen.core.Stream): #: True to cause context to write /tmp/mitogen.stats...log. profiling = False + #: Set to the child's PID by connect(). + pid = None + def __init__(self, *args, **kwargs): super(Stream, self).__init__(*args, **kwargs) self.sent_modules = set(['mitogen', 'mitogen.core']) @@ -351,6 +354,16 @@ class Stream(mitogen.core.Stream): ) ) + def on_disconnect(self, broker): + pid, status = os.waitpid(self.pid, os.WNOHANG) + if pid: + LOG.debug('%r: child process exit status was %d', self, status) + else: + LOG.debug('%r: child process still alive, sending SIGTERM', self) + os.kill(self.pid, signal.SIGTERM) + pid, status = os.waitpid(self.pid, 0) + super(Stream, self).on_disconnect(broker) + # Minimised, gzipped, base64'd and passed to 'python -c'. It forks, dups # file descriptor 0 as 100, creates a pipe, then execs a new interpreter # with a custom argv. @@ -425,8 +438,8 @@ class Stream(mitogen.core.Stream): def connect(self): LOG.debug('%r.connect()', self) - pid, fd = self.create_child(*self.get_boot_command()) - self.name = 'local.%s' % (pid,) + self.pid, fd = self.create_child(*self.get_boot_command()) + self.name = 'local.%s' % (self.pid,) self.receive_side = mitogen.core.Side(self, fd) self.transmit_side = mitogen.core.Side(self, os.dup(fd)) LOG.debug('%r.connect(): child process stdin/stdout=%r', @@ -492,6 +505,20 @@ class Context(mitogen.core.Context): receiver = self.call_async(fn, *args, **kwargs) return receiver.get().unpickle(throw_dead=False) + def shutdown(self, wait=False): + LOG.debug('%r.shutdown() sending SHUTDOWN', self) + latch = mitogen.core.Latch() + mitogen.core.listen(self, 'disconnect', lambda: latch.put(None)) + + self.send( + mitogen.core.Message( + handle=mitogen.core.SHUTDOWN, + ) + ) + + if wait: + latch.get() + class RouteMonitor(object): def __init__(self, router, parent=None): diff --git a/tests/parent_test.py b/tests/parent_test.py index 1a2d0a19..169d237b 100644 --- a/tests/parent_test.py +++ b/tests/parent_test.py @@ -1,3 +1,4 @@ +import os import subprocess import time @@ -7,6 +8,14 @@ import mitogen.parent import testlib +class ContextTest(testlib.RouterMixin, unittest2.TestCase): + def test_context_shutdown(self): + local = self.router.local() + pid = local.call(os.getpid) + local.shutdown(wait=True) + self.assertRaises(OSError, lambda: os.kill(pid, 0)) + + class IterReadTest(unittest2.TestCase): func = staticmethod(mitogen.parent.iter_read)