[CIFuzz] Seperate code for running fuzzers into own module: run_fuzzers.py (#5031)

TODO: Rename cifuzz.py to build_fuzzers.py
This commit is contained in:
jonathanmetzman 2021-01-26 08:32:41 -08:00 committed by GitHub
parent 3e112546fc
commit d6ff0bfcdc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 395 additions and 291 deletions

View File

@ -11,21 +11,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module used by CI tools in order to interact with fuzzers.
This module helps CI tools do the following:
1. Build fuzzers.
2. Run fuzzers.
Eventually it will be used to help CI tools determine which fuzzers to run.
"""
"""Module used by CI tools in order to interact with fuzzers. This module helps
CI tools to build fuzzers."""
import logging
import os
import shutil
import sys
import time
import affected_fuzz_targets
import fuzz_target
# pylint: disable=wrong-import-position,import-error
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@ -35,35 +28,6 @@ import repo_manager
import retry
import utils
# From clusterfuzz: src/python/crash_analysis/crash_analyzer.py
# Used to get the beginning of the stacktrace.
STACKTRACE_TOOL_MARKERS = [
b'AddressSanitizer',
b'ASAN:',
b'CFI: Most likely a control flow integrity violation;',
b'ERROR: libFuzzer',
b'KASAN:',
b'LeakSanitizer',
b'MemorySanitizer',
b'ThreadSanitizer',
b'UndefinedBehaviorSanitizer',
b'UndefinedSanitizer',
]
# From clusterfuzz: src/python/crash_analysis/crash_analyzer.py
# Used to get the end of the stacktrace.
STACKTRACE_END_MARKERS = [
b'ABORTING',
b'END MEMORY TOOL REPORT',
b'End of process memory map.',
b'END_KASAN_OUTPUT',
b'SUMMARY:',
b'Shadow byte and word',
b'[end of stack trace]',
b'\nExiting',
b'minidump has been written',
]
# Default fuzz configuration.
DEFAULT_ENGINE = 'libfuzzer'
DEFAULT_ARCHITECTURE = 'x86_64'
@ -399,73 +363,6 @@ def build_fuzzers( # pylint: disable=too-many-arguments,too-many-locals
return builder.build()
def run_fuzzers( # pylint: disable=too-many-arguments,too-many-locals
fuzz_seconds,
workspace,
project_name,
sanitizer='address'):
"""Runs all fuzzers for a specific OSS-Fuzz project.
Args:
fuzz_seconds: The total time allotted for fuzzing.
workspace: The location in a shared volume to store a git repo and build
artifacts.
project_name: The name of the relevant OSS-Fuzz project.
sanitizer: The sanitizer the fuzzers should be run with.
Returns:
(True if run was successful, True if bug was found).
"""
# Validate inputs.
if not os.path.exists(workspace):
logging.error('Invalid workspace: %s.', workspace)
return False, False
logging.info('Using %s sanitizer.', sanitizer)
out_dir = os.path.join(workspace, 'out')
artifacts_dir = os.path.join(out_dir, 'artifacts')
os.makedirs(artifacts_dir, exist_ok=True)
if not fuzz_seconds or fuzz_seconds < 1:
logging.error('Fuzz_seconds argument must be greater than 1, but was: %s.',
fuzz_seconds)
return False, False
# Get fuzzer information.
fuzzer_paths = utils.get_fuzz_targets(out_dir)
if not fuzzer_paths:
logging.error('No fuzzers were found in out directory: %s.', out_dir)
return False, False
# Run fuzzers for allotted time.
total_num_fuzzers = len(fuzzer_paths)
fuzzers_left_to_run = total_num_fuzzers
min_seconds_per_fuzzer = fuzz_seconds // total_num_fuzzers
for fuzzer_path in fuzzer_paths:
run_seconds = max(fuzz_seconds // fuzzers_left_to_run,
min_seconds_per_fuzzer)
target = fuzz_target.FuzzTarget(fuzzer_path,
run_seconds,
out_dir,
project_name,
sanitizer=sanitizer)
start_time = time.time()
testcase, stacktrace = target.fuzz()
fuzz_seconds -= (time.time() - start_time)
if not testcase or not stacktrace:
logging.info('Fuzzer %s, finished running.', target.target_name)
else:
utils.binary_print(b'Fuzzer %s, detected error:\n%s' %
(target.target_name.encode(), stacktrace))
shutil.move(testcase, os.path.join(artifacts_dir, 'test_case'))
parse_fuzzer_output(stacktrace, artifacts_dir)
return True, True
fuzzers_left_to_run -= 1
return True, False
def get_common_docker_args(sanitizer):
"""Returns a list of common docker arguments."""
return [
@ -524,37 +421,3 @@ def check_fuzzer_build(out_dir,
logging.error('Check fuzzer build failed.')
return False
return True
def parse_fuzzer_output(fuzzer_output, out_dir):
"""Parses the fuzzer output from a fuzz target binary.
Args:
fuzzer_output: A fuzz target binary output string to be parsed.
out_dir: The location to store the parsed output files.
"""
# Get index of key file points.
for marker in STACKTRACE_TOOL_MARKERS:
marker_index = fuzzer_output.find(marker)
if marker_index:
begin_summary = marker_index
break
end_summary = -1
for marker in STACKTRACE_END_MARKERS:
marker_index = fuzzer_output.find(marker)
if marker_index:
end_summary = marker_index + len(marker)
break
if begin_summary is None or end_summary is None:
return
summary_str = fuzzer_output[begin_summary:end_summary]
if not summary_str:
return
# Write sections of fuzzer output to specific files.
summary_file_path = os.path.join(out_dir, 'bug_summary.txt')
with open(summary_file_path, 'ab') as summary_handle:
summary_handle.write(summary_str)

View File

@ -11,10 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests the functionality of the cifuzz module's functions:
1. Building fuzzers.
2. Running fuzzers.
"""
"""Tests the functionality of the cifuzz module."""
import os
import sys
import tempfile
@ -28,7 +25,6 @@ sys.path.append(INFRA_DIR)
OSS_FUZZ_DIR = os.path.dirname(INFRA_DIR)
import cifuzz
import fuzz_target
import test_helpers
# NOTE: This integration test relies on
@ -52,12 +48,6 @@ EXAMPLE_NOCRASH_FUZZER = 'example_nocrash_fuzzer'
# A fuzzer to be built in build_fuzzers integration tests.
EXAMPLE_BUILD_FUZZER = 'do_stuff_fuzzer'
MEMORY_FUZZER_DIR = os.path.join(TEST_FILES_PATH, 'memory')
MEMORY_FUZZER = 'curl_fuzzer_memory'
UNDEFINED_FUZZER_DIR = os.path.join(TEST_FILES_PATH, 'undefined')
UNDEFINED_FUZZER = 'curl_fuzzer_undefined'
# pylint: disable=no-self-use
@ -227,142 +217,6 @@ class BuildFuzzersIntegrationTest(unittest.TestCase):
))
class RunFuzzerIntegrationTestMixin: # pylint: disable=too-few-public-methods,invalid-name
"""Mixin for integration test classes that runbuild_fuzzers on builds of a
specific sanitizer."""
# These must be defined by children.
FUZZER_DIR = None
FUZZER = None
def _test_run_with_sanitizer(self, fuzzer_dir, sanitizer):
"""Calls run_fuzzers on fuzzer_dir and |sanitizer| and asserts
the run succeeded and that no bug was found."""
with test_helpers.temp_dir_copy(fuzzer_dir) as fuzzer_dir_copy:
run_success, bug_found = cifuzz.run_fuzzers(10,
fuzzer_dir_copy,
'curl',
sanitizer=sanitizer)
self.assertTrue(run_success)
self.assertFalse(bug_found)
class RunMemoryFuzzerIntegrationTest(RunFuzzerIntegrationTestMixin,
unittest.TestCase):
"""Integration test for build_fuzzers with an MSAN build."""
FUZZER_DIR = MEMORY_FUZZER_DIR
FUZZER = MEMORY_FUZZER
@unittest.skipIf(not os.getenv('INTEGRATION_TESTS'),
'INTEGRATION_TESTS=1 not set')
def test_run_with_memory_sanitizer(self):
"""Tests run_fuzzers with a valid MSAN build."""
self._test_run_with_sanitizer(self.FUZZER_DIR, 'memory')
class RunUndefinedFuzzerIntegrationTest(RunFuzzerIntegrationTestMixin,
unittest.TestCase):
"""Integration test for build_fuzzers with an UBSAN build."""
FUZZER_DIR = UNDEFINED_FUZZER_DIR
FUZZER = UNDEFINED_FUZZER
@unittest.skipIf(not os.getenv('INTEGRATION_TESTS'),
'INTEGRATION_TESTS=1 not set')
def test_run_with_undefined_sanitizer(self):
"""Tests run_fuzzers with a valid UBSAN build."""
self._test_run_with_sanitizer(self.FUZZER_DIR, 'undefined')
class RunAddressFuzzersIntegrationTest(RunFuzzerIntegrationTestMixin,
unittest.TestCase):
"""Integration tests for build_fuzzers with an ASAN build."""
@unittest.skipIf(not os.getenv('INTEGRATION_TESTS'),
'INTEGRATION_TESTS=1 not set')
def test_new_bug_found(self):
"""Tests run_fuzzers with a valid ASAN build."""
# Set the first return value to True, then the second to False to
# emulate a bug existing in the current PR but not on the downloaded
# OSS-Fuzz build.
with mock.patch.object(fuzz_target.FuzzTarget,
'is_reproducible',
side_effect=[True, False]):
run_success, bug_found = cifuzz.run_fuzzers(10, TEST_FILES_PATH,
EXAMPLE_PROJECT)
build_dir = os.path.join(TEST_FILES_PATH, 'out', 'oss_fuzz_latest')
self.assertTrue(os.path.exists(build_dir))
self.assertNotEqual(0, len(os.listdir(build_dir)))
self.assertTrue(run_success)
self.assertTrue(bug_found)
@unittest.skipIf(not os.getenv('INTEGRATION_TESTS'),
'INTEGRATION_TESTS=1 not set')
def test_old_bug_found(self):
"""Tests run_fuzzers with a bug found in OSS-Fuzz before."""
with mock.patch.object(fuzz_target.FuzzTarget,
'is_reproducible',
side_effect=[True, True]):
run_success, bug_found = cifuzz.run_fuzzers(10, TEST_FILES_PATH,
EXAMPLE_PROJECT)
build_dir = os.path.join(TEST_FILES_PATH, 'out', 'oss_fuzz_latest')
self.assertTrue(os.path.exists(build_dir))
self.assertNotEqual(0, len(os.listdir(build_dir)))
self.assertTrue(run_success)
self.assertFalse(bug_found)
def test_invalid_build(self):
"""Tests run_fuzzers with an invalid ASAN build."""
with tempfile.TemporaryDirectory() as tmp_dir:
out_path = os.path.join(tmp_dir, 'out')
os.mkdir(out_path)
run_success, bug_found = cifuzz.run_fuzzers(10, tmp_dir, EXAMPLE_PROJECT)
self.assertFalse(run_success)
self.assertFalse(bug_found)
def test_invalid_fuzz_seconds(self):
"""Tests run_fuzzers with an invalid fuzz seconds."""
with tempfile.TemporaryDirectory() as tmp_dir:
out_path = os.path.join(tmp_dir, 'out')
os.mkdir(out_path)
run_success, bug_found = cifuzz.run_fuzzers(0, tmp_dir, EXAMPLE_PROJECT)
self.assertFalse(run_success)
self.assertFalse(bug_found)
def test_invalid_out_dir(self):
"""Tests run_fuzzers with an invalid out directory."""
run_success, bug_found = cifuzz.run_fuzzers(10, 'not/a/valid/path',
EXAMPLE_PROJECT)
self.assertFalse(run_success)
self.assertFalse(bug_found)
class ParseOutputTest(unittest.TestCase):
"""Tests parse_fuzzer_output."""
def test_parse_valid_output(self):
"""Checks that the parse fuzzer output can correctly parse output."""
test_output_path = os.path.join(TEST_FILES_PATH,
'example_crash_fuzzer_output.txt')
test_summary_path = os.path.join(TEST_FILES_PATH, 'bug_summary_example.txt')
with tempfile.TemporaryDirectory() as tmp_dir:
with open(test_output_path, 'rb') as test_fuzz_output:
cifuzz.parse_fuzzer_output(test_fuzz_output.read(), tmp_dir)
result_files = ['bug_summary.txt']
self.assertCountEqual(os.listdir(tmp_dir), result_files)
# Compare the bug summaries.
with open(os.path.join(tmp_dir, 'bug_summary.txt')) as bug_summary:
detected_summary = bug_summary.read()
with open(test_summary_path) as bug_summary:
real_summary = bug_summary.read()
self.assertEqual(detected_summary, real_summary)
def test_parse_invalid_output(self):
"""Checks that no files are created when an invalid input was given."""
with tempfile.TemporaryDirectory() as tmp_dir:
cifuzz.parse_fuzzer_output(b'not a valid output_string', tmp_dir)
self.assertEqual(len(os.listdir(tmp_dir)), 0)
class CheckFuzzerBuildTest(unittest.TestCase):
"""Tests the check_fuzzer_build function in the cifuzz module."""

