mirror of https://github.com/google/oss-fuzz.git
366 lines
15 KiB
Python
366 lines
15 KiB
Python
# Copyright 2021 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""Tests for running fuzzers."""
|
|
import os
|
|
import sys
|
|
import shutil
|
|
import tempfile
|
|
import unittest
|
|
from unittest import mock
|
|
|
|
import parameterized
|
|
from pyfakefs import fake_filesystem_unittest
|
|
|
|
import fuzz_target
|
|
import run_fuzzers
|
|
|
|
# pylint: disable=wrong-import-position
|
|
INFRA_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
sys.path.append(INFRA_DIR)
|
|
|
|
import test_helpers
|
|
|
|
# NOTE: This integration test relies on
|
|
# https://github.com/google/oss-fuzz/tree/master/projects/example project.
|
|
EXAMPLE_PROJECT = 'example'
|
|
|
|
# Location of files used for testing.
|
|
TEST_DATA_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)),
|
|
'test_data')
|
|
|
|
MEMORY_FUZZER_DIR = os.path.join(TEST_DATA_PATH, 'memory')
|
|
MEMORY_FUZZER = 'curl_fuzzer_memory'
|
|
|
|
UNDEFINED_FUZZER_DIR = os.path.join(TEST_DATA_PATH, 'undefined')
|
|
UNDEFINED_FUZZER = 'curl_fuzzer_undefined'
|
|
|
|
FUZZ_SECONDS = 10
|
|
|
|
|
|
class RunFuzzerIntegrationTestMixin: # pylint: disable=too-few-public-methods,invalid-name
|
|
"""Mixin for integration test classes that runbuild_fuzzers on builds of a
|
|
specific sanitizer."""
|
|
# These must be defined by children.
|
|
FUZZER_DIR = None
|
|
FUZZER = None
|
|
|
|
def _test_run_with_sanitizer(self, fuzzer_dir, sanitizer):
|
|
"""Calls run_fuzzers on fuzzer_dir and |sanitizer| and asserts
|
|
the run succeeded and that no bug was found."""
|
|
with test_helpers.temp_dir_copy(fuzzer_dir) as fuzzer_dir_copy:
|
|
config = test_helpers.create_run_config(fuzz_seconds=FUZZ_SECONDS,
|
|
workspace=fuzzer_dir_copy,
|
|
project_name='curl',
|
|
sanitizer=sanitizer)
|
|
result = run_fuzzers.run_fuzzers(config)
|
|
self.assertEqual(result, run_fuzzers.RunFuzzersResult.NO_BUG_FOUND)
|
|
|
|
|
|
@unittest.skipIf(not os.getenv('INTEGRATION_TESTS'),
|
|
'INTEGRATION_TESTS=1 not set')
|
|
class RunMemoryFuzzerIntegrationTest(RunFuzzerIntegrationTestMixin,
|
|
unittest.TestCase):
|
|
"""Integration test for build_fuzzers with an MSAN build."""
|
|
FUZZER_DIR = MEMORY_FUZZER_DIR
|
|
FUZZER = MEMORY_FUZZER
|
|
|
|
def test_run_with_memory_sanitizer(self):
|
|
"""Tests run_fuzzers with a valid MSAN build."""
|
|
self._test_run_with_sanitizer(self.FUZZER_DIR, 'memory')
|
|
|
|
|
|
@unittest.skipIf(not os.getenv('INTEGRATION_TESTS'),
|
|
'INTEGRATION_TESTS=1 not set')
|
|
class RunUndefinedFuzzerIntegrationTest(RunFuzzerIntegrationTestMixin,
|
|
unittest.TestCase):
|
|
"""Integration test for build_fuzzers with an UBSAN build."""
|
|
FUZZER_DIR = UNDEFINED_FUZZER_DIR
|
|
FUZZER = UNDEFINED_FUZZER
|
|
|
|
def test_run_with_undefined_sanitizer(self):
|
|
"""Tests run_fuzzers with a valid UBSAN build."""
|
|
self._test_run_with_sanitizer(self.FUZZER_DIR, 'undefined')
|
|
|
|
|
|
class BaseFuzzTargetRunnerTest(unittest.TestCase):
|
|
"""Tests BaseFuzzTargetRunner."""
|
|
|
|
def _create_runner(self, **kwargs): # pylint: disable=no-self-use
|
|
defaults = {'fuzz_seconds': FUZZ_SECONDS, 'project_name': EXAMPLE_PROJECT}
|
|
for default_key, default_value in defaults.items():
|
|
if default_key not in kwargs:
|
|
kwargs[default_key] = default_value
|
|
|
|
config = test_helpers.create_run_config(**kwargs)
|
|
return run_fuzzers.BaseFuzzTargetRunner(config)
|
|
|
|
def _test_initialize_fail(self, expected_error_args, **create_runner_kwargs):
|
|
with mock.patch('logging.error') as mocked_error:
|
|
runner = self._create_runner(**create_runner_kwargs)
|
|
self.assertFalse(runner.initialize())
|
|
mocked_error.assert_called_with(*expected_error_args)
|
|
|
|
@parameterized.parameterized.expand([(0,), (None,), (-1,)])
|
|
def test_initialize_invalid_fuzz_seconds(self, fuzz_seconds):
|
|
"""Tests initialize fails with an invalid fuzz seconds."""
|
|
expected_error_args = ('Fuzz_seconds argument must be greater than 1, '
|
|
'but was: %s.', fuzz_seconds)
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
out_path = os.path.join(tmp_dir, 'out')
|
|
os.mkdir(out_path)
|
|
with mock.patch('utils.get_fuzz_targets') as mocked_get_fuzz_targets:
|
|
mocked_get_fuzz_targets.return_value = [
|
|
os.path.join(out_path, 'fuzz_target')
|
|
]
|
|
self._test_initialize_fail(expected_error_args,
|
|
fuzz_seconds=fuzz_seconds,
|
|
workspace=tmp_dir)
|
|
|
|
def test_initialize_no_out_dir(self):
|
|
"""Tests initialize fails with no out dir."""
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
out_path = os.path.join(tmp_dir, 'out')
|
|
expected_error_args = ('Out directory: %s does not exist.', out_path)
|
|
self._test_initialize_fail(expected_error_args, workspace=tmp_dir)
|
|
|
|
def test_initialize_nonempty_artifacts(self):
|
|
"""Tests initialize with a file artifacts path."""
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
out_path = os.path.join(tmp_dir, 'out')
|
|
os.mkdir(out_path)
|
|
artifacts_path = os.path.join(out_path, 'artifacts')
|
|
with open(artifacts_path, 'w') as artifacts_handle:
|
|
artifacts_handle.write('fake')
|
|
expected_error_args = (
|
|
'Artifacts path: %s exists and is not an empty directory.',
|
|
artifacts_path)
|
|
self._test_initialize_fail(expected_error_args, workspace=tmp_dir)
|
|
|
|
def test_initialize_bad_artifacts(self):
|
|
"""Tests initialize with a non-empty artifacts path."""
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
out_path = os.path.join(tmp_dir, 'out')
|
|
artifacts_path = os.path.join(out_path, 'artifacts')
|
|
os.makedirs(artifacts_path)
|
|
artifact_path = os.path.join(artifacts_path, 'artifact')
|
|
with open(artifact_path, 'w') as artifact_handle:
|
|
artifact_handle.write('fake')
|
|
expected_error_args = (
|
|
'Artifacts path: %s exists and is not an empty directory.',
|
|
artifacts_path)
|
|
self._test_initialize_fail(expected_error_args, workspace=tmp_dir)
|
|
|
|
@mock.patch('utils.get_fuzz_targets')
|
|
@mock.patch('logging.error')
|
|
def test_initialize_empty_artifacts(self, mocked_log_error,
|
|
mocked_get_fuzz_targets):
|
|
"""Tests initialize with an empty artifacts dir."""
|
|
mocked_get_fuzz_targets.return_value = ['fuzz-target']
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
out_path = os.path.join(tmp_dir, 'out')
|
|
artifacts_path = os.path.join(out_path, 'artifacts')
|
|
os.makedirs(artifacts_path)
|
|
runner = self._create_runner(workspace=tmp_dir)
|
|
self.assertTrue(runner.initialize())
|
|
mocked_log_error.assert_not_called()
|
|
self.assertTrue(os.path.isdir(artifacts_path))
|
|
|
|
@mock.patch('utils.get_fuzz_targets')
|
|
@mock.patch('logging.error')
|
|
def test_initialize_no_artifacts(self, mocked_log_error,
|
|
mocked_get_fuzz_targets):
|
|
"""Tests initialize with no artifacts dir (the expected setting)."""
|
|
mocked_get_fuzz_targets.return_value = ['fuzz-target']
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
out_path = os.path.join(tmp_dir, 'out')
|
|
os.makedirs(out_path)
|
|
runner = self._create_runner(workspace=tmp_dir)
|
|
self.assertTrue(runner.initialize())
|
|
mocked_log_error.assert_not_called()
|
|
self.assertTrue(os.path.isdir(os.path.join(out_path, 'artifacts')))
|
|
|
|
def test_initialize_no_fuzz_targets(self):
|
|
"""Tests initialize with no fuzz targets."""
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
out_path = os.path.join(tmp_dir, 'out')
|
|
os.makedirs(out_path)
|
|
expected_error_args = ('No fuzz targets were found in out directory: %s.',
|
|
out_path)
|
|
self._test_initialize_fail(expected_error_args, workspace=tmp_dir)
|
|
|
|
def test_get_fuzz_target_artifact(self):
|
|
"""Tests that get_fuzz_target_artifact works as intended."""
|
|
runner = self._create_runner()
|
|
crashes_dir = 'crashes-dir'
|
|
runner.crashes_dir = crashes_dir
|
|
artifact_name = 'artifact-name'
|
|
target = mock.MagicMock()
|
|
target_name = 'target_name'
|
|
target.target_name = target_name
|
|
fuzz_target_artifact = runner.get_fuzz_target_artifact(
|
|
target, artifact_name)
|
|
expected_fuzz_target_artifact = (
|
|
'crashes-dir/target_name-address-artifact-name')
|
|
self.assertEqual(fuzz_target_artifact, expected_fuzz_target_artifact)
|
|
|
|
|
|
class CiFuzzTargetRunnerTest(fake_filesystem_unittest.TestCase):
|
|
"""Tests that CiFuzzTargetRunner works as intended."""
|
|
|
|
def setUp(self):
|
|
self.setUpPyfakefs()
|
|
|
|
@mock.patch('utils.get_fuzz_targets')
|
|
@mock.patch('run_fuzzers.CiFuzzTargetRunner.run_fuzz_target')
|
|
@mock.patch('run_fuzzers.CiFuzzTargetRunner.create_fuzz_target_obj')
|
|
def test_run_fuzz_targets_quits(self, mocked_create_fuzz_target_obj,
|
|
mocked_run_fuzz_target,
|
|
mocked_get_fuzz_targets):
|
|
"""Tests that run_fuzz_targets quits on the first crash it finds."""
|
|
workspace = 'workspace'
|
|
out_path = os.path.join(workspace, 'out')
|
|
self.fs.create_dir(out_path)
|
|
config = test_helpers.create_run_config(fuzz_seconds=FUZZ_SECONDS,
|
|
workspace=workspace,
|
|
project_name=EXAMPLE_PROJECT)
|
|
runner = run_fuzzers.CiFuzzTargetRunner(config)
|
|
|
|
mocked_get_fuzz_targets.return_value = ['target1', 'target2']
|
|
runner.initialize()
|
|
testcase = os.path.join(workspace, 'testcase')
|
|
self.fs.create_file(testcase)
|
|
stacktrace = b'stacktrace'
|
|
corpus_dir = 'corpus'
|
|
self.fs.create_dir(corpus_dir)
|
|
mocked_run_fuzz_target.return_value = fuzz_target.FuzzResult(
|
|
testcase, stacktrace, corpus_dir)
|
|
magic_mock = mock.MagicMock()
|
|
magic_mock.target_name = 'target1'
|
|
mocked_create_fuzz_target_obj.return_value = magic_mock
|
|
self.assertTrue(runner.run_fuzz_targets())
|
|
self.assertIn('target1-address-testcase', os.listdir(runner.crashes_dir))
|
|
self.assertEqual(mocked_run_fuzz_target.call_count, 1)
|
|
|
|
|
|
class BatchFuzzTargetRunnerTest(fake_filesystem_unittest.TestCase):
|
|
"""Tests that CiFuzzTargetRunner works as intended."""
|
|
|
|
def setUp(self):
|
|
self.setUpPyfakefs()
|
|
|
|
@mock.patch('utils.get_fuzz_targets')
|
|
@mock.patch('run_fuzzers.BatchFuzzTargetRunner.run_fuzz_target')
|
|
@mock.patch('run_fuzzers.BatchFuzzTargetRunner.create_fuzz_target_obj')
|
|
def test_run_fuzz_targets_quits(self, mocked_create_fuzz_target_obj,
|
|
mocked_run_fuzz_target,
|
|
mocked_get_fuzz_targets):
|
|
"""Tests that run_fuzz_targets doesn't quit on the first crash it finds."""
|
|
workspace = 'workspace'
|
|
out_path = os.path.join(workspace, 'out')
|
|
self.fs.create_dir(out_path)
|
|
config = test_helpers.create_run_config(fuzz_seconds=FUZZ_SECONDS,
|
|
workspace=workspace,
|
|
project_name=EXAMPLE_PROJECT)
|
|
runner = run_fuzzers.BatchFuzzTargetRunner(config)
|
|
|
|
mocked_get_fuzz_targets.return_value = ['target1', 'target2']
|
|
runner.initialize()
|
|
testcase1 = os.path.join(workspace, 'testcase-aaa')
|
|
testcase2 = os.path.join(workspace, 'testcase-bbb')
|
|
self.fs.create_file(testcase1)
|
|
self.fs.create_file(testcase2)
|
|
stacktrace = b'stacktrace'
|
|
call_count = 0
|
|
corpus_dir = 'corpus'
|
|
|
|
def mock_run_fuzz_target(_):
|
|
nonlocal call_count
|
|
if call_count == 0:
|
|
testcase = testcase1
|
|
elif call_count == 1:
|
|
testcase = testcase2
|
|
assert call_count != 2
|
|
call_count += 1
|
|
return fuzz_target.FuzzResult(testcase, stacktrace, corpus_dir)
|
|
|
|
mocked_run_fuzz_target.side_effect = mock_run_fuzz_target
|
|
magic_mock = mock.MagicMock()
|
|
magic_mock.target_name = 'target1'
|
|
mocked_create_fuzz_target_obj.return_value = magic_mock
|
|
self.assertTrue(runner.run_fuzz_targets())
|
|
self.assertIn('target1-address-testcase-aaa',
|
|
os.listdir(runner.crashes_dir))
|
|
self.assertEqual(mocked_run_fuzz_target.call_count, 2)
|
|
|
|
|
|
@unittest.skipIf(not os.getenv('INTEGRATION_TESTS'),
|
|
'INTEGRATION_TESTS=1 not set')
|
|
class RunAddressFuzzersIntegrationTest(RunFuzzerIntegrationTestMixin,
|
|
unittest.TestCase):
|
|
"""Integration tests for build_fuzzers with an ASAN build."""
|
|
|
|
BUILD_DIR_NAME = 'cifuzz-latest-build'
|
|
|
|
def test_new_bug_found(self):
|
|
"""Tests run_fuzzers with a valid ASAN build."""
|
|
# Set the first return value to True, then the second to False to
|
|
# emulate a bug existing in the current PR but not on the downloaded
|
|
# OSS-Fuzz build.
|
|
with mock.patch('fuzz_target.FuzzTarget.is_reproducible',
|
|
side_effect=[True, False]):
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
workspace = os.path.join(tmp_dir, 'workspace')
|
|
shutil.copytree(TEST_DATA_PATH, workspace)
|
|
config = test_helpers.create_run_config(fuzz_seconds=FUZZ_SECONDS,
|
|
workspace=workspace,
|
|
project_name=EXAMPLE_PROJECT)
|
|
result = run_fuzzers.run_fuzzers(config)
|
|
self.assertEqual(result, run_fuzzers.RunFuzzersResult.BUG_FOUND)
|
|
build_dir = os.path.join(workspace, 'out', self.BUILD_DIR_NAME)
|
|
self.assertNotEqual(0, len(os.listdir(build_dir)))
|
|
|
|
@mock.patch('fuzz_target.FuzzTarget.is_reproducible',
|
|
side_effect=[True, True])
|
|
def test_old_bug_found(self, _):
|
|
"""Tests run_fuzzers with a bug found in OSS-Fuzz before."""
|
|
config = test_helpers.create_run_config(fuzz_seconds=FUZZ_SECONDS,
|
|
workspace=TEST_DATA_PATH,
|
|
project_name=EXAMPLE_PROJECT)
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
workspace = os.path.join(tmp_dir, 'workspace')
|
|
shutil.copytree(TEST_DATA_PATH, workspace)
|
|
config = test_helpers.create_run_config(fuzz_seconds=FUZZ_SECONDS,
|
|
workspace=TEST_DATA_PATH,
|
|
project_name=EXAMPLE_PROJECT)
|
|
result = run_fuzzers.run_fuzzers(config)
|
|
self.assertEqual(result, run_fuzzers.RunFuzzersResult.NO_BUG_FOUND)
|
|
build_dir = os.path.join(TEST_DATA_PATH, 'out', self.BUILD_DIR_NAME)
|
|
self.assertTrue(os.path.exists(build_dir))
|
|
self.assertNotEqual(0, len(os.listdir(build_dir)))
|
|
|
|
def test_invalid_build(self):
|
|
"""Tests run_fuzzers with an invalid ASAN build."""
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
out_path = os.path.join(tmp_dir, 'out')
|
|
os.mkdir(out_path)
|
|
config = test_helpers.create_run_config(fuzz_seconds=FUZZ_SECONDS,
|
|
workspace=tmp_dir,
|
|
project_name=EXAMPLE_PROJECT)
|
|
result = run_fuzzers.run_fuzzers(config)
|
|
self.assertEqual(result, run_fuzzers.RunFuzzersResult.ERROR)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|