mirror of https://github.com/celery/kombu.git
70 lines
1.4 KiB
Python
70 lines
1.4 KiB
Python
|
from __future__ import annotations
|
||
|
|
||
|
import pytest
|
||
|
|
||
|
import kombu
|
||
|
|
||
|
from .common import (BaseExchangeTypes, BaseFailover, BaseMessage,
|
||
|
BasicFunctionality)
|
||
|
|
||
|
|
||
|
def get_connection(hostname, port):
|
||
|
return kombu.Connection(
|
||
|
f'confluentkafka://{hostname}:{port}',
|
||
|
)
|
||
|
|
||
|
|
||
|
def get_failover_connection(hostname, port):
|
||
|
return kombu.Connection(
|
||
|
f'confluentkafka://localhost:12345;confluentkafka://{hostname}:{port}',
|
||
|
connect_timeout=10,
|
||
|
)
|
||
|
|
||
|
|
||
|
@pytest.fixture()
|
||
|
def invalid_connection():
|
||
|
return kombu.Connection('confluentkafka://localhost:12345')
|
||
|
|
||
|
|
||
|
@pytest.fixture()
|
||
|
def connection():
|
||
|
return get_connection(
|
||
|
hostname='localhost',
|
||
|
port='9092'
|
||
|
)
|
||
|
|
||
|
|
||
|
@pytest.fixture()
|
||
|
def failover_connection():
|
||
|
return get_failover_connection(
|
||
|
hostname='localhost',
|
||
|
port='9092'
|
||
|
)
|
||
|
|
||
|
|
||
|
@pytest.mark.env('kafka')
|
||
|
@pytest.mark.flaky(reruns=5, reruns_delay=2)
|
||
|
class test_KafkaBasicFunctionality(BasicFunctionality):
|
||
|
pass
|
||
|
|
||
|
|
||
|
@pytest.mark.env('kafka')
|
||
|
@pytest.mark.flaky(reruns=5, reruns_delay=2)
|
||
|
class test_KafkaBaseExchangeTypes(BaseExchangeTypes):
|
||
|
|
||
|
@pytest.mark.skip('fanout is not implemented')
|
||
|
def test_fanout(self, connection):
|
||
|
pass
|
||
|
|
||
|
|
||
|
@pytest.mark.env('kafka')
|
||
|
@pytest.mark.flaky(reruns=5, reruns_delay=2)
|
||
|
class test_KafkaFailover(BaseFailover):
|
||
|
pass
|
||
|
|
||
|
|
||
|
@pytest.mark.env('kafka')
|
||
|
@pytest.mark.flaky(reruns=5, reruns_delay=2)
|
||
|
class test_KafkaMessage(BaseMessage):
|
||
|
pass
|