msal: refine fuzzers (#7958)

Fixes https://bugs.chromium.org/p/oss-fuzz/issues/detail?id=48789
This commit is contained in:
DavidKorczynski 2022-07-06 15:16:37 +01:00 committed by GitHub
parent 5d111cce14
commit 7c0746c361
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 51 additions and 12 deletions

View File

@ -18,5 +18,4 @@ RUN pip3 install --upgrade pip
RUN git clone https://github.com/AzureAD/microsoft-authentication-library-for-python msal
WORKDIR msal
#COPY build.sh task_fuzz.py parse_fuzz.py $SRC/
COPY build.sh fuzz_*.py $SRC/

View File

@ -1,5 +1,4 @@
#!/usr/bin/python3
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -15,11 +14,48 @@
# limitations under the License.
import atheris
import sys
import requests
with atheris.instrument_imports():
from msal import PublicClientApplication
from msal.application import extract_certs
from msal.authority import AuthorityBuilder
# FuzzHttpClient inspired by MinimalHttpClient from msal unit tests
class FuzzHttpClient:
"""HTTP client returning data seeded by the fuzzer and no real connections"""
def __init__(self, fdp, verify=True, proxies=None, timeout=None):
# We keep these variables from the unit test implementation
# in case some of the MSAL code uses it.
self.session = requests.Session()
self.session.verify = verify
self.session.proxies = proxies
self.timeout = timeout
self.fdp = fdp
def post(self, url, params=None, data=None, headers=None, **kwargs):
return FuzzResponse(fdp = self.fdp)
def get(self, url, params=None, headers=None, **kwargs):
return MinimalResponse(fdp = self.fdp)
def close(self):
self.session.close()
class FuzzResponse(object):
def __init__(self, fdp, requests_resp=None, status_code=None, text=None):
# Over-approximate responses by creating a random Response object
self._raw_resp = requests.Response()
self.fdp = fdp
self._raw_resp.status_code = self.fdp.ConsumeIntInRange(100, 599)
self.text = self.fdp.ConsumeString(500)
self.status_code = self._raw_resp.status_code
def raise_for_status(self):
if self._raw_resp is not None:
self._raw_resp.raise_for_status()
def is_expected(error_list,error_msg):
for error in error_list:
if error in error_msg:
@ -29,13 +65,14 @@ def is_expected(error_list,error_msg):
def TestInput(input_bytes):
if len(input_bytes)<32:
return
fdp = atheris.FuzzedDataProvider(input_bytes)
authority = AuthorityBuilder(fdp.ConsumeString(50),fdp.ConsumeString(50))
try:
app = PublicClientApplication(client_id=fdp.ConsumeString(32),authority=authority)
app = PublicClientApplication(
client_id=fdp.ConsumeString(32),
authority=authority,
http_client=FuzzHttpClient(fdp) # Use fake Fuzz HTTP client
)
app.get_accounts()
except (ValueError,KeyError) as e:
error_list = [

View File

@ -53,9 +53,7 @@ def TestInput(input_bytes):
return
fdp = atheris.FuzzedDataProvider(input_bytes)
cache = TokenCache()
client_id = fdp.ConsumeString(32)
try:
token = build_token(
@ -67,10 +65,15 @@ def TestInput(input_bytes):
"client_id": client_id,
"scope": ["s2", "s1", "s3"],
"token_endpoint": "https://%s"%fdp.ConsumeString(20),
"response": build_response(token_type=fdp.ConsumeString(5),
uid=fdp.ConsumeString(5), utid=fdp.ConsumeString(5),
expires_in=3600, access_token=fdp.ConsumeString(10),
id_token=token, refresh_token=fdp.ConsumeString(10)),
"response": build_response(
token_type=fdp.ConsumeString(5),
uid=fdp.ConsumeString(5),
utid=fdp.ConsumeString(5),
expires_in=3600,
access_token=fdp.ConsumeString(10),
id_token=token,
refresh_token=fdp.ConsumeString(10)
),
}, now=1000)
except ValueError as e:
error_list = [