Adds ConsumerProducerMixin (Issue #530)

This commit is contained in:
Ask Solem 2015-10-21 18:17:50 -07:00
parent db9e2f563a
commit f8d2a45175
2 changed files with 63 additions and 26 deletions

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python
from kombu import Connection, Producer, Queue
from kombu.mixins import ConsumerMixin
from kombu.mixins import ConsumerProducerMixin
rpc_queue = Queue('rpc_queue')
@ -15,8 +15,7 @@ def fib(n):
return fib(n-1) + fib(n-2)
class Worker(ConsumerMixin):
_producer_connection = None
class Worker(ConsumerProducerMixin):
def __init__(self, connection):
self.connection = connection
@ -34,29 +33,15 @@ class Worker(ConsumerMixin):
print(' [.] fib({0})'.format(n))
result = fib(n)
with Producer(self.producer_connection) as producer:
producer.publish(
{'result': result},
exchange='', routing_key=message.properties['reply_to'],
correlation_id=message.properties['correlation_id'],
serializer='json',
)
self.producer.publish(
{'result': result},
exchange='', routing_key=message.properties['reply_to'],
correlation_id=message.properties['correlation_id'],
serializer='json',
retry=True,
)
message.ack()
def on_consume_end(self, connection, channel):
if self._producer_connection is not None:
self._producer_connection.close()
self._producer_connection = None
@property
def producer_connection(self):
if self._producer_connection is None:
conn = self.connection.clone()
conn.ensure_connection(self.on_connection_error,
self.connect_max_retries)
self._producer_connection = conn
return self._producer_connection
def start_worker(broker_url):
connection = Connection(broker_url)
@ -66,4 +51,7 @@ def start_worker(broker_url):
if __name__ == '__main__':
start_worker('pyamqp://')
try:
start_worker('pyamqp://')
except KeyboardInterrupt:
pass

View File

@ -17,7 +17,7 @@ from time import sleep
from .common import ignore_errors
from .five import range
from .messaging import Consumer
from .messaging import Consumer, Producer
from .log import get_logger
from .utils import cached_property, nested
from .utils.encoding import safe_repr
@ -255,3 +255,52 @@ class ConsumerMixin(object):
@cached_property
def channel_errors(self):
return self.connection.channel_errors
class ConsumerProducerMixin(ConsumerMixin):
"""Version of ConsumerMixin having separate connection for also
publishing messages.
Example:
.. code-block:: python
class Worker(ConsumerProducerMixin):
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
return [Consumer(queues=Queue('foo'),
on_message=self.handle_message,
accept='application/json',
prefetch_count=10)]
def handle_message(self, message):
self.producer.publish(
{'message': 'hello to you'},
exchange='',
routing_key=message.properties['reply_to'],
correlation_id=message.properties['correlation_id'],
retry=True,
)
"""
_producer_connection = None
def on_consume_end(self, connection, channel):
if self._producer_connection is not None:
self._producer_connection.close()
self._producer_connection = None
@property
def producer(self):
return Producer(self.producer_connection)
@property
def producer_connection(self):
if self._producer_connection is None:
conn = self.connection.clone()
conn.ensure_connection(self.on_connection_error,
self.connect_max_retries)
self._producer_connection = conn
return self._producer_connection