oss-fuzz/infra/cifuzz/run_fuzzers_test.py

476 lines
19 KiB
Python

# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for running fuzzers."""
import json
import os
import shutil
import stat
import sys
import tempfile
import unittest
from unittest import mock
import parameterized
from pyfakefs import fake_filesystem_unittest
import build_fuzzers
import fuzz_target
import run_fuzzers
# pylint: disable=wrong-import-position
INFRA_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(INFRA_DIR)
import helper
import test_helpers
# NOTE: This integration test relies on
# https://github.com/google/oss-fuzz/tree/master/projects/example project.
EXAMPLE_PROJECT = 'example'
# Location of files used for testing.
TEST_DATA_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'test_data')
MEMORY_FUZZER_DIR = os.path.join(TEST_DATA_PATH, 'memory')
MEMORY_FUZZER = 'curl_fuzzer_memory'
UNDEFINED_FUZZER_DIR = os.path.join(TEST_DATA_PATH, 'undefined')
UNDEFINED_FUZZER = 'curl_fuzzer_undefined'
FUZZ_SECONDS = 10
class RunFuzzerIntegrationTestMixin: # pylint: disable=too-few-public-methods,invalid-name
"""Mixin for integration test classes that runbuild_fuzzers on builds of a
specific sanitizer."""
# These must be defined by children.
FUZZER_DIR = None
FUZZER = None
def setUp(self):
"""Patch the environ so that we can execute runner scripts."""
test_helpers.patch_environ(self, runner=True)
def _test_run_with_sanitizer(self, fuzzer_dir, sanitizer):
"""Calls run_fuzzers on fuzzer_dir and |sanitizer| and asserts
the run succeeded and that no bug was found."""
with test_helpers.temp_dir_copy(fuzzer_dir) as fuzzer_dir_copy:
config = test_helpers.create_run_config(fuzz_seconds=FUZZ_SECONDS,
workspace=fuzzer_dir_copy,
oss_fuzz_project_name='curl',
sanitizer=sanitizer)
result = run_fuzzers.run_fuzzers(config)
self.assertEqual(result, run_fuzzers.RunFuzzersResult.NO_BUG_FOUND)
@unittest.skipIf(not os.getenv('INTEGRATION_TESTS'),
'INTEGRATION_TESTS=1 not set')
class RunMemoryFuzzerIntegrationTest(RunFuzzerIntegrationTestMixin,
unittest.TestCase):
"""Integration test for build_fuzzers with an MSAN build."""
FUZZER_DIR = MEMORY_FUZZER_DIR
FUZZER = MEMORY_FUZZER
def test_run_with_memory_sanitizer(self):
"""Tests run_fuzzers with a valid MSAN build."""
self._test_run_with_sanitizer(self.FUZZER_DIR, 'memory')
@unittest.skipIf(not os.getenv('INTEGRATION_TESTS'),
'INTEGRATION_TESTS=1 not set')
class RunUndefinedFuzzerIntegrationTest(RunFuzzerIntegrationTestMixin,
unittest.TestCase):
"""Integration test for build_fuzzers with an UBSAN build."""
FUZZER_DIR = UNDEFINED_FUZZER_DIR
FUZZER = UNDEFINED_FUZZER
def test_run_with_undefined_sanitizer(self):
"""Tests run_fuzzers with a valid UBSAN build."""
self._test_run_with_sanitizer(self.FUZZER_DIR, 'undefined')
class BaseFuzzTargetRunnerTest(unittest.TestCase):
"""Tests BaseFuzzTargetRunner."""
def _create_runner(self, **kwargs): # pylint: disable=no-self-use
defaults = {
'fuzz_seconds': FUZZ_SECONDS,
'oss_fuzz_project_name': EXAMPLE_PROJECT
}
for default_key, default_value in defaults.items():
if default_key not in kwargs:
kwargs[default_key] = default_value
config = test_helpers.create_run_config(**kwargs)
return run_fuzzers.BaseFuzzTargetRunner(config)
def _test_initialize_fail(self, expected_error_args, **create_runner_kwargs):
with mock.patch('logging.error') as mock_error:
runner = self._create_runner(**create_runner_kwargs)
self.assertFalse(runner.initialize())
mock_error.assert_called_with(*expected_error_args)
@parameterized.parameterized.expand([(0,), (None,), (-1,)])
def test_initialize_invalid_fuzz_seconds(self, fuzz_seconds):
"""Tests initialize fails with an invalid fuzz seconds."""
expected_error_args = ('Fuzz_seconds argument must be greater than 1, '
'but was: %s.', fuzz_seconds)
with tempfile.TemporaryDirectory() as tmp_dir:
out_path = os.path.join(tmp_dir, 'build-out')
os.mkdir(out_path)
with mock.patch('utils.get_fuzz_targets') as mock_get_fuzz_targets:
mock_get_fuzz_targets.return_value = [
os.path.join(out_path, 'fuzz_target')
]
self._test_initialize_fail(expected_error_args,
fuzz_seconds=fuzz_seconds,
workspace=tmp_dir)
def test_initialize_no_out_dir(self):
"""Tests initialize fails with no out dir."""
with tempfile.TemporaryDirectory() as tmp_dir:
out_path = os.path.join(tmp_dir, 'build-out')
expected_error_args = ('Out directory: %s does not exist.', out_path)
self._test_initialize_fail(expected_error_args, workspace=tmp_dir)
def test_initialize_nonempty_artifacts(self):
"""Tests initialize with a file artifacts path."""
with tempfile.TemporaryDirectory() as tmp_dir:
out_path = os.path.join(tmp_dir, 'build-out')
os.mkdir(out_path)
os.makedirs(os.path.join(tmp_dir, 'out'))
artifacts_path = os.path.join(tmp_dir, 'out', 'artifacts')
with open(artifacts_path, 'w') as artifacts_handle:
artifacts_handle.write('fake')
expected_error_args = (
'Artifacts path: %s exists and is not an empty directory.',
artifacts_path)
self._test_initialize_fail(expected_error_args, workspace=tmp_dir)
def test_initialize_bad_artifacts(self):
"""Tests initialize with a non-empty artifacts path."""
with tempfile.TemporaryDirectory() as tmp_dir:
out_path = os.path.join(tmp_dir, 'build-out')
os.mkdir(out_path)
artifacts_path = os.path.join(tmp_dir, 'out', 'artifacts')
os.makedirs(artifacts_path)
artifact_path = os.path.join(artifacts_path, 'artifact')
with open(artifact_path, 'w') as artifact_handle:
artifact_handle.write('fake')
expected_error_args = (
'Artifacts path: %s exists and is not an empty directory.',
artifacts_path)
self._test_initialize_fail(expected_error_args, workspace=tmp_dir)
@mock.patch('utils.get_fuzz_targets')
@mock.patch('logging.error')
def test_initialize_empty_artifacts(self, mock_log_error,
mock_get_fuzz_targets):
"""Tests initialize with an empty artifacts dir."""
mock_get_fuzz_targets.return_value = ['fuzz-target']
with tempfile.TemporaryDirectory() as tmp_dir:
out_path = os.path.join(tmp_dir, 'build-out')
os.mkdir(out_path)
artifacts_path = os.path.join(tmp_dir, 'out', 'artifacts')
os.makedirs(artifacts_path)
runner = self._create_runner(workspace=tmp_dir)
self.assertTrue(runner.initialize())
mock_log_error.assert_not_called()
self.assertTrue(os.path.isdir(artifacts_path))
@mock.patch('utils.get_fuzz_targets')
@mock.patch('logging.error')
def test_initialize_no_artifacts(self, mock_log_error, mock_get_fuzz_targets):
"""Tests initialize with no artifacts dir (the expected setting)."""
mock_get_fuzz_targets.return_value = ['fuzz-target']
with tempfile.TemporaryDirectory() as tmp_dir:
out_path = os.path.join(tmp_dir, 'build-out')
os.mkdir(out_path)
runner = self._create_runner(workspace=tmp_dir)
self.assertTrue(runner.initialize())
mock_log_error.assert_not_called()
self.assertTrue(os.path.isdir(os.path.join(tmp_dir, 'out', 'artifacts')))
def test_initialize_no_fuzz_targets(self):
"""Tests initialize with no fuzz targets."""
with tempfile.TemporaryDirectory() as tmp_dir:
out_path = os.path.join(tmp_dir, 'build-out')
os.makedirs(out_path)
expected_error_args = ('No fuzz targets were found in out directory: %s.',
out_path)
self._test_initialize_fail(expected_error_args, workspace=tmp_dir)
class CiFuzzTargetRunnerTest(fake_filesystem_unittest.TestCase):
"""Tests that CiFuzzTargetRunner works as intended."""
def setUp(self):
self.setUpPyfakefs()
@mock.patch('clusterfuzz_deployment.OSSFuzz.upload_crashes')
@mock.patch('utils.get_fuzz_targets')
@mock.patch('run_fuzzers.CiFuzzTargetRunner.run_fuzz_target')
@mock.patch('run_fuzzers.CiFuzzTargetRunner.create_fuzz_target_obj')
def test_run_fuzz_targets_quits(self, mock_create_fuzz_target_obj,
mock_run_fuzz_target, mock_get_fuzz_targets,
mock_upload_crashes):
"""Tests that run_fuzz_targets quits on the first crash it finds."""
workspace = 'workspace'
out_path = os.path.join(workspace, 'build-out')
self.fs.create_dir(out_path)
config = test_helpers.create_run_config(
fuzz_seconds=FUZZ_SECONDS,
workspace=workspace,
oss_fuzz_project_name=EXAMPLE_PROJECT)
runner = run_fuzzers.CiFuzzTargetRunner(config)
mock_get_fuzz_targets.return_value = ['target1', 'target2']
runner.initialize()
testcase = os.path.join(workspace, 'testcase')
self.fs.create_file(testcase)
stacktrace = 'stacktrace'
corpus_dir = 'corpus'
self.fs.create_dir(corpus_dir)
mock_run_fuzz_target.return_value = fuzz_target.FuzzResult(
testcase, stacktrace, corpus_dir)
magic_mock = mock.MagicMock()
magic_mock.target_name = 'target1'
mock_create_fuzz_target_obj.return_value = magic_mock
self.assertTrue(runner.run_fuzz_targets())
self.assertEqual(mock_run_fuzz_target.call_count, 1)
self.assertEqual(mock_upload_crashes.call_count, 1)
class BatchFuzzTargetRunnerTest(fake_filesystem_unittest.TestCase):
"""Tests that BatchFuzzTargetRunnerTest works as intended."""
WORKSPACE = 'workspace'
STACKTRACE = 'stacktrace'
CORPUS_DIR = 'corpus'
def setUp(self):
self.setUpPyfakefs()
out_dir = os.path.join(self.WORKSPACE, 'build-out')
self.fs.create_dir(out_dir)
self.testcase1 = os.path.join(out_dir, 'testcase-aaa')
self.fs.create_file(self.testcase1)
self.testcase2 = os.path.join(out_dir, 'testcase-bbb')
self.fs.create_file(self.testcase2)
self.config = test_helpers.create_run_config(fuzz_seconds=FUZZ_SECONDS,
workspace=self.WORKSPACE,
cfl_platform='github')
@mock.patch('utils.get_fuzz_targets', return_value=['target1', 'target2'])
@mock.patch('clusterfuzz_deployment.ClusterFuzzLite.upload_crashes')
@mock.patch('run_fuzzers.BatchFuzzTargetRunner.run_fuzz_target')
@mock.patch('run_fuzzers.BatchFuzzTargetRunner.create_fuzz_target_obj')
def test_run_fuzz_targets_quits(self, mock_create_fuzz_target_obj,
mock_run_fuzz_target, mock_upload_crashes, _):
"""Tests that run_fuzz_targets doesn't quit on the first crash it finds."""
runner = run_fuzzers.BatchFuzzTargetRunner(self.config)
runner.initialize()
call_count = 0
def mock_run_fuzz_target_impl(_):
nonlocal call_count
if call_count == 0:
testcase = self.testcase1
elif call_count == 1:
testcase = self.testcase2
assert call_count != 2
call_count += 1
if not os.path.exists(self.CORPUS_DIR):
self.fs.create_dir(self.CORPUS_DIR)
return fuzz_target.FuzzResult(testcase, self.STACKTRACE, self.CORPUS_DIR)
mock_run_fuzz_target.side_effect = mock_run_fuzz_target_impl
magic_mock = mock.MagicMock()
magic_mock.target_name = 'target1'
mock_create_fuzz_target_obj.return_value = magic_mock
self.assertTrue(runner.run_fuzz_targets())
self.assertEqual(mock_run_fuzz_target.call_count, 2)
self.assertEqual(mock_upload_crashes.call_count, 1)
class GetCoverageTargetsTest(unittest.TestCase):
"""Tests for get_coverage_fuzz_targets."""
def test_get_fuzz_targets(self):
"""Tests that get_coverage_fuzz_targets returns expected targets."""
with tempfile.TemporaryDirectory() as temp_dir:
# Setup.
fuzz_target_path = os.path.join(temp_dir, 'fuzz-target')
with open(fuzz_target_path, 'w') as file_handle:
file_handle.write('')
fuzz_target_st = os.stat(fuzz_target_path)
os.chmod(fuzz_target_path, fuzz_target_st.st_mode | stat.S_IEXEC)
non_fuzz_target1 = os.path.join(temp_dir, 'non-fuzz-target1')
with open(non_fuzz_target1, 'w') as file_handle:
file_handle.write('LLVMFuzzerTestOneInput')
subdir = os.path.join(temp_dir, 'subdir')
os.mkdir(subdir)
non_fuzz_target2 = os.path.join(subdir, 'non-fuzz-target1')
with open(non_fuzz_target2, 'w') as file_handle:
file_handle.write('LLVMFuzzerTestOneInput')
self.assertEqual(run_fuzzers.get_coverage_fuzz_targets(temp_dir),
[fuzz_target_path])
@unittest.skip('TODO(metzman): Fix this test')
@unittest.skipIf(not os.getenv('INTEGRATION_TESTS'),
'INTEGRATION_TESTS=1 not set')
class CoverageReportIntegrationTest(unittest.TestCase):
"""Integration tests for coverage reports."""
SANITIZER = 'coverage'
def setUp(self):
test_helpers.patch_environ(self, runner=True)
@mock.patch('filestore.github_actions._upload_artifact_with_upload_js')
def test_coverage_report(self, _):
"""Tests generation of coverage reports end-to-end, from building to
generation."""
with test_helpers.docker_temp_dir() as temp_dir:
shared = os.path.join(temp_dir, 'shared')
os.mkdir(shared)
copy_command = ('cp -r /opt/code_coverage /shared && '
'cp $(which llvm-profdata) /shared && '
'cp $(which llvm-cov) /shared')
assert helper.docker_run([
'-v', f'{shared}:/shared', 'gcr.io/oss-fuzz-base/base-runner', 'bash',
'-c', copy_command
])
os.environ['CODE_COVERAGE_SRC'] = os.path.join(shared, 'code_coverage')
os.environ['PATH'] += os.pathsep + shared
# Do coverage build.
build_config = test_helpers.create_build_config(
oss_fuzz_project_name=EXAMPLE_PROJECT,
project_repo_name='oss-fuzz',
workspace=temp_dir,
git_sha='0b95fe1039ed7c38fea1f97078316bfc1030c523',
base_commit='da0746452433dc18bae699e355a9821285d863c8',
sanitizer=self.SANITIZER,
cfl_platform='github',
# Needed for test not to fail because of permissions issues.
bad_build_check=False)
self.assertTrue(build_fuzzers.build_fuzzers(build_config))
# TODO(metzman): Get rid of this here and make 'compile' do this.
chmod_command = ('chmod -R +r /out && '
'find /out -type d -exec chmod +x {} +')
assert helper.docker_run([
'-v', f'{os.path.join(temp_dir, "build-out")}:/out',
'gcr.io/oss-fuzz-base/base-builder', 'bash', '-c', chmod_command
])
# Generate report.
run_config = test_helpers.create_run_config(fuzz_seconds=FUZZ_SECONDS,
workspace=temp_dir,
sanitizer=self.SANITIZER,
mode='coverage',
cfl_platform='github')
result = run_fuzzers.run_fuzzers(run_config)
self.assertEqual(result, run_fuzzers.RunFuzzersResult.NO_BUG_FOUND)
expected_summary_path = os.path.join(
TEST_DATA_PATH, 'example_coverage_report_summary.json')
with open(expected_summary_path) as file_handle:
expected_summary = json.loads(file_handle.read())
actual_summary_path = os.path.join(temp_dir, 'cifuzz-coverage',
'report', 'linux', 'summary.json')
with open(actual_summary_path) as file_handle:
actual_summary = json.loads(file_handle.read())
self.assertEqual(expected_summary, actual_summary)
@unittest.skipIf(not os.getenv('INTEGRATION_TESTS'),
'INTEGRATION_TESTS=1 not set')
class RunAddressFuzzersIntegrationTest(RunFuzzerIntegrationTestMixin,
unittest.TestCase):
"""Integration tests for build_fuzzers with an ASAN build."""
BUILD_DIR_NAME = 'cifuzz-latest-build'
def test_new_bug_found(self):
"""Tests run_fuzzers with a valid ASAN build."""
# Set the first return value to True, then the second to False to
# emulate a bug existing in the current PR but not on the downloaded
# OSS-Fuzz build.
with mock.patch('fuzz_target.FuzzTarget.is_reproducible',
side_effect=[True, False]):
with tempfile.TemporaryDirectory() as tmp_dir:
workspace = os.path.join(tmp_dir, 'workspace')
shutil.copytree(TEST_DATA_PATH, workspace)
config = test_helpers.create_run_config(
fuzz_seconds=FUZZ_SECONDS,
workspace=workspace,
oss_fuzz_project_name=EXAMPLE_PROJECT)
result = run_fuzzers.run_fuzzers(config)
self.assertEqual(result, run_fuzzers.RunFuzzersResult.BUG_FOUND)
@mock.patch('fuzz_target.FuzzTarget.is_reproducible',
side_effect=[True, True])
def test_old_bug_found(self, _):
"""Tests run_fuzzers with a bug found in OSS-Fuzz before."""
with tempfile.TemporaryDirectory() as tmp_dir:
workspace = os.path.join(tmp_dir, 'workspace')
shutil.copytree(TEST_DATA_PATH, workspace)
config = test_helpers.create_run_config(
fuzz_seconds=FUZZ_SECONDS,
workspace=workspace,
oss_fuzz_project_name=EXAMPLE_PROJECT)
result = run_fuzzers.run_fuzzers(config)
self.assertEqual(result, run_fuzzers.RunFuzzersResult.NO_BUG_FOUND)
def test_invalid_build(self):
"""Tests run_fuzzers with an invalid ASAN build."""
with tempfile.TemporaryDirectory() as tmp_dir:
out_path = os.path.join(tmp_dir, 'build-out')
os.mkdir(out_path)
config = test_helpers.create_run_config(
fuzz_seconds=FUZZ_SECONDS,
workspace=tmp_dir,
oss_fuzz_project_name=EXAMPLE_PROJECT)
result = run_fuzzers.run_fuzzers(config)
self.assertEqual(result, run_fuzzers.RunFuzzersResult.ERROR)
class GetFuzzTargetRunnerTest(unittest.TestCase):
"""Tests for get_fuzz_fuzz_target_runner."""
@parameterized.parameterized.expand([
('batch', run_fuzzers.BatchFuzzTargetRunner),
('code-change', run_fuzzers.CiFuzzTargetRunner),
('coverage', run_fuzzers.CoverageTargetRunner)
])
def test_get_fuzz_target_runner(self, mode, fuzz_target_runner_cls):
"""Tests that get_fuzz_target_runner returns the correct runner based on the
specified mode."""
with tempfile.TemporaryDirectory() as tmp_dir:
run_config = test_helpers.create_run_config(
fuzz_seconds=FUZZ_SECONDS,
workspace=tmp_dir,
oss_fuzz_project_name='example',
mode=mode)
runner = run_fuzzers.get_fuzz_target_runner(run_config)
self.assertTrue(isinstance(runner, fuzz_target_runner_cls))
if __name__ == '__main__':
unittest.main()