From 7f68477c20f395310de3e8af8a2ed7dcab56e285 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Thu, 22 Jul 2010 16:10:26 +0200 Subject: [PATCH] Pika backend is working --- kombu/backends/pypika.py | 77 +++++++++++++++++++++++++++------------- 1 file changed, 52 insertions(+), 25 deletions(-) diff --git a/kombu/backends/pypika.py b/kombu/backends/pypika.py index 4b045096..5422660f 100644 --- a/kombu/backends/pypika.py +++ b/kombu/backends/pypika.py @@ -2,10 +2,14 @@ import weakref import functools import itertools -import pika +from pika import asyncore_adapter +from pika import blocking_adapter from pika import channel +from pika import connection +from pika import exceptions +from pika.spec import Basic, BasicProperties -from carrot.backends.base import BaseMessage, BaseBackend +from kombu.backends.base import BaseMessage, BaseBackend DEFAULT_PORT = 5672 @@ -13,9 +17,6 @@ DEFAULT_PORT = 5672 class Message(BaseMessage): def __init__(self, channel, amqp_message, **kwargs): - self.channel = channel - self._amqp_message = amqp_message - channel_id, method, header, body = amqp_message kwargs.update({"body": body, @@ -34,15 +35,22 @@ class Message(BaseMessage): class Channel(channel.Channel): Message = Message + def basic_get(self, queue, no_ack): + m = channel.Channel.basic_get(self, queue=queue, no_ack=no_ack) + if isinstance(m, Basic.GetEmpty): + return + return m + def basic_publish(self, message, exchange, routing_key, mandatory=False, immediate=False): message_data, properties = message - return channel.Channel.basic_publish(self, exchange, - routing_key, - message_data, - properties, - mandatory, - immediate) + return channel.Channel.basic_publish(self, + exchange, + routing_key, + message_data, + properties, + mandatory, + immediate) def basic_consume(self, queue, no_ack=False, consumer_tag=None, callback=None, nowait=False): @@ -58,25 +66,28 @@ class Channel(channel.Channel): content_type=None, content_encoding=None, headers=None, properties=None): """Encapsulate data into a AMQP message.""" - properties = pika.BasicProperties(priority=priority, - content_type=content_type, - content_encoding=content_encoding, - headers=headers, - **properties) + properties = BasicProperties(priority=priority, + content_type=content_type, + content_encoding=content_encoding, + headers=headers, + **properties) return message_data, properties def message_to_python(self, raw_message): """Convert encoded message body back to a Python value.""" return self.Message(channel=self, amqp_message=raw_message) + def basic_ack(self, delivery_tag): + return channel.Channel.basic_ack(self, delivery_tag) -class BlockingConnection(pika.BlockingConnection): + +class BlockingConnection(blocking_adapter.BlockingConnection): def channel(self): return Channel(channel.ChannelHandler(self)) -class AsyncoreConnection(pika.AsyncoreConnection): +class AsyncoreConnection(asyncore_adapter.AsyncoreConnection): def channel(self): return Channel(channel.ChannelHandler(self)) @@ -85,6 +96,22 @@ class AsyncoreConnection(pika.AsyncoreConnection): class SyncBackend(BaseBackend): default_port = DEFAULT_PORT + connection_errors = (exceptions.ConnectionClosed, + exceptions.ChannelClosed, + exceptions.LoginError, + exceptions.NoFreeChannels, + exceptions.DuplicateConsumerTag, + exceptions.UnknownConsumerTag, + exceptions.RecursiveOperationDetected, + exceptions.ContentTransmissionForbidden, + exceptions.ProtocolSyntaxError) + + channel_errors = (exceptions.ChannelClosed, + exceptions.DuplicateConsumerTag, + exceptions.UnknownConsumerTag, + exceptions.ProtocolSyntaxError) + + Message = Message Connection = BlockingConnection @@ -110,13 +137,13 @@ class SyncBackend(BaseBackend): if not conninfo.port: conninfo.port = self.default_port - credentials = pika.PlainCredentials(conninfo.userid, - conninfo.password) - return self.Connection(pika.ConnectionParameters( - conninfo.hostname, - port=conninfo.port, - virtual_host=conninfo.virtual_host, - credentials=credentials)) + credentials = connection.PlainCredentials(conninfo.userid, + conninfo.password) + return self.Connection(connection.ConnectionParameters( + conninfo.hostname, + port=conninfo.port, + virtual_host=conninfo.virtual_host, + credentials=credentials)) def close_connection(self, connection): """Close the AMQP broker connection."""