From 76beea6554f4ddfce64b25e899cbffc75b85b2c8 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Thu, 7 Jun 2018 16:44:02 +0100 Subject: [PATCH] issue #186: move target._get_file into mitogen.service For lack of a better place to keep the client function, make it a classmethod of FileService itself for now. The old _get_file() is removed in a subsequent commit. --- ansible_mitogen/target.py | 7 +++++- mitogen/service.py | 51 ++++++++++++++++++++++++++++++++++++--- 2 files changed, 54 insertions(+), 4 deletions(-) diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index 38889039..02f51d40 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -53,6 +53,7 @@ import ansible_mitogen.runner import mitogen.core import mitogen.fork import mitogen.parent +import mitogen.service LOG = logging.getLogger(__name__) @@ -168,7 +169,11 @@ def transfer_file(context, in_path, out_path, sync=False, set_owner=False): try: try: - ok, metadata = _get_file(context, in_path, fp) + ok, metadata = mitogen.service.FileService.get( + context=context, + path=in_path, + out_fp=fp, + ) if not ok: raise IOError('transfer of %r was interrupted.' % (in_path,)) diff --git a/mitogen/service.py b/mitogen/service.py index 33757836..424f21c8 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -669,9 +669,8 @@ class PushFileService(Service): class FileService(Service): """ - Streaming file server, used to serve small files and huge files alike. - Paths must be registered by a trusted context before they will be served to - a child. + Streaming file server, used to serve small and huge files alike. Paths must + be registered by a trusted context before they will be served to a child. Transfers are divided among the physical streams that connect external contexts, ensuring each stream never has excessive data buffered in RAM, @@ -889,3 +888,49 @@ class FileService(Service): self._schedule_pending_unlocked(state) finally: state.lock.release() + + @classmethod + def get(cls, context, path, out_fp): + """ + Streamily download a file from the connection multiplexer process in + the controller. + + :param mitogen.core.Context context: + Reference to the context hosting the FileService that will be used + to fetch the file. + :param bytes in_path: + FileService registered name of the input file. + :param bytes out_path: + Name of the output path on the local disk. + :returns: + :data:`True` on success, or :data:`False` if the transfer was + interrupted and the output should be discarded. + """ + LOG.debug('get_file(): fetching %r from %r', path, context) + t0 = time.time() + recv = mitogen.core.Receiver(router=context.router) + metadata = context.call_service( + service_name=cls.name(), + method_name='fetch', + path=path, + sender=recv.to_sender(), + ) + + for chunk in recv: + s = chunk.unpickle() + LOG.debug('get_file(%r): received %d bytes', path, len(s)) + context.call_service_async( + service_name=cls.name(), + method_name='acknowledge', + size=len(s), + ).close() + out_fp.write(s) + + ok = out_fp.tell() == metadata['size'] + if not ok: + LOG.error('get_file(%r): receiver was closed early, controller ' + 'is likely shutting down.', path) + + LOG.debug('target.get_file(): fetched %d bytes of %r from %r in %dms', + metadata['size'], path, context, 1000 * (time.time() - t0)) + return ok, metadata