qpid: ensure supported exchange types are declared (#1034)

This is a follow-up to ddba8aeaf0. Prior to that commit,
"implements" was missing entirely for qpid. The earlier commit
added it, but only declared that the transport was async-capable.

Turns out it's important also to declare the supported exchange
types. In particular, if the transport doesn't declare support
for the 'fanout' exchange type, pidbox wouldn't be used[1] and so
it would still not be possible to revoke a task with terminate=True
when using the qpid transport.

[1] https://github.com/celery/celery/commit/2f58c35340f648
This commit is contained in:
Rohan McGovern 2019-04-04 17:56:43 +10:00 committed by Omer Katz
parent fcc4c620bf
commit 57f464895d
2 changed files with 7 additions and 0 deletions

View File

@ -1408,6 +1408,7 @@ class Transport(base.Transport):
# This Transport does support the Celery asynchronous event model.
implements = virtual.Transport.implements.extend(
asynchronous=True,
exchange_type=frozenset(['direct', 'topic', 'fanout']),
)
# The driver type and name for identification purposes.

View File

@ -1667,6 +1667,12 @@ class test_Transport_class_attributes(object):
assert Transport.driver_type == 'qpid'
assert Transport.driver_name == 'qpid'
def test_verify_implements_exchange_types(self):
assert 'fanout' in Transport.implements.exchange_type
assert 'direct' in Transport.implements.exchange_type
assert 'topic' in Transport.implements.exchange_type
assert 'frobnitz' not in Transport.implements.exchange_type
def test_transport_verify_recoverable_connection_errors(self):
connection_errors = Transport.recoverable_connection_errors
assert ConnectionError in connection_errors