diff --git a/kombu/transport/virtual.py b/kombu/transport/virtual.py index 20d51120..7996d9e9 100644 --- a/kombu/transport/virtual.py +++ b/kombu/transport/virtual.py @@ -2,6 +2,7 @@ import atexit import pickle import sys import tempfile +import re from time import sleep from itertools import count, cycle @@ -162,16 +163,27 @@ class Channel(object): def _lookup(self, exchange, routing_key, default="ae.undeliver"): try: - return _exchanges[exchange]["table"][routing_key] + type = _exchanges[exchange].get('type', 'direct') + if type == 'direct': + # routing key is a direct match + return [q[2] for q in _exchanges[exchange]["table"] if q[0] == routing_key] + elif type == 'fanout': + # routing key not used, all queues receive the message + return [q[2] for q in _exchanges[exchange]["table"]] + elif type == 'topic': + # pattern matching for the queue + return [q[2] for q in _exchanges[exchange]["table"] if q[1].match(routing_key)] + else: + raise KeyError('Unknown exchange type') except KeyError: self._new_queue(default) - return default + return [default] def _restore(self, message): delivery_info = message.delivery_info - self._put(self._lookup(delivery_info["exchange"], - delivery_info["routing_key"]), - message) + for q in self._lookup(delivery_info["exchange"], + delivery_info["routing_key"]): + self._put(q, message) def _poll(self, queues): return FairCycle(self._get, queues, QueueEmpty).get() @@ -188,16 +200,16 @@ class Channel(object): return [_consumers[tag] for tag in self._consumers] def exchange_declare(self, exchange, type="direct", durable=False, - auto_delete=False, arguments=None): + auto_delete=False, arguments=None, nowait=False): if exchange not in _exchanges: _exchanges[exchange] = {"type": type, "durable": durable, "auto_delete": auto_delete, "arguments": arguments or {}, - "table": {}} + "table": []} def exchange_delete(self, exchange, if_unused=False): - for rkey, queue in _exchanges[exchange]["table"].items(): + for rkey, m, queue in _exchanges[exchange]["table"]: self._purge(queue) _exchanges.pop(exchange, None) @@ -210,9 +222,14 @@ class Channel(object): return self._delete(queue) - def queue_bind(self, queue, exchange, routing_key, arguments=None): - table = _exchanges[exchange].setdefault("table", {}) - table[routing_key] = queue + def queue_bind(self, queue, exchange, routing_key, arguments=None, nowait=False): + table = _exchanges[exchange].setdefault("table", []) + m = None + type = _exchanges[exchange].get('type', 'direct') + if type == 'topic': + re_key = "^%s$" % routing_key.replace('.', '\.').replace('#', '[^\.]').replace('*','.*?') + m = re.compile(re_key, re.U) + table.append([routing_key, m, queue]) def queue_purge(self, queue, **kwargs): return self._purge(queue, **kwargs) @@ -251,7 +268,8 @@ class Channel(object): message["properties"]["delivery_info"]["exchange"] = exchange message["properties"]["delivery_info"]["routing_key"] = routing_key message["properties"]["delivery_tag"] = self._next_delivery_tag() - self._put(self._lookup(exchange, routing_key), message) + for q in self._lookup(exchange, routing_key): + self._put(q, message) def basic_cancel(self, consumer_tag): queue = _consumers.pop(consumer_tag, None)