diff --git a/ansible_mitogen/connection.py b/ansible_mitogen/connection.py index 42fa2ef8..4d310d75 100644 --- a/ansible_mitogen/connection.py +++ b/ansible_mitogen/connection.py @@ -38,6 +38,7 @@ import sys import time import jinja2.runtime +from ansible.module_utils import six import ansible.constants as C import ansible.errors import ansible.plugins.connection @@ -459,15 +460,10 @@ class CallChain(mitogen.parent.CallChain): class Connection(ansible.plugins.connection.ConnectionBase): - #: mitogen.master.Broker for this worker. - broker = None - - #: mitogen.master.Router for this worker. - router = None - - #: mitogen.parent.Context representing the parent Context, which is - #: presently always the connection multiplexer process. - parent = None + #: The :class:`ansible_mitogen.process.Binding` representing the connection + #: multiplexer this connection's target is assigned to. :data:`None` when + #: disconnected. + binding = None #: mitogen.parent.Context for the target account on the target, possibly #: reached via become. @@ -518,13 +514,6 @@ class Connection(ansible.plugins.connection.ConnectionBase): #: matching vanilla Ansible behaviour. loader_basedir = None - def __init__(self, play_context, new_stdin, **kwargs): - assert ansible_mitogen.process.MuxProcess.unix_listener_path, ( - 'Mitogen connection types may only be instantiated ' - 'while the "mitogen" strategy is active.' - ) - super(Connection, self).__init__(play_context, new_stdin) - def __del__(self): """ Ansible cannot be trusted to always call close() e.g. the synchronize @@ -585,6 +574,15 @@ class Connection(ansible.plugins.connection.ConnectionBase): self._connect() return self.init_child_result['home_dir'] + def get_binding(self): + """ + Return the :class:`ansible_mitogen.process.Binding` representing the + process that hosts the physical connection and services (context + establishment, file transfer, ..) for our desired target. + """ + assert self.binding is not None + return self.binding + @property def connected(self): return self.context is not None @@ -672,18 +670,6 @@ class Connection(ansible.plugins.connection.ConnectionBase): return stack - def _connect_broker(self): - """ - Establish a reference to the Broker, Router and parent context used for - connections. - """ - if not self.broker: - self.broker = mitogen.master.Broker() - self.router, self.parent = mitogen.unix.connect( - path=ansible_mitogen.process.MuxProcess.unix_listener_path, - broker=self.broker, - ) - def _build_stack(self): """ Construct a list of dictionaries representing the connection @@ -691,14 +677,14 @@ class Connection(ansible.plugins.connection.ConnectionBase): additionally used by the integration tests "mitogen_get_stack" action to fetch the would-be connection configuration. """ - return self._stack_from_spec( - ansible_mitogen.transport_config.PlayContextSpec( - connection=self, - play_context=self._play_context, - transport=self.transport, - inventory_name=self.inventory_hostname, - ) + spec = ansible_mitogen.transport_config.PlayContextSpec( + connection=self, + play_context=self._play_context, + transport=self.transport, + inventory_name=self.inventory_hostname, ) + stack = self._stack_from_spec(spec) + return spec.inventory_name(), stack def _connect_stack(self, stack): """ @@ -711,7 +697,8 @@ class Connection(ansible.plugins.connection.ConnectionBase): description of the returned dictionary. """ try: - dct = self.parent.call_service( + dct = mitogen.service.call( + call_context=self.binding.get_service_context(), service_name='ansible_mitogen.services.ContextService', method_name='get', stack=mitogen.utils.cast(list(stack)), @@ -758,8 +745,9 @@ class Connection(ansible.plugins.connection.ConnectionBase): if self.connected: return - self._connect_broker() - stack = self._build_stack() + inventory_name, stack = self._build_stack() + worker_model = ansible_mitogen.process.get_worker_model() + self.binding = worker_model.get_binding(inventory_name) self._connect_stack(stack) def _mitogen_reset(self, mode): @@ -776,9 +764,10 @@ class Connection(ansible.plugins.connection.ConnectionBase): return self.chain.reset() - self.parent.call_service( + mitogen.service.call( + call_context=self.binding.get_service_context(), service_name='ansible_mitogen.services.ContextService', - method_name=mode, + method_name='put', context=self.context ) @@ -787,27 +776,6 @@ class Connection(ansible.plugins.connection.ConnectionBase): self.init_child_result = None self.chain = None - def _shutdown_broker(self): - """ - Shutdown the broker thread during :meth:`close` or :meth:`reset`. - """ - if self.broker: - self.broker.shutdown() - self.broker.join() - self.broker = None - self.router = None - - # #420: Ansible executes "meta" actions in the top-level process, - # meaning "reset_connection" will cause :class:`mitogen.core.Latch` - # FDs to be cached and erroneously shared by children on subsequent - # WorkerProcess forks. To handle that, call on_fork() to ensure any - # shared state is discarded. - # #490: only attempt to clean up when it's known that some - # resources exist to cleanup, otherwise later __del__ double-call - # to close() due to GC at random moment may obliterate an unrelated - # Connection's resources. - mitogen.fork.on_fork() - def close(self): """ Arrange for the mitogen.master.Router running in the worker to @@ -815,7 +783,9 @@ class Connection(ansible.plugins.connection.ConnectionBase): multiple times. """ self._mitogen_reset(mode='put') - self._shutdown_broker() + if self.binding: + self.binding.close() + self.binding = None def _reset_find_task_vars(self): """ @@ -853,7 +823,8 @@ class Connection(ansible.plugins.connection.ConnectionBase): self._connect() self._mitogen_reset(mode='reset') - self._shutdown_broker() + self.binding.close() + self.binding = None # Compatibility with Ansible 2.4 wait_for_connection plug-in. _reset = reset @@ -1024,7 +995,8 @@ class Connection(ansible.plugins.connection.ConnectionBase): utimes=(st.st_atime, st.st_mtime)) self._connect() - self.parent.call_service( + mitogen.service.call( + call_context=self.binding.get_service_context(), service_name='mitogen.service.FileService', method_name='register', path=mitogen.utils.cast(in_path) @@ -1036,7 +1008,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): # file alive, but that requires more work. self.get_chain().call( ansible_mitogen.target.transfer_file, - context=self.parent, + context=self.binding.get_child_service_context(), in_path=in_path, out_path=out_path ) diff --git a/ansible_mitogen/planner.py b/ansible_mitogen/planner.py index 2eebd36d..96b06995 100644 --- a/ansible_mitogen/planner.py +++ b/ansible_mitogen/planner.py @@ -148,6 +148,8 @@ class Planner(object): # named by `runner_name`. } """ + binding = self._inv.connection.get_binding() + new = dict((mitogen.core.UnicodeType(k), kwargs[k]) for k in kwargs) new.setdefault('good_temp_dir', @@ -155,7 +157,7 @@ class Planner(object): new.setdefault('cwd', self._inv.connection.get_default_cwd()) new.setdefault('extra_env', self._inv.connection.get_default_env()) new.setdefault('emulate_tty', True) - new.setdefault('service_context', self._inv.connection.parent) + new.setdefault('service_context', binding.get_child_service_context()) return new def __repr__(self): @@ -328,7 +330,9 @@ class NewStylePlanner(ScriptPlanner): def get_module_map(self): if self._module_map is None: - self._module_map = self._inv.connection.parent.call_service( + binding = self._inv.connection.get_binding() + self._module_map = mitogen.service.call( + call_context=binding.get_service_context(), service_name='ansible_mitogen.services.ModuleDepService', method_name='scan', @@ -405,9 +409,12 @@ def get_module_data(name): def _propagate_deps(invocation, planner, context): - invocation.connection.parent.call_service( + binding = invocation.connection.get_binding() + mitogen.service.call( + call_context=binding.get_service_context(), service_name='mitogen.service.PushFileService', method_name='propagate_paths_and_modules', + context=context, paths=planner.get_push_files(), modules=planner.get_module_deps(), diff --git a/ansible_mitogen/plugins/connection/mitogen_local.py b/ansible_mitogen/plugins/connection/mitogen_local.py index 24b84a03..a98c834c 100644 --- a/ansible_mitogen/plugins/connection/mitogen_local.py +++ b/ansible_mitogen/plugins/connection/mitogen_local.py @@ -81,6 +81,6 @@ class Connection(ansible_mitogen.connection.Connection): from WorkerProcess, we must emulate that. """ return dict_diff( - old=ansible_mitogen.process.MuxProcess.original_env, + old=ansible_mitogen.process.MuxProcess.cls_original_env, new=os.environ, ) diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index a8827cb1..0eaf25a7 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -30,6 +30,7 @@ from __future__ import absolute_import import atexit import errno import logging +import multiprocessing import os import signal import socket @@ -41,9 +42,15 @@ try: except ImportError: faulthandler = None +try: + import setproctitle +except ImportError: + setproctitle = None + import mitogen import mitogen.core import mitogen.debug +import mitogen.fork import mitogen.master import mitogen.parent import mitogen.service @@ -52,6 +59,7 @@ import mitogen.utils import ansible import ansible.constants as C +import ansible.errors import ansible_mitogen.logging import ansible_mitogen.services @@ -66,28 +74,55 @@ ANSIBLE_PKG_OVERRIDE = ( u"__author__ = %r\n" ) +worker_model_msg = ( + 'Mitogen connection types may only be instantiated when one of the ' + '"mitogen_*" or "operon_*" strategies are active.' +) -def clean_shutdown(sock): +#: The worker model as configured by the currently running strategy. This is +#: managed via :func:`get_worker_model` / :func:`set_worker_model` functions by +#: :class:`StrategyMixin`. +_worker_model = None + + +#: A copy of the sole :class:`ClassicWorkerModel` that ever exists during a +#: classic run, as return by :func:`get_classic_worker_model`. +_classic_worker_model = None + + +def set_worker_model(model): """ - Shut the write end of `sock`, causing `recv` in the worker process to wake - up with a 0-byte read and initiate mux process exit, then wait for a 0-byte - read from the read end, which will occur after the the child closes the - descriptor on exit. + To remove process model-wiring from + :class:`ansible_mitogen.connection.Connection`, it is necessary to track + some idea of the configured execution environment outside the connection + plug-in. - This is done using :mod:`atexit` since Ansible lacks any more sensible hook - to run code during exit, and unless some synchronization exists with - MuxProcess, debug logs may appear on the user's terminal *after* the prompt - has been printed. + That is what :func:`set_worker_model` and :func:`get_worker_model` are for. """ - try: - sock.shutdown(socket.SHUT_WR) - except socket.error: - # Already closed. This is possible when tests are running. - LOG.debug('clean_shutdown: ignoring duplicate call') - return + global _worker_model + assert model is None or _worker_model is None + _worker_model = model - sock.recv(1) - sock.close() + +def get_worker_model(): + """ + Return the :class:`WorkerModel` currently configured by the running + strategy. + """ + if _worker_model is None: + raise ansible.errors.AnsibleConnectionFailure(worker_model_msg) + return _worker_model + + +def get_classic_worker_model(): + """ + Return the single :class:`ClassicWorkerModel` instance, constructing it if + necessary. + """ + global _classic_worker_model + if _classic_worker_model is None: + _classic_worker_model = ClassicWorkerModel() + return _classic_worker_model def getenv_int(key, default=0): @@ -119,6 +154,330 @@ def save_pid(name): fp.write(str(os.getpid())) +def setup_pool(pool): + """ + Configure a connection multiplexer's :class:`mitogen.service.Pool` with + services accessed by clients and WorkerProcesses. + """ + pool.add(mitogen.service.FileService(router=pool.router)) + pool.add(mitogen.service.PushFileService(router=pool.router)) + pool.add(ansible_mitogen.services.ContextService(router=pool.router)) + pool.add(ansible_mitogen.services.ModuleDepService(pool.router)) + LOG.debug('Service pool configured: size=%d', pool.size) + + +def _setup_simplejson(responder): + """ + We support serving simplejson for Python 2.4 targets on Ansible 2.3, at + least so the package's own CI Docker scripts can run without external + help, however newer versions of simplejson no longer support Python + 2.4. Therefore override any installed/loaded version with a + 2.4-compatible version we ship in the compat/ directory. + """ + responder.whitelist_prefix('simplejson') + + # issue #536: must be at end of sys.path, in case existing newer + # version is already loaded. + compat_path = os.path.join(os.path.dirname(__file__), 'compat') + sys.path.append(compat_path) + + for fullname, is_pkg, suffix in ( + (u'simplejson', True, '__init__.py'), + (u'simplejson.decoder', False, 'decoder.py'), + (u'simplejson.encoder', False, 'encoder.py'), + (u'simplejson.scanner', False, 'scanner.py'), + ): + path = os.path.join(compat_path, 'simplejson', suffix) + fp = open(path, 'rb') + try: + source = fp.read() + finally: + fp.close() + + responder.add_source_override( + fullname=fullname, + path=path, + source=source, + is_pkg=is_pkg, + ) + + +def _setup_responder(responder): + """ + Configure :class:`mitogen.master.ModuleResponder` to only permit + certain packages, and to generate custom responses for certain modules. + """ + responder.whitelist_prefix('ansible') + responder.whitelist_prefix('ansible_mitogen') + _setup_simplejson(responder) + + # Ansible 2.3 is compatible with Python 2.4 targets, however + # ansible/__init__.py is not. Instead, executor/module_common.py writes + # out a 2.4-compatible namespace package for unknown reasons. So we + # copy it here. + responder.add_source_override( + fullname='ansible', + path=ansible.__file__, + source=(ANSIBLE_PKG_OVERRIDE % ( + ansible.__version__, + ansible.__author__, + )).encode(), + is_pkg=True, + ) + + +def common_setup(_init_logging=True): + save_pid('controller') + ansible_mitogen.logging.set_process_name('top') + ansible_mitogen.affinity.policy.assign_controller() + + mitogen.utils.setup_gil() + if _init_logging: + ansible_mitogen.logging.setup() + + if faulthandler is not None: + faulthandler.enable() + + MuxProcess.profiling = getenv_int('MITOGEN_PROFILING') > 0 + if MuxProcess.profiling: + mitogen.core.enable_profiling() + + MuxProcess.cls_original_env = dict(os.environ) + + +def get_cpu_count(default=None): + """ + Get the multiplexer CPU count from the MITOGEN_CPU_COUNT environment + variable, returning `default` if one isn't set, or is out of range. + + :param int default: + Default CPU, or :data:`None` to use all available CPUs. + """ + max_cpus = multiprocessing.cpu_count() + if default is None: + default = max_cpus + + cpu_count = getenv_int('MITOGEN_CPU_COUNT', default=default) + if cpu_count < 1 or cpu_count > max_cpus: + cpu_count = default + + return cpu_count + + +class Binding(object): + def get_child_service_context(self): + """ + Return the :class:`mitogen.core.Context` to which children should + direct ContextService requests, or :data:`None` for the local process. + """ + raise NotImplementedError() + + def get_service_context(self): + """ + Return the :class:`mitogen.core.Context` to which this process should + direct ContextService requests, or :data:`None` for the local process. + """ + raise NotImplementedError() + + def close(self): + """ + Finalize any associated resources. + """ + raise NotImplementedError() + + +class WorkerModel(object): + def on_strategy_start(self): + """ + Called prior to strategy start in the top-level process. Responsible + for preparing any worker/connection multiplexer state. + """ + raise NotImplementedError() + + def on_strategy_complete(self): + """ + Called after strategy completion in the top-level process. Must place + Ansible back in a "compatible" state where any other strategy plug-in + may execute. + """ + raise NotImplementedError() + + def get_binding(self, inventory_name): + raise NotImplementedError() + + +class ClassicBinding(Binding): + """ + Only one connection may be active at a time in a classic worker, so its + binding just provides forwarders back to :class:`ClassicWorkerModel`. + """ + def __init__(self, model): + self.model = model + + def get_service_context(self): + """ + See Binding.get_service_context(). + """ + return self.model.parent + + def get_child_service_context(self): + """ + See Binding.get_child_service_context(). + """ + return self.model.parent + + def close(self): + """ + See Binding.close(). + """ + self.model.on_binding_close() + + +class ClassicWorkerModel(WorkerModel): + #: mitogen.master.Router for this worker. + router = None + + #: mitogen.master.Broker for this worker. + broker = None + + #: Name of multiplexer process socket we are currently connected to. + listener_path = None + + #: mitogen.parent.Context representing the parent Context, which is the + #: connection multiplexer process when running in classic mode, or the + #: top-level process when running a new-style mode. + parent = None + + def __init__(self, _init_logging=True): + self._init_logging = _init_logging + self.initialized = False + + def _listener_for_name(self, name): + """ + Given a connection stack, return the UNIX listener that should be used + to communicate with it. This is a simple hash of the inventory name. + """ + if len(self._muxes) == 1: + return self._muxes[0].path + + idx = abs(hash(name)) % len(self._muxes) + LOG.debug('Picked worker %d: %s', idx, self._muxes[idx].path) + return self._muxes[idx].path + + def _reconnect(self, path): + if self.router is not None: + # Router can just be overwritten, but the previous parent + # connection must explicitly be removed from the broker first. + self.router.disconnect(self.parent) + self.parent = None + self.router = None + + self.router, self.parent = mitogen.unix.connect( + path=path, + broker=self.broker, + ) + self.listener_path = path + + def on_process_exit(self, sock): + """ + This is an :mod:`atexit` handler installed in the top-level process. + + Shut the write end of `sock`, causing the receive side of the socket in + every worker process to wake up with a 0-byte reads, and causing their + main threads to wake up and initiate shutdown. After shutting the + socket down, wait for a 0-byte read from the read end, which will occur + after the last child closes the descriptor on exit. + + This is done using :mod:`atexit` since Ansible lacks any better hook to + run code during exit, and unless some synchronization exists with + MuxProcess, debug logs may appear on the user's terminal *after* the + prompt has been printed. + """ + try: + sock.shutdown(socket.SHUT_WR) + except socket.error: + # Already closed. This is possible when tests are running. + LOG.debug('on_process_exit: ignoring duplicate call') + return + + mitogen.core.io_op(sock.recv, 1) + sock.close() + + def _initialize(self): + """ + Arrange for classic process model connection multiplexer child + processes to be started, if they are not already running. + + The parent process picks a UNIX socket path the child will use prior to + fork, creates a socketpair used essentially as a semaphore, then blocks + waiting for the child to indicate the UNIX socket is ready for use. + + :param bool _init_logging: + For testing, if :data:`False`, don't initialize logging. + """ + common_setup(_init_logging=self._init_logging) + MuxProcess.cls_parent_sock, \ + MuxProcess.cls_child_sock = socket.socketpair() + + mitogen.core.set_cloexec(MuxProcess.cls_parent_sock.fileno()) + mitogen.core.set_cloexec(MuxProcess.cls_child_sock.fileno()) + + self._muxes = [ + MuxProcess(index) + for index in range(get_cpu_count(default=1)) + ] + for mux in self._muxes: + mux.start() + + atexit.register(self.on_process_exit, MuxProcess.cls_parent_sock) + MuxProcess.cls_child_sock.close() + MuxProcess.cls_child_sock = None + + def on_strategy_start(self): + """ + See WorkerModel.on_strategy_start(). + """ + if not self.initialized: + self._initialize() + self.initialized = True + + def on_strategy_complete(self): + """ + See WorkerModel.on_strategy_complete(). + """ + + def get_binding(self, inventory_name): + """ + See WorkerModel.get_binding(). + """ + if self.broker is None: + self.broker = mitogen.master.Broker() + + path = self._listener_for_name(inventory_name) + if path != self.listener_path: + self._reconnect(path) + + return ClassicBinding(self) + + def on_binding_close(self): + if self.broker: + self.broker.shutdown() + self.broker.join() + self.router = None + self.broker = None + + # #420: Ansible executes "meta" actions in the top-level process, + # meaning "reset_connection" will cause :class:`mitogen.core.Latch` + # FDs to be cached and erroneously shared by children on subsequent + # WorkerProcess forks. To handle that, call on_fork() to ensure any + # shared state is discarded. + # #490: only attempt to clean up when it's known that some + # resources exist to cleanup, otherwise later __del__ double-call + # to close() due to GC at random moment may obliterate an unrelated + # Connection's related resources. + mitogen.fork.on_fork() + + class MuxProcess(object): """ Implement a subprocess forked from the Ansible top-level, as a safe place @@ -136,30 +495,27 @@ class MuxProcess(object): See https://bugs.python.org/issue6721 for a thorough description of the class of problems this worker is intended to avoid. """ - #: In the top-level process, this references one end of a socketpair(), - #: which the MuxProcess blocks reading from in order to determine when - #: the master process dies. Once the read returns, the MuxProcess will - #: begin shutting itself down. - worker_sock = None + #: whose other end child MuxProcesses block reading from to determine when + #: the master process dies. When the top-level exits abnormally, or + #: normally but where :func:`on_process_exit` has been called, this socket + #: will be closed, causing all the children to wake. + cls_parent_sock = None - #: In the worker process, this references the other end of - #: :py:attr:`worker_sock`. - child_sock = None - - #: In the top-level process, this is the PID of the single MuxProcess - #: that was spawned. - worker_pid = None + #: In the mux process, this is the other end of :attr:`cls_parent_sock`. + #: The main thread blocks on a read from it until :attr:`cls_parent_sock` + #: is closed. + cls_child_sock = None #: A copy of :data:`os.environ` at the time the multiplexer process was #: started. It's used by mitogen_local.py to find changes made to the #: top-level environment (e.g. vars plugins -- issue #297) that must be #: applied to locally executed commands and modules. - original_env = None + cls_original_env = None - #: In both processes, this is the temporary UNIX socket used for - #: forked WorkerProcesses to contact the MuxProcess - unix_listener_path = None + #: In both processes, this a list of the temporary UNIX sockets used for + #: forked WorkerProcesses to contact the forked mux processes. + cls_listener_paths = None @classmethod def _reset(cls): @@ -171,69 +527,54 @@ class MuxProcess(object): cls.worker_sock = None os.waitpid(cls.worker_pid, 0) - @classmethod - def start(cls, _init_logging=True): - """ - Arrange for the subprocess to be started, if it is not already running. + def __init__(self, index): + self.index = index + #: Individual path of this process. + self.path = mitogen.unix.make_socket_path() - The parent process picks a UNIX socket path the child will use prior to - fork, creates a socketpair used essentially as a semaphore, then blocks - waiting for the child to indicate the UNIX socket is ready for use. - - :param bool _init_logging: - For testing, if :data:`False`, don't initialize logging. - """ - if cls.worker_sock is not None: + def start(self): + pid = os.fork() + if pid: + # Wait for child to boot before continuing. + mitogen.core.io_op(MuxProcess.cls_parent_sock.recv, 1) return - if faulthandler is not None: - faulthandler.enable() + save_pid('mux') + ansible_mitogen.logging.set_process_name('mux:' + str(self.index)) + if setproctitle: + setproctitle.setproctitle('mitogen mux:%s (%s)' % ( + self.index, + os.path.basename(self.path), + )) - mitogen.utils.setup_gil() - cls.unix_listener_path = mitogen.unix.make_socket_path() - cls.worker_sock, cls.child_sock = socket.socketpair() - atexit.register(clean_shutdown, cls.worker_sock) - mitogen.core.set_cloexec(cls.worker_sock.fileno()) - mitogen.core.set_cloexec(cls.child_sock.fileno()) - - cls.profiling = os.environ.get('MITOGEN_PROFILING') is not None - if cls.profiling: - mitogen.core.enable_profiling() - if _init_logging: - ansible_mitogen.logging.setup() - - cls.original_env = dict(os.environ) - cls.worker_pid = os.fork() - if cls.worker_pid: - save_pid('controller') - ansible_mitogen.logging.set_process_name('top') - ansible_mitogen.affinity.policy.assign_controller() - cls.child_sock.close() - cls.child_sock = None - mitogen.core.io_op(cls.worker_sock.recv, 1) - else: - save_pid('mux') - ansible_mitogen.logging.set_process_name('mux') - ansible_mitogen.affinity.policy.assign_muxprocess() - cls.worker_sock.close() - cls.worker_sock = None - self = cls() - self.worker_main() + MuxProcess.cls_parent_sock.close() + MuxProcess.cls_parent_sock = None + try: + try: + self.worker_main() + except Exception: + LOG.exception('worker_main() crashed') + finally: + sys.exit() def worker_main(self): """ - The main function of for the mux process: setup the Mitogen broker - thread and ansible_mitogen services, then sleep waiting for the socket + The main function of the mux process: setup the Mitogen broker thread + and ansible_mitogen services, then sleep waiting for the socket connected to the parent to be closed (indicating the parent has died). """ + save_pid('mux') + ansible_mitogen.logging.set_process_name('mux') + ansible_mitogen.affinity.policy.assign_muxprocess() + self._setup_master() self._setup_services() try: # Let the parent know our listening socket is ready. - mitogen.core.io_op(self.child_sock.send, b('1')) + mitogen.core.io_op(self.cls_child_sock.send, b('1')) # Block until the socket is closed, which happens on parent exit. - mitogen.core.io_op(self.child_sock.recv, 1) + mitogen.core.io_op(self.cls_child_sock.recv, 1) finally: self.broker.shutdown() self.broker.join() @@ -252,64 +593,6 @@ class MuxProcess(object): if secs: mitogen.debug.dump_to_logger(secs=secs) - def _setup_simplejson(self, responder): - """ - We support serving simplejson for Python 2.4 targets on Ansible 2.3, at - least so the package's own CI Docker scripts can run without external - help, however newer versions of simplejson no longer support Python - 2.4. Therefore override any installed/loaded version with a - 2.4-compatible version we ship in the compat/ directory. - """ - responder.whitelist_prefix('simplejson') - - # issue #536: must be at end of sys.path, in case existing newer - # version is already loaded. - compat_path = os.path.join(os.path.dirname(__file__), 'compat') - sys.path.append(compat_path) - - for fullname, is_pkg, suffix in ( - (u'simplejson', True, '__init__.py'), - (u'simplejson.decoder', False, 'decoder.py'), - (u'simplejson.encoder', False, 'encoder.py'), - (u'simplejson.scanner', False, 'scanner.py'), - ): - path = os.path.join(compat_path, 'simplejson', suffix) - fp = open(path, 'rb') - try: - source = fp.read() - finally: - fp.close() - - responder.add_source_override( - fullname=fullname, - path=path, - source=source, - is_pkg=is_pkg, - ) - - def _setup_responder(self, responder): - """ - Configure :class:`mitogen.master.ModuleResponder` to only permit - certain packages, and to generate custom responses for certain modules. - """ - responder.whitelist_prefix('ansible') - responder.whitelist_prefix('ansible_mitogen') - self._setup_simplejson(responder) - - # Ansible 2.3 is compatible with Python 2.4 targets, however - # ansible/__init__.py is not. Instead, executor/module_common.py writes - # out a 2.4-compatible namespace package for unknown reasons. So we - # copy it here. - responder.add_source_override( - fullname='ansible', - path=ansible.__file__, - source=(ANSIBLE_PKG_OVERRIDE % ( - ansible.__version__, - ansible.__author__, - )).encode(), - is_pkg=True, - ) - def _setup_master(self): """ Construct a Router, Broker, and mitogen.unix listener @@ -319,12 +602,12 @@ class MuxProcess(object): broker=self.broker, max_message_size=4096 * 1048576, ) - self._setup_responder(self.router.responder) + _setup_responder(self.router.responder) mitogen.core.listen(self.broker, 'shutdown', self.on_broker_shutdown) mitogen.core.listen(self.broker, 'exit', self.on_broker_exit) self.listener = mitogen.unix.Listener.build_stream( router=self.router, - path=self.unix_listener_path, + path=self.path, backlog=C.DEFAULT_FORKS, ) self._enable_router_debug() @@ -337,15 +620,9 @@ class MuxProcess(object): """ self.pool = mitogen.service.Pool( router=self.router, - services=[ - mitogen.service.FileService(router=self.router), - mitogen.service.PushFileService(router=self.router), - ansible_mitogen.services.ContextService(self.router), - ansible_mitogen.services.ModuleDepService(self.router), - ], size=getenv_int('MITOGEN_POOL_SIZE', default=32), ) - LOG.debug('Service pool configured: size=%d', self.pool.size) + setup_pool(self.pool) def on_broker_shutdown(self): """ @@ -364,7 +641,7 @@ class MuxProcess(object): ourself. In future this should gracefully join the pool, but TERM is fine for now. """ - if not self.profiling: + if not os.environ.get('MITOGEN_PROFILING'): # In normal operation we presently kill the process because there is # not yet any way to cancel connect(). When profiling, threads # including the broker must shut down gracefully, otherwise pstats diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index a7c0e46f..a8fde265 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -326,6 +326,7 @@ class ContextService(mitogen.service.Service): ) def _send_module_forwards(self, context): + return self.router.responder.forward_modules(context, self.ALWAYS_PRELOAD) _candidate_temp_dirs = None @@ -372,7 +373,7 @@ class ContextService(mitogen.service.Service): try: method = getattr(self.router, spec['method']) except AttributeError: - raise Error('unsupported method: %(transport)s' % spec) + raise Error('unsupported method: %(method)s' % spec) context = method(via=via, unidirectional=True, **spec['kwargs']) if via and spec.get('enable_lru'): @@ -382,6 +383,7 @@ class ContextService(mitogen.service.Service): mitogen.core.listen(context, 'disconnect', lambda: self._on_context_disconnect(context)) + #self._send_module_forwards(context) TODO self._send_module_forwards(context) init_child_result = context.call( ansible_mitogen.target.init_child, @@ -443,7 +445,7 @@ class ContextService(mitogen.service.Service): @mitogen.service.arg_spec({ 'stack': list }) - def get(self, msg, stack): + def get(self, stack): """ Return a Context referring to an established connection with the given configuration, establishing new connections as necessary. diff --git a/ansible_mitogen/strategy.py b/ansible_mitogen/strategy.py index 01dff285..a1315cd9 100644 --- a/ansible_mitogen/strategy.py +++ b/ansible_mitogen/strategy.py @@ -31,6 +31,11 @@ import os import signal import threading +try: + import setproctitle +except ImportError: + setproctitle = None + import mitogen.core import ansible_mitogen.affinity import ansible_mitogen.loaders @@ -145,11 +150,17 @@ def wrap_connection_loader__get(name, *args, **kwargs): return connection_loader__get(name, *args, **kwargs) -def wrap_worker__run(*args, **kwargs): +def wrap_worker__run(self): """ While the strategy is active, rewrite connection_loader.get() calls for some transports into requests for a compatible Mitogen transport. """ + if setproctitle: + setproctitle.setproctitle('worker:%s task:%s' % ( + self._host.name, + self._task.action, + )) + # Ignore parent's attempts to murder us when we still need to write # profiling output. if mitogen.core._profile_hook.__name__ != '_profile_hook': @@ -158,10 +169,60 @@ def wrap_worker__run(*args, **kwargs): ansible_mitogen.logging.set_process_name('task') ansible_mitogen.affinity.policy.assign_worker() return mitogen.core._profile_hook('WorkerProcess', - lambda: worker__run(*args, **kwargs) + lambda: worker__run(self) ) +class AnsibleWrappers(object): + """ + Manage add/removal of various Ansible runtime hooks. + """ + def _add_plugin_paths(self): + """ + Add the Mitogen plug-in directories to the ModuleLoader path, avoiding + the need for manual configuration. + """ + base_dir = os.path.join(os.path.dirname(__file__), 'plugins') + ansible_mitogen.loaders.connection_loader.add_directory( + os.path.join(base_dir, 'connection') + ) + ansible_mitogen.loaders.action_loader.add_directory( + os.path.join(base_dir, 'action') + ) + + def _install_wrappers(self): + """ + Install our PluginLoader monkey patches and update global variables + with references to the real functions. + """ + global action_loader__get + action_loader__get = ansible_mitogen.loaders.action_loader.get + ansible_mitogen.loaders.action_loader.get = wrap_action_loader__get + + global connection_loader__get + connection_loader__get = ansible_mitogen.loaders.connection_loader.get + ansible_mitogen.loaders.connection_loader.get = wrap_connection_loader__get + + global worker__run + worker__run = ansible.executor.process.worker.WorkerProcess.run + ansible.executor.process.worker.WorkerProcess.run = wrap_worker__run + + def _remove_wrappers(self): + """ + Uninstall the PluginLoader monkey patches. + """ + ansible_mitogen.loaders.action_loader.get = action_loader__get + ansible_mitogen.loaders.connection_loader.get = connection_loader__get + ansible.executor.process.worker.WorkerProcess.run = worker__run + + def install(self): + self._add_plugin_paths() + self._install_wrappers() + + def remove(self): + self._remove_wrappers() + + class StrategyMixin(object): """ This mix-in enhances any built-in strategy by arranging for various Mitogen @@ -223,43 +284,6 @@ class StrategyMixin(object): remote process, all the heavy lifting of transferring the action module and its dependencies are automatically handled by Mitogen. """ - def _install_wrappers(self): - """ - Install our PluginLoader monkey patches and update global variables - with references to the real functions. - """ - global action_loader__get - action_loader__get = ansible_mitogen.loaders.action_loader.get - ansible_mitogen.loaders.action_loader.get = wrap_action_loader__get - - global connection_loader__get - connection_loader__get = ansible_mitogen.loaders.connection_loader.get - ansible_mitogen.loaders.connection_loader.get = wrap_connection_loader__get - - global worker__run - worker__run = ansible.executor.process.worker.WorkerProcess.run - ansible.executor.process.worker.WorkerProcess.run = wrap_worker__run - - def _remove_wrappers(self): - """ - Uninstall the PluginLoader monkey patches. - """ - ansible_mitogen.loaders.action_loader.get = action_loader__get - ansible_mitogen.loaders.connection_loader.get = connection_loader__get - ansible.executor.process.worker.WorkerProcess.run = worker__run - - def _add_plugin_paths(self): - """ - Add the Mitogen plug-in directories to the ModuleLoader path, avoiding - the need for manual configuration. - """ - base_dir = os.path.join(os.path.dirname(__file__), 'plugins') - ansible_mitogen.loaders.connection_loader.add_directory( - os.path.join(base_dir, 'connection') - ) - ansible_mitogen.loaders.action_loader.add_directory( - os.path.join(base_dir, 'action') - ) def _queue_task(self, host, task, task_vars, play_context): """ @@ -290,20 +314,35 @@ class StrategyMixin(object): play_context=play_context, ) + def _get_worker_model(self): + """ + In classic mode a single :class:`WorkerModel` exists, which manages + references and configuration of the associated connection multiplexer + process. + """ + return ansible_mitogen.process.get_classic_worker_model() + def run(self, iterator, play_context, result=0): """ - Arrange for a mitogen.master.Router to be available for the duration of - the strategy's real run() method. + Wrap :meth:`run` to ensure requisite infrastructure and modifications + are configured for the duration of the call. """ _assert_supported_release() - - ansible_mitogen.process.MuxProcess.start() - run = super(StrategyMixin, self).run - self._add_plugin_paths() - self._install_wrappers() + wrappers = AnsibleWrappers() + self._worker_model = self._get_worker_model() + ansible_mitogen.process.set_worker_model(self._worker_model) try: - return mitogen.core._profile_hook('Strategy', - lambda: run(iterator, play_context) - ) + self._worker_model.on_strategy_start() + try: + wrappers.install() + try: + run = super(StrategyMixin, self).run + return mitogen.core._profile_hook('Strategy', + lambda: run(iterator, play_context) + ) + finally: + wrappers.remove() + finally: + self._worker_model.on_strategy_complete() finally: - self._remove_wrappers() + ansible_mitogen.process.set_worker_model(None) diff --git a/mitogen/core.py b/mitogen/core.py index 626d5297..10cd4385 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -901,7 +901,11 @@ class Message(object): unpickler.find_global = self._find_global try: # Must occur off the broker thread. - obj = unpickler.load() + try: + obj = unpickler.load() + except: + LOG.error('raw pickle was: %r', self.data) + raise self._unpickled = obj except (TypeError, ValueError): e = sys.exc_info()[1] diff --git a/mitogen/parent.py b/mitogen/parent.py index 97ec4949..4b2ac388 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -2159,6 +2159,23 @@ class Router(mitogen.core.Router): finally: self._write_lock.release() + def disconnect(self, context): + """ + Disconnect a context and forget its stream, assuming the context is + directly connected. + """ + stream = self.stream_by_id(context) + if stream.remote_id != context.context_id: + return + + l = mitogen.core.Latch() + mitogen.core.listen(stream, 'disconnect', l.put) + def disconnect(): + LOG.debug('Starting disconnect of %r', stream) + stream.on_disconnect(self.broker) + self.broker.defer(disconnect) + l.get() + def add_route(self, target_id, stream): """ Arrange for messages whose `dst_id` is `target_id` to be forwarded on diff --git a/mitogen/service.py b/mitogen/service.py index 886012e8..9e17482c 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -92,6 +92,24 @@ def get_or_create_pool(size=None, router=None): _pool_lock.release() +def call(service_name, method_name, call_context=None, **kwargs): + """ + Call a service registered with this pool, using the calling thread as a + host. + """ + if isinstance(service_name, mitogen.core.BytesType): + service_name = service_name.encode('utf-8') + elif not isinstance(service_name, mitogen.core.UnicodeType): + service_name = service_name.name() # Service.name() + + if call_context: + return call_context.call_service(service_name, method_name, **kwargs) + else: + pool = get_or_create_pool() + invoker = pool.get_invoker(service_name, msg=None) + return getattr(invoker.service, method_name)(**kwargs) + + def validate_arg_spec(spec, args): for name in spec: try: @@ -239,12 +257,13 @@ class Invoker(object): if not policies: raise mitogen.core.CallError('Method has no policies set.') - if not all(p.is_authorized(self.service, msg) for p in policies): - raise mitogen.core.CallError( - self.unauthorized_msg, - method_name, - self.service.name() - ) + if msg is not None: + if not all(p.is_authorized(self.service, msg) for p in policies): + raise mitogen.core.CallError( + self.unauthorized_msg, + method_name, + self.service.name() + ) required = getattr(method, 'mitogen_service__arg_spec', {}) validate_arg_spec(required, kwargs) @@ -264,7 +283,7 @@ class Invoker(object): except Exception: if no_reply: LOG.exception('While calling no-reply method %s.%s', - type(self.service).__name__, + self.service.name(), func_name(method)) else: raise @@ -690,7 +709,7 @@ class PushFileService(Service): """ for path in paths: self.propagate_to(context, mitogen.core.to_text(path)) - self.router.responder.forward_modules(context, modules) + #self.router.responder.forward_modules(context, modules) TODO @expose(policy=AllowParents()) @arg_spec({ diff --git a/tests/ansible/run_ansible_playbook.py b/tests/ansible/run_ansible_playbook.py index b5b459a1..467eaffc 100755 --- a/tests/ansible/run_ansible_playbook.py +++ b/tests/ansible/run_ansible_playbook.py @@ -51,7 +51,11 @@ else: os.path.join(GIT_BASEDIR, 'tests/ansible/hosts') ) -args = ['ansible-playbook'] +if 'ANSIBLE_ARGV' in os.environ: + args = eval(os.environ['ANSIBLE_ARGV']) +else: + args = ['ansible-playbook'] + args += ['-e', json.dumps(extra)] args += sys.argv[1:] os.execvp(args[0], args)