Implement kombu version of the RabbitMQ rpc-tut-6 example. Closes #530

This commit is contained in:
Ask Solem 2015-10-21 18:09:30 -07:00
parent f1899f46c5
commit db9e2f563a
2 changed files with 115 additions and 0 deletions

View File

@ -0,0 +1,46 @@
#!/usr/bin/env python
from kombu import Connection, Producer, Consumer, Queue
from kombu.utils import uuid
class FibonacciRpcClient(object):
def __init__(self, connection):
self.connection = connection
self.callback_queue = Queue(uuid(), exclusive=True, auto_delete=True)
def on_response(self, message):
if message.properties['correlation_id'] == self.correlation_id:
self.response = message.payload['result']
def call(self, n):
self.response = None
self.correlation_id = uuid()
with Producer(self.connection) as producer:
producer.publish(
{'n': n},
exchange='',
routing_key='rpc_queue',
declare=[self.callback_queue],
reply_to=self.callback_queue,
correlation_id=self.correlation_id,
)
with Consumer(self.connection,
on_message=self.on_response,
queues=[self.callback_queue], no_ack=True):
while self.response is None:
self.connection.drain_events()
return self.response
def main(broker_url):
connection = Connection(broker_url)
fibonacci_rpc = FibonacciRpcClient(connection)
print(' [x] Requesting fib(30)')
response = fibonacci_rpc.call(30)
print(' [.] Got {0!r}'.format(response))
if __name__ == '__main__':
main('pyamqp://')

View File

@ -0,0 +1,69 @@
#!/usr/bin/env python
from kombu import Connection, Producer, Queue
from kombu.mixins import ConsumerMixin
rpc_queue = Queue('rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
class Worker(ConsumerMixin):
_producer_connection = None
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
return [Consumer(
queues=[rpc_queue],
on_message=self.on_request,
accept={'application/json'},
prefetch_count=1,
)]
def on_request(self, message):
n = message.payload['n']
print(' [.] fib({0})'.format(n))
result = fib(n)
with Producer(self.producer_connection) as producer:
producer.publish(
{'result': result},
exchange='', routing_key=message.properties['reply_to'],
correlation_id=message.properties['correlation_id'],
serializer='json',
)
message.ack()
def on_consume_end(self, connection, channel):
if self._producer_connection is not None:
self._producer_connection.close()
self._producer_connection = None
@property
def producer_connection(self):
if self._producer_connection is None:
conn = self.connection.clone()
conn.ensure_connection(self.on_connection_error,
self.connect_max_retries)
self._producer_connection = conn
return self._producer_connection
def start_worker(broker_url):
connection = Connection(broker_url)
print " [x] Awaiting RPC requests"
worker = Worker(connection)
worker.run()
if __name__ == '__main__':
start_worker('pyamqp://')