kombu/t/integration/test_kafka.py

70 lines
1.4 KiB
Python

from __future__ import annotations
import pytest
import kombu
from .common import (BaseExchangeTypes, BaseFailover, BaseMessage,
BasicFunctionality)
def get_connection(hostname, port):
return kombu.Connection(
f'confluentkafka://{hostname}:{port}',
)
def get_failover_connection(hostname, port):
return kombu.Connection(
f'confluentkafka://localhost:12345;confluentkafka://{hostname}:{port}',
connect_timeout=10,
)
@pytest.fixture()
def invalid_connection():
return kombu.Connection('confluentkafka://localhost:12345')
@pytest.fixture()
def connection():
return get_connection(
hostname='localhost',
port='9092'
)
@pytest.fixture()
def failover_connection():
return get_failover_connection(
hostname='localhost',
port='9092'
)
@pytest.mark.env('kafka')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_KafkaBasicFunctionality(BasicFunctionality):
pass
@pytest.mark.env('kafka')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_KafkaBaseExchangeTypes(BaseExchangeTypes):
@pytest.mark.skip('fanout is not implemented')
def test_fanout(self, connection):
pass
@pytest.mark.env('kafka')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_KafkaFailover(BaseFailover):
pass
@pytest.mark.env('kafka')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_KafkaMessage(BaseMessage):
pass