From 9b3874af3146a58c8f4cd2aebbfd79e6c43b89d9 Mon Sep 17 00:00:00 2001 From: DavidKorczynski Date: Wed, 14 Sep 2022 11:17:56 +0100 Subject: [PATCH] kafka: fix consumer (#8491) Fixes: https://bugs.chromium.org/p/oss-fuzz/issues/detail?id=51338 --- projects/kafka/Dockerfile | 2 +- projects/kafka/fuzz_consumer.py | 39 ++++++++++++++++++++++++--------- 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/projects/kafka/Dockerfile b/projects/kafka/Dockerfile index dcb29f8d9..66d59a658 100644 --- a/projects/kafka/Dockerfile +++ b/projects/kafka/Dockerfile @@ -16,7 +16,7 @@ FROM gcr.io/oss-fuzz-base/base-builder-python -RUN apt-get update -y && apt-get install wget +RUN apt-get update -y && apt-get install -y wget software-properties-common RUN wget -qO - https://packages.confluent.io/deb/7.0/archive.key | apt-key add - RUN add-apt-repository "deb https://packages.confluent.io/clients/deb $(lsb_release -cs) main" RUN apt-get update -y && apt-get install librdkafka-dev software-properties-common lsb-release gcc make python3-dev libsasl2-modules-gssapi-mit krb5-user -y diff --git a/projects/kafka/fuzz_consumer.py b/projects/kafka/fuzz_consumer.py index 130355c1d..efbb57a16 100644 --- a/projects/kafka/fuzz_consumer.py +++ b/projects/kafka/fuzz_consumer.py @@ -15,8 +15,8 @@ import atheris import sys -with atheris.instrument_imports(): - from confluent_kafka import Consumer, KafkaException +from confluent_kafka import Consumer, KafkaException, TopicPartition + def TestInput(data): fdp = atheris.FuzzedDataProvider(data) @@ -24,20 +24,36 @@ def TestInput(data): def dummy_callback(err, partitions): pass - c = Consumer({ - 'group.id': fdp.ConsumeString(10), - 'socket.timeout.ms': fdp.ConsumeIntInRange(10,2000), - 'session.timeout.ms': fdp.ConsumeIntInRange(10,2000), - 'on_commit': dummy_callback}) + try: + c = Consumer({ + 'group.id': fdp.ConsumeString(10), + 'socket.timeout.ms': fdp.ConsumeIntInRange(10,2000), + 'session.timeout.ms': fdp.ConsumeIntInRange(10,2000), + 'on_commit': dummy_callback}) + except Exception as e: + # If the consumer fails, we just retry + return try: - c.subscribe([fdp.ConsumeString(10)], on_assign=dummy_callback, on_revoke=dummy_callback) + c.subscribe( + [fdp.ConsumeString(10)], + on_assign=dummy_callback, + on_revoke=dummy_callback + ) c.unsubscribe() msg = c.poll(timeout=0.001) - msglist = c.consume(num_messages=fdp.ConsumeIntInRange(1,10), timeout=0.001) + msglist = c.consume( + num_messages=fdp.ConsumeIntInRange(1,10), + timeout=0.001 + ) - partitions = list(map(lambda part: TopicPartition(fdp.ConsumeString(10), part), range(0, 100, 3))) + partitions = list( + map( + lambda part: TopicPartition(fdp.ConsumeString(10), part), + range(0, 100, 3) + ) + ) c.assign(partitions) c.unassign() @@ -51,9 +67,12 @@ def TestInput(data): c.close() + def main(): + atheris.instrument_all() atheris.Setup(sys.argv, TestInput, enable_python_coverage=True) atheris.Fuzz() + if __name__ == "__main__": main()