diff --git a/kombu/common.py b/kombu/common.py index 9dc06bb0..74cbc370 100644 --- a/kombu/common.py +++ b/kombu/common.py @@ -1,5 +1,44 @@ +from collections import defaultdict + from kombu import entity -from kombu.utils import gen_unique_id as uuid +from kombu.utils import uuid + +declared_entities = defaultdict(lambda: set()) + + +def maybe_declare(entity, channel): + declared = declared_entities[channel.connection.client] + if not entity.is_bound: + entity = entity(channel) + if entity not in declared: + entity.declare() + declared.add(entity) + return True + return False + + +class Broadcast(entity.Queue): + """Convenience class used to define broadcast queues. + + Every queue instance will have a unique name, + and both the queue and exchange is configued with auto deletion. + + :keyword name: This is used as the name of the exchange. + :keyword queue: By default a unique id is used for the queue + name for every consumer. You can specify a custom queue + name here. + :keyword \*\*kwargs: See :class:`~kombu.entity.Queue` for a list + of additional keyword arguments supported. + + """ + + def __init__(self, name=None, queue=None, **kwargs): + kwargs.setdefault("exchange", entity.Exchange(name, type="fanout", + auto_delete=True)) + kwargs.setdefault("auto_delete", True) + kwargs.setdefault("alias", name) + return super(Broadcast, self).__init__( + name=queue or "bcast.%s" % (uuid(), ), **kwargs) def entry_to_queue(queue, **options):