mirror of https://github.com/google/oss-fuzz.git
g-cloud-logging: refine fuzzing set up (#8256)
Add mocking to resources fuzzer and simplify others.
This commit is contained in:
parent
5260d875b2
commit
6929137715
|
@ -17,6 +17,7 @@
|
|||
FROM gcr.io/oss-fuzz-base/base-builder-python
|
||||
|
||||
RUN git clone https://github.com/googleapis/python-logging gcloud-logging
|
||||
RUN pip3 install --upgrade pip mock
|
||||
WORKDIR gcloud-logging
|
||||
|
||||
COPY build.sh fuzz_*.py $SRC/
|
||||
|
|
|
@ -15,10 +15,10 @@
|
|||
|
||||
import atheris
|
||||
import sys
|
||||
with atheris.instrument_imports():
|
||||
import google.cloud.logging_v2.entries as entries
|
||||
from google.cloud.logging_v2.client import Client
|
||||
from google.cloud.logging_v2.resource import Resource
|
||||
|
||||
import google.cloud.logging_v2.entries as entries
|
||||
from google.cloud.logging_v2.client import Client
|
||||
from google.cloud.logging_v2.resource import Resource
|
||||
|
||||
def create_dummy_log_entry(fdp):
|
||||
return entries.LogEntry(
|
||||
|
@ -72,6 +72,7 @@ def TestInput(data):
|
|||
raise e
|
||||
|
||||
def main():
|
||||
atheris.instrument_all()
|
||||
atheris.Setup(sys.argv, TestInput, enable_python_coverage=True)
|
||||
atheris.Fuzz()
|
||||
|
||||
|
|
|
@ -15,24 +15,26 @@
|
|||
|
||||
import atheris
|
||||
import sys
|
||||
with atheris.instrument_imports():
|
||||
import google.cloud.logging_v2._helpers as helpers
|
||||
import google.cloud.logging_v2.handlers._helpers as handlers_helpers
|
||||
|
||||
import google.cloud.logging_v2._helpers as helpers
|
||||
import google.cloud.logging_v2.handlers._helpers as handlers_helpers
|
||||
|
||||
|
||||
def TestInput(data):
|
||||
fdp = atheris.FuzzedDataProvider(data)
|
||||
|
||||
helpers.retrieve_metadata_server(fdp.ConsumeString(100))
|
||||
helpers._normalize_severity(fdp.ConsumeInt(100))
|
||||
helpers._add_defaults_to_filter(fdp.ConsumeString(100))
|
||||
|
||||
handlers_helpers.get_request_data_from_flask()
|
||||
handlers_helpers.get_request_data_from_django()
|
||||
handlers_helpers._parse_trace_parent(fdp.ConsumeString(100))
|
||||
handlers_helpers._parse_xcloud_trace(fdp.ConsumeString(100))
|
||||
handlers_helpers.get_request_data()
|
||||
op = fdp.ConsumeIntInRange(0, 4)
|
||||
if op == 0:
|
||||
helpers._normalize_severity(fdp.ConsumeInt(sys.maxsize))
|
||||
elif op == 1:
|
||||
helpers._add_defaults_to_filter(fdp.ConsumeUnicodeNoSurrogates(40))
|
||||
elif op == 2:
|
||||
handlers_helpers._parse_trace_parent(fdp.ConsumeUnicodeNoSurrogates(300))
|
||||
else:
|
||||
handlers_helpers._parse_xcloud_trace(fdp.ConsumeUnicodeNoSurrogates(300))
|
||||
|
||||
def main():
|
||||
atheris.instrument_all()
|
||||
atheris.Setup(sys.argv, TestInput, enable_python_coverage=True)
|
||||
atheris.Fuzz()
|
||||
|
||||
|
|
|
@ -15,19 +15,35 @@
|
|||
|
||||
import atheris
|
||||
import sys
|
||||
import mock
|
||||
|
||||
with atheris.instrument_imports():
|
||||
import google.cloud.logging_v2.handlers._monitored_resources as resources
|
||||
from google.cloud.logging_v2.handlers import _monitored_resources
|
||||
|
||||
|
||||
global_fdp = None
|
||||
def mock_retrieve_metadata_server(endpoint):
|
||||
"""Mock for retrieve_metadata_server"""
|
||||
if global_fdp is None:
|
||||
return None
|
||||
if global_fdp.ConsumeIntInRange(1, 10) < 3:
|
||||
return None
|
||||
return global_fdp.ConsumeUnicodeNoSurrogates(30)
|
||||
|
||||
def TestInput(data):
|
||||
fdp = atheris.FuzzedDataProvider(data)
|
||||
global global_fdp
|
||||
global_fdp = atheris.FuzzedDataProvider(data)
|
||||
|
||||
# Mock the metadata server to avoid connections. The
|
||||
# retrieve_metadata_server will return fuzzer-seeded data.
|
||||
patch = mock.patch(
|
||||
"google.cloud.logging_v2.handlers._monitored_resources.retrieve_metadata_server",
|
||||
wraps=mock_retrieve_metadata_server,
|
||||
)
|
||||
# TODO: randomise relevant environment variables.
|
||||
with patch:
|
||||
_monitored_resources.detect_resource()
|
||||
|
||||
resources._create_functions_resource()
|
||||
resources._create_kubernetes_resource()
|
||||
resources._create_compute_resource()
|
||||
resources._create_cloud_run_resource()
|
||||
resources._create_app_engine_resource()
|
||||
resources._create_global_resource(fdp.ConsumeString(100))
|
||||
resources.detect_resource(fdp.ConsumeString(100))
|
||||
|
||||
def main():
|
||||
atheris.Setup(sys.argv, TestInput, enable_python_coverage=True)
|
||||
|
|
Loading…
Reference in New Issue