#!/usr/bin/env python
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
################################################################################
"""Adds comments for PR to provide more information for approvers."""
import base64
import json
import os
import subprocess
import requests
import yaml
OWNER = 'google'
REPO = 'oss-fuzz'
GITHUB_URL = 'https://github.com/'
GITHUB_NONREF_URL = f'https://www.github.com/{OWNER}/{REPO}' # Github URL that doesn't send emails on linked issues.
API_URL = 'https://api.github.com'
BASE_URL = f'{API_URL}/repos/{OWNER}/{REPO}'
BRANCH = 'master'
CRITICALITY_SCORE_PATH = '/home/runner/go/bin/criticality_score'
COMMITS_LIMIT = 50 # Only process the most recent 50 commits.
def get_criticality_score(repo_url):
"""Gets the criticality score of the project."""
# Criticality score does not support repo url ends with '.git'
if repo_url.endswith('.git'):
repo_url = repo_url[:-4]
report = subprocess.run([
CRITICALITY_SCORE_PATH, '--format', 'json',
'-gcp-project-id=clusterfuzz-external', '-depsdev-disable', repo_url
],
capture_output=True,
text=True)
try:
report_dict = json.loads(report.stdout)
except:
print(f'Criticality score failed with stdout: {report.stdout}')
print(f'Criticality score failed with stderr: {report.stderr}')
return 'N/A'
return report_dict.get('default_score', 'N/A')
def is_known_contributor(content, email):
"""Checks if the author is in the contact list."""
return (email == content.get('primary_contact') or
email in content.get('vendor_ccs', []) or
email in content.get('auto_ccs', []))
def save_env(message, is_ready_for_merge, is_internal=False):
"""Saves the outputs as environment variables."""
with open(os.environ['GITHUB_ENV'], 'a') as github_env:
github_env.write(f'MESSAGE={message}\n')
github_env.write(f'IS_READY_FOR_MERGE={is_ready_for_merge}\n')
github_env.write(f'IS_INTERNAL={is_internal}')
def main():
"""Verifies if a PR is ready to merge."""
github = GithubHandler()
# Bypasses PRs of the internal members.
if github.is_author_internal_member():
save_env(None, None, True)
return
message = ''
is_ready_for_merge = True
pr_author = github.get_pr_author()
# Gets all modified projects path.
projects_path = github.get_projects_path()
verified, email = github.get_author_email()
for project_path in projects_path:
project_url = f'{GITHUB_URL}/{OWNER}/{REPO}/tree/{BRANCH}/{project_path}'
content_dict = github.get_project_yaml(project_path)
# Gets information for the new integrating project.
if not content_dict:
is_ready_for_merge = False
new_project = github.get_integrated_project_info()
repo_url = new_project.get('main_repo')
if repo_url is None:
message += (f'{pr_author} is integrating a new project, '
'but the `main_repo` is missing. '
'The criticality score cannot be computed.
')
else:
message += (f'{pr_author} is integrating a new project:
'
f'- Main repo: {repo_url}
- Criticality score: '
f'{get_criticality_score(repo_url)}
')
continue
# Checks if the author is in the contact list.
if email:
if is_known_contributor(content_dict, email):
# Checks if the email is verified.
verified_marker = ' (verified)' if verified else ''
message += (
f'{pr_author}{verified_marker} is either the primary contact or '
f'is in the CCs list of [{project_path}]({project_url}).
')
if verified:
continue
# Checks the previous commits.
commit_sha = github.has_author_modified_project(project_path)
if commit_sha is None:
history_message = ''
contributors = github.get_past_contributors(project_path)
if contributors:
history_message = 'The past contributors are: '
history_message += ', '.join(contributors)
message += (
f'{pr_author} is a new contributor to '
f'[{project_path}]({project_url}). The PR must be approved by known '
f'contributors before it can be merged. {history_message}
')
is_ready_for_merge = False
continue
# If the previous commit is not associated with a pull request.
pr_message = (f'{pr_author} has previously contributed to '
f'[{project_path}]({project_url}). The previous commit was '
f'{GITHUB_NONREF_URL}/commit/{commit_sha}
')
previous_pr_number = github.get_pull_request_number(commit_sha)
if previous_pr_number is not None:
pr_message = (f'{pr_author} has previously contributed to '
f'[{project_path}]({project_url}). '
f'The previous PR was [#{previous_pr_number}]'
f'({GITHUB_NONREF_URL}/pull/{previous_pr_number})
')
message += pr_message
save_env(message, is_ready_for_merge, False)
class GithubHandler:
"""Github requests handler."""
def __init__(self):
self._pr_author = os.environ['PRAUTHOR']
self._token = os.environ['GITHUBTOKEN']
self._pr_number = os.environ['PRNUMBER']
self._headers = {
'Authorization': f'Bearer {self._token}',
'X-GitHub-Api-Version': '2022-11-28'
}
self._maintainers = set()
os.environ['GITHUB_AUTH_TOKEN'] = self._token
def get_pr_author(self):
"""Gets the pr author user name."""
return self._pr_author
def get_projects_path(self):
"""Gets the current project path."""
response = requests.get(f'{BASE_URL}/pulls/{self._pr_number}/files',
headers=self._headers)
if not response.ok:
return []
projects_path = set()
for file in response.json():
file_path = file['filename']
dir_path = file_path.split(os.sep)
if len(dir_path) > 1 and dir_path[0] == 'projects':
projects_path.add(os.sep.join(dir_path[0:2]))
return list(projects_path)
def get_author_email(self):
"""Retrieves the author's email address for a pull request,
including non-public emails."""
user_response = requests.get(f'{API_URL}/users/{self._pr_author}')
if user_response.ok:
email = user_response.json()['email']
if email:
return True, email
commits_response = requests.get(
f'{BASE_URL}/pulls/{self._pr_number}/commits', headers=self._headers)
if not commits_response.ok:
return False, None
email = commits_response.json()[0]['commit']['author']['email']
verified = commits_response.json()[0]['commit']['verification']['verified']
return verified, email
def get_project_yaml(self, project_path):
"""Gets the project yaml file."""
contents_url = f'{BASE_URL}/contents/{project_path}/project.yaml'
return self.get_yaml_file_content(contents_url)
def get_yaml_file_content(self, contents_url):
"""Gets yaml file content."""
response = requests.get(contents_url, headers=self._headers)
if not response.ok:
return {}
content = base64.b64decode(response.json()['content']).decode('UTF-8')
return yaml.safe_load(content)
def get_integrated_project_info(self):
"""Gets the new integrated project."""
response = requests.get(f'{BASE_URL}/pulls/{self._pr_number}/files',
headers=self._headers)
for file in response.json():
file_path = file['filename']
if 'project.yaml' in file_path:
return self.get_yaml_file_content(file['contents_url'])
return {}
def get_pull_request_number(self, commit):
"""Gets the pull request number."""
pr_response = requests.get(f'{BASE_URL}/commits/{commit}/pulls',
headers=self._headers)
if not pr_response.ok:
return None
return pr_response.json()[0]['number']
def get_past_contributors(self, project_path):
"""Returns a list of past contributors of a certain project."""
commits_response = requests.get(f'{BASE_URL}/commits?path={project_path}',
headers=self._headers)
if not commits_response.ok:
return []
commits = commits_response.json()
contributors: dict[str, bool] = {}
for i, commit in enumerate(commits):
if i >= COMMITS_LIMIT:
break
login = commit['author']['login']
verified = commit['commit']['verification']['verified']
if login in self._maintainers:
continue
if login not in contributors:
contributors[login] = verified
if verified:
# Override previous verification bit.
contributors[login] = True
all_contributors = []
for login, verified in contributors.items():
login_verify = login if verified else f'{login} (unverified)'
all_contributors.append(login_verify)
return all_contributors
def get_maintainers(self):
"""Get a list of internal members."""
if self._maintainers:
return self._maintainers
response = requests.get(f'{BASE_URL}/contents/infra/MAINTAINERS.csv',
headers=self._headers)
if not response.ok:
return self._maintainers
maintainers_file = base64.b64decode(
response.json()['content']).decode('UTF-8')
for line in maintainers_file.split(os.linesep):
self._maintainers.add(line.split(',')[2])
return self._maintainers
def is_author_internal_member(self):
"""Returns if the author is an internal member."""
return self._pr_author in self.get_maintainers()
def has_author_modified_project(self, project_path):
"""Checks if the author has modified this project before."""
commits_response = requests.get(
f'{BASE_URL}/commits?path={project_path}&author={self._pr_author}',
headers=self._headers)
if not commits_response.ok or not commits_response.json():
return None
commit = commits_response.json()[0]
return commit['sha']
if __name__ == '__main__':
main()