Using kamqp

This commit is contained in:
Ask Solem 2011-11-10 15:17:18 +00:00
parent f27186987a
commit d368ffa983
8 changed files with 62 additions and 243 deletions

View File

@ -11,7 +11,6 @@ Exchange and Queue declarations.
from __future__ import absolute_import
from .abstract import MaybeChannelBound
from .syn import blocking as _SYN
TRANSIENT_DELIVERY_MODE = 1
PERSISTENT_DELIVERY_MODE = 2
@ -150,12 +149,12 @@ class Exchange(MaybeChannelBound):
response will not be waited for. Default is :const:`False`.
"""
return _SYN(self.channel.exchange_declare, exchange=self.name,
type=self.type,
durable=self.durable,
auto_delete=self.auto_delete,
arguments=self.arguments,
nowait=nowait)
return self.channel.exchange_declare(exchange=self.name,
type=self.type,
durable=self.durable,
auto_delete=self.auto_delete,
arguments=self.arguments,
nowait=nowait)
def Message(self, body, delivery_mode=None, priority=None,
content_type=None, content_encoding=None, properties=None,
@ -224,9 +223,9 @@ class Exchange(MaybeChannelBound):
response will not be waited for. Default is :const:`False`.
"""
return _SYN(self.channel.exchange_delete, exchange=self.name,
if_unused=if_unused,
nowait=nowait)
return self.channel.exchange_delete(exchange=self.name,
if_unused=if_unused,
nowait=nowait)
def __eq__(self, other):
if isinstance(other, Exchange):
@ -396,13 +395,13 @@ class Queue(MaybeChannelBound):
without modifying the server state.
"""
ret = _SYN(self.channel.queue_declare, queue=self.name,
passive=passive,
durable=self.durable,
exclusive=self.exclusive,
auto_delete=self.auto_delete,
arguments=self.queue_arguments,
nowait=nowait)
ret = self.channel.queue_declare(queue=self.name,
passive=passive,
durable=self.durable,
exclusive=self.exclusive,
auto_delete=self.auto_delete,
arguments=self.queue_arguments,
nowait=nowait)
if not self.name:
self.name = ret[0]
return ret
@ -413,11 +412,11 @@ class Queue(MaybeChannelBound):
:keyword nowait: Do not wait for a reply.
"""
return _SYN(self.channel.queue_bind, queue=self.name,
exchange=self.exchange.name,
routing_key=self.routing_key,
arguments=self.binding_arguments,
nowait=nowait)
return self.channel.queue_bind(queue=self.name,
exchange=self.exchange.name,
routing_key=self.routing_key,
arguments=self.binding_arguments,
nowait=nowait)
def get(self, no_ack=None):
"""Poll the server for a new message.
@ -434,14 +433,14 @@ class Queue(MaybeChannelBound):
is more important than performance.
"""
message = _SYN(self.channel.basic_get, queue=self.name, no_ack=no_ack)
message = self.channel.basic_get(queue=self.name, no_ack=no_ack)
if message is not None:
return self.channel.message_to_python(message)
def purge(self, nowait=False):
"""Remove all messages from the queue."""
return _SYN(self.channel.queue_purge, queue=self.name,
nowait=nowait) or 0
return self.channel.queue_purge(queue=self.name,
nowait=nowait) or 0
def consume(self, consumer_tag='', callback=None, no_ack=None,
nowait=False):
@ -488,17 +487,17 @@ class Queue(MaybeChannelBound):
:keyword nowait: Do not wait for a reply.
"""
return _SYN(self.channel.queue_delete, queue=self.name,
if_unused=if_unused,
if_empty=if_empty,
nowait=nowait)
return self.channel.queue_delete(queue=self.name,
if_unused=if_unused,
if_empty=if_empty,
nowait=nowait)
def unbind(self):
"""Delete the binding on the server."""
return _SYN(self.channel.queue_unbind, queue=self.name,
exchange=self.exchange.name,
routing_key=self.routing_key,
arguments=self.binding_arguments)
return self.channel.queue_unbind(queue=self.name,
exchange=self.exchange.name,
routing_key=self.routing_key,
arguments=self.binding_arguments)
def __eq__(self, other):
if isinstance(other, Queue):

View File

@ -15,7 +15,6 @@ from itertools import count
from . import entity
from .compression import compress
from .serialization import encode
from .syn import blocking as _SYN
from .utils import maybe_list
__all__ = ["Exchange", "Queue", "Producer", "Consumer"]
@ -225,7 +224,7 @@ class Consumer(object):
:keyword on_decode_error: see :attr:`on_decode_error`.
"""
#: The connection channel to use.
#: The connection/channel to use for this consumer.
channel = None
#: A single :class:`~kombu.entity.Queue`, or a list of queues to
@ -410,9 +409,9 @@ class Consumer(object):
Currently not supported by RabbitMQ.
"""
return _SYN(self.channel.basic_qos, prefetch_size,
prefetch_count,
apply_global)
return self.channel.basic_qos(prefetch_size,
prefetch_count,
apply_global)
def recover(self, requeue=False):
"""Redeliver unacknowledged messages.
@ -426,7 +425,7 @@ class Consumer(object):
delivering it to an alternative subscriber.
"""
return _SYN(self.channel.basic_recover, requeue=requeue)
return self.channel.basic_recover(requeue=requeue)
def receive(self, body, message):
"""Method called when a message is received.
@ -472,6 +471,7 @@ class Consumer(object):
message = self.channel.message_to_python(raw_message)
decoded = message.payload
except Exception, exc:
raise
if not self.on_decode_error:
raise
self.on_decode_error(message, exc)

