kombu/examples/simple_task_queue/worker.py

39 lines
1.1 KiB
Python
Raw Normal View History

2011-09-12 09:25:42 +00:00
from __future__ import with_statement
from kombu.mixins import ConsumerMixin
2011-09-12 09:47:22 +00:00
from kombu.utils import kwdict, reprcall
2011-09-12 09:25:42 +00:00
from queues import task_queues
2011-09-12 09:47:22 +00:00
2011-09-12 09:25:42 +00:00
class Worker(ConsumerMixin):
2011-09-12 09:47:22 +00:00
def __init__(self, connection):
self.connection = connection
2011-09-12 09:25:42 +00:00
def get_consumers(self, Consumer, channel):
2011-09-12 09:47:22 +00:00
return [Consumer(queues=task_queues,
callbacks=[self.process_task])]
2011-09-12 09:25:42 +00:00
2011-09-12 09:47:22 +00:00
def process_task(self, body, message):
2011-09-12 09:25:42 +00:00
fun = body["fun"]
args = body["args"]
kwargs = body["kwargs"]
2011-09-12 09:47:22 +00:00
self.info("Got task: %s", reprcall(fun.__name__, args, kwargs))
try:
fun(*args, **kwdict(kwargs))
except Exception, exc:
self.error("task raised exception: %r", exc)
2011-09-12 09:25:42 +00:00
message.ack()
if __name__ == "__main__":
from kombu import BrokerConnection
from kombu.utils.debug import setup_logging
2011-09-12 09:25:42 +00:00
setup_logging(loglevel="INFO")
with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn:
2011-09-12 11:57:12 +00:00
try:
Worker(conn).run()
except KeyboardInterrupt:
print("bye bye")