kafka: fix consumer (#8491)

Fixes: https://bugs.chromium.org/p/oss-fuzz/issues/detail?id=51338
This commit is contained in:
DavidKorczynski 2022-09-14 11:17:56 +01:00 committed by GitHub
parent 5fb1c5ecf8
commit 9b3874af31
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 30 additions and 11 deletions

View File

@ -16,7 +16,7 @@
FROM gcr.io/oss-fuzz-base/base-builder-python
RUN apt-get update -y && apt-get install wget
RUN apt-get update -y && apt-get install -y wget software-properties-common
RUN wget -qO - https://packages.confluent.io/deb/7.0/archive.key | apt-key add -
RUN add-apt-repository "deb https://packages.confluent.io/clients/deb $(lsb_release -cs) main"
RUN apt-get update -y && apt-get install librdkafka-dev software-properties-common lsb-release gcc make python3-dev libsasl2-modules-gssapi-mit krb5-user -y

View File

@ -15,8 +15,8 @@
import atheris
import sys
with atheris.instrument_imports():
from confluent_kafka import Consumer, KafkaException
from confluent_kafka import Consumer, KafkaException, TopicPartition
def TestInput(data):
fdp = atheris.FuzzedDataProvider(data)
@ -24,20 +24,36 @@ def TestInput(data):
def dummy_callback(err, partitions):
pass
c = Consumer({
'group.id': fdp.ConsumeString(10),
'socket.timeout.ms': fdp.ConsumeIntInRange(10,2000),
'session.timeout.ms': fdp.ConsumeIntInRange(10,2000),
'on_commit': dummy_callback})
try:
c = Consumer({
'group.id': fdp.ConsumeString(10),
'socket.timeout.ms': fdp.ConsumeIntInRange(10,2000),
'session.timeout.ms': fdp.ConsumeIntInRange(10,2000),
'on_commit': dummy_callback})
except Exception as e:
# If the consumer fails, we just retry
return
try:
c.subscribe([fdp.ConsumeString(10)], on_assign=dummy_callback, on_revoke=dummy_callback)
c.subscribe(
[fdp.ConsumeString(10)],
on_assign=dummy_callback,
on_revoke=dummy_callback
)
c.unsubscribe()
msg = c.poll(timeout=0.001)
msglist = c.consume(num_messages=fdp.ConsumeIntInRange(1,10), timeout=0.001)
msglist = c.consume(
num_messages=fdp.ConsumeIntInRange(1,10),
timeout=0.001
)
partitions = list(map(lambda part: TopicPartition(fdp.ConsumeString(10), part), range(0, 100, 3)))
partitions = list(
map(
lambda part: TopicPartition(fdp.ConsumeString(10), part),
range(0, 100, 3)
)
)
c.assign(partitions)
c.unassign()
@ -51,9 +67,12 @@ def TestInput(data):
c.close()
def main():
atheris.instrument_all()
atheris.Setup(sys.argv, TestInput, enable_python_coverage=True)
atheris.Fuzz()
if __name__ == "__main__":
main()