From f8ee383cc3b3f1f9166627e81a64af4939e4de10 Mon Sep 17 00:00:00 2001 From: Shane Caraveo Date: Mon, 6 Sep 2010 13:56:09 -0700 Subject: [PATCH] add amqp style routing for virtual channels, allows memory backend to behave like amqp --- example/topic.py | 70 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 example/topic.py diff --git a/example/topic.py b/example/topic.py new file mode 100644 index 00000000..d8e775c0 --- /dev/null +++ b/example/topic.py @@ -0,0 +1,70 @@ +from kombu.connection import BrokerConnection +from kombu.messaging import Exchange, Queue, Consumer, Producer + +# configuration, normally in an ini file +exchange_name = "test.shane" +exchange_type = "topic" +exchange_durable = True +message_serializer = "json" +queue_name = "test.q" + +# 1. setup the connection to the exchange +# hostname,userid,password,virtual_host not used with memory backend +cons_conn = BrokerConnection(hostname="localhost", + userid="guest", + password="guest", + virtual_host="/", + transport="memory") +cons_chan = cons_conn.channel() +cons_exch = Exchange(exchange_name, type=exchange_type, durable=exchange_durable) + +pub_conn = BrokerConnection(hostname="localhost", + userid="guest", + password="guest", + virtual_host="/", + transport="memory") +pub_chan = pub_conn.channel() +pub_exch = Exchange(exchange_name, type=exchange_type, durable=exchange_durable) + +# 2. setup the consumer, the consumer declares/creates the queue, if you +# publish to a queue before there is a consumer it will fail unless the queue +# was first created and is durable +class AConsumer: + def __init__(self, queue_name, key): + self.queue = Queue(queue_name, exchange=cons_exch, routing_key=key) + self.consumer = Consumer(cons_chan, [self.queue]) + self.consumer.consume() + + def mq_callback(message_data, message): + print("%s: %r: %r" % (key, message.delivery_info, message_data,)) + #message.ack() + self.consumer.register_callback(mq_callback) + +c1 = AConsumer("test_1","test.1") +c2 = AConsumer("testing","test.ing") +# consumers can use simple pattern matching when defining a queue +c3 = AConsumer("test_all","test.*") + +# 3. publish something to consume +# publishers always send to a specific route, the mq will route to the queues +producer = Producer(pub_chan, exchange=pub_exch, serializer=message_serializer) +producer.publish({"name": "Shane Caraveo", "username": "mixedpuppy"}, routing_key="test.1") +producer.publish({"name": "Micky Mouse", "username": "donaldduck"}, routing_key="test.ing") +producer.publish({"name": "Anonymous", "username": "whoami"}, routing_key="test.foobar") + +def have_messages(): + return sum([q.qsize() for q in cons_chan.queues.values()]) + +# 5. run the event loop +while have_messages(): + try: + cons_conn.drain_events() + except KeyboardInterrupt: + print + print "quitting" + break + except Exception, e: + import traceback + print traceback.format_exc() + break +