diff --git a/examples/complete_receive.py b/examples/complete_receive.py index bf618e46..416fa2d6 100644 --- a/examples/complete_receive.py +++ b/examples/complete_receive.py @@ -2,8 +2,9 @@ Example of simple consumer that waits for a single message, acknowledges it and exits. """ +from __future__ import with_statement -from kombu import BrokerConnection, Exchange, Queue, Consumer +from kombu import Connection, Exchange, Queue from pprint import pformat #: By default messages sent to exchanges are persistent (delivery_mode=2), @@ -23,23 +24,12 @@ def handle_message(body, message): print(" delivery_info:\n%s" % (pretty(message.delivery_info), )) message.ack() -#: Create a connection and a channel. -#: If hostname, userid, password and virtual_host is not specified -#: the values below are the default, but listed here so it can -#: be easily changed. -connection = BrokerConnection(hostname="localhost", - userid="guest", - password="guest", - virtual_host="/") -channel = connection.channel() - -#: Create consumer using our callback and queue. -#: Second argument can also be a list to consume from -#: any number of queues. -consumer = Consumer(channel, queue, callbacks=[handle_message]) -consumer.consume() - -#: This waits for a single event. Note that this event may not -#: be a message, or a message that is to be delivered to the consumers -#: channel, but any event received on the connection. -connection.drain_events() +with Connection("amqp://guest:guest@localhost:5672//") as connection: + #: Create consumer using our callback and queue. + #: Second argument can also be a list to consume from + #: any number of queues. + with connection.Consumer(queue, callbacks=[handle_message]): + #: This waits for a single event. Note that this event may not + #: be a message, or a message that is to be delivered to the consumers + #: channel, but any event received on the connection. + connection.drain_events(timeout=10) diff --git a/examples/complete_receive_manual.py b/examples/complete_receive_manual.py new file mode 100644 index 00000000..bf618e46 --- /dev/null +++ b/examples/complete_receive_manual.py @@ -0,0 +1,45 @@ +""" +Example of simple consumer that waits for a single message, acknowledges it +and exits. +""" + +from kombu import BrokerConnection, Exchange, Queue, Consumer +from pprint import pformat + +#: By default messages sent to exchanges are persistent (delivery_mode=2), +#: and queues and exchanges are durable. +exchange = Exchange("kombu_demo", type="direct") +queue = Queue("kombu_demo", exchange, routing_key="kombu_demo") + + +def pretty(obj): + return pformat(obj, indent=4) + + +#: This is the callback applied when a message is received. +def handle_message(body, message): + print("Received message: %r" % (body, )) + print(" properties:\n%s" % (pretty(message.properties), )) + print(" delivery_info:\n%s" % (pretty(message.delivery_info), )) + message.ack() + +#: Create a connection and a channel. +#: If hostname, userid, password and virtual_host is not specified +#: the values below are the default, but listed here so it can +#: be easily changed. +connection = BrokerConnection(hostname="localhost", + userid="guest", + password="guest", + virtual_host="/") +channel = connection.channel() + +#: Create consumer using our callback and queue. +#: Second argument can also be a list to consume from +#: any number of queues. +consumer = Consumer(channel, queue, callbacks=[handle_message]) +consumer.consume() + +#: This waits for a single event. Note that this event may not +#: be a message, or a message that is to be delivered to the consumers +#: channel, but any event received on the connection. +connection.drain_events() diff --git a/examples/complete_send.py b/examples/complete_send.py index 9e3b3252..faa6a1a1 100644 --- a/examples/complete_send.py +++ b/examples/complete_send.py @@ -5,8 +5,9 @@ Example producer that sends a single message and exits. You can use `complete_receive.py` to receive the message sent. """ +from __future__ import with_statement -from kombu import BrokerConnection, Exchange, Queue, Producer +from kombu import Connection, Exchange, Queue #: By default messages sent to exchanges are persistent (delivery_mode=2), #: and queues and exchanges are durable. @@ -14,22 +15,18 @@ exchange = Exchange("kombu_demo", type="direct") queue = Queue("kombu_demo", exchange, routing_key="kombu_demo") -#: Create connection and channel. -#: If hostname, userid, password and virtual_host is not specified -#: the values below are the default, but listed here so it can -#: be easily changed. -connection = BrokerConnection(hostname="localhost", - userid="guest", - password="guest", - virtual_host="/") -channel = connection.channel() +with Connection("amqp://guest:guest@localhost:5672//") as connection: -#: Producers are used to publish messages. -#: Routing keys can also be specifed as an argument to `publish`. -producer = Producer(channel, exchange, routing_key="kombu_demo") + #: Producers are used to publish messages. + #: a default exchange and routing key can also be specifed + #: as arguments the Producer, but we rather specify this explicitly + #: at the publish call. + producer = connection.Producer() -#: Publish the message using the json serializer (which is the default), -#: and zlib compression. The kombu consumer will automatically detect -#: encoding, serializiation and compression used and decode accordingly. -producer.publish({"hello": "world"}, serializer="json", - compression="zlib") + #: Publish the message using the json serializer (which is the default), + #: and zlib compression. The kombu consumer will automatically detect + #: encoding, serializiation and compression used and decode accordingly. + producer.publish({"hello": "world"}, + exchange=exchange, + routing_key="kombu_demo", + serializer="json", compression="zlib") diff --git a/examples/complete_send_manual.py b/examples/complete_send_manual.py new file mode 100644 index 00000000..9e3b3252 --- /dev/null +++ b/examples/complete_send_manual.py @@ -0,0 +1,35 @@ +""" + +Example producer that sends a single message and exits. + +You can use `complete_receive.py` to receive the message sent. + +""" + +from kombu import BrokerConnection, Exchange, Queue, Producer + +#: By default messages sent to exchanges are persistent (delivery_mode=2), +#: and queues and exchanges are durable. +exchange = Exchange("kombu_demo", type="direct") +queue = Queue("kombu_demo", exchange, routing_key="kombu_demo") + + +#: Create connection and channel. +#: If hostname, userid, password and virtual_host is not specified +#: the values below are the default, but listed here so it can +#: be easily changed. +connection = BrokerConnection(hostname="localhost", + userid="guest", + password="guest", + virtual_host="/") +channel = connection.channel() + +#: Producers are used to publish messages. +#: Routing keys can also be specifed as an argument to `publish`. +producer = Producer(channel, exchange, routing_key="kombu_demo") + +#: Publish the message using the json serializer (which is the default), +#: and zlib compression. The kombu consumer will automatically detect +#: encoding, serializiation and compression used and decode accordingly. +producer.publish({"hello": "world"}, serializer="json", + compression="zlib") diff --git a/examples/simple_eventlet_receive.py b/examples/simple_eventlet_receive.py index 0b10d7a6..cce518cc 100644 --- a/examples/simple_eventlet_receive.py +++ b/examples/simple_eventlet_receive.py @@ -22,10 +22,7 @@ def wait_many(timeout=1): #: If hostname, userid, password and virtual_host is not specified #: the values below are the default, but listed here so it can #: be easily changed. - connection = BrokerConnection(hostname="localhost", - userid="guest", - password="guest", - virtual_host="/") + connection = BrokerConnection("amqp://guest:guest@localhost:5672//") #: SimpleQueue mimics the interface of the Python Queue module. #: First argument can either be a queue name or a kombu.Queue object. @@ -42,5 +39,8 @@ def wait_many(timeout=1): spawn(message.ack) print(message.payload) + queue.close() + connection.close() + spawn(wait_many).wait() diff --git a/examples/simple_eventlet_send.py b/examples/simple_eventlet_send.py index 325a99cc..3d0641e1 100644 --- a/examples/simple_eventlet_send.py +++ b/examples/simple_eventlet_send.py @@ -19,10 +19,7 @@ def send_many(n): #: If hostname, userid, password and virtual_host is not specified #: the values below are the default, but listed here so it can #: be easily changed. - connection = BrokerConnection(hostname="localhost", - userid="guest", - password="guest", - virtual_host="/") + connection = BrokerConnection("amqp://guest:guest@localhost:5672//") #: SimpleQueue mimics the interface of the Python Queue module. #: First argument can either be a queue name or a kombu.Queue object. @@ -38,5 +35,8 @@ def send_many(n): pool.spawn(send_message, i) pool.waitall() + queue.close() + connection.close() + if __name__ == "__main__": send_many(10) diff --git a/examples/simple_receive.py b/examples/simple_receive.py index 6be741c4..9d53aa35 100644 --- a/examples/simple_receive.py +++ b/examples/simple_receive.py @@ -2,27 +2,27 @@ Example receiving a message using the SimpleQueue interface. """ +from __future__ import with_statement + from kombu import BrokerConnection #: Create connection #: If hostname, userid, password and virtual_host is not specified #: the values below are the default, but listed here so it can #: be easily changed. -connection = BrokerConnection(hostname="localhost", - userid="guest", - password="guest", - virtual_host="/") +with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn: + #: SimpleQueue mimics the interface of the Python Queue module. + #: First argument can either be a queue name or a kombu.Queue object. + #: If a name, then the queue will be declared with the name as the queue + #: name, exchange name and routing key. + with conn.SimpleQueue("kombu_demo") as queue: + message = queue.get(block=True, timeout=10) + message.ack() + print(message.payload) -#: SimpleQueue mimics the interface of the Python Queue module. -#: First argument can either be a queue name or a kombu.Queue object. -#: If a name, then the queue will be declared with the name as the queue -#: name, exchange name and routing key. -queue = connection.SimpleQueue("kombu_demo") -message = queue.get(block=True, timeout=10) -message.ack() -print(message.payload) - -#: Always remember to close connections and channels. -queue.close() -connection.close() +#### +#: If you don't use the with statement then you must aways +# remember to close objects after use: +# queue.close() +# connection.close() diff --git a/examples/simple_send.py b/examples/simple_send.py index d0f1382e..76cdb9b5 100644 --- a/examples/simple_send.py +++ b/examples/simple_send.py @@ -6,6 +6,7 @@ You can use `simple_receive.py` (or `complete_receive.py`) to receive the message sent. """ +from __future__ import with_statement from kombu import BrokerConnection @@ -13,19 +14,18 @@ from kombu import BrokerConnection #: If hostname, userid, password and virtual_host is not specified #: the values below are the default, but listed here so it can #: be easily changed. -connection = BrokerConnection(hostname="localhost", - userid="guest", - password="guest", - virtual_host="/") +with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn: + + #: SimpleQueue mimics the interface of the Python Queue module. + #: First argument can either be a queue name or a kombu.Queue object. + #: If a name, then the queue will be declared with the name as the queue + #: name, exchange name and routing key. + with conn.SimpleQueue("kombu_demo") as queue: + queue.put({"hello": "world"}, serializer="json", compression="zlib") -#: SimpleQueue mimics the interface of the Python Queue module. -#: First argument can either be a queue name or a kombu.Queue object. -#: If a name, then the queue will be declared with the name as the queue -#: name, exchange name and routing key. -queue = connection.SimpleQueue("kombu_demo") -queue.put({"hello": "world"}, serializer="json", compression="zlib") - -# Always remember to close channels and connections. -queue.close() -connection.close() +##### +# If you don't use the with statement, you must always +# remember to close objects. +# queue.close() +# connection.close()