diff --git a/kombu/entity.py b/kombu/entity.py index 34161c9b..1e1f769b 100644 --- a/kombu/entity.py +++ b/kombu/entity.py @@ -11,7 +11,6 @@ Exchange and Queue declarations. from __future__ import absolute_import from .abstract import MaybeChannelBound -from .syn import blocking as _SYN TRANSIENT_DELIVERY_MODE = 1 PERSISTENT_DELIVERY_MODE = 2 @@ -150,12 +149,12 @@ class Exchange(MaybeChannelBound): response will not be waited for. Default is :const:`False`. """ - return _SYN(self.channel.exchange_declare, exchange=self.name, - type=self.type, - durable=self.durable, - auto_delete=self.auto_delete, - arguments=self.arguments, - nowait=nowait) + return self.channel.exchange_declare(exchange=self.name, + type=self.type, + durable=self.durable, + auto_delete=self.auto_delete, + arguments=self.arguments, + nowait=nowait) def Message(self, body, delivery_mode=None, priority=None, content_type=None, content_encoding=None, properties=None, @@ -224,9 +223,9 @@ class Exchange(MaybeChannelBound): response will not be waited for. Default is :const:`False`. """ - return _SYN(self.channel.exchange_delete, exchange=self.name, - if_unused=if_unused, - nowait=nowait) + return self.channel.exchange_delete(exchange=self.name, + if_unused=if_unused, + nowait=nowait) def __eq__(self, other): if isinstance(other, Exchange): @@ -396,13 +395,13 @@ class Queue(MaybeChannelBound): without modifying the server state. """ - ret = _SYN(self.channel.queue_declare, queue=self.name, - passive=passive, - durable=self.durable, - exclusive=self.exclusive, - auto_delete=self.auto_delete, - arguments=self.queue_arguments, - nowait=nowait) + ret = self.channel.queue_declare(queue=self.name, + passive=passive, + durable=self.durable, + exclusive=self.exclusive, + auto_delete=self.auto_delete, + arguments=self.queue_arguments, + nowait=nowait) if not self.name: self.name = ret[0] return ret @@ -413,11 +412,11 @@ class Queue(MaybeChannelBound): :keyword nowait: Do not wait for a reply. """ - return _SYN(self.channel.queue_bind, queue=self.name, - exchange=self.exchange.name, - routing_key=self.routing_key, - arguments=self.binding_arguments, - nowait=nowait) + return self.channel.queue_bind(queue=self.name, + exchange=self.exchange.name, + routing_key=self.routing_key, + arguments=self.binding_arguments, + nowait=nowait) def get(self, no_ack=None): """Poll the server for a new message. @@ -434,14 +433,14 @@ class Queue(MaybeChannelBound): is more important than performance. """ - message = _SYN(self.channel.basic_get, queue=self.name, no_ack=no_ack) + message = self.channel.basic_get(queue=self.name, no_ack=no_ack) if message is not None: return self.channel.message_to_python(message) def purge(self, nowait=False): """Remove all messages from the queue.""" - return _SYN(self.channel.queue_purge, queue=self.name, - nowait=nowait) or 0 + return self.channel.queue_purge(queue=self.name, + nowait=nowait) or 0 def consume(self, consumer_tag='', callback=None, no_ack=None, nowait=False): @@ -488,17 +487,17 @@ class Queue(MaybeChannelBound): :keyword nowait: Do not wait for a reply. """ - return _SYN(self.channel.queue_delete, queue=self.name, - if_unused=if_unused, - if_empty=if_empty, - nowait=nowait) + return self.channel.queue_delete(queue=self.name, + if_unused=if_unused, + if_empty=if_empty, + nowait=nowait) def unbind(self): """Delete the binding on the server.""" - return _SYN(self.channel.queue_unbind, queue=self.name, - exchange=self.exchange.name, - routing_key=self.routing_key, - arguments=self.binding_arguments) + return self.channel.queue_unbind(queue=self.name, + exchange=self.exchange.name, + routing_key=self.routing_key, + arguments=self.binding_arguments) def __eq__(self, other): if isinstance(other, Queue): diff --git a/kombu/messaging.py b/kombu/messaging.py index 5a8e003f..48df49ef 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -15,7 +15,6 @@ from itertools import count from . import entity from .compression import compress from .serialization import encode -from .syn import blocking as _SYN from .utils import maybe_list __all__ = ["Exchange", "Queue", "Producer", "Consumer"] @@ -225,7 +224,7 @@ class Consumer(object): :keyword on_decode_error: see :attr:`on_decode_error`. """ - #: The connection channel to use. + #: The connection/channel to use for this consumer. channel = None #: A single :class:`~kombu.entity.Queue`, or a list of queues to @@ -410,9 +409,9 @@ class Consumer(object): Currently not supported by RabbitMQ. """ - return _SYN(self.channel.basic_qos, prefetch_size, - prefetch_count, - apply_global) + return self.channel.basic_qos(prefetch_size, + prefetch_count, + apply_global) def recover(self, requeue=False): """Redeliver unacknowledged messages. @@ -426,7 +425,7 @@ class Consumer(object): delivering it to an alternative subscriber. """ - return _SYN(self.channel.basic_recover, requeue=requeue) + return self.channel.basic_recover(requeue=requeue) def receive(self, body, message): """Method called when a message is received. @@ -472,6 +471,7 @@ class Consumer(object): message = self.channel.message_to_python(raw_message) decoded = message.payload except Exception, exc: + raise if not self.on_decode_error: raise self.on_decode_error(message, exc) diff --git a/kombu/mixins.py b/kombu/mixins.py index 3fae5bdc..4126a5f3 100644 --- a/kombu/mixins.py +++ b/kombu/mixins.py @@ -93,6 +93,11 @@ class ConsumerMixin(LogMixin): Also keyword arguments to ``consume`` are forwarded to this handler. + * :meth:`on_consume_end` + + Handler called after the consumers are cancelled. + Takes arguments ``(connection, channel)``. + * :meth:`on_iteration` Handler called for every iteration while draining @@ -132,6 +137,9 @@ class ConsumerMixin(LogMixin): def on_consume_ready(self, connection, channel, consumers, **kwargs): pass + def on_consume_end(self, connection, channel): + pass + def on_iteration(self): pass @@ -168,7 +176,6 @@ class ConsumerMixin(LogMixin): for i in limit and xrange(limit) or count(): if self.should_stop: break - self.debug("DRAIN EVENTS") self.on_iteration() try: connection.drain_events(timeout=safety_interval) @@ -212,6 +219,7 @@ class ConsumerMixin(LogMixin): with self._consume_from(*self.get_consumers(cls, channel)) as c: yield conn, channel, c self.debug("Consumers cancelled") + self.on_consume_end(connection, channel) self.debug("Connection closed") @contextmanager diff --git a/kombu/syn.py b/kombu/syn.py index 8d37674c..f75c0203 100644 --- a/kombu/syn.py +++ b/kombu/syn.py @@ -2,8 +2,6 @@ kombu.syn ========= -Thread synchronization. - :copyright: (c) 2009 - 2011 by Ask Solem. :license: BSD, see LICENSE for more details. @@ -12,57 +10,11 @@ from __future__ import absolute_import import sys -__all__ = ["blocking", "select_blocking_method", "detect_environment"] - -#: current blocking method -__sync_current = None +__all__ = ["blocking", "detect_environment"] def blocking(fun, *args, **kwargs): - """Make sure function is called by blocking and waiting for the result, - even if we're currently in a monkey patched eventlet/gevent - environment.""" - if __sync_current is None: - select_blocking_method(detect_environment()) - return __sync_current(fun, *args, **kwargs) - - -def select_blocking_method(type): - """Select blocking method, where `type` is one of default - gevent or eventlet.""" - global __sync_current - __sync_current = {"eventlet": _sync_eventlet, - "gevent": _sync_gevent, - "default": _sync_default}[type]() - - -def _sync_default(): - """Create blocking primitive.""" - - def __blocking__(fun, *args, **kwargs): - return fun(*args, **kwargs) - - return __blocking__ - - -def _sync_eventlet(): - """Create Eventlet blocking primitive.""" - from eventlet import spawn - - def __eblocking__(fun, *args, **kwargs): - return spawn(fun, *args, **kwargs).wait() - - return __eblocking__ - - -def _sync_gevent(): - """Create gevent blocking primitive.""" - from gevent import Greenlet - - def __gblocking__(fun, *args, **kwargs): - return Greenlet.spawn(fun, *args, **kwargs).get() - - return __gblocking__ + return fun(*args, **kwargs) def detect_environment(): diff --git a/kombu/transport/amqplib.py b/kombu/transport/amqplib.py index 6a5d43cf..46cc1325 100644 --- a/kombu/transport/amqplib.py +++ b/kombu/transport/amqplib.py @@ -2,7 +2,7 @@ kombu.transport.amqplib ======================= -amqplib transport. +kamqp transport. :copyright: (c) 2009 - 2011 by Ask Solem. :license: BSD, see LICENSE for more details. @@ -12,139 +12,16 @@ from __future__ import absolute_import import socket -try: - from ssl import SSLError -except ImportError: - class SSLError(Exception): # noqa - pass - -from amqplib import client_0_8 as amqp -from amqplib.client_0_8 import transport -from amqplib.client_0_8.channel import Channel as _Channel -from amqplib.client_0_8.exceptions import AMQPConnectionException -from amqplib.client_0_8.exceptions import AMQPChannelException +from kamqp import client_0_8 as amqp +from kamqp.client_0_8.channel import Channel as _Channel +from kamqp.client_0_8.exceptions import AMQPConnectionError +from kamqp.client_0_8.exceptions import AMQPChannelError from . import base from ..utils.encoding import str_to_bytes DEFAULT_PORT = 5672 -# amqplib's handshake mistakenly identifies as protocol version 1191, -# this breaks in RabbitMQ tip, which no longer falls back to -# 0-8 for unknown ids. -transport.AMQP_PROTOCOL_HEADER = str_to_bytes("AMQP\x01\x01\x08\x00") - - -class Connection(amqp.Connection): # pragma: no cover - - def _dispatch_basic_return(self, channel, args, msg): - reply_code = args.read_short() - reply_text = args.read_shortstr() - exchange = args.read_shortstr() - routing_key = args.read_shortstr() - - exc = AMQPChannelException(reply_code, reply_text, (50, 60)) - if channel.events["basic_return"]: - for callback in channel.events["basic_return"]: - callback(exc, exchange, routing_key, msg) - else: - raise exc - - def __init__(self, *args, **kwargs): - super(Connection, self).__init__(*args, **kwargs) - self._method_override = {(60, 50): self._dispatch_basic_return} - - def drain_events(self, allowed_methods=None, timeout=None): - """Wait for an event on any channel.""" - return self.wait_multi(self.channels.values(), timeout=timeout) - - def wait_multi(self, channels, allowed_methods=None, timeout=None): - """Wait for an event on a channel.""" - chanmap = dict((chan.channel_id, chan) for chan in channels) - chanid, method_sig, args, content = self._wait_multiple( - chanmap.keys(), allowed_methods, timeout=timeout) - - channel = chanmap[chanid] - - if content \ - and channel.auto_decode \ - and hasattr(content, 'content_encoding'): - try: - content.body = content.body.decode(content.content_encoding) - except Exception: - pass - - amqp_method = self._method_override.get(method_sig) or \ - channel._METHOD_MAP.get(method_sig, None) - - if amqp_method is None: - raise Exception('Unknown AMQP method (%d, %d)' % method_sig) - - if content is None: - return amqp_method(channel, args) - else: - return amqp_method(channel, args, content) - - def read_timeout(self, timeout=None): - if timeout is None: - return self.method_reader.read_method() - sock = self.transport.sock - prev = sock.gettimeout() - sock.settimeout(timeout) - try: - try: - return self.method_reader.read_method() - except SSLError, exc: - # http://bugs.python.org/issue10272 - if "timed out" in str(exc): - raise socket.timeout() - raise - finally: - sock.settimeout(prev) - - def _wait_multiple(self, channel_ids, allowed_methods, timeout=None): - for channel_id in channel_ids: - method_queue = self.channels[channel_id].method_queue - for queued_method in method_queue: - method_sig = queued_method[0] - if (allowed_methods is None) \ - or (method_sig in allowed_methods) \ - or (method_sig == (20, 40)): - method_queue.remove(queued_method) - method_sig, args, content = queued_method - return channel_id, method_sig, args, content - - # Nothing queued, need to wait for a method from the peer - read_timeout = self.read_timeout - channels = self.channels - wait = self.wait - while 1: - channel, method_sig, args, content = read_timeout(timeout) - - if (channel in channel_ids) \ - and ((allowed_methods is None) \ - or (method_sig in allowed_methods) \ - or (method_sig == (20, 40))): - return channel, method_sig, args, content - - # Not the channel and/or method we were looking for. Queue - # this method for later - channels[channel].method_queue.append((method_sig, args, content)) - - # - # If we just queued up a method for channel 0 (the Connection - # itself) it's probably a close method in reaction to some - # error, so deal with it right away. - # - if channel == 0: - wait() - - def channel(self, channel_id=None): - try: - return self.channels[channel_id] - except KeyError: - return Channel(self, channel_id) - class Message(base.Message): """A message received by the broker. @@ -178,16 +55,11 @@ class Message(base.Message): class Channel(_Channel, base.StdChannel): Message = Message - events = {"basic_return": []} - - def __init__(self, *args, **kwargs): - self.no_ack_consumers = set() - super(Channel, self).__init__(*args, **kwargs) def prepare_message(self, message_data, priority=None, content_type=None, content_encoding=None, headers=None, properties=None): - """Encapsulate data into a AMQP message.""" + """Encapsulate data into an AMQP message.""" return amqp.Message(message_data, priority=priority, content_type=content_type, content_encoding=content_encoding, @@ -198,21 +70,9 @@ class Channel(_Channel, base.StdChannel): """Convert encoded message body back to a Python value.""" return self.Message(self, raw_message) - def close(self): - try: - super(Channel, self).close() - finally: - self.connection = None - def basic_consume(self, *args, **kwargs): - consumer_tag = super(Channel, self).basic_consume(*args, **kwargs) - if kwargs["no_ack"]: - self.no_ack_consumers.add(consumer_tag) - return consumer_tag - - def basic_cancel(self, consumer_tag, **kwargs): - self.no_ack_consumers.discard(consumer_tag) - return super(Channel, self).basic_cancel(consumer_tag, **kwargs) +class Connection(amqp.Connection): + Channel = Channel class Transport(base.Transport): @@ -222,12 +82,12 @@ class Transport(base.Transport): # it's very annoying that amqplib sometimes raises AttributeError # if the connection is lost, but nothing we can do about that here. - connection_errors = (AMQPConnectionException, + connection_errors = (AMQPConnectionError, socket.error, IOError, OSError, AttributeError) - channel_errors = (AMQPChannelException, ) + channel_errors = (AMQPChannelError, ) def __init__(self, client, **kwargs): self.client = client diff --git a/requirements/default.txt b/requirements/default.txt index 9d7aebdb..0b65f76f 100644 --- a/requirements/default.txt +++ b/requirements/default.txt @@ -1,2 +1,2 @@ anyjson>=0.3.1 -amqplib>=1.0 +kamqp diff --git a/setup.cfg b/setup.cfg index dd78d19a..7de61400 100644 --- a/setup.cfg +++ b/setup.cfg @@ -25,4 +25,4 @@ upload-dir = docs/.build/html [bdist_rpm] requires = anyjson >= 0.3.1 - amqplib >= 1.0 + kamqp diff --git a/setup.py b/setup.py index c8297749..00d535ea 100644 --- a/setup.py +++ b/setup.py @@ -114,7 +114,7 @@ setup( test_suite="nose.collector", install_requires=[ 'anyjson>=0.3.1', - 'amqplib>=1.0', + 'kamqp', ], tests_require=tests_require, classifiers=[