Merge branch 'hvnsweeting/master'

This commit is contained in:
Ask Solem 2012-12-10 15:41:04 +00:00
commit 40ee6a1933
2 changed files with 56 additions and 20 deletions

View File

@ -0,0 +1,36 @@
.. _guide-hello-world:
==================
Hello world uses simple interface
==================
Below example uses simple interface to send helloworld message through
message broker (rabbitmq) and print received message
:file:`simple_publisher.py`:
.. code-block:: python
from __future__ import with_statement
from kombu import Connection
import datetime
with Connection('amqp://guest:guest@localhost:5672//') as conn:
simple_queue = conn.SimpleQueue('simple_queue')
message = 'helloword, sent at %s' % datetime.datetime.today()
simple_queue.put(message)
print('Sent: %s' % message)
simple_queue.close()
:file:`simple_consumer.py`:
.. code-block:: python
from __future__ import with_statement
from kombu import Connection
with Connection('amqp://guest:guest@localhost:5672//') as conn:
simple_queue = conn.SimpleQueue('simple_queue')
message = simple_queue.get(block=True, timeout=1)
print("Received: %s" % message.payload)
message.ack()
simple_queue.close()

View File

@ -50,23 +50,22 @@ Here is an example using the :class:`~kombu.simple.SimpleQueue` class
to produce and consume logging messages:
.. code-block:: python
from __future__ import with_statement
from socket import gethostname
import socket
import datetime
from time import time
from kombu import Connection
class Logger(object):
def __init__(self, connection, queue_name='log_queue',
serializer='json', compression=None):
self.queue = connection.SimpleQueue(self.queue_name)
self.queue = connection.SimpleQueue(queue_name)
self.serializer = serializer
self.compression = compression
def log(self, message, level='INFO', context={}):
self.queue.put({'message': message,
'level': level,
@ -75,40 +74,41 @@ to produce and consume logging messages:
'timestamp': time()},
serializer=self.serializer,
compression=self.compression)
def process(self, callback, n=1, timeout=1):
for i in xrange(n):
log_message = self.queue.get(block=True, timeout=1)
entry = log_message.payload # deserialized data.
callback(entry)
log_message.ack() # remove message from queue
def close(self):
self.queue.close()
if __name__ == '__main__':
from contextlib import closing
with Connection('amqp://guest:guest@localhost:5672//') as conn:
with closing(Logger(connection)) as logger:
with closing(Logger(conn)) as logger:
# Send message
logger.log('Error happened while encoding video',
level='ERROR',
context={'filename': 'cutekitten.mpg'})
# Consume and process message
# This is the callback called when a log message is
# received.
def dump_entry(entry):
date = datetime.fromtimestamp(entry['timestamp'])
date = datetime.datetime.fromtimestamp(entry['timestamp'])
print('[%s %s %s] %s %r' % (date,
entry['hostname'],
entry['level'],
entry['message'],
entry['context']))
# Process a single message using the callback above.
logger.process(dump_entry, n=1)