From 9035884c77617ce0fd59519198adb2f786f80ecf Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 28 Jul 2018 18:57:30 -0700 Subject: [PATCH] ansible: abstract worker process model. Move all details of broker/router setup out of connection.py, instead deferring it to a WorkerModel class exported by process.py via get_worker_model(). The running strategy can override the configured worker model via _get_worker_model(). ClassicWorkerModel is installed by default, which implements the extension's existing process model. Add optional support for the third party setproctitle module, so children have pretty names in ps output. Add optional support for per-CPU multiplexers to classic runs. --- ansible_mitogen/connection.py | 102 ++-- ansible_mitogen/planner.py | 13 +- .../plugins/connection/mitogen_local.py | 2 +- ansible_mitogen/process.py | 575 +++++++++++++----- ansible_mitogen/services.py | 6 +- ansible_mitogen/strategy.py | 139 +++-- mitogen/core.py | 6 +- mitogen/parent.py | 17 + mitogen/service.py | 35 +- tests/ansible/run_ansible_playbook.py | 6 +- 10 files changed, 621 insertions(+), 280 deletions(-) diff --git a/ansible_mitogen/connection.py b/ansible_mitogen/connection.py index 42fa2ef8..4d310d75 100644 --- a/ansible_mitogen/connection.py +++ b/ansible_mitogen/connection.py @@ -38,6 +38,7 @@ import sys import time import jinja2.runtime +from ansible.module_utils import six import ansible.constants as C import ansible.errors import ansible.plugins.connection @@ -459,15 +460,10 @@ class CallChain(mitogen.parent.CallChain): class Connection(ansible.plugins.connection.ConnectionBase): - #: mitogen.master.Broker for this worker. - broker = None - - #: mitogen.master.Router for this worker. - router = None - - #: mitogen.parent.Context representing the parent Context, which is - #: presently always the connection multiplexer process. - parent = None + #: The :class:`ansible_mitogen.process.Binding` representing the connection + #: multiplexer this connection's target is assigned to. :data:`None` when + #: disconnected. + binding = None #: mitogen.parent.Context for the target account on the target, possibly #: reached via become. @@ -518,13 +514,6 @@ class Connection(ansible.plugins.connection.ConnectionBase): #: matching vanilla Ansible behaviour. loader_basedir = None - def __init__(self, play_context, new_stdin, **kwargs): - assert ansible_mitogen.process.MuxProcess.unix_listener_path, ( - 'Mitogen connection types may only be instantiated ' - 'while the "mitogen" strategy is active.' - ) - super(Connection, self).__init__(play_context, new_stdin) - def __del__(self): """ Ansible cannot be trusted to always call close() e.g. the synchronize @@ -585,6 +574,15 @@ class Connection(ansible.plugins.connection.ConnectionBase): self._connect() return self.init_child_result['home_dir'] + def get_binding(self): + """ + Return the :class:`ansible_mitogen.process.Binding` representing the + process that hosts the physical connection and services (context + establishment, file transfer, ..) for our desired target. + """ + assert self.binding is not None + return self.binding + @property def connected(self): return self.context is not None @@ -672,18 +670,6 @@ class Connection(ansible.plugins.connection.ConnectionBase): return stack - def _connect_broker(self): - """ - Establish a reference to the Broker, Router and parent context used for - connections. - """ - if not self.broker: - self.broker = mitogen.master.Broker() - self.router, self.parent = mitogen.unix.connect( - path=ansible_mitogen.process.MuxProcess.unix_listener_path, - broker=self.broker, - ) - def _build_stack(self): """ Construct a list of dictionaries representing the connection @@ -691,14 +677,14 @@ class Connection(ansible.plugins.connection.ConnectionBase): additionally used by the integration tests "mitogen_get_stack" action to fetch the would-be connection configuration. """ - return self._stack_from_spec( - ansible_mitogen.transport_config.PlayContextSpec( - connection=self, - play_context=self._play_context, - transport=self.transport, - inventory_name=self.inventory_hostname, - ) + spec = ansible_mitogen.transport_config.PlayContextSpec( + connection=self, + play_context=self._play_context, + transport=self.transport, + inventory_name=self.inventory_hostname, ) + stack = self._stack_from_spec(spec) + return spec.inventory_name(), stack def _connect_stack(self, stack): """ @@ -711,7 +697,8 @@ class Connection(ansible.plugins.connection.ConnectionBase): description of the returned dictionary. """ try: - dct = self.parent.call_service( + dct = mitogen.service.call( + call_context=self.binding.get_service_context(), service_name='ansible_mitogen.services.ContextService', method_name='get', stack=mitogen.utils.cast(list(stack)), @@ -758,8 +745,9 @@ class Connection(ansible.plugins.connection.ConnectionBase): if self.connected: return - self._connect_broker() - stack = self._build_stack() + inventory_name, stack = self._build_stack() + worker_model = ansible_mitogen.process.get_worker_model() + self.binding = worker_model.get_binding(inventory_name) self._connect_stack(stack) def _mitogen_reset(self, mode): @@ -776,9 +764,10 @@ class Connection(ansible.plugins.connection.ConnectionBase): return self.chain.reset() - self.parent.call_service( + mitogen.service.call( + call_context=self.binding.get_service_context(), service_name='ansible_mitogen.services.ContextService', - method_name=mode, + method_name='put', context=self.context ) @@ -787,27 +776,6 @@ class Connection(ansible.plugins.connection.ConnectionBase): self.init_child_result = None self.chain = None - def _shutdown_broker(self): - """ - Shutdown the broker thread during :meth:`close` or :meth:`reset`. - """ - if self.broker: - self.broker.shutdown() - self.broker.join() - self.broker = None - self.router = None - - # #420: Ansible executes "meta" actions in the top-level process, - # meaning "reset_connection" will cause :class:`mitogen.core.Latch` - # FDs to be cached and erroneously shared by children on subsequent - # WorkerProcess forks. To handle that, call on_fork() to ensure any - # shared state is discarded. - # #490: only attempt to clean up when it's known that some - # resources exist to cleanup, otherwise later __del__ double-call - # to close() due to GC at random moment may obliterate an unrelated - # Connection's resources. - mitogen.fork.on_fork() - def close(self): """ Arrange for the mitogen.master.Router running in the worker to @@ -815,7 +783,9 @@ class Connection(ansible.plugins.connection.ConnectionBase): multiple times. """ self._mitogen_reset(mode='put') - self._shutdown_broker() + if self.binding: + self.binding.close() + self.binding = None def _reset_find_task_vars(self): """ @@ -853,7 +823,8 @@ class Connection(ansible.plugins.connection.ConnectionBase): self._connect() self._mitogen_reset(mode='reset') - self._shutdown_broker() + self.binding.close() + self.binding = None # Compatibility with Ansible 2.4 wait_for_connection plug-in. _reset = reset @@ -1024,7 +995,8 @@ class Connection(ansible.plugins.connection.ConnectionBase): utimes=(st.st_atime, st.st_mtime)) self._connect() - self.parent.call_service( + mitogen.service.call( + call_context=self.binding.get_service_context(), service_name='mitogen.service.FileService', method_name='register', path=mitogen.utils.cast(in_path) @@ -1036,7 +1008,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): # file alive, but that requires more work. self.get_chain().call( ansible_mitogen.target.transfer_file, - context=self.parent, + context=self.binding.get_child_service_context(), in_path=in_path, out_path=out_path ) diff --git a/ansible_mitogen/planner.py b/ansible_mitogen/planner.py index 2eebd36d..96b06995 100644 --- a/ansible_mitogen/planner.py +++ b/ansible_mitogen/planner.py @@ -148,6 +148,8 @@ class Planner(object): # named by `runner_name`. } """ + binding = self._inv.connection.get_binding() + new = dict((mitogen.core.UnicodeType(k), kwargs[k]) for k in kwargs) new.setdefault('good_temp_dir', @@ -155,7 +157,7 @@ class Planner(object): new.setdefault('cwd', self._inv.connection.get_default_cwd()) new.setdefault('extra_env', self._inv.connection.get_default_env()) new.setdefault('emulate_tty', True) - new.setdefault('service_context', self._inv.connection.parent) + new.setdefault('service_context', binding.get_child_service_context()) return new def __repr__(self): @@ -328,7 +330,9 @@ class NewStylePlanner(ScriptPlanner): def get_module_map(self): if self._module_map is None: - self._module_map = self._inv.connection.parent.call_service( + binding = self._inv.connection.get_binding() + self._module_map = mitogen.service.call( + call_context=binding.get_service_context(), service_name='ansible_mitogen.services.ModuleDepService', method_name='scan', @@ -405,9 +409,12 @@ def get_module_data(name): def _propagate_deps(invocation, planner, context): - invocation.connection.parent.call_service( + binding = invocation.connection.get_binding() + mitogen.service.call( + call_context=binding.get_service_context(), service_name='mitogen.service.PushFileService', method_name='propagate_paths_and_modules', + context=context, paths=planner.get_push_files(), modules=planner.get_module_deps(), diff --git a/ansible_mitogen/plugins/connection/mitogen_local.py b/ansible_mitogen/plugins/connection/mitogen_local.py index 24b84a03..a98c834c 100644 --- a/ansible_mitogen/plugins/connection/mitogen_local.py +++ b/ansible_mitogen/plugins/connection/mitogen_local.py @@ -81,6 +81,6 @@ class Connection(ansible_mitogen.connection.Connection): from WorkerProcess, we must emulate that. """ return dict_diff( - old=ansible_mitogen.process.MuxProcess.original_env, + old=ansible_mitogen.process.MuxProcess.cls_original_env, new=os.environ, ) diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index a8827cb1..0eaf25a7 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -30,6 +30,7 @@ from __future__ import absolute_import import atexit import errno import logging +import multiprocessing import os import signal import socket @@ -41,9 +42,15 @@ try: except ImportError: faulthandler = None +try: + import setproctitle +except ImportError: + setproctitle = None + import mitogen import mitogen.core import mitogen.debug +import mitogen.fork import mitogen.master import mitogen.parent import mitogen.service @@ -52,6 +59,7 @@ import mitogen.utils import ansible import ansible.constants as C +import ansible.errors import ansible_mitogen.logging import ansible_mitogen.services @@ -66,28 +74,55 @@ ANSIBLE_PKG_OVERRIDE = ( u"__author__ = %r\n" ) +worker_model_msg = ( + 'Mitogen connection types may only be instantiated when one of the ' + '"mitogen_*" or "operon_*" strategies are active.' +) -def clean_shutdown(sock): +#: The worker model as configured by the currently running strategy. This is +#: managed via :func:`get_worker_model` / :func:`set_worker_model` functions by +#: :class:`StrategyMixin`. +_worker_model = None + + +#: A copy of the sole :class:`ClassicWorkerModel` that ever exists during a +#: classic run, as return by :func:`get_classic_worker_model`. +_classic_worker_model = None + + +def set_worker_model(model): """ - Shut the write end of `sock`, causing `recv` in the worker process to wake - up with a 0-byte read and initiate mux process exit, then wait for a 0-byte - read from the read end, which will occur after the the child closes the - descriptor on exit. + To remove process model-wiring from + :class:`ansible_mitogen.connection.Connection`, it is necessary to track + some idea of the configured execution environment outside the connection + plug-in. - This is done using :mod:`atexit` since Ansible lacks any more sensible hook - to run code during exit, and unless some synchronization exists with - MuxProcess, debug logs may appear on the user's terminal *after* the prompt - has been printed. + That is what :func:`set_worker_model` and :func:`get_worker_model` are for. """ - try: - sock.shutdown(socket.SHUT_WR) - except socket.error: - # Already closed. This is possible when tests are running. - LOG.debug('clean_shutdown: ignoring duplicate call') - return + global _worker_model + assert model is None or _worker_model is None + _worker_model = model - sock.recv(1) - sock.close() + +def get_worker_model(): + """ + Return the :class:`WorkerModel` currently configured by the running + strategy. + """ + if _worker_model is None: + raise ansible.errors.AnsibleConnectionFailure(worker_model_msg) + return _worker_model + + +def get_classic_worker_model(): + """ + Return the single :class:`ClassicWorkerModel` instance, constructing it if + necessary. + """ + global _classic_worker_model + if _classic_worker_model is None: + _classic_worker_model = ClassicWorkerModel() + return _classic_worker_model def getenv_int(key, default=0): @@ -119,6 +154,330 @@ def save_pid(name): fp.write(str(os.getpid())) +def setup_pool(pool): + """ + Configure a connection multiplexer's :class:`mitogen.service.Pool` with + services accessed by clients and WorkerProcesses. + """ + pool.add(mitogen.service.FileService(router=pool.router)) + pool.add(mitogen.service.PushFileService(router=pool.router)) + pool.add(ansible_mitogen.services.ContextService(router=pool.router)) + pool.add(ansible_mitogen.services.ModuleDepService(pool.router)) + LOG.debug('Service pool configured: size=%d', pool.size) + + +def _setup_simplejson(responder): + """ + We support serving simplejson for Python 2.4 targets on Ansible 2.3, at + least so the package's own CI Docker scripts can run without external + help, however newer versions of simplejson no longer support Python + 2.4. Therefore override any installed/loaded version with a + 2.4-compatible version we ship in the compat/ directory. + """ + responder.whitelist_prefix('simplejson') + + # issue #536: must be at end of sys.path, in case existing newer + # version is already loaded. + compat_path = os.path.join(os.path.dirname(__file__), 'compat') + sys.path.append(compat_path) + + for fullname, is_pkg, suffix in ( + (u'simplejson', True, '__init__.py'), + (u'simplejson.decoder', False, 'decoder.py'), + (u'simplejson.encoder', False, 'encoder.py'), + (u'simplejson.scanner', False, 'scanner.py'), + ): + path = os.path.join(compat_path, 'simplejson', suffix) + fp = open(path, 'rb') + try: + source = fp.read() + finally: + fp.close() + + responder.add_source_override( + fullname=fullname, + path=path, + source=source, + is_pkg=is_pkg, + ) + + +def _setup_responder(responder): + """ + Configure :class:`mitogen.master.ModuleResponder` to only permit + certain packages, and to generate custom responses for certain modules. + """ + responder.whitelist_prefix('ansible') + responder.whitelist_prefix('ansible_mitogen') + _setup_simplejson(responder) + + # Ansible 2.3 is compatible with Python 2.4 targets, however + # ansible/__init__.py is not. Instead, executor/module_common.py writes + # out a 2.4-compatible namespace package for unknown reasons. So we + # copy it here. + responder.add_source_override( + fullname='ansible', + path=ansible.__file__, + source=(ANSIBLE_PKG_OVERRIDE % ( + ansible.__version__, + ansible.__author__, + )).encode(), + is_pkg=True, + ) + + +def common_setup(_init_logging=True): + save_pid('controller') + ansible_mitogen.logging.set_process_name('top') + ansible_mitogen.affinity.policy.assign_controller() + + mitogen.utils.setup_gil() + if _init_logging: + ansible_mitogen.logging.setup() + + if faulthandler is not None: + faulthandler.enable() + + MuxProcess.profiling = getenv_int('MITOGEN_PROFILING') > 0 + if MuxProcess.profiling: + mitogen.core.enable_profiling() + + MuxProcess.cls_original_env = dict(os.environ) + + +def get_cpu_count(default=None): + """ + Get the multiplexer CPU count from the MITOGEN_CPU_COUNT environment + variable, returning `default` if one isn't set, or is out of range. + + :param int default: + Default CPU, or :data:`None` to use all available CPUs. + """ + max_cpus = multiprocessing.cpu_count() + if default is None: + default = max_cpus + + cpu_count = getenv_int('MITOGEN_CPU_COUNT', default=default) + if cpu_count < 1 or cpu_count > max_cpus: + cpu_count = default + + return cpu_count + + +class Binding(object): + def get_child_service_context(self): + """ + Return the :class:`mitogen.core.Context` to which children should + direct ContextService requests, or :data:`None` for the local process. + """ + raise NotImplementedError() + + def get_service_context(self): + """ + Return the :class:`mitogen.core.Context` to which this process should + direct ContextService requests, or :data:`None` for the local process. + """ + raise NotImplementedError() + + def close(self): + """ + Finalize any associated resources. + """ + raise NotImplementedError() + + +class WorkerModel(object): + def on_strategy_start(self): + """ + Called prior to strategy start in the top-level process. Responsible + for preparing any worker/connection multiplexer state. + """ + raise NotImplementedError() + + def on_strategy_complete(self): + """ + Called after strategy completion in the top-level process. Must place + Ansible back in a "compatible" state where any other strategy plug-in + may execute. + """ + raise NotImplementedError() + + def get_binding(self, inventory_name): + raise NotImplementedError() + + +class ClassicBinding(Binding): + """ + Only one connection may be active at a time in a classic worker, so its + binding just provides forwarders back to :class:`ClassicWorkerModel`. + """ + def __init__(self, model): + self.model = model + + def get_service_context(self): + """ + See Binding.get_service_context(). + """ + return self.model.parent + + def get_child_service_context(self): + """ + See Binding.get_child_service_context(). + """ + return self.model.parent + + def close(self): + """ + See Binding.close(). + """ + self.model.on_binding_close() + + +class ClassicWorkerModel(WorkerModel): + #: mitogen.master.Router for this worker. + router = None + + #: mitogen.master.Broker for this worker. + broker = None + + #: Name of multiplexer process socket we are currently connected to. + listener_path = None + + #: mitogen.parent.Context representing the parent Context, which is the + #: connection multiplexer process when running in classic mode, or the + #: top-level process when running a new-style mode. + parent = None + + def __init__(self, _init_logging=True): + self._init_logging = _init_logging + self.initialized = False + + def _listener_for_name(self, name): + """ + Given a connection stack, return the UNIX listener that should be used + to communicate with it. This is a simple hash of the inventory name. + """ + if len(self._muxes) == 1: + return self._muxes[0].path + + idx = abs(hash(name)) % len(self._muxes) + LOG.debug('Picked worker %d: %s', idx, self._muxes[idx].path) + return self._muxes[idx].path + + def _reconnect(self, path): + if self.router is not None: + # Router can just be overwritten, but the previous parent + # connection must explicitly be removed from the broker first. + self.router.disconnect(self.parent) + self.parent = None + self.router = None + + self.router, self.parent = mitogen.unix.connect( + path=path, + broker=self.broker, + ) + self.listener_path = path + + def on_process_exit(self, sock): + """ + This is an :mod:`atexit` handler installed in the top-level process. + + Shut the write end of `sock`, causing the receive side of the socket in + every worker process to wake up with a 0-byte reads, and causing their + main threads to wake up and initiate shutdown. After shutting the + socket down, wait for a 0-byte read from the read end, which will occur + after the last child closes the descriptor on exit. + + This is done using :mod:`atexit` since Ansible lacks any better hook to + run code during exit, and unless some synchronization exists with + MuxProcess, debug logs may appear on the user's terminal *after* the + prompt has been printed. + """ + try: + sock.shutdown(socket.SHUT_WR) + except socket.error: + # Already closed. This is possible when tests are running. + LOG.debug('on_process_exit: ignoring duplicate call') + return + + mitogen.core.io_op(sock.recv, 1) + sock.close() + + def _initialize(self): + """ + Arrange for classic process model connection multiplexer child + processes to be started, if they are not already running. + + The parent process picks a UNIX socket path the child will use prior to + fork, creates a socketpair used essentially as a semaphore, then blocks + waiting for the child to indicate the UNIX socket is ready for use. + + :param bool _init_logging: + For testing, if :data:`False`, don't initialize logging. + """ + common_setup(_init_logging=self._init_logging) + MuxProcess.cls_parent_sock, \ + MuxProcess.cls_child_sock = socket.socketpair() + + mitogen.core.set_cloexec(MuxProcess.cls_parent_sock.fileno()) + mitogen.core.set_cloexec(MuxProcess.cls_child_sock.fileno()) + + self._muxes = [ + MuxProcess(index) + for index in range(get_cpu_count(default=1)) + ] + for mux in self._muxes: + mux.start() + + atexit.register(self.on_process_exit, MuxProcess.cls_parent_sock) + MuxProcess.cls_child_sock.close() + MuxProcess.cls_child_sock = None + + def on_strategy_start(self): + """ + See WorkerModel.on_strategy_start(). + """ + if not self.initialized: + self._initialize() + self.initialized = True + + def on_strategy_complete(self): + """ + See WorkerModel.on_strategy_complete(). + """ + + def get_binding(self, inventory_name): + """ + See WorkerModel.get_binding(). + """ + if self.broker is None: + self.broker = mitogen.master.Broker() + + path = self._listener_for_name(inventory_name) + if path != self.listener_path: + self._reconnect(path) + + return ClassicBinding(self) + + def on_binding_close(self): + if self.broker: + self.broker.shutdown() + self.broker.join() + self.router = None + self.broker = None + + # #420: Ansible executes "meta" actions in the top-level process, + # meaning "reset_connection" will cause :class:`mitogen.core.Latch` + # FDs to be cached and erroneously shared by children on subsequent + # WorkerProcess forks. To handle that, call on_fork() to ensure any + # shared state is discarded. + # #490: only attempt to clean up when it's known that some + # resources exist to cleanup, otherwise later __del__ double-call + # to close() due to GC at random moment may obliterate an unrelated + # Connection's related resources. + mitogen.fork.on_fork() + + class MuxProcess(object): """ Implement a subprocess forked from the Ansible top-level, as a safe place @@ -136,30 +495,27 @@ class MuxProcess(object): See https://bugs.python.org/issue6721 for a thorough description of the class of problems this worker is intended to avoid. """ - #: In the top-level process, this references one end of a socketpair(), - #: which the MuxProcess blocks reading from in order to determine when - #: the master process dies. Once the read returns, the MuxProcess will - #: begin shutting itself down. - worker_sock = None + #: whose other end child MuxProcesses block reading from to determine when + #: the master process dies. When the top-level exits abnormally, or + #: normally but where :func:`on_process_exit` has been called, this socket + #: will be closed, causing all the children to wake. + cls_parent_sock = None - #: In the worker process, this references the other end of - #: :py:attr:`worker_sock`. - child_sock = None - - #: In the top-level process, this is the PID of the single MuxProcess - #: that was spawned. - worker_pid = None + #: In the mux process, this is the other end of :attr:`cls_parent_sock`. + #: The main thread blocks on a read from it until :attr:`cls_parent_sock` + #: is closed. + cls_child_sock = None #: A copy of :data:`os.environ` at the time the multiplexer process was #: started. It's used by mitogen_local.py to find changes made to the #: top-level environment (e.g. vars plugins -- issue #297) that must be #: applied to locally executed commands and modules. - original_env = None + cls_original_env = None - #: In both processes, this is the temporary UNIX socket used for - #: forked WorkerProcesses to contact the MuxProcess - unix_listener_path = None + #: In both processes, this a list of the temporary UNIX sockets used for + #: forked WorkerProcesses to contact the forked mux processes. + cls_listener_paths = None @classmethod def _reset(cls): @@ -171,69 +527,54 @@ class MuxProcess(object): cls.worker_sock = None os.waitpid(cls.worker_pid, 0) - @classmethod - def start(cls, _init_logging=True): - """ - Arrange for the subprocess to be started, if it is not already running. + def __init__(self, index): + self.index = index + #: Individual path of this process. + self.path = mitogen.unix.make_socket_path() - The parent process picks a UNIX socket path the child will use prior to - fork, creates a socketpair used essentially as a semaphore, then blocks - waiting for the child to indicate the UNIX socket is ready for use. - - :param bool _init_logging: - For testing, if :data:`False`, don't initialize logging. - """ - if cls.worker_sock is not None: + def start(self): + pid = os.fork() + if pid: + # Wait for child to boot before continuing. + mitogen.core.io_op(MuxProcess.cls_parent_sock.recv, 1) return - if faulthandler is not None: - faulthandler.enable() + save_pid('mux') + ansible_mitogen.logging.set_process_name('mux:' + str(self.index)) + if setproctitle: + setproctitle.setproctitle('mitogen mux:%s (%s)' % ( + self.index, + os.path.basename(self.path), + )) - mitogen.utils.setup_gil() - cls.unix_listener_path = mitogen.unix.make_socket_path() - cls.worker_sock, cls.child_sock = socket.socketpair() - atexit.register(clean_shutdown, cls.worker_sock) - mitogen.core.set_cloexec(cls.worker_sock.fileno()) - mitogen.core.set_cloexec(cls.child_sock.fileno()) - - cls.profiling = os.environ.get('MITOGEN_PROFILING') is not None - if cls.profiling: - mitogen.core.enable_profiling() - if _init_logging: - ansible_mitogen.logging.setup() - - cls.original_env = dict(os.environ) - cls.worker_pid = os.fork() - if cls.worker_pid: - save_pid('controller') - ansible_mitogen.logging.set_process_name('top') - ansible_mitogen.affinity.policy.assign_controller() - cls.child_sock.close() - cls.child_sock = None - mitogen.core.io_op(cls.worker_sock.recv, 1) - else: - save_pid('mux') - ansible_mitogen.logging.set_process_name('mux') - ansible_mitogen.affinity.policy.assign_muxprocess() - cls.worker_sock.close() - cls.worker_sock = None - self = cls() - self.worker_main() + MuxProcess.cls_parent_sock.close() + MuxProcess.cls_parent_sock = None + try: + try: + self.worker_main() + except Exception: + LOG.exception('worker_main() crashed') + finally: + sys.exit() def worker_main(self): """ - The main function of for the mux process: setup the Mitogen broker - thread and ansible_mitogen services, then sleep waiting for the socket + The main function of the mux process: setup the Mitogen broker thread + and ansible_mitogen services, then sleep waiting for the socket connected to the parent to be closed (indicating the parent has died). """ + save_pid('mux') + ansible_mitogen.logging.set_process_name('mux') + ansible_mitogen.affinity.policy.assign_muxprocess() + self._setup_master() self._setup_services() try: # Let the parent know our listening socket is ready. - mitogen.core.io_op(self.child_sock.send, b('1')) + mitogen.core.io_op(self.cls_child_sock.send, b('1')) # Block until the socket is closed, which happens on parent exit. - mitogen.core.io_op(self.child_sock.recv, 1) + mitogen.core.io_op(self.cls_child_sock.recv, 1) finally: self.broker.shutdown() self.broker.join() @@ -252,64 +593,6 @@ class MuxProcess(object): if secs: mitogen.debug.dump_to_logger(secs=secs) - def _setup_simplejson(self, responder): - """ - We support serving simplejson for Python 2.4 targets on Ansible 2.3, at - least so the package's own CI Docker scripts can run without external - help, however newer versions of simplejson no longer support Python - 2.4. Therefore override any installed/loaded version with a - 2.4-compatible version we ship in the compat/ directory. - """ - responder.whitelist_prefix('simplejson') - - # issue #536: must be at end of sys.path, in case existing newer - # version is already loaded. - compat_path = os.path.join(os.path.dirname(__file__), 'compat') - sys.path.append(compat_path) - - for fullname, is_pkg, suffix in ( - (u'simplejson', True, '__init__.py'), - (u'simplejson.decoder', False, 'decoder.py'), - (u'simplejson.encoder', False, 'encoder.py'), - (u'simplejson.scanner', False, 'scanner.py'), - ): - path = os.path.join(compat_path, 'simplejson', suffix) - fp = open(path, 'rb') - try: - source = fp.read() - finally: - fp.close() - - responder.add_source_override( - fullname=fullname, - path=path, - source=source, - is_pkg=is_pkg, - ) - - def _setup_responder(self, responder): - """ - Configure :class:`mitogen.master.ModuleResponder` to only permit - certain packages, and to generate custom responses for certain modules. - """ - responder.whitelist_prefix('ansible') - responder.whitelist_prefix('ansible_mitogen') - self._setup_simplejson(responder) - - # Ansible 2.3 is compatible with Python 2.4 targets, however - # ansible/__init__.py is not. Instead, executor/module_common.py writes - # out a 2.4-compatible namespace package for unknown reasons. So we - # copy it here. - responder.add_source_override( - fullname='ansible', - path=ansible.__file__, - source=(ANSIBLE_PKG_OVERRIDE % ( - ansible.__version__, - ansible.__author__, - )).encode(), - is_pkg=True, - ) - def _setup_master(self): """ Construct a Router, Broker, and mitogen.unix listener @@ -319,12 +602,12 @@ class MuxProcess(object): broker=self.broker, max_message_size=4096 * 1048576, ) - self._setup_responder(self.router.responder) + _setup_responder(self.router.responder) mitogen.core.listen(self.broker, 'shutdown', self.on_broker_shutdown) mitogen.core.listen(self.broker, 'exit', self.on_broker_exit) self.listener = mitogen.unix.Listener.build_stream( router=self.router, - path=self.unix_listener_path, + path=self.path, backlog=C.DEFAULT_FORKS, ) self._enable_router_debug() @@ -337,15 +620,9 @@ class MuxProcess(object): """ self.pool = mitogen.service.Pool( router=self.router, - services=[ - mitogen.service.FileService(router=self.router), - mitogen.service.PushFileService(router=self.router), - ansible_mitogen.services.ContextService(self.router), - ansible_mitogen.services.ModuleDepService(self.router), - ], size=getenv_int('MITOGEN_POOL_SIZE', default=32), ) - LOG.debug('Service pool configured: size=%d', self.pool.size) + setup_pool(self.pool) def on_broker_shutdown(self): """ @@ -364,7 +641,7 @@ class MuxProcess(object): ourself. In future this should gracefully join the pool, but TERM is fine for now. """ - if not self.profiling: + if not os.environ.get('MITOGEN_PROFILING'): # In normal operation we presently kill the process because there is # not yet any way to cancel connect(). When profiling, threads # including the broker must shut down gracefully, otherwise pstats diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index a7c0e46f..a8fde265 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -326,6 +326,7 @@ class ContextService(mitogen.service.Service): ) def _send_module_forwards(self, context): + return self.router.responder.forward_modules(context, self.ALWAYS_PRELOAD) _candidate_temp_dirs = None @@ -372,7 +373,7 @@ class ContextService(mitogen.service.Service): try: method = getattr(self.router, spec['method']) except AttributeError: - raise Error('unsupported method: %(transport)s' % spec) + raise Error('unsupported method: %(method)s' % spec) context = method(via=via, unidirectional=True, **spec['kwargs']) if via and spec.get('enable_lru'): @@ -382,6 +383,7 @@ class ContextService(mitogen.service.Service): mitogen.core.listen(context, 'disconnect', lambda: self._on_context_disconnect(context)) + #self._send_module_forwards(context) TODO self._send_module_forwards(context) init_child_result = context.call( ansible_mitogen.target.init_child, @@ -443,7 +445,7 @@ class ContextService(mitogen.service.Service): @mitogen.service.arg_spec({ 'stack': list }) - def get(self, msg, stack): + def get(self, stack): """ Return a Context referring to an established connection with the given configuration, establishing new connections as necessary. diff --git a/ansible_mitogen/strategy.py b/ansible_mitogen/strategy.py index 01dff285..a1315cd9 100644 --- a/ansible_mitogen/strategy.py +++ b/ansible_mitogen/strategy.py @@ -31,6 +31,11 @@ import os import signal import threading +try: + import setproctitle +except ImportError: + setproctitle = None + import mitogen.core import ansible_mitogen.affinity import ansible_mitogen.loaders @@ -145,11 +150,17 @@ def wrap_connection_loader__get(name, *args, **kwargs): return connection_loader__get(name, *args, **kwargs) -def wrap_worker__run(*args, **kwargs): +def wrap_worker__run(self): """ While the strategy is active, rewrite connection_loader.get() calls for some transports into requests for a compatible Mitogen transport. """ + if setproctitle: + setproctitle.setproctitle('worker:%s task:%s' % ( + self._host.name, + self._task.action, + )) + # Ignore parent's attempts to murder us when we still need to write # profiling output. if mitogen.core._profile_hook.__name__ != '_profile_hook': @@ -158,10 +169,60 @@ def wrap_worker__run(*args, **kwargs): ansible_mitogen.logging.set_process_name('task') ansible_mitogen.affinity.policy.assign_worker() return mitogen.core._profile_hook('WorkerProcess', - lambda: worker__run(*args, **kwargs) + lambda: worker__run(self) ) +class AnsibleWrappers(object): + """ + Manage add/removal of various Ansible runtime hooks. + """ + def _add_plugin_paths(self): + """ + Add the Mitogen plug-in directories to the ModuleLoader path, avoiding + the need for manual configuration. + """ + base_dir = os.path.join(os.path.dirname(__file__), 'plugins') + ansible_mitogen.loaders.connection_loader.add_directory( + os.path.join(base_dir, 'connection') + ) + ansible_mitogen.loaders.action_loader.add_directory( + os.path.join(base_dir, 'action') + ) + + def _install_wrappers(self): + """ + Install our PluginLoader monkey patches and update global variables + with references to the real functions. + """ + global action_loader__get + action_loader__get = ansible_mitogen.loaders.action_loader.get + ansible_mitogen.loaders.action_loader.get = wrap_action_loader__get + + global connection_loader__get + connection_loader__get = ansible_mitogen.loaders.connection_loader.get + ansible_mitogen.loaders.connection_loader.get = wrap_connection_loader__get + + global worker__run + worker__run = ansible.executor.process.worker.WorkerProcess.run + ansible.executor.process.worker.WorkerProcess.run = wrap_worker__run + + def _remove_wrappers(self): + """ + Uninstall the PluginLoader monkey patches. + """ + ansible_mitogen.loaders.action_loader.get = action_loader__get + ansible_mitogen.loaders.connection_loader.get = connection_loader__get + ansible.executor.process.worker.WorkerProcess.run = worker__run + + def install(self): + self._add_plugin_paths() + self._install_wrappers() + + def remove(self): + self._remove_wrappers() + + class StrategyMixin(object): """ This mix-in enhances any built-in strategy by arranging for various Mitogen @@ -223,43 +284,6 @@ class StrategyMixin(object): remote process, all the heavy lifting of transferring the action module and its dependencies are automatically handled by Mitogen. """ - def _install_wrappers(self): - """ - Install our PluginLoader monkey patches and update global variables - with references to the real functions. - """ - global action_loader__get - action_loader__get = ansible_mitogen.loaders.action_loader.get - ansible_mitogen.loaders.action_loader.get = wrap_action_loader__get - - global connection_loader__get - connection_loader__get = ansible_mitogen.loaders.connection_loader.get - ansible_mitogen.loaders.connection_loader.get = wrap_connection_loader__get - - global worker__run - worker__run = ansible.executor.process.worker.WorkerProcess.run - ansible.executor.process.worker.WorkerProcess.run = wrap_worker__run - - def _remove_wrappers(self): - """ - Uninstall the PluginLoader monkey patches. - """ - ansible_mitogen.loaders.action_loader.get = action_loader__get - ansible_mitogen.loaders.connection_loader.get = connection_loader__get - ansible.executor.process.worker.WorkerProcess.run = worker__run - - def _add_plugin_paths(self): - """ - Add the Mitogen plug-in directories to the ModuleLoader path, avoiding - the need for manual configuration. - """ - base_dir = os.path.join(os.path.dirname(__file__), 'plugins') - ansible_mitogen.loaders.connection_loader.add_directory( - os.path.join(base_dir, 'connection') - ) - ansible_mitogen.loaders.action_loader.add_directory( - os.path.join(base_dir, 'action') - ) def _queue_task(self, host, task, task_vars, play_context): """ @@ -290,20 +314,35 @@ class StrategyMixin(object): play_context=play_context, ) + def _get_worker_model(self): + """ + In classic mode a single :class:`WorkerModel` exists, which manages + references and configuration of the associated connection multiplexer + process. + """ + return ansible_mitogen.process.get_classic_worker_model() + def run(self, iterator, play_context, result=0): """ - Arrange for a mitogen.master.Router to be available for the duration of - the strategy's real run() method. + Wrap :meth:`run` to ensure requisite infrastructure and modifications + are configured for the duration of the call. """ _assert_supported_release() - - ansible_mitogen.process.MuxProcess.start() - run = super(StrategyMixin, self).run - self._add_plugin_paths() - self._install_wrappers() + wrappers = AnsibleWrappers() + self._worker_model = self._get_worker_model() + ansible_mitogen.process.set_worker_model(self._worker_model) try: - return mitogen.core._profile_hook('Strategy', - lambda: run(iterator, play_context) - ) + self._worker_model.on_strategy_start() + try: + wrappers.install() + try: + run = super(StrategyMixin, self).run + return mitogen.core._profile_hook('Strategy', + lambda: run(iterator, play_context) + ) + finally: + wrappers.remove() + finally: + self._worker_model.on_strategy_complete() finally: - self._remove_wrappers() + ansible_mitogen.process.set_worker_model(None) diff --git a/mitogen/core.py b/mitogen/core.py index 626d5297..10cd4385 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -901,7 +901,11 @@ class Message(object): unpickler.find_global = self._find_global try: # Must occur off the broker thread. - obj = unpickler.load() + try: + obj = unpickler.load() + except: + LOG.error('raw pickle was: %r', self.data) + raise self._unpickled = obj except (TypeError, ValueError): e = sys.exc_info()[1] diff --git a/mitogen/parent.py b/mitogen/parent.py index 97ec4949..4b2ac388 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -2159,6 +2159,23 @@ class Router(mitogen.core.Router): finally: self._write_lock.release() + def disconnect(self, context): + """ + Disconnect a context and forget its stream, assuming the context is + directly connected. + """ + stream = self.stream_by_id(context) + if stream.remote_id != context.context_id: + return + + l = mitogen.core.Latch() + mitogen.core.listen(stream, 'disconnect', l.put) + def disconnect(): + LOG.debug('Starting disconnect of %r', stream) + stream.on_disconnect(self.broker) + self.broker.defer(disconnect) + l.get() + def add_route(self, target_id, stream): """ Arrange for messages whose `dst_id` is `target_id` to be forwarded on diff --git a/mitogen/service.py b/mitogen/service.py index 886012e8..9e17482c 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -92,6 +92,24 @@ def get_or_create_pool(size=None, router=None): _pool_lock.release() +def call(service_name, method_name, call_context=None, **kwargs): + """ + Call a service registered with this pool, using the calling thread as a + host. + """ + if isinstance(service_name, mitogen.core.BytesType): + service_name = service_name.encode('utf-8') + elif not isinstance(service_name, mitogen.core.UnicodeType): + service_name = service_name.name() # Service.name() + + if call_context: + return call_context.call_service(service_name, method_name, **kwargs) + else: + pool = get_or_create_pool() + invoker = pool.get_invoker(service_name, msg=None) + return getattr(invoker.service, method_name)(**kwargs) + + def validate_arg_spec(spec, args): for name in spec: try: @@ -239,12 +257,13 @@ class Invoker(object): if not policies: raise mitogen.core.CallError('Method has no policies set.') - if not all(p.is_authorized(self.service, msg) for p in policies): - raise mitogen.core.CallError( - self.unauthorized_msg, - method_name, - self.service.name() - ) + if msg is not None: + if not all(p.is_authorized(self.service, msg) for p in policies): + raise mitogen.core.CallError( + self.unauthorized_msg, + method_name, + self.service.name() + ) required = getattr(method, 'mitogen_service__arg_spec', {}) validate_arg_spec(required, kwargs) @@ -264,7 +283,7 @@ class Invoker(object): except Exception: if no_reply: LOG.exception('While calling no-reply method %s.%s', - type(self.service).__name__, + self.service.name(), func_name(method)) else: raise @@ -690,7 +709,7 @@ class PushFileService(Service): """ for path in paths: self.propagate_to(context, mitogen.core.to_text(path)) - self.router.responder.forward_modules(context, modules) + #self.router.responder.forward_modules(context, modules) TODO @expose(policy=AllowParents()) @arg_spec({ diff --git a/tests/ansible/run_ansible_playbook.py b/tests/ansible/run_ansible_playbook.py index b5b459a1..467eaffc 100755 --- a/tests/ansible/run_ansible_playbook.py +++ b/tests/ansible/run_ansible_playbook.py @@ -51,7 +51,11 @@ else: os.path.join(GIT_BASEDIR, 'tests/ansible/hosts') ) -args = ['ansible-playbook'] +if 'ANSIBLE_ARGV' in os.environ: + args = eval(os.environ['ANSIBLE_ARGV']) +else: + args = ['ansible-playbook'] + args += ['-e', json.dumps(extra)] args += sys.argv[1:] os.execvp(args[0], args)