From f8ee383cc3b3f1f9166627e81a64af4939e4de10 Mon Sep 17 00:00:00 2001 From: Shane Caraveo Date: Mon, 6 Sep 2010 13:56:09 -0700 Subject: [PATCH 1/2] add amqp style routing for virtual channels, allows memory backend to behave like amqp --- example/topic.py | 70 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 example/topic.py diff --git a/example/topic.py b/example/topic.py new file mode 100644 index 00000000..d8e775c0 --- /dev/null +++ b/example/topic.py @@ -0,0 +1,70 @@ +from kombu.connection import BrokerConnection +from kombu.messaging import Exchange, Queue, Consumer, Producer + +# configuration, normally in an ini file +exchange_name = "test.shane" +exchange_type = "topic" +exchange_durable = True +message_serializer = "json" +queue_name = "test.q" + +# 1. setup the connection to the exchange +# hostname,userid,password,virtual_host not used with memory backend +cons_conn = BrokerConnection(hostname="localhost", + userid="guest", + password="guest", + virtual_host="/", + transport="memory") +cons_chan = cons_conn.channel() +cons_exch = Exchange(exchange_name, type=exchange_type, durable=exchange_durable) + +pub_conn = BrokerConnection(hostname="localhost", + userid="guest", + password="guest", + virtual_host="/", + transport="memory") +pub_chan = pub_conn.channel() +pub_exch = Exchange(exchange_name, type=exchange_type, durable=exchange_durable) + +# 2. setup the consumer, the consumer declares/creates the queue, if you +# publish to a queue before there is a consumer it will fail unless the queue +# was first created and is durable +class AConsumer: + def __init__(self, queue_name, key): + self.queue = Queue(queue_name, exchange=cons_exch, routing_key=key) + self.consumer = Consumer(cons_chan, [self.queue]) + self.consumer.consume() + + def mq_callback(message_data, message): + print("%s: %r: %r" % (key, message.delivery_info, message_data,)) + #message.ack() + self.consumer.register_callback(mq_callback) + +c1 = AConsumer("test_1","test.1") +c2 = AConsumer("testing","test.ing") +# consumers can use simple pattern matching when defining a queue +c3 = AConsumer("test_all","test.*") + +# 3. publish something to consume +# publishers always send to a specific route, the mq will route to the queues +producer = Producer(pub_chan, exchange=pub_exch, serializer=message_serializer) +producer.publish({"name": "Shane Caraveo", "username": "mixedpuppy"}, routing_key="test.1") +producer.publish({"name": "Micky Mouse", "username": "donaldduck"}, routing_key="test.ing") +producer.publish({"name": "Anonymous", "username": "whoami"}, routing_key="test.foobar") + +def have_messages(): + return sum([q.qsize() for q in cons_chan.queues.values()]) + +# 5. run the event loop +while have_messages(): + try: + cons_conn.drain_events() + except KeyboardInterrupt: + print + print "quitting" + break + except Exception, e: + import traceback + print traceback.format_exc() + break + From 04b9a6f2fb6854fadbb4c29880866135354fdeef Mon Sep 17 00:00:00 2001 From: Shane Caraveo Date: Mon, 6 Sep 2010 15:41:02 -0700 Subject: [PATCH 2/2] amqp style routing for virtual exchanges (e.g. memory backend) forgot to git add this part of the change --- kombu/transport/virtual.py | 42 +++++++++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/kombu/transport/virtual.py b/kombu/transport/virtual.py index 20d51120..7996d9e9 100644 --- a/kombu/transport/virtual.py +++ b/kombu/transport/virtual.py @@ -2,6 +2,7 @@ import atexit import pickle import sys import tempfile +import re from time import sleep from itertools import count, cycle @@ -162,16 +163,27 @@ class Channel(object): def _lookup(self, exchange, routing_key, default="ae.undeliver"): try: - return _exchanges[exchange]["table"][routing_key] + type = _exchanges[exchange].get('type', 'direct') + if type == 'direct': + # routing key is a direct match + return [q[2] for q in _exchanges[exchange]["table"] if q[0] == routing_key] + elif type == 'fanout': + # routing key not used, all queues receive the message + return [q[2] for q in _exchanges[exchange]["table"]] + elif type == 'topic': + # pattern matching for the queue + return [q[2] for q in _exchanges[exchange]["table"] if q[1].match(routing_key)] + else: + raise KeyError('Unknown exchange type') except KeyError: self._new_queue(default) - return default + return [default] def _restore(self, message): delivery_info = message.delivery_info - self._put(self._lookup(delivery_info["exchange"], - delivery_info["routing_key"]), - message) + for q in self._lookup(delivery_info["exchange"], + delivery_info["routing_key"]): + self._put(q, message) def _poll(self, queues): return FairCycle(self._get, queues, QueueEmpty).get() @@ -188,16 +200,16 @@ class Channel(object): return [_consumers[tag] for tag in self._consumers] def exchange_declare(self, exchange, type="direct", durable=False, - auto_delete=False, arguments=None): + auto_delete=False, arguments=None, nowait=False): if exchange not in _exchanges: _exchanges[exchange] = {"type": type, "durable": durable, "auto_delete": auto_delete, "arguments": arguments or {}, - "table": {}} + "table": []} def exchange_delete(self, exchange, if_unused=False): - for rkey, queue in _exchanges[exchange]["table"].items(): + for rkey, m, queue in _exchanges[exchange]["table"]: self._purge(queue) _exchanges.pop(exchange, None) @@ -210,9 +222,14 @@ class Channel(object): return self._delete(queue) - def queue_bind(self, queue, exchange, routing_key, arguments=None): - table = _exchanges[exchange].setdefault("table", {}) - table[routing_key] = queue + def queue_bind(self, queue, exchange, routing_key, arguments=None, nowait=False): + table = _exchanges[exchange].setdefault("table", []) + m = None + type = _exchanges[exchange].get('type', 'direct') + if type == 'topic': + re_key = "^%s$" % routing_key.replace('.', '\.').replace('#', '[^\.]').replace('*','.*?') + m = re.compile(re_key, re.U) + table.append([routing_key, m, queue]) def queue_purge(self, queue, **kwargs): return self._purge(queue, **kwargs) @@ -251,7 +268,8 @@ class Channel(object): message["properties"]["delivery_info"]["exchange"] = exchange message["properties"]["delivery_info"]["routing_key"] = routing_key message["properties"]["delivery_tag"] = self._next_delivery_tag() - self._put(self._lookup(exchange, routing_key), message) + for q in self._lookup(exchange, routing_key): + self._put(q, message) def basic_cancel(self, consumer_tag): queue = _consumers.pop(consumer_tag, None)