issue #226: ansible: file transfer improvements
* put_data() supports setting mode and times. * put_file() refuses to copy non-regular files (sockets, FIFOs). * put_file() saves one RTT for <32KiB files by using put_data() and embedding file content in argument list. * FileService returns dict with size/mode/owner/group/mtime/atime. * FileService refuses to copy non-regular files. * transfer_file() preserves file mode. * transfer_file() preserves atime/mtime. * transfer_file() optionally preserves ownership. * transfer_file() optionally calls fsync(). * transfer_file() uses unique temporary file name to avoid conflicting with parallel transfers. * transfer_file() ensures temporary file is deleted on any error. * write_path() writes to a temporary file and deletes it on failure. * write_path() uses unique temporary file name to avoid conflicting with parallel transfers. * write_path() supports setting symbolic owner/group. * write_path() optionally calls fsync(). * write_path() supports setting symbolic mode/mtime/atime. Closes #226, #227, #229
This commit is contained in:
parent
e8b4c4e683
commit
219a202a82
|
@ -30,6 +30,7 @@ from __future__ import absolute_import
|
|||
import logging
|
||||
import os
|
||||
import shlex
|
||||
import stat
|
||||
import sys
|
||||
import time
|
||||
|
||||
|
@ -470,7 +471,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
|
|||
mitogen.utils.cast(in_path))
|
||||
ansible_mitogen.target.write_path(out_path, output)
|
||||
|
||||
def put_data(self, out_path, data):
|
||||
def put_data(self, out_path, data, mode=None, utimes=None):
|
||||
"""
|
||||
Implement put_file() by caling the corresponding
|
||||
ansible_mitogen.target function in the target.
|
||||
|
@ -482,7 +483,9 @@ class Connection(ansible.plugins.connection.ConnectionBase):
|
|||
"""
|
||||
self.call(ansible_mitogen.target.write_path,
|
||||
mitogen.utils.cast(out_path),
|
||||
mitogen.utils.cast(data))
|
||||
mitogen.utils.cast(data),
|
||||
mode=mode,
|
||||
utimes=utimes)
|
||||
|
||||
def put_file(self, in_path, out_path):
|
||||
"""
|
||||
|
@ -494,6 +497,25 @@ class Connection(ansible.plugins.connection.ConnectionBase):
|
|||
:param str out_path:
|
||||
Remote filesystem path to write.
|
||||
"""
|
||||
st = os.stat(in_path)
|
||||
if not stat.S_ISREG(st.st_mode):
|
||||
raise IOError('%r is not a regular file.' % (in_path,))
|
||||
|
||||
# If the file is sufficiently small, just ship it in the argument list
|
||||
# rather than introducing an extra RTT for the child to request it from
|
||||
# FileService.
|
||||
if st.st_size <= 32768:
|
||||
fp = open(in_path, 'rb')
|
||||
try:
|
||||
s = fp.read(32769)
|
||||
finally:
|
||||
fp.close()
|
||||
|
||||
# Ensure file was not growing during call.
|
||||
if len(s) == st.st_size:
|
||||
return self.put_data(out_path, s, mode=st.st_mode,
|
||||
utimes=(st.st_atime, st.st_mtime))
|
||||
|
||||
mitogen.service.call(
|
||||
context=self.parent,
|
||||
handle=ansible_mitogen.services.FileService.handle,
|
||||
|
|
|
@ -38,11 +38,12 @@ when a child has completed a job.
|
|||
"""
|
||||
|
||||
from __future__ import absolute_import
|
||||
import hashlib
|
||||
import grp
|
||||
import logging
|
||||
import os
|
||||
import os.path
|
||||
import pprint
|
||||
import pwd
|
||||
import stat
|
||||
import sys
|
||||
import threading
|
||||
import zlib
|
||||
|
@ -435,7 +436,7 @@ class FileService(mitogen.service.Service):
|
|||
def __init__(self, router):
|
||||
super(FileService, self).__init__(router)
|
||||
#: Mapping of registered path -> file size.
|
||||
self._size_by_path = {}
|
||||
self._metadata_by_path = {}
|
||||
#: Queue used to communicate from service to scheduler thread.
|
||||
self._queue = mitogen.core.Latch()
|
||||
#: Mapping of Stream->[(Sender, file object)].
|
||||
|
@ -545,6 +546,12 @@ class FileService(mitogen.service.Service):
|
|||
sender.close()
|
||||
fp.close()
|
||||
|
||||
def _name_or_none(self, func, n, attr):
|
||||
try:
|
||||
return getattr(func(n), attr)
|
||||
except KeyError:
|
||||
return None
|
||||
|
||||
@mitogen.service.expose(policy=mitogen.service.AllowParents())
|
||||
@mitogen.service.arg_spec({
|
||||
'path': basestring
|
||||
|
@ -557,9 +564,22 @@ class FileService(mitogen.service.Service):
|
|||
:param str path:
|
||||
File path.
|
||||
"""
|
||||
if path not in self._size_by_path:
|
||||
LOG.debug('%r: registering %r', self, path)
|
||||
self._size_by_path[path] = os.path.getsize(path)
|
||||
if path in self._metadata_by_path:
|
||||
return
|
||||
|
||||
st = os.stat(path)
|
||||
if not stat.S_ISREG(st.st_mode):
|
||||
raise IOError('%r is not a regular file.' % (in_path,))
|
||||
|
||||
LOG.debug('%r: registering %r', self, path)
|
||||
self._metadata_by_path[path] = {
|
||||
'size': st.st_size,
|
||||
'mode': st.st_mode,
|
||||
'owner': self._name_or_none(pwd.getpwuid, 0, 'pw_name'),
|
||||
'group': self._name_or_none(grp.getgrgid, 0, 'gr_name'),
|
||||
'mtime': st.st_mtime,
|
||||
'atime': st.st_atime,
|
||||
}
|
||||
|
||||
@mitogen.service.expose(policy=mitogen.service.AllowAny())
|
||||
@mitogen.service.arg_spec({
|
||||
|
@ -575,15 +595,21 @@ class FileService(mitogen.service.Service):
|
|||
:param mitogen.core.Sender sender:
|
||||
Sender to receive file data.
|
||||
:returns:
|
||||
File size. The target can decide whether to keep the file in RAM or
|
||||
disk based on the return value.
|
||||
Dict containing the file metadata:
|
||||
|
||||
* ``size``: File size in bytes.
|
||||
* ``mode``: Integer file mode.
|
||||
* ``owner``: Owner account name on host machine.
|
||||
* ``group``: Owner group name on host machine.
|
||||
* ``mtime``: Floating point modification time.
|
||||
* ``ctime``: Floating point change time.
|
||||
:raises mitogen.core.CallError:
|
||||
The path was not registered.
|
||||
"""
|
||||
if path not in self._size_by_path:
|
||||
if path not in self._metadata_by_path:
|
||||
raise mitogen.core.CallError(self.unregistered_msg)
|
||||
|
||||
LOG.debug('Serving %r', path)
|
||||
fp = open(path, 'rb', mitogen.core.CHUNK_SIZE)
|
||||
self._queue.put((sender, fp))
|
||||
return self._size_by_path[path]
|
||||
return self._metadata_by_path[path]
|
||||
|
|
|
@ -33,6 +33,7 @@ for file transfer, module execution and sundry bits like changing file modes.
|
|||
|
||||
from __future__ import absolute_import
|
||||
import cStringIO
|
||||
import grp
|
||||
import json
|
||||
import logging
|
||||
import operator
|
||||
|
@ -85,7 +86,7 @@ def _get_file(context, path, out_fp):
|
|||
LOG.debug('_get_file(): fetching %r from %r', path, context)
|
||||
t0 = time.time()
|
||||
recv = mitogen.core.Receiver(router=context.router)
|
||||
size = mitogen.service.call(
|
||||
metadata = mitogen.service.call(
|
||||
context=context,
|
||||
handle=ansible_mitogen.services.FileService.handle,
|
||||
method='fetch',
|
||||
|
@ -100,13 +101,14 @@ def _get_file(context, path, out_fp):
|
|||
LOG.debug('_get_file(%r): received %d bytes', path, len(s))
|
||||
out_fp.write(s)
|
||||
|
||||
if out_fp.tell() != size:
|
||||
ok = out_fp.tell() == metadata['size']
|
||||
if not ok:
|
||||
LOG.error('get_file(%r): receiver was closed early, controller '
|
||||
'is likely shutting down.', path)
|
||||
|
||||
LOG.debug('target.get_file(): fetched %d bytes of %r from %r in %dms',
|
||||
size, path, context, 1000*(time.time() - t0))
|
||||
return out_fp.tell() == size
|
||||
metadata['size'], path, context, 1000*(time.time() - t0))
|
||||
return ok, metadata
|
||||
|
||||
|
||||
def get_file(context, path):
|
||||
|
@ -125,13 +127,14 @@ def get_file(context, path):
|
|||
"""
|
||||
if path not in _file_cache:
|
||||
io = cStringIO.StringIO()
|
||||
if not _get_file(context, path, io):
|
||||
ok, metadata = _get_file(context, path, io)
|
||||
if not ok:
|
||||
raise IOError('transfer of %r was interrupted.' % (path,))
|
||||
_file_cache[path] = io.getvalue()
|
||||
return _file_cache[path]
|
||||
|
||||
|
||||
def transfer_file(context, in_path, out_path):
|
||||
def transfer_file(context, in_path, out_path, sync=False, set_owner=False):
|
||||
"""
|
||||
Streamily download a file from the connection multiplexer process in the
|
||||
controller.
|
||||
|
@ -143,19 +146,42 @@ def transfer_file(context, in_path, out_path):
|
|||
FileService registered name of the input file.
|
||||
:param bytes out_path:
|
||||
Name of the output path on the local disk.
|
||||
:param bool sync:
|
||||
If :data:`True`, ensure the file content and metadat are fully on disk
|
||||
before renaming the temporary file over the existing file. This should
|
||||
ensure in the case of system crash, either the entire old or new file
|
||||
are visible post-reboot.
|
||||
:param bool set_owner:
|
||||
If :data:`True`, look up the metadata username and group on the local
|
||||
system and file the file owner using :func:`os.fchmod`.
|
||||
"""
|
||||
fp = open(out_path+'.tmp', 'wb', mitogen.core.CHUNK_SIZE)
|
||||
out_path = os.path.abspath(out_path)
|
||||
fd, tmp_path = tempfile.mkstemp(suffix='.tmp',
|
||||
prefix='.ansible_mitogen_transfer-',
|
||||
dir=os.path.dirname(out_path))
|
||||
fp = os.fdopen(fd, 'wb', mitogen.core.CHUNK_SIZE)
|
||||
LOG.debug('transfer_file(out_path=%r) tempory file: %s', out_path, tmp_path)
|
||||
|
||||
try:
|
||||
try:
|
||||
if not _get_file(context, in_path, fp):
|
||||
raise IOError('transfer of %r was interrupted.' % (in_path,))
|
||||
except Exception:
|
||||
os.unlink(fp.name)
|
||||
raise
|
||||
finally:
|
||||
fp.close()
|
||||
os.fchmod(tmp_path, metadata['mode'])
|
||||
if set_owner:
|
||||
set_fd_owner(fp.fileno(), metadata['owner'], metadata['group'])
|
||||
|
||||
os.rename(out_path + '.tmp', out_path)
|
||||
ok, metadata = _get_file(context, in_path, fp)
|
||||
if not ok:
|
||||
raise IOError('transfer of %r was interrupted.' % (in_path,))
|
||||
finally:
|
||||
fp.close()
|
||||
|
||||
if sync:
|
||||
os.fsync(fp.fileno())
|
||||
os.rename(tmp_path, out_path)
|
||||
except:
|
||||
os.unlink(tmp_path)
|
||||
raise
|
||||
|
||||
os.utime(out_path, (metadata['atime'], metadata['mtime']))
|
||||
|
||||
|
||||
@mitogen.core.takes_econtext
|
||||
|
@ -392,11 +418,51 @@ def read_path(path):
|
|||
return open(path, 'rb').read()
|
||||
|
||||
|
||||
def write_path(path, s):
|
||||
def set_fd_owner(fd, owner, group=None):
|
||||
if owner:
|
||||
uid = pwd.getpwnam(owner).pw_uid
|
||||
else:
|
||||
uid = os.geteuid()
|
||||
|
||||
if group:
|
||||
gid = grp.getgrnam(group).gr_gid
|
||||
else:
|
||||
gid = os.getegid()
|
||||
|
||||
os.fchown(fd, (uid, gid))
|
||||
|
||||
|
||||
def write_path(path, s, owner=None, group=None, mode=None,
|
||||
utimes=None, sync=False):
|
||||
"""
|
||||
Writes bytes `s` to a filesystem `path`.
|
||||
"""
|
||||
open(path, 'wb').write(s)
|
||||
path = os.path.abspath(path)
|
||||
fd, tmp_path = tempfile.mkstemp(suffix='.tmp',
|
||||
prefix='.ansible_mitogen_transfer-',
|
||||
dir=os.path.dirname(path))
|
||||
fp = os.fdopen(fd, 'wb', mitogen.core.CHUNK_SIZE)
|
||||
LOG.debug('write_path(path=%r) tempory file: %s', path, tmp_path)
|
||||
|
||||
try:
|
||||
try:
|
||||
if mode:
|
||||
os.fchmod(tmp_path, mode)
|
||||
if owner or group:
|
||||
set_fd_owner(fp.fileno(), owner, group)
|
||||
fp.write(s)
|
||||
finally:
|
||||
fp.close()
|
||||
|
||||
if sync:
|
||||
os.fsync(fp.fileno())
|
||||
os.rename(tmp_path, out_path)
|
||||
except:
|
||||
os.unlink(tmp_path)
|
||||
raise
|
||||
|
||||
if utimes:
|
||||
os.utime(out_path, utimes)
|
||||
|
||||
|
||||
CHMOD_CLAUSE_PAT = re.compile(r'([uoga]*)([+\-=])([ugo]|[rwx]*)')
|
||||
|
|
Loading…
Reference in New Issue