View File

@ -93,6 +93,11 @@ class ConsumerMixin(LogMixin):
Also keyword arguments to ``consume`` are forwarded
to this handler.
* :meth:`on_consume_end`
Handler called after the consumers are cancelled.
Takes arguments ``(connection, channel)``.
* :meth:`on_iteration`
Handler called for every iteration while draining
@ -132,6 +137,9 @@ class ConsumerMixin(LogMixin):
def on_consume_ready(self, connection, channel, consumers, **kwargs):
pass
def on_consume_end(self, connection, channel):
pass
def on_iteration(self):
pass
@ -168,7 +176,6 @@ class ConsumerMixin(LogMixin):
for i in limit and xrange(limit) or count():
if self.should_stop:
break
self.debug("DRAIN EVENTS")
self.on_iteration()
try:
connection.drain_events(timeout=safety_interval)
@ -212,6 +219,7 @@ class ConsumerMixin(LogMixin):
with self._consume_from(*self.get_consumers(cls, channel)) as c:
yield conn, channel, c
self.debug("Consumers cancelled")
self.on_consume_end(connection, channel)
self.debug("Connection closed")
@contextmanager

View File

@ -2,8 +2,6 @@
kombu.syn
=========
Thread synchronization.
:copyright: (c) 2009 - 2011 by Ask Solem.
:license: BSD, see LICENSE for more details.
@ -12,57 +10,11 @@ from __future__ import absolute_import
import sys
__all__ = ["blocking", "select_blocking_method", "detect_environment"]
#: current blocking method
__sync_current = None
__all__ = ["blocking", "detect_environment"]
def blocking(fun, *args, **kwargs):
"""Make sure function is called by blocking and waiting for the result,
even if we're currently in a monkey patched eventlet/gevent
environment."""
if __sync_current is None:
select_blocking_method(detect_environment())
return __sync_current(fun, *args, **kwargs)
def select_blocking_method(type):
"""Select blocking method, where `type` is one of default
gevent or eventlet."""
global __sync_current
__sync_current = {"eventlet": _sync_eventlet,
"gevent": _sync_gevent,
"default": _sync_default}[type]()
def _sync_default():
"""Create blocking primitive."""
def __blocking__(fun, *args, **kwargs):
return fun(*args, **kwargs)
return __blocking__
def _sync_eventlet():
"""Create Eventlet blocking primitive."""
from eventlet import spawn
def __eblocking__(fun, *args, **kwargs):
return spawn(fun, *args, **kwargs).wait()
return __eblocking__
def _sync_gevent():
"""Create gevent blocking primitive."""
from gevent import Greenlet
def __gblocking__(fun, *args, **kwargs):
return Greenlet.spawn(fun, *args, **kwargs).get()
return __gblocking__
return fun(*args, **kwargs)
def detect_environment():

View File

