mirror of https://github.com/celery/kombu.git
546 lines
20 KiB
Python
546 lines
20 KiB
Python
from __future__ import annotations
|
|
|
|
import socket
|
|
from contextlib import closing
|
|
from time import sleep
|
|
|
|
import pytest
|
|
|
|
import kombu
|
|
|
|
|
|
class BasicFunctionality:
|
|
|
|
def test_connect(self, connection):
|
|
assert connection.connect()
|
|
assert connection.connection
|
|
connection.close()
|
|
assert connection.connection is None
|
|
assert connection.connect()
|
|
assert connection.connection
|
|
connection.close()
|
|
|
|
def test_failed_connect(self, invalid_connection):
|
|
# method raises transport exception
|
|
with pytest.raises(Exception):
|
|
invalid_connection.connect()
|
|
|
|
def test_failed_connection(self, invalid_connection):
|
|
# method raises transport exception
|
|
with pytest.raises(Exception):
|
|
invalid_connection.connection
|
|
|
|
def test_failed_channel(self, invalid_connection):
|
|
# method raises transport exception
|
|
with pytest.raises(Exception):
|
|
invalid_connection.channel()
|
|
|
|
def test_failed_default_channel(self, invalid_connection):
|
|
invalid_connection.transport_options = {'max_retries': 1}
|
|
# method raises transport exception
|
|
with pytest.raises(Exception):
|
|
invalid_connection.default_channel
|
|
|
|
def test_default_channel_autoconnect(self, connection):
|
|
connection.connect()
|
|
connection.close()
|
|
assert connection.connection is None
|
|
assert connection.default_channel
|
|
assert connection.connection
|
|
connection.close()
|
|
|
|
def test_channel(self, connection):
|
|
chan = connection.channel()
|
|
assert chan
|
|
assert connection.connection
|
|
|
|
def test_default_channel(self, connection):
|
|
chan = connection.default_channel
|
|
assert chan
|
|
assert connection.connection
|
|
|
|
def test_publish_consume(self, connection):
|
|
test_queue = kombu.Queue('test', routing_key='test')
|
|
|
|
def callback(body, message):
|
|
assert body == {'hello': 'world'}
|
|
assert message.content_type == 'application/x-python-serialize'
|
|
message.delivery_info['routing_key'] == 'test'
|
|
message.delivery_info['exchange'] == ''
|
|
message.ack()
|
|
assert message.payload == body
|
|
|
|
with connection as conn:
|
|
with conn.channel() as channel:
|
|
producer = kombu.Producer(channel)
|
|
producer.publish(
|
|
{'hello': 'world'},
|
|
retry=True,
|
|
exchange=test_queue.exchange,
|
|
routing_key=test_queue.routing_key,
|
|
declare=[test_queue],
|
|
serializer='pickle'
|
|
)
|
|
|
|
consumer = kombu.Consumer(
|
|
conn, [test_queue], accept=['pickle']
|
|
)
|
|
consumer.register_callback(callback)
|
|
with consumer:
|
|
conn.drain_events(timeout=1)
|
|
|
|
def test_consume_empty_queue(self, connection):
|
|
|
|
def callback(body, message):
|
|
assert False, 'Callback should not be called'
|
|
|
|
test_queue = kombu.Queue('test_empty', routing_key='test_empty')
|
|
with connection as conn:
|
|
with conn.channel():
|
|
consumer = kombu.Consumer(
|
|
conn, [test_queue], accept=['pickle']
|
|
)
|
|
consumer.register_callback(callback)
|
|
with consumer:
|
|
with pytest.raises(socket.timeout):
|
|
conn.drain_events(timeout=1)
|
|
|
|
def test_simple_queue_publish_consume(self, connection):
|
|
with connection as conn:
|
|
with closing(conn.SimpleQueue('simple_queue_test')) as queue:
|
|
queue.put({'Hello': 'World'}, headers={'k1': 'v1'})
|
|
message = queue.get(timeout=1)
|
|
assert message.payload == {'Hello': 'World'}
|
|
assert message.content_type == 'application/json'
|
|
assert message.content_encoding == 'utf-8'
|
|
assert message.headers == {'k1': 'v1'}
|
|
message.ack()
|
|
|
|
def test_simple_buffer_publish_consume(self, connection):
|
|
with connection as conn:
|
|
with closing(conn.SimpleBuffer('simple_buffer_test')) as buf:
|
|
buf.put({'Hello': 'World'}, headers={'k1': 'v1'})
|
|
message = buf.get(timeout=1)
|
|
assert message.payload == {'Hello': 'World'}
|
|
assert message.content_type == 'application/json'
|
|
assert message.content_encoding == 'utf-8'
|
|
assert message.headers == {'k1': 'v1'}
|
|
message.ack()
|
|
|
|
|
|
class BaseExchangeTypes:
|
|
|
|
def _callback(self, body, message):
|
|
message.ack()
|
|
assert body == {'hello': 'world'}
|
|
assert message.content_type == 'application/x-python-serialize'
|
|
message.delivery_info['routing_key'] == 'test'
|
|
message.delivery_info['exchange'] == ''
|
|
assert message.payload == body
|
|
|
|
def _create_consumer(self, connection, queue):
|
|
consumer = kombu.Consumer(
|
|
connection, [queue], accept=['pickle']
|
|
)
|
|
consumer.register_callback(self._callback)
|
|
return consumer
|
|
|
|
def _consume_from(self, connection, consumer):
|
|
with consumer:
|
|
connection.drain_events(timeout=1)
|
|
|
|
def _consume(self, connection, queue):
|
|
with self._create_consumer(connection, queue):
|
|
connection.drain_events(timeout=1)
|
|
|
|
def _publish(self, channel, exchange, queues=None, routing_key=None):
|
|
producer = kombu.Producer(channel, exchange=exchange)
|
|
if routing_key:
|
|
producer.publish(
|
|
{'hello': 'world'},
|
|
declare=list(queues) if queues else None,
|
|
serializer='pickle',
|
|
routing_key=routing_key
|
|
)
|
|
else:
|
|
producer.publish(
|
|
{'hello': 'world'},
|
|
declare=list(queues) if queues else None,
|
|
serializer='pickle'
|
|
)
|
|
|
|
def test_direct(self, connection):
|
|
ex = kombu.Exchange('test_direct', type='direct')
|
|
test_queue = kombu.Queue('direct1', exchange=ex)
|
|
|
|
with connection as conn:
|
|
with conn.channel() as channel:
|
|
self._publish(channel, ex, [test_queue])
|
|
self._consume(conn, test_queue)
|
|
|
|
def test_direct_routing_keys(self, connection):
|
|
ex = kombu.Exchange('test_rk_direct', type='direct')
|
|
test_queue1 = kombu.Queue('rk_direct1', exchange=ex, routing_key='d1')
|
|
test_queue2 = kombu.Queue('rk_direct2', exchange=ex, routing_key='d2')
|
|
|
|
with connection as conn:
|
|
with conn.channel() as channel:
|
|
self._publish(channel, ex, [test_queue1, test_queue2], 'd1')
|
|
self._consume(conn, test_queue1)
|
|
# direct2 queue should not have data
|
|
with pytest.raises(socket.timeout):
|
|
self._consume(conn, test_queue2)
|
|
# test that publishing using key which is not used results in
|
|
# discarted message.
|
|
self._publish(channel, ex, [test_queue1, test_queue2], 'd3')
|
|
with pytest.raises(socket.timeout):
|
|
self._consume(conn, test_queue1)
|
|
with pytest.raises(socket.timeout):
|
|
self._consume(conn, test_queue2)
|
|
|
|
def test_fanout(self, connection):
|
|
ex = kombu.Exchange('test_fanout', type='fanout')
|
|
test_queue1 = kombu.Queue('fanout1', exchange=ex)
|
|
test_queue2 = kombu.Queue('fanout2', exchange=ex)
|
|
|
|
with connection as conn:
|
|
with conn.channel() as channel:
|
|
self._publish(channel, ex, [test_queue1, test_queue2])
|
|
|
|
self._consume(conn, test_queue1)
|
|
self._consume(conn, test_queue2)
|
|
|
|
def test_topic(self, connection):
|
|
ex = kombu.Exchange('test_topic', type='topic')
|
|
test_queue1 = kombu.Queue('topic1', exchange=ex, routing_key='t.*')
|
|
test_queue2 = kombu.Queue('topic2', exchange=ex, routing_key='t.*')
|
|
test_queue3 = kombu.Queue('topic3', exchange=ex, routing_key='t')
|
|
|
|
with connection as conn:
|
|
with conn.channel() as channel:
|
|
self._publish(
|
|
channel, ex, [test_queue1, test_queue2, test_queue3],
|
|
routing_key='t.1'
|
|
)
|
|
self._consume(conn, test_queue1)
|
|
self._consume(conn, test_queue2)
|
|
with pytest.raises(socket.timeout):
|
|
# topic3 queue should not have data
|
|
self._consume(conn, test_queue3)
|
|
|
|
def test_publish_empty_exchange(self, connection):
|
|
ex = kombu.Exchange('test_empty_exchange', type='topic')
|
|
with connection as conn:
|
|
with conn.channel() as channel:
|
|
self._publish(
|
|
channel, ex,
|
|
routing_key='t.1'
|
|
)
|
|
|
|
|
|
class BaseTimeToLive:
|
|
def test_publish_consume(self, connection):
|
|
test_queue = kombu.Queue('ttl_test', routing_key='ttl_test')
|
|
|
|
def callback(body, message):
|
|
assert False, 'Callback should not be called'
|
|
|
|
with connection as conn:
|
|
with conn.channel() as channel:
|
|
producer = kombu.Producer(channel)
|
|
producer.publish(
|
|
{'hello': 'world'},
|
|
retry=True,
|
|
exchange=test_queue.exchange,
|
|
routing_key=test_queue.routing_key,
|
|
declare=[test_queue],
|
|
serializer='pickle',
|
|
expiration=2
|
|
)
|
|
|
|
consumer = kombu.Consumer(
|
|
conn, [test_queue], accept=['pickle']
|
|
)
|
|
consumer.register_callback(callback)
|
|
sleep(3)
|
|
with consumer:
|
|
with pytest.raises(socket.timeout):
|
|
conn.drain_events(timeout=1)
|
|
|
|
def test_simple_queue_publish_consume(self, connection):
|
|
with connection as conn:
|
|
with closing(conn.SimpleQueue('ttl_simple_queue_test')) as queue:
|
|
queue.put(
|
|
{'Hello': 'World'}, headers={'k1': 'v1'}, expiration=2
|
|
)
|
|
sleep(3)
|
|
with pytest.raises(queue.Empty):
|
|
queue.get(timeout=1)
|
|
|
|
def test_simple_buffer_publish_consume(self, connection):
|
|
with connection as conn:
|
|
with closing(conn.SimpleBuffer('ttl_simple_buffer_test')) as buf:
|
|
buf.put({'Hello': 'World'}, headers={'k1': 'v1'}, expiration=2)
|
|
sleep(3)
|
|
with pytest.raises(buf.Empty):
|
|
buf.get(timeout=1)
|
|
|
|
|
|
class BasePriority:
|
|
|
|
PRIORITY_ORDER = 'asc'
|
|
|
|
def test_publish_consume(self, connection):
|
|
|
|
# py-amqp transport has higher numbers higher priority
|
|
# redis transport has lower numbers higher priority
|
|
if self.PRIORITY_ORDER == 'asc':
|
|
prio_high = 6
|
|
prio_low = 3
|
|
else:
|
|
prio_high = 3
|
|
prio_low = 6
|
|
|
|
test_queue = kombu.Queue(
|
|
'priority_test', routing_key='priority_test', max_priority=10
|
|
)
|
|
|
|
received_messages = []
|
|
|
|
def callback(body, message):
|
|
received_messages.append(body)
|
|
message.ack()
|
|
|
|
with connection as conn:
|
|
with conn.channel() as channel:
|
|
producer = kombu.Producer(channel)
|
|
for msg, prio in [
|
|
[{'msg': 'first'}, prio_low],
|
|
[{'msg': 'second'}, prio_high],
|
|
[{'msg': 'third'}, prio_low],
|
|
]:
|
|
producer.publish(
|
|
msg,
|
|
retry=True,
|
|
exchange=test_queue.exchange,
|
|
routing_key=test_queue.routing_key,
|
|
declare=[test_queue],
|
|
serializer='pickle',
|
|
priority=prio
|
|
)
|
|
# Sleep to make sure that queue sorted based on priority
|
|
sleep(0.5)
|
|
consumer = kombu.Consumer(
|
|
conn, [test_queue], accept=['pickle']
|
|
)
|
|
consumer.register_callback(callback)
|
|
with consumer:
|
|
conn.drain_events(timeout=1)
|
|
# Second message must be received first
|
|
assert received_messages[0] == {'msg': 'second'}
|
|
assert received_messages[1] == {'msg': 'first'}
|
|
assert received_messages[2] == {'msg': 'third'}
|
|
|
|
def test_publish_requeue_consume(self, connection):
|
|
# py-amqp transport has higher numbers higher priority
|
|
# redis transport has lower numbers higher priority
|
|
if self.PRIORITY_ORDER == 'asc':
|
|
prio_max = 9
|
|
prio_high = 6
|
|
prio_low = 3
|
|
else:
|
|
prio_max = 0
|
|
prio_high = 3
|
|
prio_low = 6
|
|
|
|
test_queue = kombu.Queue(
|
|
'priority_requeue_test',
|
|
routing_key='priority_requeue_test', max_priority=10
|
|
)
|
|
|
|
received_messages = []
|
|
received_message_bodies = []
|
|
|
|
def callback(body, message):
|
|
received_messages.append(message)
|
|
received_message_bodies.append(body)
|
|
# don't ack the message so it can be requeued
|
|
|
|
with connection as conn:
|
|
with conn.channel() as channel:
|
|
producer = kombu.Producer(channel)
|
|
for msg, prio in [
|
|
[{'msg': 'first'}, prio_low],
|
|
[{'msg': 'second'}, prio_high],
|
|
[{'msg': 'third'}, prio_low],
|
|
]:
|
|
producer.publish(
|
|
msg,
|
|
retry=True,
|
|
exchange=test_queue.exchange,
|
|
routing_key=test_queue.routing_key,
|
|
declare=[test_queue],
|
|
serializer='pickle',
|
|
priority=prio
|
|
)
|
|
# Sleep to make sure that queue sorted based on priority
|
|
sleep(0.5)
|
|
consumer = kombu.Consumer(
|
|
conn, [test_queue], accept=['pickle']
|
|
)
|
|
consumer.register_callback(callback)
|
|
with consumer:
|
|
# drain_events() returns just on number in
|
|
# Virtual transports
|
|
conn.drain_events(timeout=1)
|
|
|
|
# requeue the messages
|
|
for msg in received_messages:
|
|
msg.requeue()
|
|
received_messages.clear()
|
|
received_message_bodies.clear()
|
|
|
|
# add a fourth max priority message
|
|
producer.publish(
|
|
{'msg': 'fourth'},
|
|
retry=True,
|
|
exchange=test_queue.exchange,
|
|
routing_key=test_queue.routing_key,
|
|
declare=[test_queue],
|
|
serializer='pickle',
|
|
priority=prio_max
|
|
)
|
|
# Sleep to make sure that queue sorted based on priority
|
|
sleep(0.5)
|
|
|
|
with consumer:
|
|
conn.drain_events(timeout=1)
|
|
|
|
# Fourth message must be received first
|
|
assert received_message_bodies[0] == {'msg': 'fourth'}
|
|
assert received_message_bodies[1] == {'msg': 'second'}
|
|
assert received_message_bodies[2] == {'msg': 'first'}
|
|
assert received_message_bodies[3] == {'msg': 'third'}
|
|
|
|
def test_simple_queue_publish_consume(self, connection):
|
|
if self.PRIORITY_ORDER == 'asc':
|
|
prio_high = 7
|
|
prio_low = 1
|
|
else:
|
|
prio_high = 1
|
|
prio_low = 7
|
|
with connection as conn:
|
|
with closing(
|
|
conn.SimpleQueue(
|
|
'priority_simple_queue_test',
|
|
queue_opts={'max_priority': 10}
|
|
)
|
|
) as queue:
|
|
for msg, prio in [
|
|
[{'msg': 'first'}, prio_low],
|
|
[{'msg': 'second'}, prio_high],
|
|
[{'msg': 'third'}, prio_low],
|
|
]:
|
|
queue.put(
|
|
msg, headers={'k1': 'v1'}, priority=prio
|
|
)
|
|
# Sleep to make sure that queue sorted based on priority
|
|
sleep(0.5)
|
|
# Second message must be received first
|
|
for data in [
|
|
{'msg': 'second'}, {'msg': 'first'}, {'msg': 'third'},
|
|
]:
|
|
msg = queue.get(timeout=1)
|
|
msg.ack()
|
|
assert msg.payload == data
|
|
|
|
def test_simple_buffer_publish_consume(self, connection):
|
|
if self.PRIORITY_ORDER == 'asc':
|
|
prio_high = 6
|
|
prio_low = 2
|
|
else:
|
|
prio_high = 2
|
|
prio_low = 6
|
|
with connection as conn:
|
|
with closing(
|
|
conn.SimpleBuffer(
|
|
'priority_simple_buffer_test',
|
|
queue_opts={'max_priority': 10}
|
|
)
|
|
) as buf:
|
|
for msg, prio in [
|
|
[{'msg': 'first'}, prio_low],
|
|
[{'msg': 'second'}, prio_high],
|
|
[{'msg': 'third'}, prio_low],
|
|
]:
|
|
buf.put(
|
|
msg, headers={'k1': 'v1'}, priority=prio
|
|
)
|
|
# Sleep to make sure that queue sorted based on priority
|
|
sleep(0.5)
|
|
# Second message must be received first
|
|
for data in [
|
|
{'msg': 'second'}, {'msg': 'first'}, {'msg': 'third'},
|
|
]:
|
|
msg = buf.get(timeout=1)
|
|
msg.ack()
|
|
assert msg.payload == data
|
|
|
|
|
|
class BaseMessage:
|
|
|
|
def test_ack(self, connection):
|
|
with connection as conn:
|
|
with closing(conn.SimpleQueue('test_ack')) as queue:
|
|
queue.put({'Hello': 'World'}, headers={'k1': 'v1'})
|
|
message = queue.get_nowait()
|
|
message.ack()
|
|
with pytest.raises(queue.Empty):
|
|
queue.get_nowait()
|
|
|
|
def test_reject_no_requeue(self, connection):
|
|
with connection as conn:
|
|
with closing(conn.SimpleQueue('test_reject_no_requeue')) as queue:
|
|
queue.put({'Hello': 'World'}, headers={'k1': 'v1'})
|
|
message = queue.get_nowait()
|
|
message.reject(requeue=False)
|
|
with pytest.raises(queue.Empty):
|
|
queue.get_nowait()
|
|
|
|
def test_reject_requeue(self, connection):
|
|
with connection as conn:
|
|
with closing(conn.SimpleQueue('test_reject_requeue')) as queue:
|
|
queue.put({'Hello': 'World'}, headers={'k1': 'v1'})
|
|
message = queue.get_nowait()
|
|
message.reject(requeue=True)
|
|
message2 = queue.get_nowait()
|
|
assert message.body == message2.body
|
|
message2.ack()
|
|
|
|
def test_requeue(self, connection):
|
|
with connection as conn:
|
|
with closing(conn.SimpleQueue('test_requeue')) as queue:
|
|
queue.put({'Hello': 'World'}, headers={'k1': 'v1'})
|
|
message = queue.get_nowait()
|
|
message.requeue()
|
|
message2 = queue.get_nowait()
|
|
assert message.body == message2.body
|
|
message2.ack()
|
|
|
|
|
|
class BaseFailover(BasicFunctionality):
|
|
|
|
def test_connect(self, failover_connection):
|
|
super().test_connect(failover_connection)
|
|
|
|
def test_publish_consume(self, failover_connection):
|
|
super().test_publish_consume(failover_connection)
|
|
|
|
def test_consume_empty_queue(self, failover_connection):
|
|
super().test_consume_empty_queue(failover_connection)
|
|
|
|
def test_simple_buffer_publish_consume(self, failover_connection):
|
|
super().test_simple_buffer_publish_consume(
|
|
failover_connection
|
|
)
|