# Copyright 2019 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Module to get the the name of a git repo containing a specific commit inside of an OSS-Fuzz project. Example Usage: python detect_repo.py --src_dir /src --example_commit b534f03eecd8a109db2b085ab24d419b6486de97 Prints the location of the git remote repo as well as the repo's name seperated by a space. https://github.com/VirusTotal/yara.git yara """ import argparse import logging import os import subprocess GO_PATH = '/root/go/src/' def main(): """Function to get a git repo's url and name referenced by OSS-Fuzz Dockerfile. Raises: ValueError when a commit or a ref is not provided. """ parser = argparse.ArgumentParser( description= 'Finds a specific git repo in an oss-fuzz project\'s docker file.') parser.add_argument('--repo_name', help='The name of the git repo.') parser.add_argument('--src_dir', help='The location of the possible repo.') parser.add_argument('--example_commit', help='A commit SHA referencing the project\'s main repo.') args = parser.parse_args() if not args.repo_name and not args.example_commit: raise ValueError( 'Requires an example commit or a repo name to find repo location.') if args.src_dir: src_dir = args.src_dir else: src_dir = os.environ.get('SRC', '/src') for single_dir in get_dirs_to_search(src_dir, args.repo_name): full_path = os.path.join(src_dir, single_dir) if not os.path.isdir(full_path): continue if args.example_commit and check_for_commit(full_path, args.example_commit): print('Detected repo:', get_repo(full_path), full_path) return if args.repo_name and check_for_repo_name(full_path, args.repo_name): print('Detected repo:', get_repo(full_path), full_path) return logging.error('No git repos with specific commit: %s found in %s', args.example_commit, src_dir) def get_dirs_to_search(src_dir, repo_name): """Gets a list of directories to search for the main git repo. Args: src_dir: The location set for the projects SRC. repo_name: The name of the repo you are searching for. Returns: A list of directorys to search. """ dirs_to_search = os.listdir(src_dir) if os.path.exists(GO_PATH) and repo_name: for root, dirs, _ in os.walk(GO_PATH): for test_dir in dirs: if repo_name in test_dir: dirs_to_search.append(os.path.join(root, test_dir)) return dirs_to_search def get_repo(repo_path): """Gets a git repo link from a specific directory in a docker image. Args: repo_path: The directory on the image where the git repo exists. Returns: The repo location or None. """ output, return_code = execute(['git', 'config', '--get', 'remote.origin.url'], location=repo_path, check_result=True) if return_code == 0 and output: return output.rstrip() return None def check_for_repo_name(repo_path, expected_repo_name): """Returns True if the repo at |repo_path| repo_name matches |expected_repo_name|. Args: repo_path: The directory of a git repo. expected_repo_name: The name of the target git repo. """ if not os.path.exists(os.path.join(repo_path, '.git')): return False repo_url, _ = execute(['git', 'config', '--get', 'remote.origin.url'], location=repo_path) # Handle two common cases: # https://github.com/google/syzkaller/ # https://github.com/google/syzkaller.git repo_url = repo_url.replace('.git', '').rstrip().rstrip('/') actual_repo_name = repo_url.split('/')[-1] return actual_repo_name == expected_repo_name def check_for_commit(repo_path, commit): """Checks a directory for a specific commit. Args: repo_path: The name of the directory to test for the commit. commit: The commit SHA to check for. Returns: True if directory contains that commit. """ # Check if valid git repo. if not os.path.exists(os.path.join(repo_path, '.git')): return False # Check if history fetch is needed. if os.path.exists(os.path.join(repo_path, '.git', 'shallow')): execute(['git', 'fetch', '--unshallow'], location=repo_path) # Check if commit is in history. _, return_code = execute(['git', 'cat-file', '-e', commit], location=repo_path) return return_code == 0 def execute(command, location, check_result=False): """Runs a shell command in the specified directory location. Args: command: The command as a list to be run. location: The directory the command is run in. check_result: Should an exception be thrown on failed command. Returns: The stdout of the command, the error code. Raises: RuntimeError: running a command resulted in an error. """ process = subprocess.Popen(command, stdout=subprocess.PIPE, cwd=location) output, err = process.communicate() if check_result and (process.returncode or err): raise RuntimeError( 'Error: %s\n running command: %s\n return code: %s\n out %s\n' % (err, command, process.returncode, output)) if output is not None: output = output.decode('ascii') return output, process.returncode if __name__ == '__main__': main()