View File

@ -0,0 +1,94 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for running fuzzers."""
import logging
import os
import shutil
import sys
import time
import fuzz_target
import stack_parser
# pylint: disable=wrong-import-position,import-error
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import utils
def run_fuzzers( # pylint: disable=too-many-arguments,too-many-locals
fuzz_seconds,
workspace,
project_name,
sanitizer='address'):
"""Runs all fuzzers for a specific OSS-Fuzz project.
Args:
fuzz_seconds: The total time allotted for fuzzing.
workspace: The location in a shared volume to store a git repo and build
artifacts.
project_name: The name of the relevant OSS-Fuzz project.
sanitizer: The sanitizer the fuzzers should be run with.
Returns:
(True if run was successful, True if bug was found).
"""
# Validate inputs.
if not os.path.exists(workspace):
logging.error('Invalid workspace: %s.', workspace)
return False, False
logging.info('Using %s sanitizer.', sanitizer)
out_dir = os.path.join(workspace, 'out')
artifacts_dir = os.path.join(out_dir, 'artifacts')
os.makedirs(artifacts_dir, exist_ok=True)
if not fuzz_seconds or fuzz_seconds < 1:
logging.error('Fuzz_seconds argument must be greater than 1, but was: %s.',
fuzz_seconds)
return False, False
# Get fuzzer information.
fuzzer_paths = utils.get_fuzz_targets(out_dir)
if not fuzzer_paths:
logging.error('No fuzzers were found in out directory: %s.', out_dir)
return False, False
# Run fuzzers for allotted time.
total_num_fuzzers = len(fuzzer_paths)
fuzzers_left_to_run = total_num_fuzzers
min_seconds_per_fuzzer = fuzz_seconds // total_num_fuzzers
for fuzzer_path in fuzzer_paths:
run_seconds = max(fuzz_seconds // fuzzers_left_to_run,
min_seconds_per_fuzzer)
target = fuzz_target.FuzzTarget(fuzzer_path,
run_seconds,
out_dir,
project_name,
sanitizer=sanitizer)
start_time = time.time()
testcase, stacktrace = target.fuzz()
fuzz_seconds -= (time.time() - start_time)
if not testcase or not stacktrace:
logging.info('Fuzzer %s, finished running.', target.target_name)
else:
utils.binary_print(b'Fuzzer %s, detected error:\n%s' %
(target.target_name.encode(), stacktrace))
shutil.move(testcase, os.path.join(artifacts_dir, 'test_case'))
stack_parser.parse_fuzzer_output(stacktrace, artifacts_dir)
return True, True
fuzzers_left_to_run -= 1
return True, False

View File

@ -16,7 +16,7 @@ import logging
import os
import sys
import cifuzz
import run_fuzzers
# pylint: disable=c-extension-no-member
# pylint gets confused because of the relative import of cifuzz.
@ -77,10 +77,10 @@ def main():
logging.error('This script needs to be run in the Github action context.')
return returncode
# Run the specified project's fuzzers from the build.
run_status, bug_found = cifuzz.run_fuzzers(fuzz_seconds,
workspace,
oss_fuzz_project_name,
sanitizer=sanitizer)
run_status, bug_found = run_fuzzers.run_fuzzers(fuzz_seconds,
workspace,
oss_fuzz_project_name,
sanitizer=sanitizer)
if not run_status:
logging.error('Error occurred while running in workspace %s.', workspace)
return returncode

View File

@ -0,0 +1,155 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for running fuzzers."""
import os
import sys
import tempfile
import unittest
from unittest import mock
# pylint: disable=wrong-import-position
INFRA_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(INFRA_DIR)
import fuzz_target
import run_fuzzers
import test_helpers
# NOTE: This integration test relies on
# https://github.com/google/oss-fuzz/tree/master/projects/example project.
EXAMPLE_PROJECT = 'example'
# Location of files used for testing.
TEST_FILES_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'test_files')
MEMORY_FUZZER_DIR = os.path.join(TEST_FILES_PATH, 'memory')
MEMORY_FUZZER = 'curl_fuzzer_memory'
UNDEFINED_FUZZER_DIR = os.path.join(TEST_FILES_PATH, 'undefined')
UNDEFINED_FUZZER = 'curl_fuzzer_undefined'
class RunFuzzerIntegrationTestMixin: # pylint: disable=too-few-public-methods,invalid-name
"""Mixin for integration test classes that runbuild_fuzzers on builds of a
specific sanitizer."""
# These must be defined by children.
FUZZER_DIR = None
FUZZER = None
def _test_run_with_sanitizer(self, fuzzer_dir, sanitizer):
"""Calls run_fuzzers on fuzzer_dir and |sanitizer| and asserts
the run succeeded and that no bug was found."""
with test_helpers.temp_dir_copy(fuzzer_dir) as fuzzer_dir_copy:
run_success, bug_found = run_fuzzers.run_fuzzers(10,
fuzzer_dir_copy,
'curl',
sanitizer=sanitizer)
self.assertTrue(run_success)
self.assertFalse(bug_found)
class RunMemoryFuzzerIntegrationTest(RunFuzzerIntegrationTestMixin,
unittest.TestCase):
"""Integration test for build_fuzzers with an MSAN build."""
FUZZER_DIR = MEMORY_FUZZER_DIR
FUZZER = MEMORY_FUZZER
@unittest.skipIf(not os.getenv('INTEGRATION_TESTS'),
'INTEGRATION_TESTS=1 not set')
def test_run_with_memory_sanitizer(self):
"""Tests run_fuzzers with a valid MSAN build."""
self._test_run_with_sanitizer(self.FUZZER_DIR, 'memory')
class RunUndefinedFuzzerIntegrationTest(RunFuzzerIntegrationTestMixin,
unittest.TestCase):
"""Integration test for build_fuzzers with an UBSAN build."""
FUZZER_DIR = UNDEFINED_FUZZER_DIR
FUZZER = UNDEFINED_FUZZER
@unittest.skipIf(not os.getenv('INTEGRATION_TESTS'),
'INTEGRATION_TESTS=1 not set')
def test_run_with_undefined_sanitizer(self):
"""Tests run_fuzzers with a valid UBSAN build."""
self._test_run_with_sanitizer(self.FUZZER_DIR, 'undefined')
class RunAddressFuzzersIntegrationTest(RunFuzzerIntegrationTestMixin,
unittest.TestCase):
"""Integration tests for build_fuzzers with an ASAN build."""
@unittest.skipIf(not os.getenv('INTEGRATION_TESTS'),
'INTEGRATION_TESTS=1 not set')
def test_new_bug_found(self):
"""Tests run_fuzzers with a valid ASAN build."""
# Set the first return value to True, then the second to False to
# emulate a bug existing in the current PR but not on the downloaded
# OSS-Fuzz build.
with mock.patch.object(fuzz_target.FuzzTarget,
'is_reproducible',
side_effect=[True, False]):
run_success, bug_found = run_fuzzers.run_fuzzers(10, TEST_FILES_PATH,
EXAMPLE_PROJECT)
build_dir = os.path.join(TEST_FILES_PATH, 'out', 'oss_fuzz_latest')
self.assertTrue(os.path.exists(build_dir))
self.assertNotEqual(0, len(os.listdir(build_dir)))
self.assertTrue(run_success)
self.assertTrue(bug_found)
@unittest.skipIf(not os.getenv('INTEGRATION_TESTS'),
'INTEGRATION_TESTS=1 not set')
def test_old_bug_found(self):
"""Tests run_fuzzers with a bug found in OSS-Fuzz before."""
with mock.patch.object(fuzz_target.FuzzTarget,
'is_reproducible',
side_effect=[True, True]):
run_success, bug_found = run_fuzzers.run_fuzzers(10, TEST_FILES_PATH,
EXAMPLE_PROJECT)
build_dir = os.path.join(TEST_FILES_PATH, 'out', 'oss_fuzz_latest')
self.assertTrue(os.path.exists(build_dir))
self.assertNotEqual(0, len(os.listdir(build_dir)))
self.assertTrue(run_success)
self.assertFalse(bug_found)
def test_invalid_build(self):
"""Tests run_fuzzers with an invalid ASAN build."""
with tempfile.TemporaryDirectory() as tmp_dir:
out_path = os.path.join(tmp_dir, 'out')
os.mkdir(out_path)
run_success, bug_found = run_fuzzers.run_fuzzers(10, tmp_dir,
EXAMPLE_PROJECT)
self.assertFalse(run_success)
self.assertFalse(bug_found)
def test_invalid_fuzz_seconds(self):
"""Tests run_fuzzers with an invalid fuzz seconds."""
with tempfile.TemporaryDirectory() as tmp_dir:
out_path = os.path.join(tmp_dir, 'out')
os.mkdir(out_path)
run_success, bug_found = run_fuzzers.run_fuzzers(0, tmp_dir,
EXAMPLE_PROJECT)
self.assertFalse(run_success)
self.assertFalse(bug_found)
def test_invalid_out_dir(self):
"""Tests run_fuzzers with an invalid out directory."""
run_success, bug_found = run_fuzzers.run_fuzzers(10, 'not/a/valid/path',
EXAMPLE_PROJECT)
self.assertFalse(run_success)
self.assertFalse(bug_found)
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,79 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for parsing stacks from fuzz targets."""
import os
# From clusterfuzz: src/python/crash_analysis/crash_analyzer.py
# Used to get the beginning of the stacktrace.
STACKTRACE_TOOL_MARKERS = [
b'AddressSanitizer',
b'ASAN:',
b'CFI: Most likely a control flow integrity violation;',
b'ERROR: libFuzzer',
b'KASAN:',
b'LeakSanitizer',
b'MemorySanitizer',
b'ThreadSanitizer',
b'UndefinedBehaviorSanitizer',
b'UndefinedSanitizer',
]
# From clusterfuzz: src/python/crash_analysis/crash_analyzer.py
# Used to get the end of the stacktrace.
STACKTRACE_END_MARKERS = [
b'ABORTING',
b'END MEMORY TOOL REPORT',
b'End of process memory map.',
b'END_KASAN_OUTPUT',
b'SUMMARY:',
b'Shadow byte and word',
b'[end of stack trace]',
b'\nExiting',
b'minidump has been written',
]
def parse_fuzzer_output(fuzzer_output, out_dir):
"""Parses the fuzzer output from a fuzz target binary.
Args:
fuzzer_output: A fuzz target binary output string to be parsed.
out_dir: The location to store the parsed output files.
"""
# Get index of key file points.
for marker in STACKTRACE_TOOL_MARKERS:
marker_index = fuzzer_output.find(marker)
if marker_index:
begin_stack = marker_index
break
end_stack = -1
for marker in STACKTRACE_END_MARKERS:
marker_index = fuzzer_output.find(marker)
if marker_index:
end_stack = marker_index + len(marker)
break
if begin_stack is None or end_stack is None:
return
summary_str = fuzzer_output[begin_stack:end_stack]
if not summary_str:
return
# Write sections of fuzzer output to specific files.
summary_file_path = os.path.join(out_dir, 'bug_summary.txt')
with open(summary_file_path, 'ab') as summary_handle:
summary_handle.write(summary_str)

View File

@ -0,0 +1,59 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for stack_parser."""
import os
import tempfile
import unittest
import stack_parser
# NOTE: This integration test relies on
# https://github.com/google/oss-fuzz/tree/master/projects/example project.
EXAMPLE_PROJECT = 'example'
# Location of files used for testing.
TEST_FILES_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'test_files')
class ParseOutputTest(unittest.TestCase):
"""Tests parse_fuzzer_output."""
def test_parse_valid_output(self):
"""Checks that the parse fuzzer output can correctly parse output."""
test_output_path = os.path.join(TEST_FILES_PATH,
'example_crash_fuzzer_output.txt')
test_summary_path = os.path.join(TEST_FILES_PATH, 'bug_summary_example.txt')
with tempfile.TemporaryDirectory() as tmp_dir:
with open(test_output_path, 'rb') as test_fuzz_output:
stack_parser.parse_fuzzer_output(test_fuzz_output.read(), tmp_dir)
result_files = ['bug_summary.txt']
self.assertCountEqual(os.listdir(tmp_dir), result_files)
# Compare the bug summaries.
with open(os.path.join(tmp_dir, 'bug_summary.txt')) as bug_summary:
detected_summary = bug_summary.read()
with open(test_summary_path) as bug_summary:
real_summary = bug_summary.read()
self.assertEqual(detected_summary, real_summary)
def test_parse_invalid_output(self):
"""Checks that no files are created when an invalid input was given."""
with tempfile.TemporaryDirectory() as tmp_dir:
stack_parser.parse_fuzzer_output(b'not a valid output_string', tmp_dir)
self.assertEqual(len(os.listdir(tmp_dir)), 0)
if __name__ == '__main__':
unittest.main()