Kafka support (#1506)

* Add kafka support

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Fix checks

* Fix checks

* Update kombu/transport/confluentkafka.py

Co-authored-by: Asif Saif Uddin <auvipy@gmail.com>

* Add customization ability

* Add docs

* Fix pypi integration tests

* Fix pydocstyle

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Cleanup

* Disable PyPy tests

* Fix tests

* Add docs

Co-authored-by: ilbo0518 <ilia.borsuk@netcracker.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Asif Saif Uddin <auvipy@gmail.com>
This commit is contained in:
CountRedClaw 2022-04-21 21:40:41 +04:00 committed by GitHub
parent 14d395aa85
commit 33e8e5a781
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 518 additions and 3 deletions

View File

@ -57,6 +57,9 @@ jobs:
run: tox -v -e ${{ env.PYTHON_VERSION }}-linux-integration-py-redis -- -v
- name: Run MongoDB integration tests
run: tox -v -e ${{ env.PYTHON_VERSION }}-linux-integration-py-mongodb -- -v
- name: Run kafka integration tests
if: ${{ env.PYTHON_VERSION != 'pypy3.7' && env.PYTHON_VERSION != 'pypy3.8' }}
run: tox -v -e ${{ env.PYTHON_VERSION }}-linux-integration-py-kafka -- -v
#################### Linters and checkers ####################
lint:

View File

@ -0,0 +1,31 @@
=========================================================
confluent-kafka Transport - ``kombu.transport.confluentkafka``
=========================================================
.. currentmodule:: kombu.transport.confluentkafka
.. automodule:: kombu.transport.confluentkafka
.. contents::
:local:
Transport
---------
.. autoclass:: Transport
:members:
:undoc-members:
Channel
-------
.. autoclass:: Channel
:members:
:undoc-members:
Message
-------
.. autoclass:: Message
:members:
:undoc-members:

View File

@ -23,6 +23,7 @@ TRANSPORT_ALIASES = {
'amqps': 'kombu.transport.pyamqp:SSLTransport',
'pyamqp': 'kombu.transport.pyamqp:Transport',
'librabbitmq': 'kombu.transport.librabbitmq:Transport',
'confluentkafka': 'kombu.transport.confluentkafka:Transport',
'memory': 'kombu.transport.memory:Transport',
'redis': 'kombu.transport.redis:Transport',
'rediss': 'kombu.transport.redis:Transport',

View File

@ -0,0 +1,379 @@
"""confluent-kafka transport module for Kombu.
Kafka transport using confluent-kafka library.
**References**
- http://docs.confluent.io/current/clients/confluent-kafka-python
**Limitations**
The confluent-kafka transport does not support PyPy environment.
Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: No
* Supports Priority: No
* Supports TTL: No
Connection String
=================
Connection string has the following format:
.. code-block::
confluentkafka://[USER:PASSWORD@]KAFKA_ADDRESS[:PORT]
Transport Options
=================
* ``connection_wait_time_seconds`` - Time in seconds to wait for connection
to succeed. Default ``5``
* ``wait_time_seconds`` - Time in seconds to wait to receive messages.
Default ``5``
* ``security_protocol`` - Protocol used to communicate with broker.
Visit https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for
an explanation of valid values. Default ``plaintext``
* ``sasl_mechanism`` - SASL mechanism to use for authentication.
Visit https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for
an explanation of valid values.
* ``num_partitions`` - Number of partitions to create. Default ``1``
* ``replication_factor`` - Replication factor of partitions. Default ``1``
* ``topic_config`` - Topic configuration. Must be a dict whose key-value pairs
correspond with attributes in the
http://kafka.apache.org/documentation.html#topicconfigs.
* ``kafka_common_config`` - Configuration applied to producer, consumer and
admin client. Must be a dict whose key-value pairs correspond with attributes
in the https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
* ``kafka_producer_config`` - Producer configuration. Must be a dict whose
key-value pairs correspond with attributes in the
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
* ``kafka_consumer_config`` - Consumer configuration. Must be a dict whose
key-value pairs correspond with attributes in the
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
* ``kafka_admin_config`` - Admin client configuration. Must be a dict whose
key-value pairs correspond with attributes in the
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
"""
from __future__ import annotations
from queue import Empty
from kombu.transport import virtual
from kombu.utils import cached_property
from kombu.utils.encoding import str_to_bytes
from kombu.utils.json import dumps, loads
try:
import confluent_kafka
from confluent_kafka import Consumer, Producer, TopicPartition
from confluent_kafka.admin import AdminClient, NewTopic
KAFKA_CONNECTION_ERRORS = ()
KAFKA_CHANNEL_ERRORS = ()
except ImportError:
confluent_kafka = None
KAFKA_CONNECTION_ERRORS = KAFKA_CHANNEL_ERRORS = ()
from kombu.log import get_logger
logger = get_logger(__name__)
DEFAULT_PORT = 9092
class NoBrokersAvailable(confluent_kafka.KafkaException):
"""Kafka broker is not available exception."""
retriable = True
class Message(virtual.Message):
"""Message object."""
def __init__(self, payload, channel=None, **kwargs):
self.topic = payload.get('topic')
super().__init__(payload, channel=channel, **kwargs)
class QoS(virtual.QoS):
"""Quality of Service guarantees."""
_not_yet_acked = {}
def can_consume(self):
"""Return true if the channel can be consumed from.
:returns: True, if this QoS object can accept a message.
:rtype: bool
"""
return not self.prefetch_count or len(self._not_yet_acked) < self \
.prefetch_count
def can_consume_max_estimate(self):
if self.prefetch_count:
return self.prefetch_count - len(self._not_yet_acked)
else:
return 1
def append(self, message, delivery_tag):
self._not_yet_acked[delivery_tag] = message
def get(self, delivery_tag):
return self._not_yet_acked[delivery_tag]
def ack(self, delivery_tag):
if delivery_tag not in self._not_yet_acked:
return
message = self._not_yet_acked.pop(delivery_tag)
consumer = self.channel._get_consumer(message.topic)
consumer.commit()
def reject(self, delivery_tag, requeue=False):
"""Reject a message by delivery tag.
If requeue is True, then the last consumed message is reverted so
it'll be refetched on the next attempt.
If False, that message is consumed and ignored.
"""
if requeue:
message = self._not_yet_acked.pop(delivery_tag)
consumer = self.channel._get_consumer(message.topic)
for assignment in consumer.assignment():
topic_partition = TopicPartition(message.topic,
assignment.partition)
[committed_offset] = consumer.committed([topic_partition])
consumer.seek(committed_offset)
else:
self.ack(delivery_tag)
def restore_unacked_once(self, stderr=None):
pass
class Channel(virtual.Channel):
"""Kafka Channel."""
QoS = QoS
Message = Message
default_wait_time_seconds = 5
default_connection_wait_time_seconds = 5
_client = None
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._kafka_consumers = {}
self._kafka_producers = {}
self._client = self._open()
def sanitize_queue_name(self, queue):
"""Need to sanitize the name, celery sometimes pushes in @ signs."""
return str(queue).replace('@', '')
def _get_producer(self, queue):
"""Create/get a producer instance for the given topic/queue."""
queue = self.sanitize_queue_name(queue)
producer = self._kafka_producers.get(queue, None)
if producer is None:
producer = Producer({
**self.common_config,
**(self.options.get('kafka_producer_config') or {}),
})
self._kafka_producers[queue] = producer
return producer
def _get_consumer(self, queue):
"""Create/get a consumer instance for the given topic/queue."""
queue = self.sanitize_queue_name(queue)
consumer = self._kafka_consumers.get(queue, None)
if consumer is None:
consumer = Consumer({
'group.id': f'{queue}-consumer-group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
**self.common_config,
**(self.options.get('kafka_consumer_config') or {}),
})
consumer.subscribe([queue])
self._kafka_consumers[queue] = consumer
return consumer
def _put(self, queue, message, **kwargs):
"""Put a message on the topic/queue."""
queue = self.sanitize_queue_name(queue)
producer = self._get_producer(queue)
producer.produce(queue, str_to_bytes(dumps(message)))
producer.flush()
def _get(self, queue, **kwargs):
"""Get a message from the topic/queue."""
queue = self.sanitize_queue_name(queue)
consumer = self._get_consumer(queue)
message = None
try:
message = consumer.poll(self.wait_time_seconds)
except StopIteration:
pass
if not message:
raise Empty()
error = message.error()
if error:
logger.error(error)
raise Empty()
return {**loads(message.value()), 'topic': message.topic()}
def _delete(self, queue, *args, **kwargs):
"""Delete a queue/topic."""
queue = self.sanitize_queue_name(queue)
self._kafka_consumers[queue].close()
self._kafka_consumers.pop(queue)
self.client.delete_topics([queue])
def _size(self, queue):
"""Get the number of pending messages in the topic/queue."""
queue = self.sanitize_queue_name(queue)
consumer = self._kafka_consumers.get(queue, None)
if consumer is None:
return 0
size = 0
for assignment in consumer.assignment():
topic_partition = TopicPartition(queue, assignment.partition)
(_, end_offset) = consumer.get_watermark_offsets(topic_partition)
[committed_offset] = consumer.committed([topic_partition])
size += end_offset - committed_offset.offset
return size
def _new_queue(self, queue, **kwargs):
"""Create a new topic if it does not exist."""
queue = self.sanitize_queue_name(queue)
if queue in self.client.list_topics().topics:
return
topic = NewTopic(
queue,
num_partitions=self.options.get('num_partitions', 1),
replication_factor=self.options.get('replication_factor', 1),
config=self.options.get('topic_config', {})
)
self.client.create_topics(new_topics=[topic])
def _has_queue(self, queue, **kwargs):
"""Check if a topic already exists."""
queue = self.sanitize_queue_name(queue)
return queue in self.client.list_topics().topics
def _open(self):
client = AdminClient({
**self.common_config,
**(self.options.get('kafka_admin_config') or {}),
})
try:
# seems to be the only way to check connection
client.list_topics(timeout=self.wait_time_seconds)
except confluent_kafka.KafkaException as e:
raise NoBrokersAvailable(e)
return client
@property
def client(self):
if self._client is None:
self._client = self._open()
return self._client
@property
def options(self):
return self.connection.client.transport_options
@property
def conninfo(self):
return self.connection.client
@cached_property
def wait_time_seconds(self):
return self.options.get(
'wait_time_seconds', self.default_wait_time_seconds
)
@cached_property
def connection_wait_time_seconds(self):
return self.options.get(
'connection_wait_time_seconds',
self.default_connection_wait_time_seconds,
)
@cached_property
def common_config(self):
conninfo = self.connection.client
config = {
'bootstrap.servers':
f'{conninfo.hostname}:{int(conninfo.port) or DEFAULT_PORT}',
}
security_protocol = self.options.get('security_protocol', 'plaintext')
if security_protocol.lower() != 'plaintext':
config.update({
'security.protocol': security_protocol,
'sasl.username': conninfo.userid,
'sasl.password': conninfo.password,
'sasl.mechanism': self.options.get('sasl_mechanism'),
})
config.update(self.options.get('kafka_common_config') or {})
return config
def close(self):
super().close()
self._kafka_producers = {}
for consumer in self._kafka_consumers.values():
consumer.close()
self._kafka_consumers = {}
class Transport(virtual.Transport):
"""Kafka Transport."""
def as_uri(self, uri: str, include_password=False, mask='**') -> str:
pass
Channel = Channel
default_port = DEFAULT_PORT
driver_type = 'kafka'
driver_name = 'confluentkafka'
recoverable_connection_errors = (
NoBrokersAvailable,
)
def __init__(self, client, **kwargs):
if confluent_kafka is None:
raise ImportError('The confluent-kafka library is not installed')
super().__init__(client, **kwargs)
def driver_version(self):
return confluent_kafka.__version__
def establish_connection(self):
return super().establish_connection()
def close_connection(self, connection):
return super().close_connection(connection)

View File

@ -0,0 +1 @@
confluent-kafka

View File

@ -11,4 +11,4 @@ codecov
-r extras/zookeeper.txt
-r extras/brotli.txt
-r extras/zstd.txt
-r extras/sqlalchemy.txt
-r extras/sqlalchemy.txt

View File

@ -11,4 +11,4 @@ codecov
-r extras/zookeeper.txt
-r extras/brotli.txt
-r extras/zstd.txt
-r extras/sqlalchemy.txt
-r extras/sqlalchemy.txt

View File

@ -0,0 +1,69 @@
from __future__ import annotations
import pytest
import kombu
from .common import (BaseExchangeTypes, BaseFailover, BaseMessage,
BasicFunctionality)
def get_connection(hostname, port):
return kombu.Connection(
f'confluentkafka://{hostname}:{port}',
)
def get_failover_connection(hostname, port):
return kombu.Connection(
f'confluentkafka://localhost:12345;confluentkafka://{hostname}:{port}',
connect_timeout=10,
)
@pytest.fixture()
def invalid_connection():
return kombu.Connection('confluentkafka://localhost:12345')
@pytest.fixture()
def connection():
return get_connection(
hostname='localhost',
port='9092'
)
@pytest.fixture()
def failover_connection():
return get_failover_connection(
hostname='localhost',
port='9092'
)
@pytest.mark.env('kafka')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_KafkaBasicFunctionality(BasicFunctionality):
pass
@pytest.mark.env('kafka')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_KafkaBaseExchangeTypes(BaseExchangeTypes):
@pytest.mark.skip('fanout is not implemented')
def test_fanout(self, connection):
pass
@pytest.mark.env('kafka')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_KafkaFailover(BaseFailover):
pass
@pytest.mark.env('kafka')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_KafkaMessage(BaseMessage):
pass

33
tox.ini
View File

@ -4,8 +4,8 @@ envlist =
{pypy3.7,pypy3.8,3.7,3.8,3.9,3.10}-linux-integration-py-amqp
{pypy3.7,pypy3.8,3.7,3.8,3.9,3.10}-linux-integration-redis
{pypy3.7,pypy3.8,3.7,3.8,3.9,3.10}-linux-integration-mongodb
{3.7,3.8,3.9,3.10}-linux-integration-kafka
flake8
flakeplus
apicheck
pydocstyle
@ -21,6 +21,7 @@ deps=
apicheck,pypy3.7,pypy3.8,3.7,3.8,3.9,3.10: -r{toxinidir}/requirements/default.txt
apicheck,pypy3.7,pypy3.8,3.7,3.8,3.9,3.10: -r{toxinidir}/requirements/test.txt
apicheck,pypy3.7,pypy3.8,3.7-linux,3.8-linux,3.9-linux,3.10-linux: -r{toxinidir}/requirements/test-ci.txt
apicheck,3.7-linux,3.8-linux,3.9-linux,3.10-linux: -r{toxinidir}/requirements/extras/confluentkafka.txt
3.7-windows,3.8-windows,3.9-windows,3.10-windows: -r{toxinidir}/requirements/test-ci-windows.txt
apicheck,linkcheck: -r{toxinidir}/requirements/docs.txt
flake8,flakeplus,pydocstyle,mypy: -r{toxinidir}/requirements/pkgutils.txt
@ -30,6 +31,7 @@ commands =
integration-py-amqp: py.test -xv -E py-amqp t/integration {posargs:-n2}
integration-redis: py.test -xv -E redis t/integration {posargs:-n2}
integration-mongodb: py.test -xv -E mongodb t/integration {posargs:-n2}
integration-kafka: py.test -xv -E kafka t/integration {posargs:-n2}
basepython =
pypy3.7: pypy3.7
@ -45,6 +47,8 @@ docker =
integration-py-amqp: rabbitmq
integration-redis: redis
integration-mongodb: mongodb
integration-kafka: zookeeper
integration-kafka: kafka
dockerenv =
PYAMQP_INTEGRATION_INSTANCE=1
@ -77,6 +81,33 @@ healthcheck_timeout = 10
healthcheck_retries = 30
healthcheck_start_period = 5
[docker:zookeeper]
image = bitnami/zookeeper:latest
ports = 2181:2181/tcp
healthcheck_interval = 10
healthcheck_timeout = 10
healthcheck_retries = 30
healthcheck_start_period = 5
environment = ALLOW_ANONYMOUS_LOGIN=yes
[docker:kafka]
image = bitnami/kafka:latest
ports =
9092:9092/tcp
healthcheck_cmd = /bin/bash -c 'kafka-topics.sh --list --bootstrap-server 127.0.0.1:9092'
healthcheck_interval = 10
healthcheck_timeout = 10
healthcheck_retries = 30
healthcheck_start_period = 5
links =
zookeeper:zookeeper
environment =
KAFKA_BROKER_ID=1
KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
ALLOW_PLAINTEXT_LISTENER=yes
[testenv:apicheck]
commands = pip install -U -r{toxinidir}/requirements/dev.txt
sphinx-build -j2 -b apicheck -d {envtmpdir}/doctrees docs docs/_build/apicheck