ansible: document the connection class.

This commit is contained in:
David Wilson 2018-02-17 15:57:29 +05:45
parent b7f563a6f0
commit 5d8cb0f5fb
3 changed files with 138 additions and 32 deletions

View File

@ -33,17 +33,36 @@ import ansible.plugins.connection
import ansible_mitogen.helpers
import mitogen.unix
from ansible_mitogen.strategy.mitogen import ContextService
from ansible_mitogen.utils import cast
class Connection(ansible.plugins.connection.ConnectionBase):
#: mitogen.master.Router for this worker.
router = None
#: mitogen.master.Context representing the parent Context, which is
#: presently always the master process.
parent = None
#: mitogen.master.Context used to communicate with the target user account.
context = None
#: Only sudo is supported for now.
become_methods = ['sudo']
transport = 'mitogen'
#: Set by the constructor according to whichever connection type this
#: connection should emulate. We emulate the original connection type to
#: work around artificial limitations in e.g. the synchronize action, which
#: hard-codes 'local' and 'ssh' as the only allowable connection types.
transport = None
def __init__(self, play_context, new_stdin, original_transport):
assert 'MITOGEN_LISTENER_PATH' in os.environ, (
'The "mitogen" connection plug-in may only be instantiated '
'by the "mitogen" strategy plugin.'
)
self.original_transport = original_transport
self.transport = original_transport
super(Connection, self).__init__(play_context, new_stdin)
@ -53,23 +72,43 @@ class Connection(ansible.plugins.connection.ConnectionBase):
return self.router is not None
def _connect_local(self):
return mitogen.service.call(self.parent, 500, {
"""
Fetch a reference to the local() Context from ContextService in the
master process.
"""
return mitogen.service.call(self.parent, ContextService.handle, cast({
'method': 'local',
})
def _connect_ssh(self):
return mitogen.service.call(self.parent, 500, cast({
'method': 'ssh',
'hostname': self._play_context.remote_addr,
'username': self._play_context.remote_user,
'password': self._play_context.password,
'port': self._play_context.port,
'python_path': '/usr/bin/python',
'ssh_path': self._play_context.ssh_executable,
}))
def _connect_ssh(self):
"""
Fetch a reference to an SSH Context matching the play context from
ContextService in the master process.
"""
return mitogen.service.call(
self.parent,
ContextService.handle,
cast({
'method': 'ssh',
'hostname': self._play_context.remote_addr,
'username': self._play_context.remote_user,
'password': self._play_context.password,
'port': self._play_context.port,
'python_path': '/usr/bin/python',
'ssh_path': self._play_context.ssh_executable,
})
)
def _connect_sudo(self, via):
return mitogen.service.call(self.parent, 500, cast({
"""
Fetch a reference to a sudo Context matching the play context from
ContextService in the master process.
:param via:
Parent Context of the sudo Context. For Ansible, this should always
be a Context returned by _connect_ssh().
"""
return mitogen.service.call(self.parent, ContextService.handle, cast({
'method': 'sudo',
'username': self._play_context.become_user,
'password': self._play_context.password,
@ -79,10 +118,20 @@ class Connection(ansible.plugins.connection.ConnectionBase):
}))
def _connect(self):
"""
Establish a connection to the master process's UNIX listener socket,
constructing a mitogen.master.Router to communicate with the master,
and a mitogen.master.Context to represent it.
Depending on the original transport we should emulate, trigger one of
the _connect_*() service calls defined above to cause the master
process to establish the real connection on our behalf, or return a
reference to the existing one.
"""
if self.connected:
return
path = os.environ['LISTENER_SOCKET_PATH']
path = os.environ['MITOGEN_LISTENER_PATH']
self.router, self.parent = mitogen.unix.connect(path)
if self.original_transport == 'local':
@ -94,29 +143,76 @@ class Connection(ansible.plugins.connection.ConnectionBase):
else:
self.context = self._connect_sudo(via=self.host)
def close(self):
"""
Arrange for the mitogen.master.Router running in the worker to
gracefully shut down, and wait for shutdown to complete. Safe to call
multiple times.
"""
if self.router:
self.router.broker.shutdown()
self.router.broker.join()
self.router = None
def call_async(self, func, *args, **kwargs):
"""
Start a function call to the target.
:returns:
mitogen.core.Receiver that receives the function call result.
"""
self._connect()
return self.context.call_async(func, *args, **kwargs)
def call(self, func, *args, **kwargs):
"""
Start and wait for completion of a function call in the target.
:raises mitogen.core.CallError:
The function call failed.
:returns:
Function return value.
"""
return self.call_async(func, *args, **kwargs).get().unpickle()
def exec_command(self, cmd, in_data=None, sudoable=True):
super(Connection, self).exec_command(cmd, in_data=in_data, sudoable=sudoable)
if in_data:
raise ansible.errors.AnsibleError("does not support module pipelining")
def exec_command(self, cmd, in_data='', sudoable=True):
"""
Implement exec_command() by calling the corresponding
ansible_mitogen.helpers function in the target.
:param str cmd:
Shell command to execute.
:param bytes in_data:
Data to supply on ``stdin`` of the process.
:returns:
(return code, stdout bytes, stderr bytes)
"""
return self.py_call(ansible_mitogen.helpers.exec_command,
cast(cmd), cast(in_data))
def fetch_file(self, in_path, out_path):
"""
Implement fetch_file() by calling the corresponding
ansible_mitogen.helpers function in the target.
:param str in_path:
Remote filesystem path to read.
:param str out_path:
Local filesystem path to write.
"""
output = self.py_call(ansible_mitogen.helpers.read_path,
cast(in_path))
ansible_mitogen.helpers.write_path(out_path, output)
def put_file(self, in_path, out_path):
"""
Implement put_file() by caling the corresponding
ansible_mitogen.helpers function in the target.
:param str in_path:
Local filesystem path to read.
:param str out_path:
Remote filesystem path to write.
"""
self.py_call(ansible_mitogen.helpers.write_path, cast(out_path),
ansible_mitogen.helpers.read_path(in_path))
def close(self):
self.router.broker.shutdown()
self.router.broker.join()

View File

@ -75,7 +75,7 @@ def wrap_connection_loader__get(name, play_context, new_stdin):
return connection_loader__get(name, play_context, new_stdin, **kwargs)
class ContextProxyService(mitogen.service.Service):
class ContextService(mitogen.service.Service):
"""
Used by worker processes connecting back into the top-level process to
fetch the single Context instance corresponding to the supplied connection
@ -98,11 +98,11 @@ class ContextProxyService(mitogen.service.Service):
:returns mitogen.master.Context:
Corresponding Context instance.
"""
well_known_id = 500
handle = 500
max_message_size = 1000
def __init__(self, router):
super(ContextProxyService, self).__init__(router)
super(ContextService, self).__init__(router)
self._context_by_key = {}
def validate_args(self, args):
@ -149,10 +149,10 @@ class StrategyModule(ansible.plugins.strategy.linear.StrategyModule):
self.router.responder.whitelist_prefix('ansible')
self.router.responder.whitelist_prefix('ansible_mitogen')
self.listener = mitogen.unix.Listener(self.router)
os.environ['LISTENER_SOCKET_PATH'] = self.listener.path
os.environ['MITOGEN_LISTENER_PATH'] = self.listener.path
# TODO: gracefully shutdown and join on this at exist.
self.service = ContextProxyService(self.router)
# TODO: gracefully shutdown and join on this at exit.
self.service = ContextService(self.router)
self.service_thread = threading.Thread(target=self.service.run)
self.service_thread.setDaemon(True)
self.service_thread.start()

View File

@ -34,19 +34,29 @@ from mitogen.core import LOG
class Service(object):
well_known_id = None
#: If ``None``, a handle is dynamically allocated, otherwise the fixed
#: integer handle to use.
handle = None
max_message_size = 0
def __init__(self, router):
self.router = router
self.recv = mitogen.core.Receiver(router, self.well_known_id)
self.recv = mitogen.core.Receiver(router, self.handle)
self.handle = self.recv.handle
self.running = True
def validate_args(self, args):
return True
def run_once(self):
msg = self.recv.get()
try:
msg = self.recv.get()
except mitogen.core.ChannelError, e:
# Channel closed due to broker shutdown, exit gracefully.
LOG.debug('%r: channel closed: %s', self, e)
self.running = False
return
if len(msg.data) > self.max_message_size:
LOG.error('%r: larger than permitted size: %r', self, msg)
msg.reply(mitogen.core.CallError('Message size exceeded'))