From cc980569a391cc870e7b1d5841ffc7832951067e Mon Sep 17 00:00:00 2001 From: David Wilson Date: Fri, 13 Apr 2018 13:31:25 +0100 Subject: [PATCH] issue #159: initial context LRU implementation Now Connection.close() *must* be called in the worker, to ensure the reference count for a context drops correctly. Remove 'discriminator' for now, I'm not using it for testing any more and it complicated this code. This code is a car crash, it needs rewritten again. Ideally some/most of this behaviour could live on services.DeduplicatingService somehow, but I couldn't come up with a sensible design. --- ansible_mitogen/connection.py | 33 ++++--- ansible_mitogen/services.py | 158 ++++++++++++++++++++++++++++++---- docs/ansible.rst | 44 ++++++++-- mitogen/parent.py | 8 ++ mitogen/service.py | 5 +- 5 files changed, 209 insertions(+), 39 deletions(-) diff --git a/ansible_mitogen/connection.py b/ansible_mitogen/connection.py index ace5a100..4c5cad21 100644 --- a/ansible_mitogen/connection.py +++ b/ansible_mitogen/connection.py @@ -58,7 +58,13 @@ class Connection(ansible.plugins.connection.ConnectionBase): #: presently always the master process. parent = None - #: mitogen.master.Context used to communicate with the target user account. + #: mitogen.master.Context connected to the target machine's initial SSH + #: account. + host = None + + #: mitogen.master.Context connected to the target user account on the + #: target machine (i.e. via sudo), or simply a copy of :attr:`host` if + #: become is not in use. context = None #: Only sudo is supported for now. @@ -79,9 +85,6 @@ class Connection(ansible.plugins.connection.ConnectionBase): #: Set to 'ansible_ssh_timeout' by on_action_run(). ansible_ssh_timeout = None - #: Set to 'mitogen_ssh_discriminator' by on_action_run() - mitogen_ssh_discriminator = None - #: Set after connection to the target context's home directory. _homedir = None @@ -111,10 +114,6 @@ class Connection(ansible.plugins.connection.ConnectionBase): executing. We use the opportunity to grab relevant bits from the task-specific data. """ - self.mitogen_ssh_discriminator = task_vars.get( - 'mitogen_ssh_discriminator', - None - ) self.ansible_ssh_timeout = task_vars.get( 'ansible_ssh_timeout', None @@ -162,7 +161,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): dct = mitogen.service.call( context=self.parent, handle=ContextService.handle, - method='connect', + method='get', kwargs=mitogen.utils.cast(kwargs), ) @@ -190,7 +189,6 @@ class Connection(ansible.plugins.connection.ConnectionBase): 'method_name': 'ssh', 'check_host_keys': False, # TODO 'hostname': self._play_context.remote_addr, - 'discriminator': self.mitogen_ssh_discriminator, 'username': self._play_context.remote_user, 'password': self._play_context.password, 'port': self._play_context.port, @@ -298,6 +296,17 @@ class Connection(ansible.plugins.connection.ConnectionBase): gracefully shut down, and wait for shutdown to complete. Safe to call multiple times. """ + for context in set([self.host, self.context]): + if context: + mitogen.service.call( + context=self.parent, + handle=ContextService.handle, + method='put', + kwargs={ + 'context': context + } + ) + self.host = None self.context = None if self.broker and not new_task: @@ -378,10 +387,10 @@ class Connection(ansible.plugins.connection.ConnectionBase): Implement put_file() by caling the corresponding ansible_mitogen.target function in the target. - :param str in_path: - Local filesystem path to read. :param str out_path: Remote filesystem path to write. + :param byte data: + File contents to put. """ self.call(ansible_mitogen.target.write_path, mitogen.utils.cast(out_path), diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index 642cd042..cfccd6ff 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -39,7 +39,9 @@ when a child has completed a job. from __future__ import absolute_import import logging +import sys import os.path +import pprint import threading import zlib @@ -55,7 +57,7 @@ class Error(Exception): pass -class ContextService(mitogen.service.DeduplicatingService): +class ContextService(mitogen.service.Service): """ Used by workers to fetch the single Context instance corresponding to a connection configuration, creating the matching connection if it does not @@ -71,33 +73,93 @@ class ContextService(mitogen.service.DeduplicatingService): """ handle = 500 max_message_size = 1000 + max_contexts = 20 + + def __init__(self, *args, **kwargs): + super(ContextService, self).__init__(*args, **kwargs) + #: JoinPoint for context responses. + self._lock = threading.Lock() + self._response_by_key = {} + self._waiters_by_key = {} + #: Active context count. + self._refs_by_context = {} + #: List of contexts in creation order by via= parameter. + self._lru_by_via = {} + #: (method_name, kwargs) pairs by Conetxt + self._cfg_by_context = {} @mitogen.service.expose(mitogen.service.AllowParents()) @mitogen.service.arg_spec({ - 'method_name': str + 'context': mitogen.core.Context }) - def connect(self, method_name, discriminator=None, **kwargs): + def put(self, context): """ - Return a Context referring to an established connection with the given - configuration, establishing a new connection as necessary. - - :param str method_name: - The :class:`mitogen.parent.Router` connection method to use. - :param discriminator: - Mixed into the key used to select an existing connection, to allow - intentional duplicate connections to be created. - :param dict kwargs: - Keyword arguments passed to `mitogen.master.Router.[method_name]()`. - - :returns tuple: - Tuple of `(context, home_dir)`, where: - * `context` is the mitogen.master.Context referring to the - target context. - * `home_dir` is a cached copy of the remote directory. + Return a reference, making it eligable for recycling once its reference + count reaches zero. """ + LOG.debug('%r.put(%r)', self, context) + assert self._refs_by_context[context] > 0 + self._refs_by_context[context] -= 1 + + def key_from_kwargs(self, **kwargs): + """ + Generate a deduplication key from the request. The default + implementation returns a string based on a stable representation of the + input dictionary generated by :py:func:`pprint.pformat`. + """ + return pprint.pformat(kwargs) + + def _produce_response(self, key, response): + self._lock.acquire() + try: + waiters = self._waiters_by_key.pop(key) + count = len(waiters) + for msg in waiters: + msg.reply(response) + finally: + self._lock.release() + return count + + def _lru(self, new_context, **kwargs): + via = kwargs.get('via') + if via is None: + # We don't have a limit on the number of directly connections. + return + + lru = self._lru_by_via.setdefault(via, []) + if len(lru) < self.max_contexts: + lru.append(new_context) + return + + for context in reversed(lru): + if self._refs_by_context[context] == 0: + break + else: + LOG.warning('via=%r reached maximum number of interpreters, ' + 'but they are all marked as in-use.', via) + return + + LOG.info('%r._discard_one(): shutting down %r', self, context) + context.shutdown() + + method_name, kwargs = self._cfg_by_context[context] + key = self.key_from_kwargs(method_name=method_name, **kwargs) + + self._lock.acquire() + try: + del self._response_by_key[key] + del self._refs_by_context[context] + del self._cfg_by_context[context] + lru.remove(context) + lru.append(new_context) + finally: + self._lock.release() + + def _connect(self, method_name, **kwargs): method = getattr(self.router, method_name, None) if method is None: raise Error('no such Router method: %s' % (method_name,)) + try: context = method(**kwargs) except mitogen.core.StreamError as e: @@ -107,17 +169,75 @@ class ContextService(mitogen.service.DeduplicatingService): 'msg': str(e), } + if kwargs.get('via'): + self._lru(context, method_name=method_name, **kwargs) home_dir = context.call(os.path.expanduser, '~') # We don't need to wait for the result of this. Ideally we'd check its # return value somewhere, but logs will catch a failure anyway. context.call_async(ansible_mitogen.target.start_fork_parent) + self._cfg_by_context[context] = (method_name, kwargs) + self._refs_by_context[context] = 0 return { 'context': context, 'home_dir': home_dir, 'msg': None, } + @mitogen.service.expose(mitogen.service.AllowParents()) + @mitogen.service.arg_spec({ + 'method_name': str + }) + def get(self, msg, **kwargs): + """ + Return a Context referring to an established connection with the given + configuration, establishing a new connection as necessary. + + :param str method_name: + The :class:`mitogen.parent.Router` connection method to use. + :param dict kwargs: + Keyword arguments passed to `mitogen.master.Router.[method_name]()`. + + :returns tuple: + Tuple of `(context, home_dir)`, where: + * `context` is the mitogen.master.Context referring to the + target context. + * `home_dir` is a cached copy of the remote directory. + """ + key = self.key_from_kwargs(**kwargs) + self._lock.acquire() + try: + response = self._response_by_key.get(key) + if response is not None: + self._refs_by_context[response['context']] += 1 + return response + + waiters = self._waiters_by_key.get(key) + if waiters is not None: + waiters.append(msg) + return self.NO_REPLY + + self._waiters_by_key[key] = [msg] + finally: + self._lock.release() + + # I'm the first thread to wait on a result, so I will create the + # connection. + try: + response = self._connect(**kwargs) + count = self._produce_response(key, response) + if response['msg'] is None: + self._response_by_key[key] = response + self._refs_by_context[response['context']] += count + except mitogen.core.CallError: + e = sys.exc_info()[1] + self._produce_response(key, e) + except Exception: + e = sys.exc_info()[1] + self._produce_response(key, mitogen.core.CallError(e)) + + return self.NO_REPLY + class FileService(mitogen.service.Service): """ diff --git a/docs/ansible.rst b/docs/ansible.rst index 4bd7a54c..58a12a7f 100644 --- a/docs/ansible.rst +++ b/docs/ansible.rst @@ -135,10 +135,6 @@ High Risk exhaust available RAM. This will be fixed soon as it's likely to be tickled by common playbooks. -* No mechanism exists to bound the number of interpreters created during a run. - For some playbooks that parameterize ``become_user`` over many accounts, - resource exhaustion may be triggered on the target machine. - Low Risk ~~~~~~~~ @@ -291,9 +287,6 @@ This list will grow as more missing pieces are discovered. * ``ansible_ssh_private_key_file`` * ``ansible_ssh_pass``, ``ansible_password`` (default: assume passwordless) * ``ssh_args``, ``ssh_common_args``, ``ssh_extra_args`` -* ``mitogen_ssh_discriminator``: if present, a string mixed into the key used - to deduplicate connections. This permits intentional duplicate Mitogen - connections to a single host, which is probably only useful for testing. Sudo Variables @@ -373,6 +366,43 @@ internal list can be updated to prevent users bumping into the same problem in future. +Interpreter Recycling +~~~~~~~~~~~~~~~~~~~~~ + +To avoid accidental DoS of targets, the extension stops creating persistent +interpreters after the 20th interpreter has been created. Instead the most +recently created interpreter is shut down to make room for any new interpreter. +This is to avoid situations like below from triggering memory exhaustion by +spawning a huge number of interpreters. + +.. code-block:: yaml + + - hosts: corp_boxes + vars: + user_directory: [ + # 10,000 corporate user accounts + ] + tasks: + - name: Create user bashrc + become: true + vars: + ansible_become_user: "{{item}}" + copy: + src: bashrc + dest: "~{{item}}/.bashrc" + with_items: "{{user_directory}}" + +The recycling behaviour does not occur for direct connections from the Ansible +controller, and it is keyed on a per-host basis, i.e. up to 20 interpreters may +exist for each directly connected target host. + +The newest interpreter is chosen to avoid recycling useful accounts, like +"root" or "postgresql" that tend to appear early in a run, however it is simple +to construct a playbook that defeats this strategy. A future version will key +interpreters on the identity of the task, file and/or playbook that created +them, avoiding the recycling of useful accounts in every scenario. + + Runtime Patches ~~~~~~~~~~~~~~~ diff --git a/mitogen/parent.py b/mitogen/parent.py index c100c4fd..1b54cccc 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -619,6 +619,14 @@ class ChildIdAllocator(object): class Context(mitogen.core.Context): via = None + def __eq__(self, other): + return (isinstance(other, mitogen.core.Context) and + (other.context_id == self.context_id) and + (other.router == self.router)) + + def __hash__(self): + return hash((self.router, self.context_id)) + def call_async(self, fn, *args, **kwargs): LOG.debug('%r.call_async(%r, *%r, **%r)', self, fn, args, kwargs) diff --git a/mitogen/service.py b/mitogen/service.py index bd93eff1..4e815bfa 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -174,7 +174,10 @@ class Service(object): def _on_receive_message(self, msg): method_name, kwargs = self._validate_message(msg) - return getattr(self, method_name)(**kwargs) + method = getattr(self, method_name) + if 'msg' in method.func_code.co_varnames: + kwargs['msg'] = msg # TODO: hack + return method(**kwargs) def on_receive_message(self, msg): try: