diff --git a/docs/changelog.rst b/docs/changelog.rst index cdc160be..99b30214 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -164,6 +164,10 @@ Fixes environment variable if it is set, causing behaviour to diverge when Ansible was invoked across user accounts via ``sudo``. +* `#364 `_: file transfers from + controllers running Python 2.7.2 or earlier could be interrupted due to a + forking bug in the :mod:`tempfile` module. + * `#370 `_: the Ansible `reboot `_ module is supported. diff --git a/mitogen/core.py b/mitogen/core.py index b1519c90..cb64fc92 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -721,13 +721,19 @@ class Sender(object): _vv and IOLOG.debug('%r.send(%r..)', self, repr(data)[:100]) self.context.send(Message.pickled(data, handle=self.dst_handle)) + explicit_close_msg = 'Sender was explicitly closed' + def close(self): """ Send a dead message to the remote, causing :meth:`ChannelError` to be raised in any waiting thread. """ _vv and IOLOG.debug('%r.close()', self) - self.context.send(Message.dead(handle=self.dst_handle)) + self.context.send( + Message.dead( + reason=self.explicit_close_msg, + handle=self.dst_handle) + ) def __repr__(self): return 'Sender(%r, %r)' % (self.context, self.dst_handle) diff --git a/mitogen/service.py b/mitogen/service.py index dc90b67f..0bafc899 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -754,8 +754,8 @@ class FileService(Service): def __init__(self, router): super(FileService, self).__init__(router) - #: Mapping of registered path -> file size. - self._metadata_by_path = {} + #: Set of registered paths. + self._paths = set() #: Mapping of Stream->FileStreamState. self._state_by_stream = {} @@ -772,20 +772,21 @@ class FileService(Service): def register(self, path): """ Authorize a path for access by children. Repeat calls with the same - path is harmless. + path has no effect. :param str path: File path. """ - if path in self._metadata_by_path: - return + if path not in self._paths: + LOG.debug('%r: registering %r', self, path) + self._paths.add(path) + def _generate_stat(self, path): st = os.stat(path) if not stat.S_ISREG(st.st_mode): raise IOError('%r is not a regular file.' % (path,)) - LOG.debug('%r: registering %r', self, path) - self._metadata_by_path[path] = { + return { 'size': st.st_size, 'mode': st.st_mode, 'owner': self._name_or_none(pwd.getpwuid, 0, 'pw_name'), @@ -869,25 +870,25 @@ class FileService(Service): :raises Error: Unregistered path, or Sender did not match requestee context. """ - if path not in self._metadata_by_path: + if path not in self._paths: raise Error(self.unregistered_msg) if msg.src_id != sender.context.context_id: raise Error(self.context_mismatch_msg) LOG.debug('Serving %r', path) - try: - fp = open(path, 'rb', self.IO_SIZE) - except IOError: - msg.reply(mitogen.core.CallError( - sys.exc_info()[1] - )) - return # Response must arrive first so requestee can begin receive loop, # otherwise first ack won't arrive until all pending chunks were # delivered. In that case max BDP would always be 128KiB, aka. max # ~10Mbit/sec over a 100ms link. - msg.reply(self._metadata_by_path[path]) + try: + fp = open(path, 'rb', self.IO_SIZE) + msg.reply(self._generate_stat(path)) + except IOError: + msg.reply(mitogen.core.CallError( + sys.exc_info()[1] + )) + return stream = self.router.stream_by_id(sender.context.context_id) state = self._state_by_stream.setdefault(stream, FileStreamState()) @@ -949,6 +950,7 @@ class FileService(Service): sender=recv.to_sender(), ) + received_bytes = 0 for chunk in recv: s = chunk.unpickle() LOG.debug('get_file(%r): received %d bytes', path, len(s)) @@ -958,11 +960,19 @@ class FileService(Service): size=len(s), ).close() out_fp.write(s) + received_bytes += len(s) - ok = out_fp.tell() == metadata['size'] - if not ok: + ok = received_bytes == metadata['size'] + if received_bytes < metadata['size']: LOG.error('get_file(%r): receiver was closed early, controller ' - 'is likely shutting down.', path) + 'may be shutting down, or the file was truncated ' + 'during transfer. Expected %d bytes, received %d.', + path, metadata['size'], received_bytes) + elif received_bytes > metadata['size']: + LOG.error('get_file(%r): the file appears to have grown ' + 'while transfer was in progress. Expected %d ' + 'bytes, received %d.', + path, metadata['size'], received_bytes) LOG.debug('target.get_file(): fetched %d bytes of %r from %r in %dms', metadata['size'], path, context, 1000 * (time.time() - t0)) diff --git a/tests/ansible/integration/action/copy.yml b/tests/ansible/integration/action/copy.yml index d799be90..c3e961f6 100644 --- a/tests/ansible/integration/action/copy.yml +++ b/tests/ansible/integration/action/copy.yml @@ -1,6 +1,6 @@ # Verify copy module for small and large files, and inline content. -- name: integration/action/synchronize.yml +- name: integration/action/copy.yml hosts: test-targets any_errors_fatal: true tasks: