oss-fuzz/infra/repo_manager.py

269 lines
8.3 KiB
Python

# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Class to manage a git repository via python.
This class is to be used to implement git commands over
a python API and manage the current state of the git repo.
Typical usage example:
r_man = RepoManager('https://github.com/google/oss-fuzz.git')
r_man.checkout('5668cc422c2c92d38a370545d3591039fb5bb8d4')
"""
import datetime
import logging
import os
import shutil
import urllib.parse
import utils
class RepoManager:
"""Repo manager."""
def __init__(self, repo_dir):
self.repo_dir = repo_dir
def _is_git_repo(self):
"""Test if the current repo dir is a git repo or not.
Returns:
True if the current repo_dir is a valid git repo.
"""
git_path = os.path.join(self.repo_dir, '.git')
return os.path.isdir(git_path)
def git(self, cmd, check_result=False):
"""Run a git command.
Args:
command: The git command as a list to be run.
check_result: Should an exception be thrown on failed command.
Returns:
stdout, stderr, error code.
"""
return utils.execute(['git'] + cmd,
location=self.repo_dir,
check_result=check_result)
def commit_exists(self, commit):
"""Checks to see if a commit exists in the project repo.
Args:
commit: The commit SHA you are checking.
Returns:
True if the commit exits in the project.
"""
if not commit.rstrip():
return False
_, _, err_code = self.git(['cat-file', '-e', commit])
return not err_code
def commit_date(self, commit):
"""Get the date of a commit.
Args:
commit: The commit hash.
Returns:
A datetime representing the date of the commit.
"""
out, _, _ = self.git(['show', '-s', '--format=%ct', commit],
check_result=True)
return datetime.datetime.fromtimestamp(int(out), tz=datetime.timezone.utc)
def get_git_diff(self, base='origin...'):
"""Gets a list of files that have changed from the repo head.
Returns:
A list of changed file paths or None on Error.
"""
self.fetch_unshallow()
# Add '--' so that git knows we aren't talking about files.
command = ['diff', '--name-only', base, '--']
out, err_msg, err_code = self.git(command)
if err_code:
logging.error('Git diff failed with error message %s.', err_msg)
return None
if not out:
logging.error('No diff was found.')
return None
return [line for line in out.splitlines() if line]
def get_current_commit(self):
"""Gets the current commit SHA of the repo.
Returns:
The current active commit SHA.
"""
out, _, _ = self.git(['rev-parse', 'HEAD'], check_result=True)
return out.strip()
def get_parent(self, commit, count):
"""Gets the count'th parent of the given commit.
Returns:
The parent commit SHA.
"""
self.fetch_unshallow()
out, _, err_code = self.git(['rev-parse', commit + '~' + str(count)],
check_result=False)
if err_code:
return None
return out.strip()
def fetch_all_remotes(self):
"""Fetch all remotes for checkouts that track a single branch."""
self.git([
'config', 'remote.origin.fetch', '+refs/heads/*:refs/remotes/origin/*'
],
check_result=True)
self.git(['remote', 'update'], check_result=True)
def get_commit_list(self, newest_commit, oldest_commit=None, limit=None):
"""Gets the list of commits(inclusive) between the old and new commits.
Args:
newest_commit: The newest commit to be in the list.
oldest_commit: The (optional) oldest commit to be in the list.
Returns:
The list of commit SHAs from newest to oldest.
Raises:
ValueError: When either the oldest or newest commit does not exist.
RuntimeError: When there is an error getting the commit list.
"""
self.fetch_unshallow()
if oldest_commit and not self.commit_exists(oldest_commit):
raise ValueError('The oldest commit %s does not exist' % oldest_commit)
if not self.commit_exists(newest_commit):
raise ValueError('The newest commit %s does not exist' % newest_commit)
if oldest_commit == newest_commit:
return [oldest_commit]
if oldest_commit:
commit_range = oldest_commit + '..' + newest_commit
else:
commit_range = newest_commit
limit_args = []
if limit:
limit_args.append(f'--max-count={limit}')
out, _, err_code = self.git(['rev-list', commit_range] + limit_args)
commits = out.split('\n')
commits = [commit for commit in commits if commit]
if err_code or not commits:
raise RuntimeError('Error getting commit list between %s and %s ' %
(oldest_commit, newest_commit))
# Make sure result is inclusive
if oldest_commit:
commits.append(oldest_commit)
return commits
def fetch_branch(self, branch):
"""Fetches a remote branch from origin."""
return self.git(
['fetch', 'origin', '{branch}:{branch}'.format(branch=branch)])
def fetch_unshallow(self):
"""Gets the current git repository history."""
shallow_file = os.path.join(self.repo_dir, '.git', 'shallow')
if os.path.exists(shallow_file):
_, err, err_code = self.git(['fetch', '--unshallow'], check_result=False)
if err_code:
logging.error('Unshallow returned non-zero code: %s', err)
def checkout_pr(self, pr_ref):
"""Checks out a remote pull request.
Args:
pr_ref: The pull request reference to be checked out.
"""
self.fetch_unshallow()
self.git(['fetch', 'origin', pr_ref], check_result=True)
self.git(['checkout', '-f', 'FETCH_HEAD'], check_result=True)
def checkout_commit(self, commit, clean=True):
"""Checks out a specific commit from the repo.
Args:
commit: The commit SHA to be checked out.
Raises:
RuntimeError: when checkout is not successful.
ValueError: when commit does not exist.
"""
self.fetch_unshallow()
if not self.commit_exists(commit):
raise ValueError('Commit %s does not exist in current branch' % commit)
self.git(['checkout', '-f', commit], check_result=True)
if clean:
self.git(['clean', '-fxd'], check_result=True)
if self.get_current_commit() != commit:
raise RuntimeError('Error checking out commit %s' % commit)
def remove_repo(self):
"""Removes the git repo from disk."""
if os.path.isdir(self.repo_dir):
shutil.rmtree(self.repo_dir)
def clone_repo_and_get_manager(repo_url,
base_dir,
repo_name=None,
username=None,
password=None):
"""Clones a repo and constructs a repo manager class.
Args:
repo_url: The github url needed to clone.
base_dir: The full file-path where the git repo is located.
repo_name: The name of the directory the repo is cloned to.
"""
if repo_name is None:
repo_name = os.path.basename(repo_url).replace('.git', '')
repo_dir = os.path.join(base_dir, repo_name)
manager = RepoManager(repo_dir)
if not os.path.exists(repo_dir):
_clone(repo_url, base_dir, repo_name, username=username, password=password)
return manager
def _clone(repo_url, base_dir, repo_name, username=None, password=None):
"""Creates a clone of the repo in the specified directory.
Raises:
ValueError: when the repo is not able to be cloned.
"""
if username and password:
parsed_url = urllib.parse.urlparse(repo_url)
new_netloc = f'{username}:{password}@{parsed_url.netloc}'
repo_url = urllib.parse.urlunparse(parsed_url._replace(netloc=new_netloc))
utils.execute(['git', 'clone', repo_url, repo_name],
location=base_dir,
check_result=True,
log_command=not password)