diff --git a/ansible_mitogen/connection/mitogen.py b/ansible_mitogen/connection/mitogen.py index e6a3486a..28e18c93 100644 --- a/ansible_mitogen/connection/mitogen.py +++ b/ansible_mitogen/connection/mitogen.py @@ -33,17 +33,36 @@ import ansible.plugins.connection import ansible_mitogen.helpers import mitogen.unix +from ansible_mitogen.strategy.mitogen import ContextService from ansible_mitogen.utils import cast class Connection(ansible.plugins.connection.ConnectionBase): + #: mitogen.master.Router for this worker. router = None + + #: mitogen.master.Context representing the parent Context, which is + #: presently always the master process. + parent = None + + #: mitogen.master.Context used to communicate with the target user account. context = None + #: Only sudo is supported for now. become_methods = ['sudo'] - transport = 'mitogen' + + #: Set by the constructor according to whichever connection type this + #: connection should emulate. We emulate the original connection type to + #: work around artificial limitations in e.g. the synchronize action, which + #: hard-codes 'local' and 'ssh' as the only allowable connection types. + transport = None def __init__(self, play_context, new_stdin, original_transport): + assert 'MITOGEN_LISTENER_PATH' in os.environ, ( + 'The "mitogen" connection plug-in may only be instantiated ' + 'by the "mitogen" strategy plugin.' + ) + self.original_transport = original_transport self.transport = original_transport super(Connection, self).__init__(play_context, new_stdin) @@ -53,23 +72,43 @@ class Connection(ansible.plugins.connection.ConnectionBase): return self.router is not None def _connect_local(self): - return mitogen.service.call(self.parent, 500, { + """ + Fetch a reference to the local() Context from ContextService in the + master process. + """ + return mitogen.service.call(self.parent, ContextService.handle, cast({ 'method': 'local', - }) - - def _connect_ssh(self): - return mitogen.service.call(self.parent, 500, cast({ - 'method': 'ssh', - 'hostname': self._play_context.remote_addr, - 'username': self._play_context.remote_user, - 'password': self._play_context.password, - 'port': self._play_context.port, - 'python_path': '/usr/bin/python', - 'ssh_path': self._play_context.ssh_executable, })) + def _connect_ssh(self): + """ + Fetch a reference to an SSH Context matching the play context from + ContextService in the master process. + """ + return mitogen.service.call( + self.parent, + ContextService.handle, + cast({ + 'method': 'ssh', + 'hostname': self._play_context.remote_addr, + 'username': self._play_context.remote_user, + 'password': self._play_context.password, + 'port': self._play_context.port, + 'python_path': '/usr/bin/python', + 'ssh_path': self._play_context.ssh_executable, + }) + ) + def _connect_sudo(self, via): - return mitogen.service.call(self.parent, 500, cast({ + """ + Fetch a reference to a sudo Context matching the play context from + ContextService in the master process. + + :param via: + Parent Context of the sudo Context. For Ansible, this should always + be a Context returned by _connect_ssh(). + """ + return mitogen.service.call(self.parent, ContextService.handle, cast({ 'method': 'sudo', 'username': self._play_context.become_user, 'password': self._play_context.password, @@ -79,10 +118,20 @@ class Connection(ansible.plugins.connection.ConnectionBase): })) def _connect(self): + """ + Establish a connection to the master process's UNIX listener socket, + constructing a mitogen.master.Router to communicate with the master, + and a mitogen.master.Context to represent it. + + Depending on the original transport we should emulate, trigger one of + the _connect_*() service calls defined above to cause the master + process to establish the real connection on our behalf, or return a + reference to the existing one. + """ if self.connected: return - path = os.environ['LISTENER_SOCKET_PATH'] + path = os.environ['MITOGEN_LISTENER_PATH'] self.router, self.parent = mitogen.unix.connect(path) if self.original_transport == 'local': @@ -94,29 +143,76 @@ class Connection(ansible.plugins.connection.ConnectionBase): else: self.context = self._connect_sudo(via=self.host) + def close(self): + """ + Arrange for the mitogen.master.Router running in the worker to + gracefully shut down, and wait for shutdown to complete. Safe to call + multiple times. + """ + if self.router: + self.router.broker.shutdown() + self.router.broker.join() + self.router = None + def call_async(self, func, *args, **kwargs): + """ + Start a function call to the target. + + :returns: + mitogen.core.Receiver that receives the function call result. + """ self._connect() return self.context.call_async(func, *args, **kwargs) def call(self, func, *args, **kwargs): + """ + Start and wait for completion of a function call in the target. + + :raises mitogen.core.CallError: + The function call failed. + :returns: + Function return value. + """ return self.call_async(func, *args, **kwargs).get().unpickle() - def exec_command(self, cmd, in_data=None, sudoable=True): - super(Connection, self).exec_command(cmd, in_data=in_data, sudoable=sudoable) - if in_data: - raise ansible.errors.AnsibleError("does not support module pipelining") + def exec_command(self, cmd, in_data='', sudoable=True): + """ + Implement exec_command() by calling the corresponding + ansible_mitogen.helpers function in the target. + + :param str cmd: + Shell command to execute. + :param bytes in_data: + Data to supply on ``stdin`` of the process. + :returns: + (return code, stdout bytes, stderr bytes) + """ return self.py_call(ansible_mitogen.helpers.exec_command, cast(cmd), cast(in_data)) def fetch_file(self, in_path, out_path): + """ + Implement fetch_file() by calling the corresponding + ansible_mitogen.helpers function in the target. + + :param str in_path: + Remote filesystem path to read. + :param str out_path: + Local filesystem path to write. + """ output = self.py_call(ansible_mitogen.helpers.read_path, cast(in_path)) ansible_mitogen.helpers.write_path(out_path, output) def put_file(self, in_path, out_path): + """ + Implement put_file() by caling the corresponding + ansible_mitogen.helpers function in the target. + + :param str in_path: + Local filesystem path to read. + :param str out_path: + Remote filesystem path to write. + """ self.py_call(ansible_mitogen.helpers.write_path, cast(out_path), ansible_mitogen.helpers.read_path(in_path)) - - def close(self): - self.router.broker.shutdown() - self.router.broker.join() diff --git a/ansible_mitogen/strategy/mitogen.py b/ansible_mitogen/strategy/mitogen.py index b7777ec8..4a881acb 100644 --- a/ansible_mitogen/strategy/mitogen.py +++ b/ansible_mitogen/strategy/mitogen.py @@ -75,7 +75,7 @@ def wrap_connection_loader__get(name, play_context, new_stdin): return connection_loader__get(name, play_context, new_stdin, **kwargs) -class ContextProxyService(mitogen.service.Service): +class ContextService(mitogen.service.Service): """ Used by worker processes connecting back into the top-level process to fetch the single Context instance corresponding to the supplied connection @@ -98,11 +98,11 @@ class ContextProxyService(mitogen.service.Service): :returns mitogen.master.Context: Corresponding Context instance. """ - well_known_id = 500 + handle = 500 max_message_size = 1000 def __init__(self, router): - super(ContextProxyService, self).__init__(router) + super(ContextService, self).__init__(router) self._context_by_key = {} def validate_args(self, args): @@ -149,10 +149,10 @@ class StrategyModule(ansible.plugins.strategy.linear.StrategyModule): self.router.responder.whitelist_prefix('ansible') self.router.responder.whitelist_prefix('ansible_mitogen') self.listener = mitogen.unix.Listener(self.router) - os.environ['LISTENER_SOCKET_PATH'] = self.listener.path + os.environ['MITOGEN_LISTENER_PATH'] = self.listener.path - # TODO: gracefully shutdown and join on this at exist. - self.service = ContextProxyService(self.router) + # TODO: gracefully shutdown and join on this at exit. + self.service = ContextService(self.router) self.service_thread = threading.Thread(target=self.service.run) self.service_thread.setDaemon(True) self.service_thread.start() diff --git a/mitogen/service.py b/mitogen/service.py index 550712dd..fc220a05 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -34,19 +34,29 @@ from mitogen.core import LOG class Service(object): - well_known_id = None + #: If ``None``, a handle is dynamically allocated, otherwise the fixed + #: integer handle to use. + handle = None max_message_size = 0 def __init__(self, router): self.router = router - self.recv = mitogen.core.Receiver(router, self.well_known_id) + self.recv = mitogen.core.Receiver(router, self.handle) + self.handle = self.recv.handle self.running = True def validate_args(self, args): return True def run_once(self): - msg = self.recv.get() + try: + msg = self.recv.get() + except mitogen.core.ChannelError, e: + # Channel closed due to broker shutdown, exit gracefully. + LOG.debug('%r: channel closed: %s', self, e) + self.running = False + return + if len(msg.data) > self.max_message_size: LOG.error('%r: larger than permitted size: %r', self, msg) msg.reply(mitogen.core.CallError('Message size exceeded'))