diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index e0da28ff..5dc78177 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -26,6 +26,17 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. +""" +Classes in this file define Mitogen 'services' that run (initially) within the +connection multiplexer process that is forked off the top-level controller +process. + +Once a worker process connects to a multiplexer process +(Connection._connect()), it communicates with these services to establish new +connections, grant access to files by children, and register for notification +when a child has completed a job. +""" + from __future__ import absolute_import import logging import os.path @@ -46,37 +57,17 @@ class Error(Exception): class ContextService(mitogen.service.DeduplicatingService): """ - Used by worker processes connecting back into the top-level process to - fetch the single Context instance corresponding to the supplied connection - configuration, creating a matching connection if it does not exist. + Used by workers to fetch the single Context instance corresponding to a + connection configuration, creating the matching connection if it does not + exist. - For connection methods and their parameters, refer to: + For connection methods and their parameters, see: https://mitogen.readthedocs.io/en/latest/api.html#context-factories - This concentrates all SSH connections in the top-level process, which may - become a bottleneck. There are multiple ways to fix that: - * creating one .local() child context per CPU and sharding connections - between them, using the master process to route messages, or - * as above, but having each child create a unique UNIX listener and - having workers connect in directly. - - :param dict dct: - Parameters passed to `mitogen.master.Router.[method]()`. - - * The `method` key is popped from the dictionary and used to look up - the Mitogen connection method. - * The `discriminator` key is mixed into the key used to select an - existing connection, but popped from the list of arguments passed to - the connection method. - - :returns tuple: - Tuple of `(context, home_dir)`, where: - * `context` is the mitogen.master.Context referring to the target - context. - * `home_dir` is a cached copy of the remote directory. - - mitogen.master.Context: - Corresponding Context instance. + This concentrates connections in the top-level process, which may become a + bottleneck. The bottleneck can be removed using per-CPU connection + processes and arranging for the worker to select one according to a hash of + the connection parameters (sharding). """ handle = 500 max_message_size = 1000 @@ -86,6 +77,25 @@ class ContextService(mitogen.service.DeduplicatingService): 'method_name': str }) def connect(self, method_name, discriminator=None, **kwargs): + """ + Return a Context referring to an established connection with the given + configuration, establishing a new connection as necessary. + + :param dict dct: + Parameters passed to `mitogen.master.Router.[method]()`. + + * The `method` key is popped from the dictionary and used to look + up the Mitogen connection method. + * The `discriminator` key is mixed into the key used to select an + existing connection, but popped from the list of arguments passed + to the connection method. + + :returns tuple: + Tuple of `(context, home_dir)`, where: + * `context` is the mitogen.master.Context referring to the + target context. + * `home_dir` is a cached copy of the remote directory. + """ method = getattr(self.router, method_name, None) if method is None: raise Error('no such Router method: %s' % (method_name,)) @@ -101,7 +111,7 @@ class ContextService(mitogen.service.DeduplicatingService): home_dir = context.call(os.path.expanduser, '~') # We don't need to wait for the result of this. Ideally we'd check its - # return value somewhere, but logs will catch any failures anyway. + # return value somewhere, but logs will catch a failure anyway. context.call_async(ansible_mitogen.target.start_fork_parent) return { 'context': context, @@ -119,20 +129,6 @@ class FileService(mitogen.service.Service): Paths must be explicitly added to the service by a trusted context before they will be served to an untrusted context. - - :param tuple args: - Tuple of `(cmd, path)`, where: - - cmd: one of "register", "fetch", where: - - register: register a file that may be fetched - - fetch: fetch a file that was previously registered - - path: key of the file to fetch or register - - :returns: - Returns ``None` for "register", or the file data for "fetch". - - :raises mitogen.core.CallError: - Security violation occurred, either path not registered, or attempt to - register path from unprivileged context. """ handle = 501 max_message_size = 1000 @@ -147,8 +143,15 @@ class FileService(mitogen.service.Service): 'path': basestring }) def register(self, path): + """ + Authorize a path for access by child contexts. Calling this repeatedly + with the same path is harmless. + + :param str path: + File path. + """ if path not in self._paths: - LOG.info('%r: registering %r', self, path) + LOG.debug('%r: registering %r', self, path) with open(path, 'rb') as fp: self._paths[path] = zlib.compress(fp.read()) @@ -157,6 +160,18 @@ class FileService(mitogen.service.Service): 'path': basestring }) def fetch(self, path): + """ + Fetch a file's data. + + :param str path: + File path. + + :returns: + The file data. + + :raises mitogen.core.CallError: + The path was not registered. + """ if path not in self._paths: raise mitogen.core.CallError(self.unregistered_msg) @@ -169,6 +184,16 @@ class JobResultService(mitogen.service.Service): Receive the result of a task from a child and forward it to interested listeners. If no listener exists, store the result until it is requested. + Storing results in an intermediary service allows: + + * the lifetime of the worker to be decoupled from the lifetime of the job, + * for new and unrelated workers to request the job result after the original + worker that spawned it has exitted, + * for synchronous and asynchronous jobs to be treated identically, + * for latency-free polling and waiting on job results, and + * for Ansible job IDs to be be used to refer to a job in preference to + Mitogen-internal identifiers such as Sender and Context. + Results are keyed by job ID. """ handle = 502 @@ -186,6 +211,14 @@ class JobResultService(mitogen.service.Service): 'sender': mitogen.core.Sender, }) def listen(self, job_id, sender): + """ + Register to receive the result of a job when it becomes available. + + :param str job_id: + Job ID to listen for. + :param mitogen.core.Sender sender: + Sender on which to deliver the job result. + """ LOG.debug('%r.listen(job_id=%r, sender=%r)', self, job_id, sender) with self._lock: if job_id in self._sender_by_job_id: @@ -197,6 +230,15 @@ class JobResultService(mitogen.service.Service): 'job_id': basestring, }) def get(self, job_id): + """ + Return a job's result if it is available, otherwise return immediately. + The job result is forgotten once it has been returned by this method. + + :param str job_id: + Job ID to return. + :returns: + Job result dictionary, or :data:`None`. + """ LOG.debug('%r.get(job_id=%r)', self, job_id) with self._lock: return self._result_by_job_id.pop(job_id, None) @@ -207,6 +249,15 @@ class JobResultService(mitogen.service.Service): 'result': dict }) def push(self, job_id, result): + """ + Deliver a job's result from a child context, notifying any listener + registred via :meth:`listen` of the result. + + :param str job_id: + Job ID whose result is being pushed. + :param dict result: + Job result dictionary. + """ LOG.debug('%r.push(job_id=%r, result=%r)', self, job_id, result) with self._lock: if job_id in self._result_by_job_id: