mirror of https://github.com/google/oss-fuzz.git
183 lines
5.7 KiB
Python
183 lines
5.7 KiB
Python
# Copyright 2019 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""Module to get the the name of a git repo containing a specific commit
|
|
inside of an OSS-Fuzz project.
|
|
|
|
Example Usage:
|
|
|
|
python detect_repo.py --src_dir /src --example_commit
|
|
b534f03eecd8a109db2b085ab24d419b6486de97
|
|
|
|
Prints the location of the git remote repo as well as the repo's name
|
|
seperated by a space.
|
|
|
|
https://github.com/VirusTotal/yara.git yara
|
|
|
|
"""
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
|
|
GO_PATH = '/root/go/src/'
|
|
|
|
|
|
def main():
|
|
"""Function to get a git repo's url and name referenced by OSS-Fuzz
|
|
Dockerfile.
|
|
|
|
Raises:
|
|
ValueError when a commit or a ref is not provided.
|
|
"""
|
|
parser = argparse.ArgumentParser(
|
|
description=
|
|
'Finds a specific git repo in an oss-fuzz project\'s docker file.')
|
|
parser.add_argument('--repo_name', help='The name of the git repo.')
|
|
parser.add_argument('--src_dir', help='The location of the possible repo.')
|
|
parser.add_argument('--example_commit',
|
|
help='A commit SHA referencing the project\'s main repo.')
|
|
|
|
args = parser.parse_args()
|
|
if not args.repo_name and not args.example_commit:
|
|
raise ValueError(
|
|
'Requires an example commit or a repo name to find repo location.')
|
|
if args.src_dir:
|
|
src_dir = args.src_dir
|
|
else:
|
|
src_dir = os.environ.get('SRC', '/src')
|
|
|
|
for single_dir in get_dirs_to_search(src_dir, args.repo_name):
|
|
full_path = os.path.join(src_dir, single_dir)
|
|
if not os.path.isdir(full_path):
|
|
continue
|
|
if args.example_commit and check_for_commit(full_path, args.example_commit):
|
|
print('Detected repo:', get_repo(full_path), full_path)
|
|
return
|
|
if args.repo_name and check_for_repo_name(full_path, args.repo_name):
|
|
print('Detected repo:', get_repo(full_path), full_path)
|
|
return
|
|
logging.error('No git repos with specific commit: %s found in %s',
|
|
args.example_commit, src_dir)
|
|
|
|
|
|
def get_dirs_to_search(src_dir, repo_name):
|
|
"""Gets a list of directories to search for the main git repo.
|
|
|
|
Args:
|
|
src_dir: The location set for the projects SRC.
|
|
repo_name: The name of the repo you are searching for.
|
|
|
|
Returns:
|
|
A list of directorys to search.
|
|
"""
|
|
dirs_to_search = os.listdir(src_dir)
|
|
if os.path.exists(GO_PATH) and repo_name:
|
|
for root, dirs, _ in os.walk(GO_PATH):
|
|
for test_dir in dirs:
|
|
if repo_name in test_dir:
|
|
dirs_to_search.append(os.path.join(root, test_dir))
|
|
return dirs_to_search
|
|
|
|
|
|
def get_repo(repo_path):
|
|
"""Gets a git repo link from a specific directory in a docker image.
|
|
|
|
Args:
|
|
repo_path: The directory on the image where the git repo exists.
|
|
|
|
Returns:
|
|
The repo location or None.
|
|
"""
|
|
output, return_code = execute(['git', 'config', '--get', 'remote.origin.url'],
|
|
location=repo_path,
|
|
check_result=True)
|
|
if return_code == 0 and output:
|
|
return output.rstrip()
|
|
return None
|
|
|
|
|
|
def check_for_repo_name(repo_path, expected_repo_name):
|
|
"""Returns True if the repo at |repo_path| repo_name matches
|
|
|expected_repo_name|.
|
|
|
|
Args:
|
|
repo_path: The directory of a git repo.
|
|
expected_repo_name: The name of the target git repo.
|
|
"""
|
|
if not os.path.exists(os.path.join(repo_path, '.git')):
|
|
return False
|
|
|
|
repo_url, _ = execute(['git', 'config', '--get', 'remote.origin.url'],
|
|
location=repo_path)
|
|
# Handle two common cases:
|
|
# https://github.com/google/syzkaller/
|
|
# https://github.com/google/syzkaller.git
|
|
repo_url = repo_url.replace('.git', '').rstrip().rstrip('/')
|
|
actual_repo_name = repo_url.split('/')[-1]
|
|
return actual_repo_name == expected_repo_name
|
|
|
|
|
|
def check_for_commit(repo_path, commit):
|
|
"""Checks a directory for a specific commit.
|
|
|
|
Args:
|
|
repo_path: The name of the directory to test for the commit.
|
|
commit: The commit SHA to check for.
|
|
|
|
Returns:
|
|
True if directory contains that commit.
|
|
"""
|
|
|
|
# Check if valid git repo.
|
|
if not os.path.exists(os.path.join(repo_path, '.git')):
|
|
return False
|
|
|
|
# Check if history fetch is needed.
|
|
if os.path.exists(os.path.join(repo_path, '.git', 'shallow')):
|
|
execute(['git', 'fetch', '--unshallow'], location=repo_path)
|
|
|
|
# Check if commit is in history.
|
|
_, return_code = execute(['git', 'cat-file', '-e', commit],
|
|
location=repo_path)
|
|
return return_code == 0
|
|
|
|
|
|
def execute(command, location, check_result=False):
|
|
"""Runs a shell command in the specified directory location.
|
|
|
|
Args:
|
|
command: The command as a list to be run.
|
|
location: The directory the command is run in.
|
|
check_result: Should an exception be thrown on failed command.
|
|
|
|
Returns:
|
|
The stdout of the command, the error code.
|
|
|
|
Raises:
|
|
RuntimeError: running a command resulted in an error.
|
|
"""
|
|
process = subprocess.Popen(command, stdout=subprocess.PIPE, cwd=location)
|
|
output, err = process.communicate()
|
|
if check_result and (process.returncode or err):
|
|
raise RuntimeError(
|
|
'Error: %s\n running command: %s\n return code: %s\n out %s\n' %
|
|
(err, command, process.returncode, output))
|
|
if output is not None:
|
|
output = output.decode('ascii')
|
|
return output, process.returncode
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|