Improve examples

This commit is contained in:
Ask Solem 2011-09-07 15:21:38 +01:00
parent 204b245fb4
commit a9ee9a16db
8 changed files with 144 additions and 77 deletions

View File

@ -2,8 +2,9 @@
Example of simple consumer that waits for a single message, acknowledges it
and exits.
"""
from __future__ import with_statement
from kombu import BrokerConnection, Exchange, Queue, Consumer
from kombu import Connection, Exchange, Queue
from pprint import pformat
#: By default messages sent to exchanges are persistent (delivery_mode=2),
@ -23,23 +24,12 @@ def handle_message(body, message):
print(" delivery_info:\n%s" % (pretty(message.delivery_info), ))
message.ack()
#: Create a connection and a channel.
#: If hostname, userid, password and virtual_host is not specified
#: the values below are the default, but listed here so it can
#: be easily changed.
connection = BrokerConnection(hostname="localhost",
userid="guest",
password="guest",
virtual_host="/")
channel = connection.channel()
#: Create consumer using our callback and queue.
#: Second argument can also be a list to consume from
#: any number of queues.
consumer = Consumer(channel, queue, callbacks=[handle_message])
consumer.consume()
#: This waits for a single event. Note that this event may not
#: be a message, or a message that is to be delivered to the consumers
#: channel, but any event received on the connection.
connection.drain_events()
with Connection("amqp://guest:guest@localhost:5672//") as connection:
#: Create consumer using our callback and queue.
#: Second argument can also be a list to consume from
#: any number of queues.
with connection.Consumer(queue, callbacks=[handle_message]):
#: This waits for a single event. Note that this event may not
#: be a message, or a message that is to be delivered to the consumers
#: channel, but any event received on the connection.
connection.drain_events(timeout=10)

View File

@ -0,0 +1,45 @@
"""
Example of simple consumer that waits for a single message, acknowledges it
and exits.
"""
from kombu import BrokerConnection, Exchange, Queue, Consumer
from pprint import pformat
#: By default messages sent to exchanges are persistent (delivery_mode=2),
#: and queues and exchanges are durable.
exchange = Exchange("kombu_demo", type="direct")
queue = Queue("kombu_demo", exchange, routing_key="kombu_demo")
def pretty(obj):
return pformat(obj, indent=4)
#: This is the callback applied when a message is received.
def handle_message(body, message):
print("Received message: %r" % (body, ))
print(" properties:\n%s" % (pretty(message.properties), ))
print(" delivery_info:\n%s" % (pretty(message.delivery_info), ))
message.ack()
#: Create a connection and a channel.
#: If hostname, userid, password and virtual_host is not specified
#: the values below are the default, but listed here so it can
#: be easily changed.
connection = BrokerConnection(hostname="localhost",
userid="guest",
password="guest",
virtual_host="/")
channel = connection.channel()
#: Create consumer using our callback and queue.
#: Second argument can also be a list to consume from
#: any number of queues.
consumer = Consumer(channel, queue, callbacks=[handle_message])
consumer.consume()
#: This waits for a single event. Note that this event may not
#: be a message, or a message that is to be delivered to the consumers
#: channel, but any event received on the connection.
connection.drain_events()

View File

@ -5,8 +5,9 @@ Example producer that sends a single message and exits.
You can use `complete_receive.py` to receive the message sent.
"""
from __future__ import with_statement
from kombu import BrokerConnection, Exchange, Queue, Producer
from kombu import Connection, Exchange, Queue
#: By default messages sent to exchanges are persistent (delivery_mode=2),
#: and queues and exchanges are durable.
@ -14,22 +15,18 @@ exchange = Exchange("kombu_demo", type="direct")
queue = Queue("kombu_demo", exchange, routing_key="kombu_demo")
#: Create connection and channel.
#: If hostname, userid, password and virtual_host is not specified
#: the values below are the default, but listed here so it can
#: be easily changed.
connection = BrokerConnection(hostname="localhost",
userid="guest",
password="guest",
virtual_host="/")
channel = connection.channel()
with Connection("amqp://guest:guest@localhost:5672//") as connection:
#: Producers are used to publish messages.
#: Routing keys can also be specifed as an argument to `publish`.
producer = Producer(channel, exchange, routing_key="kombu_demo")
#: Producers are used to publish messages.
#: a default exchange and routing key can also be specifed
#: as arguments the Producer, but we rather specify this explicitly
#: at the publish call.
producer = connection.Producer()
#: Publish the message using the json serializer (which is the default),
#: and zlib compression. The kombu consumer will automatically detect
#: encoding, serializiation and compression used and decode accordingly.
producer.publish({"hello": "world"}, serializer="json",
compression="zlib")
#: Publish the message using the json serializer (which is the default),
#: and zlib compression. The kombu consumer will automatically detect
#: encoding, serializiation and compression used and decode accordingly.
producer.publish({"hello": "world"},
exchange=exchange,
routing_key="kombu_demo",
serializer="json", compression="zlib")

View File

