diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 5d118055..74976426 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -57,6 +57,9 @@ jobs: run: tox -v -e ${{ env.PYTHON_VERSION }}-linux-integration-py-redis -- -v - name: Run MongoDB integration tests run: tox -v -e ${{ env.PYTHON_VERSION }}-linux-integration-py-mongodb -- -v + - name: Run kafka integration tests + if: ${{ env.PYTHON_VERSION != 'pypy3.7' && env.PYTHON_VERSION != 'pypy3.8' }} + run: tox -v -e ${{ env.PYTHON_VERSION }}-linux-integration-py-kafka -- -v #################### Linters and checkers #################### lint: diff --git a/docs/reference/kombu.transport.confluentkafka.rst b/docs/reference/kombu.transport.confluentkafka.rst new file mode 100644 index 00000000..3b171a28 --- /dev/null +++ b/docs/reference/kombu.transport.confluentkafka.rst @@ -0,0 +1,31 @@ +========================================================= + confluent-kafka Transport - ``kombu.transport.confluentkafka`` +========================================================= + +.. currentmodule:: kombu.transport.confluentkafka + +.. automodule:: kombu.transport.confluentkafka + + .. contents:: + :local: + + Transport + --------- + + .. autoclass:: Transport + :members: + :undoc-members: + + Channel + ------- + + .. autoclass:: Channel + :members: + :undoc-members: + + Message + ------- + + .. autoclass:: Message + :members: + :undoc-members: diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py index e5631b48..8a217691 100644 --- a/kombu/transport/__init__.py +++ b/kombu/transport/__init__.py @@ -23,6 +23,7 @@ TRANSPORT_ALIASES = { 'amqps': 'kombu.transport.pyamqp:SSLTransport', 'pyamqp': 'kombu.transport.pyamqp:Transport', 'librabbitmq': 'kombu.transport.librabbitmq:Transport', + 'confluentkafka': 'kombu.transport.confluentkafka:Transport', 'memory': 'kombu.transport.memory:Transport', 'redis': 'kombu.transport.redis:Transport', 'rediss': 'kombu.transport.redis:Transport', diff --git a/kombu/transport/confluentkafka.py b/kombu/transport/confluentkafka.py new file mode 100644 index 00000000..5332a310 --- /dev/null +++ b/kombu/transport/confluentkafka.py @@ -0,0 +1,379 @@ +"""confluent-kafka transport module for Kombu. + +Kafka transport using confluent-kafka library. + +**References** + +- http://docs.confluent.io/current/clients/confluent-kafka-python + +**Limitations** + +The confluent-kafka transport does not support PyPy environment. + +Features +======== +* Type: Virtual +* Supports Direct: Yes +* Supports Topic: Yes +* Supports Fanout: No +* Supports Priority: No +* Supports TTL: No + +Connection String +================= +Connection string has the following format: + +.. code-block:: + + confluentkafka://[USER:PASSWORD@]KAFKA_ADDRESS[:PORT] + +Transport Options +================= +* ``connection_wait_time_seconds`` - Time in seconds to wait for connection + to succeed. Default ``5`` +* ``wait_time_seconds`` - Time in seconds to wait to receive messages. + Default ``5`` +* ``security_protocol`` - Protocol used to communicate with broker. + Visit https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for + an explanation of valid values. Default ``plaintext`` +* ``sasl_mechanism`` - SASL mechanism to use for authentication. + Visit https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for + an explanation of valid values. +* ``num_partitions`` - Number of partitions to create. Default ``1`` +* ``replication_factor`` - Replication factor of partitions. Default ``1`` +* ``topic_config`` - Topic configuration. Must be a dict whose key-value pairs + correspond with attributes in the + http://kafka.apache.org/documentation.html#topicconfigs. +* ``kafka_common_config`` - Configuration applied to producer, consumer and + admin client. Must be a dict whose key-value pairs correspond with attributes + in the https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. +* ``kafka_producer_config`` - Producer configuration. Must be a dict whose + key-value pairs correspond with attributes in the + https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. +* ``kafka_consumer_config`` - Consumer configuration. Must be a dict whose + key-value pairs correspond with attributes in the + https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. +* ``kafka_admin_config`` - Admin client configuration. Must be a dict whose + key-value pairs correspond with attributes in the + https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. +""" + +from __future__ import annotations + +from queue import Empty + +from kombu.transport import virtual +from kombu.utils import cached_property +from kombu.utils.encoding import str_to_bytes +from kombu.utils.json import dumps, loads + +try: + import confluent_kafka + from confluent_kafka import Consumer, Producer, TopicPartition + from confluent_kafka.admin import AdminClient, NewTopic + + KAFKA_CONNECTION_ERRORS = () + KAFKA_CHANNEL_ERRORS = () + +except ImportError: + confluent_kafka = None + KAFKA_CONNECTION_ERRORS = KAFKA_CHANNEL_ERRORS = () + +from kombu.log import get_logger + +logger = get_logger(__name__) + +DEFAULT_PORT = 9092 + + +class NoBrokersAvailable(confluent_kafka.KafkaException): + """Kafka broker is not available exception.""" + + retriable = True + + +class Message(virtual.Message): + """Message object.""" + + def __init__(self, payload, channel=None, **kwargs): + self.topic = payload.get('topic') + super().__init__(payload, channel=channel, **kwargs) + + +class QoS(virtual.QoS): + """Quality of Service guarantees.""" + + _not_yet_acked = {} + + def can_consume(self): + """Return true if the channel can be consumed from. + + :returns: True, if this QoS object can accept a message. + :rtype: bool + """ + return not self.prefetch_count or len(self._not_yet_acked) < self \ + .prefetch_count + + def can_consume_max_estimate(self): + if self.prefetch_count: + return self.prefetch_count - len(self._not_yet_acked) + else: + return 1 + + def append(self, message, delivery_tag): + self._not_yet_acked[delivery_tag] = message + + def get(self, delivery_tag): + return self._not_yet_acked[delivery_tag] + + def ack(self, delivery_tag): + if delivery_tag not in self._not_yet_acked: + return + message = self._not_yet_acked.pop(delivery_tag) + consumer = self.channel._get_consumer(message.topic) + consumer.commit() + + def reject(self, delivery_tag, requeue=False): + """Reject a message by delivery tag. + + If requeue is True, then the last consumed message is reverted so + it'll be refetched on the next attempt. + If False, that message is consumed and ignored. + """ + if requeue: + message = self._not_yet_acked.pop(delivery_tag) + consumer = self.channel._get_consumer(message.topic) + for assignment in consumer.assignment(): + topic_partition = TopicPartition(message.topic, + assignment.partition) + [committed_offset] = consumer.committed([topic_partition]) + consumer.seek(committed_offset) + else: + self.ack(delivery_tag) + + def restore_unacked_once(self, stderr=None): + pass + + +class Channel(virtual.Channel): + """Kafka Channel.""" + + QoS = QoS + Message = Message + + default_wait_time_seconds = 5 + default_connection_wait_time_seconds = 5 + _client = None + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self._kafka_consumers = {} + self._kafka_producers = {} + + self._client = self._open() + + def sanitize_queue_name(self, queue): + """Need to sanitize the name, celery sometimes pushes in @ signs.""" + return str(queue).replace('@', '') + + def _get_producer(self, queue): + """Create/get a producer instance for the given topic/queue.""" + queue = self.sanitize_queue_name(queue) + producer = self._kafka_producers.get(queue, None) + if producer is None: + producer = Producer({ + **self.common_config, + **(self.options.get('kafka_producer_config') or {}), + }) + self._kafka_producers[queue] = producer + + return producer + + def _get_consumer(self, queue): + """Create/get a consumer instance for the given topic/queue.""" + queue = self.sanitize_queue_name(queue) + consumer = self._kafka_consumers.get(queue, None) + if consumer is None: + consumer = Consumer({ + 'group.id': f'{queue}-consumer-group', + 'auto.offset.reset': 'earliest', + 'enable.auto.commit': False, + **self.common_config, + **(self.options.get('kafka_consumer_config') or {}), + }) + consumer.subscribe([queue]) + self._kafka_consumers[queue] = consumer + + return consumer + + def _put(self, queue, message, **kwargs): + """Put a message on the topic/queue.""" + queue = self.sanitize_queue_name(queue) + producer = self._get_producer(queue) + producer.produce(queue, str_to_bytes(dumps(message))) + producer.flush() + + def _get(self, queue, **kwargs): + """Get a message from the topic/queue.""" + queue = self.sanitize_queue_name(queue) + consumer = self._get_consumer(queue) + message = None + + try: + message = consumer.poll(self.wait_time_seconds) + except StopIteration: + pass + + if not message: + raise Empty() + + error = message.error() + if error: + logger.error(error) + raise Empty() + + return {**loads(message.value()), 'topic': message.topic()} + + def _delete(self, queue, *args, **kwargs): + """Delete a queue/topic.""" + queue = self.sanitize_queue_name(queue) + self._kafka_consumers[queue].close() + self._kafka_consumers.pop(queue) + self.client.delete_topics([queue]) + + def _size(self, queue): + """Get the number of pending messages in the topic/queue.""" + queue = self.sanitize_queue_name(queue) + + consumer = self._kafka_consumers.get(queue, None) + if consumer is None: + return 0 + + size = 0 + for assignment in consumer.assignment(): + topic_partition = TopicPartition(queue, assignment.partition) + (_, end_offset) = consumer.get_watermark_offsets(topic_partition) + [committed_offset] = consumer.committed([topic_partition]) + size += end_offset - committed_offset.offset + return size + + def _new_queue(self, queue, **kwargs): + """Create a new topic if it does not exist.""" + queue = self.sanitize_queue_name(queue) + if queue in self.client.list_topics().topics: + return + + topic = NewTopic( + queue, + num_partitions=self.options.get('num_partitions', 1), + replication_factor=self.options.get('replication_factor', 1), + config=self.options.get('topic_config', {}) + ) + self.client.create_topics(new_topics=[topic]) + + def _has_queue(self, queue, **kwargs): + """Check if a topic already exists.""" + queue = self.sanitize_queue_name(queue) + return queue in self.client.list_topics().topics + + def _open(self): + client = AdminClient({ + **self.common_config, + **(self.options.get('kafka_admin_config') or {}), + }) + + try: + # seems to be the only way to check connection + client.list_topics(timeout=self.wait_time_seconds) + except confluent_kafka.KafkaException as e: + raise NoBrokersAvailable(e) + + return client + + @property + def client(self): + if self._client is None: + self._client = self._open() + return self._client + + @property + def options(self): + return self.connection.client.transport_options + + @property + def conninfo(self): + return self.connection.client + + @cached_property + def wait_time_seconds(self): + return self.options.get( + 'wait_time_seconds', self.default_wait_time_seconds + ) + + @cached_property + def connection_wait_time_seconds(self): + return self.options.get( + 'connection_wait_time_seconds', + self.default_connection_wait_time_seconds, + ) + + @cached_property + def common_config(self): + conninfo = self.connection.client + config = { + 'bootstrap.servers': + f'{conninfo.hostname}:{int(conninfo.port) or DEFAULT_PORT}', + } + security_protocol = self.options.get('security_protocol', 'plaintext') + if security_protocol.lower() != 'plaintext': + config.update({ + 'security.protocol': security_protocol, + 'sasl.username': conninfo.userid, + 'sasl.password': conninfo.password, + 'sasl.mechanism': self.options.get('sasl_mechanism'), + }) + + config.update(self.options.get('kafka_common_config') or {}) + return config + + def close(self): + super().close() + self._kafka_producers = {} + + for consumer in self._kafka_consumers.values(): + consumer.close() + + self._kafka_consumers = {} + + +class Transport(virtual.Transport): + """Kafka Transport.""" + + def as_uri(self, uri: str, include_password=False, mask='**') -> str: + pass + + Channel = Channel + + default_port = DEFAULT_PORT + + driver_type = 'kafka' + driver_name = 'confluentkafka' + + recoverable_connection_errors = ( + NoBrokersAvailable, + ) + + def __init__(self, client, **kwargs): + if confluent_kafka is None: + raise ImportError('The confluent-kafka library is not installed') + super().__init__(client, **kwargs) + + def driver_version(self): + return confluent_kafka.__version__ + + def establish_connection(self): + return super().establish_connection() + + def close_connection(self, connection): + return super().close_connection(connection) diff --git a/requirements/extras/confluentkafka.txt b/requirements/extras/confluentkafka.txt new file mode 100644 index 00000000..52bd32e9 --- /dev/null +++ b/requirements/extras/confluentkafka.txt @@ -0,0 +1 @@ +confluent-kafka \ No newline at end of file diff --git a/requirements/test-ci-windows.txt b/requirements/test-ci-windows.txt index 264b39ca..c4764a4b 100644 --- a/requirements/test-ci-windows.txt +++ b/requirements/test-ci-windows.txt @@ -11,4 +11,4 @@ codecov -r extras/zookeeper.txt -r extras/brotli.txt -r extras/zstd.txt --r extras/sqlalchemy.txt +-r extras/sqlalchemy.txt \ No newline at end of file diff --git a/requirements/test-ci.txt b/requirements/test-ci.txt index 4c07caa3..6e461a92 100644 --- a/requirements/test-ci.txt +++ b/requirements/test-ci.txt @@ -11,4 +11,4 @@ codecov -r extras/zookeeper.txt -r extras/brotli.txt -r extras/zstd.txt --r extras/sqlalchemy.txt +-r extras/sqlalchemy.txt \ No newline at end of file diff --git a/t/integration/test_kafka.py b/t/integration/test_kafka.py new file mode 100644 index 00000000..2303d887 --- /dev/null +++ b/t/integration/test_kafka.py @@ -0,0 +1,69 @@ +from __future__ import annotations + +import pytest + +import kombu + +from .common import (BaseExchangeTypes, BaseFailover, BaseMessage, + BasicFunctionality) + + +def get_connection(hostname, port): + return kombu.Connection( + f'confluentkafka://{hostname}:{port}', + ) + + +def get_failover_connection(hostname, port): + return kombu.Connection( + f'confluentkafka://localhost:12345;confluentkafka://{hostname}:{port}', + connect_timeout=10, + ) + + +@pytest.fixture() +def invalid_connection(): + return kombu.Connection('confluentkafka://localhost:12345') + + +@pytest.fixture() +def connection(): + return get_connection( + hostname='localhost', + port='9092' + ) + + +@pytest.fixture() +def failover_connection(): + return get_failover_connection( + hostname='localhost', + port='9092' + ) + + +@pytest.mark.env('kafka') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_KafkaBasicFunctionality(BasicFunctionality): + pass + + +@pytest.mark.env('kafka') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_KafkaBaseExchangeTypes(BaseExchangeTypes): + + @pytest.mark.skip('fanout is not implemented') + def test_fanout(self, connection): + pass + + +@pytest.mark.env('kafka') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_KafkaFailover(BaseFailover): + pass + + +@pytest.mark.env('kafka') +@pytest.mark.flaky(reruns=5, reruns_delay=2) +class test_KafkaMessage(BaseMessage): + pass diff --git a/tox.ini b/tox.ini index 99988014..b465f45d 100644 --- a/tox.ini +++ b/tox.ini @@ -4,8 +4,8 @@ envlist = {pypy3.7,pypy3.8,3.7,3.8,3.9,3.10}-linux-integration-py-amqp {pypy3.7,pypy3.8,3.7,3.8,3.9,3.10}-linux-integration-redis {pypy3.7,pypy3.8,3.7,3.8,3.9,3.10}-linux-integration-mongodb + {3.7,3.8,3.9,3.10}-linux-integration-kafka flake8 - flakeplus apicheck pydocstyle @@ -21,6 +21,7 @@ deps= apicheck,pypy3.7,pypy3.8,3.7,3.8,3.9,3.10: -r{toxinidir}/requirements/default.txt apicheck,pypy3.7,pypy3.8,3.7,3.8,3.9,3.10: -r{toxinidir}/requirements/test.txt apicheck,pypy3.7,pypy3.8,3.7-linux,3.8-linux,3.9-linux,3.10-linux: -r{toxinidir}/requirements/test-ci.txt + apicheck,3.7-linux,3.8-linux,3.9-linux,3.10-linux: -r{toxinidir}/requirements/extras/confluentkafka.txt 3.7-windows,3.8-windows,3.9-windows,3.10-windows: -r{toxinidir}/requirements/test-ci-windows.txt apicheck,linkcheck: -r{toxinidir}/requirements/docs.txt flake8,flakeplus,pydocstyle,mypy: -r{toxinidir}/requirements/pkgutils.txt @@ -30,6 +31,7 @@ commands = integration-py-amqp: py.test -xv -E py-amqp t/integration {posargs:-n2} integration-redis: py.test -xv -E redis t/integration {posargs:-n2} integration-mongodb: py.test -xv -E mongodb t/integration {posargs:-n2} + integration-kafka: py.test -xv -E kafka t/integration {posargs:-n2} basepython = pypy3.7: pypy3.7 @@ -45,6 +47,8 @@ docker = integration-py-amqp: rabbitmq integration-redis: redis integration-mongodb: mongodb + integration-kafka: zookeeper + integration-kafka: kafka dockerenv = PYAMQP_INTEGRATION_INSTANCE=1 @@ -77,6 +81,33 @@ healthcheck_timeout = 10 healthcheck_retries = 30 healthcheck_start_period = 5 +[docker:zookeeper] +image = bitnami/zookeeper:latest +ports = 2181:2181/tcp +healthcheck_interval = 10 +healthcheck_timeout = 10 +healthcheck_retries = 30 +healthcheck_start_period = 5 +environment = ALLOW_ANONYMOUS_LOGIN=yes + +[docker:kafka] +image = bitnami/kafka:latest +ports = + 9092:9092/tcp +healthcheck_cmd = /bin/bash -c 'kafka-topics.sh --list --bootstrap-server 127.0.0.1:9092' +healthcheck_interval = 10 +healthcheck_timeout = 10 +healthcheck_retries = 30 +healthcheck_start_period = 5 +links = + zookeeper:zookeeper +environment = + KAFKA_BROKER_ID=1 + KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 + KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 + KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 + ALLOW_PLAINTEXT_LISTENER=yes + [testenv:apicheck] commands = pip install -U -r{toxinidir}/requirements/dev.txt sphinx-build -j2 -b apicheck -d {envtmpdir}/doctrees docs docs/_build/apicheck