From 86ede622411d9cde140fa551b89436e23cb8ae78 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 17 Mar 2018 05:24:30 +0545 Subject: [PATCH] issue #150: introduce separate connection multiplexer process This is a work in progress. --- ansible_mitogen/connection.py | 5 ++- ansible_mitogen/process.py | 78 ++++++++++++++++++++++++++++------- ansible_mitogen/strategy.py | 2 +- mitogen/unix.py | 6 ++- 4 files changed, 73 insertions(+), 18 deletions(-) diff --git a/ansible_mitogen/connection.py b/ansible_mitogen/connection.py index f9cb89c0..bfcb13fc 100644 --- a/ansible_mitogen/connection.py +++ b/ansible_mitogen/connection.py @@ -40,6 +40,7 @@ import mitogen.unix from mitogen.utils import cast import ansible_mitogen.helpers +import ansible_mitogen.process from ansible_mitogen.services import ContextService @@ -79,7 +80,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): ansible_ssh_timeout = None def __init__(self, play_context, new_stdin, original_transport): - assert 'MITOGEN_LISTENER_PATH' in os.environ, ( + assert ansible_mitogen.process.MuxProcess.unix_listener_path, ( 'The "mitogen" connection plug-in may only be instantiated ' 'by the "mitogen" strategy plug-in.' ) @@ -215,7 +216,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): self.broker = mitogen.master.Broker() self.router, self.parent = mitogen.unix.connect( - path=os.environ['MITOGEN_LISTENER_PATH'], + path=ansible_mitogen.process.MuxProcess.unix_listener_path, broker=self.broker, ) diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index 2d6b2739..7cb08892 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -27,12 +27,15 @@ # POSSIBILITY OF SUCH DAMAGE. from __future__ import absolute_import -import threading import os +import socket +import sys +import threading import mitogen import mitogen.core import mitogen.master +import mitogen.parent import mitogen.service import mitogen.unix import mitogen.utils @@ -41,26 +44,71 @@ import ansible_mitogen.logging import ansible_mitogen.services -class State(object): +class MuxProcess(object): """ - Process-global state that should persist across playbook runs. + This implements a process forked from the Ansible top-level process as a + safe place to contain the Mitogen IO multiplexer thread, keeping its use of + the logging package (and the logging package's heavy use of locks) far away + from the clutches of os.fork(), which is used continuously in the top-level + process. + + The problem with running the multiplexer in that process is that should the + multiplexer thread be in the process of emitting a log entry (and holding + its lock) at the point of fork, in the child, the first attempt to log any + log entry using the same handler will deadlock the child, as in the memory + image the child received, the lock will always be marked held. """ - #: ProcessState singleton. + + #: In the top-level process, this references one end of a socketpair(), + #: which the MuxProcess blocks reading from in order to determine when + #: the master process deies. Once the read returns, the MuxProcess will + #: begin shutting itself down. + worker_sock = None + + #: In the worker process, this references the other end of the + #: :py:attr:`worker_sock`. + child_sock = None + + #: In the top-level process, this is the PID of the single MuxProcess + #: that was spawned. + worker_pid = None + + #: In both processes, this is the temporary UNIX socket used for + #: forked WorkerProcesses to contact the MuxProcess + unix_listener_path = None + + #: Singleton. _instance = None @classmethod - def instance(cls): - """ - Fetch the ProcessState singleton, constructing it as necessary. - """ - if cls._instance is None: - cls._instance = cls() - return cls._instance + def start(cls): + if cls.worker_sock is not None: + return - def __init__(self): + cls.unix_listener_path = mitogen.unix.make_socket_path() + cls.worker_sock, cls.child_sock = socket.socketpair() + cls.child_pid = os.fork() ansible_mitogen.logging.setup() + if cls.child_pid: + cls.child_sock.close() + cls.child_sock = None + cls.worker_sock.recv(1) + else: + cls.worker_sock.close() + cls.worker_sock = None + self = cls() + self.run() + sys.exit() + + def run(self): self._setup_master() self._setup_services() + self.child_sock.send('1') + try: + self.child_sock.recv(1) + except Exception, e: + print 'do e', e + pass def _setup_master(self): """ @@ -70,8 +118,10 @@ class State(object): self.router.responder.whitelist_prefix('ansible') self.router.responder.whitelist_prefix('ansible_mitogen') mitogen.core.listen(self.router.broker, 'shutdown', self.on_broker_shutdown) - self.listener = mitogen.unix.Listener(self.router) - os.environ['MITOGEN_LISTENER_PATH'] = self.listener.path + self.listener = mitogen.unix.Listener( + router=self.router, + path=self.unix_listener_path, + ) if 'MITOGEN_ROUTER_DEBUG' in os.environ: self.router.enable_debug() diff --git a/ansible_mitogen/strategy.py b/ansible_mitogen/strategy.py index 655c5fff..7cf6d3ba 100644 --- a/ansible_mitogen/strategy.py +++ b/ansible_mitogen/strategy.py @@ -179,7 +179,7 @@ class StrategyModule(ansible.plugins.strategy.linear.StrategyModule): Arrange for a mitogen.master.Router to be available for the duration of the strategy's real run() method. """ - self.state = ansible_mitogen.process.State.instance() + ansible_mitogen.process.MuxProcess.start() self._add_connection_plugin_path() self._install_wrappers() try: diff --git a/mitogen/unix.py b/mitogen/unix.py index c9136e60..d62a96a8 100644 --- a/mitogen/unix.py +++ b/mitogen/unix.py @@ -55,12 +55,16 @@ def is_path_dead(path): return False +def make_socket_path(): + return tempfile.mktemp(prefix='mitogen_unix_') + + class Listener(mitogen.core.BasicStream): keep_alive = True def __init__(self, router, path=None, backlog=30): self._router = router - self.path = path or tempfile.mktemp(prefix='mitogen_unix_') + self.path = path or make_socket_path() self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) if os.path.exists(self.path) and is_path_dead(self.path):