@ -2,7 +2,7 @@
kombu.transport.amqplib
=======================
amqplib transport.
kamqp transport.
:copyright: (c) 2009 - 2011 by Ask Solem.
:license: BSD, see LICENSE for more details.
@ -12,139 +12,16 @@ from __future__ import absolute_import
import socket
try:
from ssl import SSLError
except ImportError:
class SSLError(Exception): # noqa
pass
from amqplib import client_0_8 as amqp
from amqplib.client_0_8 import transport
from amqplib.client_0_8.channel import Channel as _Channel
from amqplib.client_0_8.exceptions import AMQPConnectionException
from amqplib.client_0_8.exceptions import AMQPChannelException
from kamqp import client_0_8 as amqp
from kamqp.client_0_8.channel import Channel as _Channel
from kamqp.client_0_8.exceptions import AMQPConnectionError
from kamqp.client_0_8.exceptions import AMQPChannelError
from . import base
from ..utils.encoding import str_to_bytes
DEFAULT_PORT = 5672
# amqplib's handshake mistakenly identifies as protocol version 1191,
# this breaks in RabbitMQ tip, which no longer falls back to
# 0-8 for unknown ids.
transport.AMQP_PROTOCOL_HEADER = str_to_bytes("AMQP\x01\x01\x08\x00")
class Connection(amqp.Connection): # pragma: no cover
def _dispatch_basic_return(self, channel, args, msg):
reply_code = args.read_short()
reply_text = args.read_shortstr()
exchange = args.read_shortstr()
routing_key = args.read_shortstr()
exc = AMQPChannelException(reply_code, reply_text, (50, 60))
if channel.events["basic_return"]:
for callback in channel.events["basic_return"]:
callback(exc, exchange, routing_key, msg)
else:
raise exc
def __init__(self, *args, **kwargs):
super(Connection, self).__init__(*args, **kwargs)
self._method_override = {(60, 50): self._dispatch_basic_return}
def drain_events(self, allowed_methods=None, timeout=None):
"""Wait for an event on any channel."""
return self.wait_multi(self.channels.values(), timeout=timeout)
def wait_multi(self, channels, allowed_methods=None, timeout=None):
"""Wait for an event on a channel."""
chanmap = dict((chan.channel_id, chan) for chan in channels)
chanid, method_sig, args, content = self._wait_multiple(
chanmap.keys(), allowed_methods, timeout=timeout)
channel = chanmap[chanid]
if content \
and channel.auto_decode \
and hasattr(content, 'content_encoding'):
try:
content.body = content.body.decode(content.content_encoding)
except Exception:
pass
amqp_method = self._method_override.get(method_sig) or \
channel._METHOD_MAP.get(method_sig, None)
if amqp_method is None:
raise Exception('Unknown AMQP method (%d, %d)' % method_sig)
if content is None:
return amqp_method(channel, args)
else:
return amqp_method(channel, args, content)
def read_timeout(self, timeout=None):
if timeout is None:
return self.method_reader.read_method()
sock = self.transport.sock
prev = sock.gettimeout()
sock.settimeout(timeout)
try:
try:
return self.method_reader.read_method()
except SSLError, exc:
# http://bugs.python.org/issue10272
if "timed out" in str(exc):
raise socket.timeout()
raise
finally:
sock.settimeout(prev)
def _wait_multiple(self, channel_ids, allowed_methods, timeout=None):
for channel_id in channel_ids:
method_queue = self.channels[channel_id].method_queue
for queued_method in method_queue:
method_sig = queued_method[0]
if (allowed_methods is None) \
or (method_sig in allowed_methods) \
or (method_sig == (20, 40)):
method_queue.remove(queued_method)
method_sig, args, content = queued_method
return channel_id, method_sig, args, content
# Nothing queued, need to wait for a method from the peer
read_timeout = self.read_timeout
channels = self.channels
wait = self.wait
while 1:
channel, method_sig, args, content = read_timeout(timeout)
if (channel in channel_ids) \
and ((allowed_methods is None) \
or (method_sig in allowed_methods) \
or (method_sig == (20, 40))):
return channel, method_sig, args, content
# Not the channel and/or method we were looking for. Queue
# this method for later
channels[channel].method_queue.append((method_sig, args, content))
#
# If we just queued up a method for channel 0 (the Connection
# itself) it's probably a close method in reaction to some
# error, so deal with it right away.
#
if channel == 0:
wait()
def channel(self, channel_id=None):
try:
return self.channels[channel_id]
except KeyError:
return Channel(self, channel_id)
class Message(base.Message):
"""A message received by the broker.
@ -178,16 +55,11 @@ class Message(base.Message):
class Channel(_Channel, base.StdChannel):
Message = Message
events = {"basic_return": []}
def __init__(self, *args, **kwargs):
self.no_ack_consumers = set()
super(Channel, self).__init__(*args, **kwargs)
def prepare_message(self, message_data, priority=None,
content_type=None, content_encoding=None, headers=None,
properties=None):
"""Encapsulate data into a AMQP message."""
"""Encapsulate data into an AMQP message."""
return amqp.Message(message_data, priority=priority,
content_type=content_type,
content_encoding=content_encoding,
@ -198,21 +70,9 @@ class Channel(_Channel, base.StdChannel):
"""Convert encoded message body back to a Python value."""
return self.Message(self, raw_message)
def close(self):
try:
super(Channel, self).close()
finally:
self.connection = None
def basic_consume(self, *args, **kwargs):
consumer_tag = super(Channel, self).basic_consume(*args, **kwargs)
if kwargs["no_ack"]:
self.no_ack_consumers.add(consumer_tag)
return consumer_tag
def basic_cancel(self, consumer_tag, **kwargs):
self.no_ack_consumers.discard(consumer_tag)
return super(Channel, self).basic_cancel(consumer_tag, **kwargs)
class Connection(amqp.Connection):
Channel = Channel
class Transport(base.Transport):
@ -222,12 +82,12 @@ class Transport(base.Transport):
# it's very annoying that amqplib sometimes raises AttributeError
# if the connection is lost, but nothing we can do about that here.
connection_errors = (AMQPConnectionException,
connection_errors = (AMQPConnectionError,
socket.error,
IOError,
OSError,
AttributeError)
channel_errors = (AMQPChannelException, )
channel_errors = (AMQPChannelError, )
def __init__(self, client, **kwargs):
self.client = client

View File

@ -1,2 +1,2 @@
anyjson>=0.3.1
amqplib>=1.0
kamqp

View File

@ -25,4 +25,4 @@ upload-dir = docs/.build/html
[bdist_rpm]
requires = anyjson >= 0.3.1
amqplib >= 1.0
kamqp

View File

@ -114,7 +114,7 @@ setup(
test_suite="nose.collector",
install_requires=[
'anyjson>=0.3.1',
'amqplib>=1.0',
'kamqp',
],
tests_require=tests_require,
classifiers=[