Adding urllib3 requests coverage (#9793)

This change added a new fuzzer `fuzz_requests`. This fuzzes the
`.request` method in urllib3, which opens a lot more coverage.

<img width="653" alt="image"
src="https://user-images.githubusercontent.com/5122866/221026547-8097ef14-bc21-4e4e-a5ab-573048259576.png">

I have created a python `HTTPServer` that provides fuzzed responses to
all the different request types. And I try and drive different
combinations of requests including mixes of headers, bodies and request
types. This gives a nice improvement on current coverage levels [urllib3
fuzz inspector report
2023-02-23](https://storage.googleapis.com/oss-fuzz-introspector/urllib3/inspector-report/20230223/fuzz_report.html).
This commit is contained in:
SGills 2023-02-28 10:49:16 +00:00 committed by GitHub
parent edd3dc9d9a
commit 6d389f49db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 130 additions and 1 deletions

View File

@ -0,0 +1,129 @@
#!/usr/bin/python3
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import atheris
from http.server import BaseHTTPRequestHandler, HTTPServer
import sys
import threading
import time
with atheris.instrument_imports():
import urllib3
timeout = urllib3.util.Timeout(connect=1.0, read=1.0)
urllib_pool = urllib3.PoolManager(timeout=timeout)
PORT = 8011
GLOBAL_RESPONSE_MESSAGE = ""
GLOBAL_RESPONSE_CODE = 0
GLOBAL_CONTENT_ENCODING = None
class handler(BaseHTTPRequestHandler):
def send_fuzzed_response(self):
self.send_response(GLOBAL_RESPONSE_CODE)
self.send_header("content-type", "text/html")
if GLOBAL_CONTENT_ENCODING:
self.send_header("content-encoding", GLOBAL_CONTENT_ENCODING)
self.end_headers()
self.wfile.write(bytes(GLOBAL_RESPONSE_MESSAGE, "utf-8"))
def do_GET(self):
self.send_fuzzed_response()
def do_POST(self):
self.send_fuzzed_response()
def do_PUT(self):
self.send_fuzzed_response()
def do_PATCH(self):
self.send_fuzzed_response()
def do_OPTIONS(self):
self.send_fuzzed_response()
def do_DELETE(self):
self.send_fuzzed_response()
def do_HEAD(self):
self.send_fuzzed_response()
# Supress HTTP log output
def log_request(self, code="-", size="-"):
return
def run_webserver():
with HTTPServer(("", PORT), handler) as server:
server.serve_forever()
REQUEST_METHODS = ["POST", "GET", "HEAD", "PUT", "DELETE", "OPTIONS", "PATCH"]
CONTENT_ENCODING_TYPES = [None, "gzip", "deflate"]
def TestOneInput(input_bytes):
global GLOBAL_RESPONSE_MESSAGE, GLOBAL_RESPONSE_CODE, GLOBAL_CONTENT_ENCODING
fdp = atheris.FuzzedDataProvider(input_bytes)
# Fuzz Http Response
GLOBAL_RESPONSE_MESSAGE = fdp.ConsumeUnicodeNoSurrogates(sys.maxsize)
GLOBAL_RESPONSE_CODE = fdp.ConsumeIntInRange(200, 599)
GLOBAL_CONTENT_ENCODING = fdp.PickValueInList(CONTENT_ENCODING_TYPES)
# Fuzz Http Request
requestType = fdp.PickValueInList(REQUEST_METHODS)
# Optionally provide request headers
requestHeaders = urllib3._collections.HTTPHeaderDict({})
for i in range(0, fdp.ConsumeIntInRange(0, 10)):
requestHeaders.add(
fdp.ConsumeString(sys.maxsize), fdp.ConsumeString(sys.maxsize)
)
requestHeaders = None if fdp.ConsumeBool() else requestHeaders
# Optionally generate form data for request
formData = {}
for i in range(0, fdp.ConsumeIntInRange(0, 100)):
formData[fdp.ConsumeString(sys.maxsize)] = fdp.ConsumeString(sys.maxsize)
formData = None if fdp.ConsumeBool() else formData
# Optionally generate request body
requestBody = None if fdp.ConsumeBool() else fdp.ConsumeString(sys.maxsize)
r = urllib_pool.request(
requestType,
f"http://localhost:{PORT}/",
headers=requestHeaders,
fields=formData,
body=requestBody
)
r.status
r.data
r.headers
if __name__ == "__main__":
x = threading.Thread(target=run_webserver, daemon=True)
x.start()
time.sleep(0.5) # Short delay to start test server
atheris.Setup(sys.argv, TestOneInput)
atheris.Fuzz()

View File

@ -34,7 +34,7 @@ def TestOneInput(data):
return
def main():
atheris.Setup(sys.argv, TestOneInput, enable_python_coverage=True)
atheris.Setup(sys.argv, TestOneInput)
atheris.Fuzz()
if __name__ == "__main__":