@ -0,0 +1,35 @@
"""
Example producer that sends a single message and exits.
You can use `complete_receive.py` to receive the message sent.
"""
from kombu import BrokerConnection, Exchange, Queue, Producer
#: By default messages sent to exchanges are persistent (delivery_mode=2),
#: and queues and exchanges are durable.
exchange = Exchange("kombu_demo", type="direct")
queue = Queue("kombu_demo", exchange, routing_key="kombu_demo")
#: Create connection and channel.
#: If hostname, userid, password and virtual_host is not specified
#: the values below are the default, but listed here so it can
#: be easily changed.
connection = BrokerConnection(hostname="localhost",
userid="guest",
password="guest",
virtual_host="/")
channel = connection.channel()
#: Producers are used to publish messages.
#: Routing keys can also be specifed as an argument to `publish`.
producer = Producer(channel, exchange, routing_key="kombu_demo")
#: Publish the message using the json serializer (which is the default),
#: and zlib compression. The kombu consumer will automatically detect
#: encoding, serializiation and compression used and decode accordingly.
producer.publish({"hello": "world"}, serializer="json",
compression="zlib")

View File

@ -22,10 +22,7 @@ def wait_many(timeout=1):
#: If hostname, userid, password and virtual_host is not specified
#: the values below are the default, but listed here so it can
#: be easily changed.
connection = BrokerConnection(hostname="localhost",
userid="guest",
password="guest",
virtual_host="/")
connection = BrokerConnection("amqp://guest:guest@localhost:5672//")
#: SimpleQueue mimics the interface of the Python Queue module.
#: First argument can either be a queue name or a kombu.Queue object.
@ -42,5 +39,8 @@ def wait_many(timeout=1):
spawn(message.ack)
print(message.payload)
queue.close()
connection.close()
spawn(wait_many).wait()

View File

@ -19,10 +19,7 @@ def send_many(n):
#: If hostname, userid, password and virtual_host is not specified
#: the values below are the default, but listed here so it can
#: be easily changed.
connection = BrokerConnection(hostname="localhost",
userid="guest",
password="guest",
virtual_host="/")
connection = BrokerConnection("amqp://guest:guest@localhost:5672//")
#: SimpleQueue mimics the interface of the Python Queue module.
#: First argument can either be a queue name or a kombu.Queue object.
@ -38,5 +35,8 @@ def send_many(n):
pool.spawn(send_message, i)
pool.waitall()
queue.close()
connection.close()
if __name__ == "__main__":
send_many(10)

View File

@ -2,27 +2,27 @@
Example receiving a message using the SimpleQueue interface.
"""
from __future__ import with_statement
from kombu import BrokerConnection
#: Create connection
#: If hostname, userid, password and virtual_host is not specified
#: the values below are the default, but listed here so it can
#: be easily changed.
connection = BrokerConnection(hostname="localhost",
userid="guest",
password="guest",
virtual_host="/")
with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn:
#: SimpleQueue mimics the interface of the Python Queue module.
#: First argument can either be a queue name or a kombu.Queue object.
#: If a name, then the queue will be declared with the name as the queue
#: name, exchange name and routing key.
with conn.SimpleQueue("kombu_demo") as queue:
message = queue.get(block=True, timeout=10)
message.ack()
print(message.payload)
#: SimpleQueue mimics the interface of the Python Queue module.
#: First argument can either be a queue name or a kombu.Queue object.
#: If a name, then the queue will be declared with the name as the queue
#: name, exchange name and routing key.
queue = connection.SimpleQueue("kombu_demo")
message = queue.get(block=True, timeout=10)
message.ack()
print(message.payload)
#: Always remember to close connections and channels.
queue.close()
connection.close()
####
#: If you don't use the with statement then you must aways
# remember to close objects after use:
# queue.close()
# connection.close()

View File

@ -6,6 +6,7 @@ You can use `simple_receive.py` (or `complete_receive.py`) to receive the
message sent.
"""
from __future__ import with_statement
from kombu import BrokerConnection
@ -13,19 +14,18 @@ from kombu import BrokerConnection
#: If hostname, userid, password and virtual_host is not specified
#: the values below are the default, but listed here so it can
#: be easily changed.
connection = BrokerConnection(hostname="localhost",
userid="guest",
password="guest",
virtual_host="/")
with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn:
#: SimpleQueue mimics the interface of the Python Queue module.
#: First argument can either be a queue name or a kombu.Queue object.
#: If a name, then the queue will be declared with the name as the queue
#: name, exchange name and routing key.
with conn.SimpleQueue("kombu_demo") as queue:
queue.put({"hello": "world"}, serializer="json", compression="zlib")
#: SimpleQueue mimics the interface of the Python Queue module.
#: First argument can either be a queue name or a kombu.Queue object.
#: If a name, then the queue will be declared with the name as the queue
#: name, exchange name and routing key.
queue = connection.SimpleQueue("kombu_demo")
queue.put({"hello": "world"}, serializer="json", compression="zlib")
# Always remember to close channels and connections.
queue.close()
connection.close()
#####
# If you don't use the with statement, you must always
# remember to close objects.
# queue.close()
# connection.close()