peru/tests/test_plugins.py

442 lines
19 KiB
Python

import asyncio
from collections import defaultdict
import contextlib
import hashlib
import io
import os
from pathlib import Path
import shutil
import subprocess
import sys
import textwrap
import unittest
from peru.async_helpers import run_task
import peru.plugin as plugin
import shared
from shared import SvnRepo, GitRepo, HgRepo, assert_contents
HG_MINIMUM_PYTHON_VERSION = (3, 6)
class TestDisplayHandle(io.StringIO):
def __enter__(self):
return self
def __exit__(self, *args):
pass
def test_plugin_fetch(context, type, fields, dest):
handle = TestDisplayHandle()
run_task(plugin.plugin_fetch(context, type, fields, dest, handle))
return handle.getvalue()
def test_plugin_get_reup_fields(context, type, fields):
handle = TestDisplayHandle()
return run_task(
plugin.plugin_get_reup_fields(context, type, fields, handle))
class PluginsTest(shared.PeruTest):
def setUp(self):
self.content = {"some": "stuff", "foo/bar": "baz"}
self.content_dir = shared.create_dir(self.content)
self.cache_root = shared.create_dir()
self.plugin_context = plugin.PluginContext(
cwd='.',
plugin_cache_root=self.cache_root,
parallelism_semaphore=asyncio.BoundedSemaphore(
plugin.DEFAULT_PARALLEL_FETCH_LIMIT),
plugin_cache_locks=defaultdict(asyncio.Lock),
tmp_root=shared.create_dir())
plugin.debug_assert_clean_parallel_count()
def tearDown(self):
plugin.debug_assert_clean_parallel_count()
def do_plugin_test(self,
type,
plugin_fields,
expected_content,
*,
fetch_dir=None):
fetch_dir = fetch_dir or shared.create_dir()
output = test_plugin_fetch(self.plugin_context, type, plugin_fields,
fetch_dir)
assert_contents(fetch_dir, expected_content)
return output
def test_git_plugin(self):
GitRepo(self.content_dir)
self.do_plugin_test("git", {"url": self.content_dir}, self.content)
def test_git_default_branch(self):
GitRepo(self.content_dir, init_default_branch='main')
self.do_plugin_test("git", {"url": self.content_dir}, self.content)
def test_empty_git_rev(self):
empty_dir = shared.create_dir()
GitRepo(empty_dir)
self.do_plugin_test('git', {'url': empty_dir}, {})
@unittest.skipIf(
sys.version_info < HG_MINIMUM_PYTHON_VERSION,
"Python too old for hg",
)
def test_hg_plugin(self):
HgRepo(self.content_dir)
self.do_plugin_test("hg", {"url": self.content_dir}, self.content)
def test_svn_plugin(self):
repo = SvnRepo(self.content_dir)
self.do_plugin_test('svn', {'url': repo.url}, self.content)
def test_svn_plugin_reup(self):
repo = SvnRepo(self.content_dir)
plugin_fields = {'url': repo.url}
output = test_plugin_get_reup_fields(self.plugin_context, 'svn',
plugin_fields)
self.assertDictEqual({'rev': '1'}, output)
def test_git_plugin_with_submodule(self):
content_repo = GitRepo(self.content_dir)
# Git has a small bug: The .gitmodules file is always created with "\n"
# line endings, even on Windows. With core.autocrlf turned on, that
# causes a warning when the file is added/committed, because those line
# endings would get replaced with "\r\n" when the file was checked out.
# We can just turn autocrlf off for this test to silence the warning.
content_repo.run('git', 'config', 'core.autocrlf', 'false')
submodule_dir = shared.create_dir({'another': 'file'})
submodule_repo = GitRepo(submodule_dir)
content_repo.run('git', 'submodule', 'add', '-q', submodule_dir,
'subdir/', env={"GIT_ALLOW_PROTOCOL": "file"})
content_repo.run('git', 'commit', '-m', 'submodule commit')
expected_content = self.content.copy()
expected_content['subdir/another'] = 'file'
with open(os.path.join(self.content_dir, '.gitmodules')) as f:
expected_content['.gitmodules'] = f.read()
self.do_plugin_test('git', {'url': self.content_dir}, expected_content)
# Now move the submodule forward. Make sure it gets fetched again.
shared.write_files(submodule_dir, {'more': 'stuff'})
submodule_repo.run('git', 'add', '-A')
submodule_repo.run('git', 'commit', '-m', 'more stuff')
subprocess.check_output(['git', 'pull', '-q'],
cwd=os.path.join(self.content_dir, 'subdir'))
content_repo.run('git', 'commit', '-am', 'submodule update')
expected_content['subdir/more'] = 'stuff'
self.do_plugin_test('git', {'url': self.content_dir}, expected_content)
# Normally when you run `git submodule add ...`, git puts two things in
# your repo: an entry in .gitmodules, and a commit object at the
# appropriate path inside your repo. However, it's possible for those
# two to get out of sync, especially if you use mv/rm on a directory
# followed by `git add`, instead of the smarter `git mv`/`git rm`. We
# need to create this condition and check that we then ignore the
# submodule.
shutil.rmtree(os.path.join(self.content_dir, 'subdir'))
content_repo.run('git', 'commit', '-am', 'inconsistent delete')
del expected_content['subdir/another']
del expected_content['subdir/more']
self.do_plugin_test('git', {'url': self.content_dir}, expected_content)
# Finally, test explicitly disabling submodule fetching. Start by
# reverting the 'inconsistent delete' commit from above.
content_repo.run('git', 'revert', '--no-edit', 'HEAD')
fields = {'url': self.content_dir, 'submodules': 'false'}
self.do_plugin_test('git', fields, expected_content)
def test_git_plugin_with_relative_submodule(self):
content_repo = GitRepo(self.content_dir)
# Same autocrlf workaround as above.
content_repo.run('git', 'config', 'core.autocrlf', 'false')
# Similar to above, but this time we use a relative path.
submodule_dir = shared.create_dir({'another': 'file'})
GitRepo(submodule_dir)
submodule_basename = os.path.basename(submodule_dir)
relative_path = "../" + submodule_basename
content_repo.run('git', 'submodule', 'add', '-q', relative_path,
'subdir/', env={"GIT_ALLOW_PROTOCOL": "file"})
content_repo.run('git', 'commit', '-m', 'submodule commit')
expected_content = self.content.copy()
expected_content['subdir/another'] = 'file'
with open(os.path.join(self.content_dir, '.gitmodules')) as f:
expected_content['.gitmodules'] = f.read()
self.do_plugin_test('git', {'url': self.content_dir}, expected_content)
def test_git_plugin_multiple_fetches(self):
content_repo = GitRepo(self.content_dir)
head = content_repo.run('git', 'rev-parse', 'HEAD')
plugin_fields = {"url": self.content_dir, "rev": head}
output = self.do_plugin_test("git", plugin_fields, self.content)
self.assertEqual(output.count("git clone"), 1)
self.assertEqual(output.count("git fetch"), 0)
# Add a new file to the directory and commit it.
shared.write_files(self.content_dir, {'another': 'file'})
content_repo.run('git', 'add', '-A')
content_repo.run('git', 'commit', '-m', 'committing another file')
# Refetch the original rev. Git should not do a git-fetch.
output = self.do_plugin_test("git", plugin_fields, self.content)
self.assertEqual(output.count("git clone"), 0)
self.assertEqual(output.count("git fetch"), 0)
# Not delete the rev field. Git should default to master and fetch.
del plugin_fields["rev"]
self.content["another"] = "file"
output = self.do_plugin_test("git", plugin_fields, self.content)
self.assertEqual(output.count("git clone"), 0)
self.assertEqual(output.count("git fetch"), 1)
@unittest.skipIf(
sys.version_info < HG_MINIMUM_PYTHON_VERSION,
"Python too old for hg",
)
def test_hg_plugin_multiple_fetches(self):
content_repo = HgRepo(self.content_dir)
head = content_repo.run('hg', 'identify', '--debug', '-r',
'.').split()[0]
plugin_fields = {'url': self.content_dir, 'rev': head}
output = self.do_plugin_test('hg', plugin_fields, self.content)
self.assertEqual(output.count('hg clone'), 1)
self.assertEqual(output.count('hg pull'), 0)
# Add a new file to the directory and commit it.
shared.write_files(self.content_dir, {'another': 'file'})
content_repo.run('hg', 'commit', '-A', '-m', 'committing another file')
# Refetch the original rev. Hg should not do a pull.
output = self.do_plugin_test('hg', plugin_fields, self.content)
self.assertEqual(output.count('hg clone'), 0)
self.assertEqual(output.count('hg pull'), 0)
# Not delete the rev field. Git should default to master and fetch.
del plugin_fields['rev']
self.content['another'] = 'file'
output = self.do_plugin_test('hg', plugin_fields, self.content)
self.assertEqual(output.count('hg clone'), 0)
self.assertEqual(output.count('hg pull'), 1)
def test_git_plugin_reup(self):
repo = GitRepo(self.content_dir)
master_head = repo.run('git', 'rev-parse', 'master')
plugin_fields = {'url': self.content_dir}
# By default, the git plugin should reup from master.
expected_output = {'rev': master_head}
output = test_plugin_get_reup_fields(self.plugin_context, 'git',
plugin_fields)
self.assertDictEqual(expected_output, output)
# Add some new commits and make sure master gets fetched properly.
repo.run('git', 'commit', '--allow-empty', '-m', 'junk')
repo.run('git', 'checkout', '-q', '-b', 'newbranch')
repo.run('git', 'commit', '--allow-empty', '-m', 'more junk')
new_master_head = repo.run('git', 'rev-parse', 'master')
expected_output['rev'] = new_master_head
output = test_plugin_get_reup_fields(self.plugin_context, 'git',
plugin_fields)
self.assertDictEqual(expected_output, output)
# Now specify the reup target explicitly.
newbranch_head = repo.run('git', 'rev-parse', 'newbranch')
plugin_fields['reup'] = 'newbranch'
expected_output['rev'] = newbranch_head
output = test_plugin_get_reup_fields(self.plugin_context, 'git',
plugin_fields)
self.assertDictEqual(expected_output, output)
@unittest.skipIf(
sys.version_info < HG_MINIMUM_PYTHON_VERSION,
"Python too old for hg",
)
def test_hg_plugin_reup(self):
repo = HgRepo(self.content_dir)
default_tip = repo.run('hg', 'identify', '--debug', '-r',
'default').split()[0]
plugin_fields = {'url': self.content_dir}
# By default, the hg plugin should reup from default.
expected_output = {'rev': default_tip}
output = test_plugin_get_reup_fields(self.plugin_context, 'hg',
plugin_fields)
self.assertDictEqual(expected_output, output)
# Add some new commits and make sure master gets fetched properly.
shared.write_files(self.content_dir,
{'randomfile': "hg doesn't like empty commits"})
repo.run('hg', 'commit', '-A', '-m', 'junk')
shared.write_files(
self.content_dir,
{'randomfile': "hg still doesn't like empty commits"})
repo.run('hg', 'branch', 'newbranch')
repo.run('hg', 'commit', '-A', '-m', 'more junk')
new_default_tip = repo.run('hg', 'identify', '--debug', '-r',
'default').split()[0]
expected_output['rev'] = new_default_tip
output = test_plugin_get_reup_fields(self.plugin_context, 'hg',
plugin_fields)
self.assertDictEqual(expected_output, output)
# Now specify the reup target explicitly.
newbranch_tip = repo.run('hg', 'identify', '--debug', '-r',
'tip').split()[0]
plugin_fields['reup'] = 'newbranch'
expected_output['rev'] = newbranch_tip
output = test_plugin_get_reup_fields(self.plugin_context, 'hg',
plugin_fields)
self.assertDictEqual(expected_output, output)
def test_curl_plugin_fetch(self):
curl_content = {'myfile': 'content'}
test_dir = shared.create_dir(curl_content)
test_url = (Path(test_dir) / 'myfile').as_uri()
fields = {'url': test_url}
self.do_plugin_test('curl', fields, curl_content)
# Run the test again with an explicit hash and an explicit filename.
digest = hashlib.sha1()
digest.update(b'content')
real_hash = digest.hexdigest()
fields['sha1'] = real_hash
fields['filename'] = 'newname'
self.do_plugin_test('curl', fields, {'newname': 'content'})
# Now run it with the wrong hash, and confirm that there's an error.
fields['sha1'] = 'wrong hash'
with self.assertRaises(plugin.PluginRuntimeError):
self.do_plugin_test('curl', fields, {'newname': 'content'})
def test_curl_plugin_fetch_archives(self):
for type in 'zip', 'tar':
fields = {
'url': (shared.test_resources / ('with_exe.' + type)).as_uri(),
'unpack': type,
}
fetch_dir = shared.create_dir()
self.do_plugin_test(
'curl',
fields, {
'not_exe.txt': 'Not executable.\n',
'exe.sh': 'echo Executable.\n',
},
fetch_dir=fetch_dir)
shared.assert_not_executable(
os.path.join(fetch_dir, 'not_exe.txt'))
shared.assert_executable(os.path.join(fetch_dir, 'exe.sh'))
def test_curl_plugin_fetch_evil_archive(self):
# There are several evil archives checked in under tests/resources. The
# others are checked directly as part of test_curl_plugin.py.
fields = {
'url': (shared.test_resources / '.tar').as_uri(),
'unpack': 'tar',
}
with self.assertRaises(plugin.PluginRuntimeError):
self.do_plugin_test('curl', fields, {})
def test_curl_plugin_reup(self):
curl_content = {'myfile': 'content'}
test_dir = shared.create_dir(curl_content)
test_url = (Path(test_dir) / 'myfile').as_uri()
digest = hashlib.sha1()
digest.update(b'content')
real_hash = digest.hexdigest()
fields = {'url': test_url}
output = test_plugin_get_reup_fields(self.plugin_context, 'curl',
fields)
self.assertDictEqual({'sha1': real_hash}, output)
# Confirm that we get the same thing with a preexisting hash.
fields['sha1'] = 'preexisting junk'
output = test_plugin_get_reup_fields(self.plugin_context, 'curl',
fields)
self.assertDictEqual({'sha1': real_hash}, output)
def test_cp_plugin(self):
self.do_plugin_test("cp", {"path": self.content_dir}, self.content)
@unittest.skipIf(os.name == 'nt', 'the rsync plugin is Unix-only')
def test_rsync_plugin(self):
self.do_plugin_test("rsync", {"path": self.content_dir}, self.content)
@unittest.skipIf(os.name != 'nt', 'the bat plugin is Windows-only')
def test_bat_plugin(self):
self.do_plugin_test(
"bat", {"filename": "xyz", "message": "hello Windows"},
{"xyz": "hello Windows\n"})
def test_empty_plugin(self):
self.do_plugin_test("empty", {}, {})
def test_missing_required_field(self):
# The 'url' field is required for git.
try:
self.do_plugin_test('git', {}, self.content)
except plugin.PluginModuleFieldError as e:
assert 'url' in e.message, 'message should mention missing field'
else:
assert False, 'should throw PluginModuleFieldError'
def test_unknown_field(self):
# The 'junk' field isn't valid for git.
bad_fields = {'url': self.content_dir, 'junk': 'junk'}
try:
self.do_plugin_test('git', bad_fields, self.content)
except plugin.PluginModuleFieldError as e:
assert 'junk' in e.message, 'message should mention bad field'
else:
assert False, 'should throw PluginModuleFieldError'
def test_user_defined_plugin(self):
plugin_prefix = 'peru/plugins/footype/'
fetch_file = plugin_prefix + 'fetch.py'
reup_file = plugin_prefix + 'reup.py'
plugin_yaml_file = plugin_prefix + 'plugin.yaml'
fake_config_dir = shared.create_dir({
fetch_file:
'#! /usr/bin/env python3\nprint("hey there!")\n',
reup_file:
textwrap.dedent('''\
#! /usr/bin/env python3
import os
outfile = os.environ['PERU_REUP_OUTPUT']
print("name: val", file=open(outfile, 'w'))
'''),
plugin_yaml_file:
textwrap.dedent('''\
sync exe: fetch.py
reup exe: reup.py
required fields: []
''')
})
os.chmod(os.path.join(fake_config_dir, fetch_file), 0o755)
os.chmod(os.path.join(fake_config_dir, reup_file), 0o755)
fetch_dir = shared.create_dir()
# We need to trick peru into loading plugins from the fake config dir
# dir. We do this by setting an env var, which depends on the platform.
if os.name == 'nt':
# Windows
config_path_variable = 'LOCALAPPDATA'
else:
# non-Windows
config_path_variable = 'XDG_CONFIG_HOME'
with temporary_environment(config_path_variable, fake_config_dir):
output = test_plugin_fetch(self.plugin_context, 'footype', {},
fetch_dir)
self.assertEqual('hey there!\n', output)
output = test_plugin_get_reup_fields(self.plugin_context,
'footype', {})
self.assertDictEqual({'name': 'val'}, output)
def test_no_such_plugin(self):
with self.assertRaises(plugin.PluginCandidateError):
test_plugin_fetch(self.plugin_context, 'nosuchtype!', {},
os.devnull)
@contextlib.contextmanager
def temporary_environment(name, value):
NOT_SET = object()
old_value = os.environ.get(name, NOT_SET)
os.environ[name] = value
try:
yield
finally:
if old_value is NOT_SET:
del os.environ[name]
else:
os.environ[name] = old_value