Pika backend is working

This commit is contained in:
Ask Solem 2010-07-22 16:10:26 +02:00
parent d380f7cea8
commit 7f68477c20
1 changed files with 52 additions and 25 deletions

View File

@ -2,10 +2,14 @@ import weakref
import functools import functools
import itertools import itertools
import pika from pika import asyncore_adapter
from pika import blocking_adapter
from pika import channel from pika import channel
from pika import connection
from pika import exceptions
from pika.spec import Basic, BasicProperties
from carrot.backends.base import BaseMessage, BaseBackend from kombu.backends.base import BaseMessage, BaseBackend
DEFAULT_PORT = 5672 DEFAULT_PORT = 5672
@ -13,9 +17,6 @@ DEFAULT_PORT = 5672
class Message(BaseMessage): class Message(BaseMessage):
def __init__(self, channel, amqp_message, **kwargs): def __init__(self, channel, amqp_message, **kwargs):
self.channel = channel
self._amqp_message = amqp_message
channel_id, method, header, body = amqp_message channel_id, method, header, body = amqp_message
kwargs.update({"body": body, kwargs.update({"body": body,
@ -34,15 +35,22 @@ class Message(BaseMessage):
class Channel(channel.Channel): class Channel(channel.Channel):
Message = Message Message = Message
def basic_get(self, queue, no_ack):
m = channel.Channel.basic_get(self, queue=queue, no_ack=no_ack)
if isinstance(m, Basic.GetEmpty):
return
return m
def basic_publish(self, message, exchange, routing_key, mandatory=False, def basic_publish(self, message, exchange, routing_key, mandatory=False,
immediate=False): immediate=False):
message_data, properties = message message_data, properties = message
return channel.Channel.basic_publish(self, exchange, return channel.Channel.basic_publish(self,
routing_key, exchange,
message_data, routing_key,
properties, message_data,
mandatory, properties,
immediate) mandatory,
immediate)
def basic_consume(self, queue, no_ack=False, consumer_tag=None, def basic_consume(self, queue, no_ack=False, consumer_tag=None,
callback=None, nowait=False): callback=None, nowait=False):
@ -58,25 +66,28 @@ class Channel(channel.Channel):
content_type=None, content_encoding=None, headers=None, content_type=None, content_encoding=None, headers=None,
properties=None): properties=None):
"""Encapsulate data into a AMQP message.""" """Encapsulate data into a AMQP message."""
properties = pika.BasicProperties(priority=priority, properties = BasicProperties(priority=priority,
content_type=content_type, content_type=content_type,
content_encoding=content_encoding, content_encoding=content_encoding,
headers=headers, headers=headers,
**properties) **properties)
return message_data, properties return message_data, properties
def message_to_python(self, raw_message): def message_to_python(self, raw_message):
"""Convert encoded message body back to a Python value.""" """Convert encoded message body back to a Python value."""
return self.Message(channel=self, amqp_message=raw_message) return self.Message(channel=self, amqp_message=raw_message)
def basic_ack(self, delivery_tag):
return channel.Channel.basic_ack(self, delivery_tag)
class BlockingConnection(pika.BlockingConnection):
class BlockingConnection(blocking_adapter.BlockingConnection):
def channel(self): def channel(self):
return Channel(channel.ChannelHandler(self)) return Channel(channel.ChannelHandler(self))
class AsyncoreConnection(pika.AsyncoreConnection): class AsyncoreConnection(asyncore_adapter.AsyncoreConnection):
def channel(self): def channel(self):
return Channel(channel.ChannelHandler(self)) return Channel(channel.ChannelHandler(self))
@ -85,6 +96,22 @@ class AsyncoreConnection(pika.AsyncoreConnection):
class SyncBackend(BaseBackend): class SyncBackend(BaseBackend):
default_port = DEFAULT_PORT default_port = DEFAULT_PORT
connection_errors = (exceptions.ConnectionClosed,
exceptions.ChannelClosed,
exceptions.LoginError,
exceptions.NoFreeChannels,
exceptions.DuplicateConsumerTag,
exceptions.UnknownConsumerTag,
exceptions.RecursiveOperationDetected,
exceptions.ContentTransmissionForbidden,
exceptions.ProtocolSyntaxError)
channel_errors = (exceptions.ChannelClosed,
exceptions.DuplicateConsumerTag,
exceptions.UnknownConsumerTag,
exceptions.ProtocolSyntaxError)
Message = Message Message = Message
Connection = BlockingConnection Connection = BlockingConnection
@ -110,13 +137,13 @@ class SyncBackend(BaseBackend):
if not conninfo.port: if not conninfo.port:
conninfo.port = self.default_port conninfo.port = self.default_port
credentials = pika.PlainCredentials(conninfo.userid, credentials = connection.PlainCredentials(conninfo.userid,
conninfo.password) conninfo.password)
return self.Connection(pika.ConnectionParameters( return self.Connection(connection.ConnectionParameters(
conninfo.hostname, conninfo.hostname,
port=conninfo.port, port=conninfo.port,
virtual_host=conninfo.virtual_host, virtual_host=conninfo.virtual_host,
credentials=credentials)) credentials=credentials))
def close_connection(self, connection): def close_connection(self, connection):
"""Close the AMQP broker connection.""" """Close the AMQP broker connection."""