diff --git a/kombu/backends/pyamqplib.py b/kombu/backends/pyamqplib.py index 944069af..44d5aa4d 100644 --- a/kombu/backends/pyamqplib.py +++ b/kombu/backends/pyamqplib.py @@ -11,6 +11,23 @@ DEFAULT_PORT = 5672 class Connection(amqp.Connection): + def _dispatch_basic_return(self, channel, args, msg): + reply_code = args.read_short() + reply_text = args.read_shortstr() + exchange = args.read_shortstr() + routing_key = args.read_shortstr() + + exc = AMQPChannelException(reply_code, reply_text, (50, 60)) + if channel.events["basic_return"]: + for callback in channel.events["basic_return"]: + callback(exc, exchange, routing_key, msg) + else: + raise exc + + def __init__(self, *args, **kwargs): + super(Connection, self).__init__(*args, **kwargs) + self._method_override = {(60, 50): self._dispatch_basic_return} + def drain_events(self, allowed_methods=None, timeout=None): """Wait for an event on any channel.""" return self.wait_multi(self.channels.values(), timeout=timeout) @@ -31,7 +48,8 @@ class Connection(amqp.Connection): except Exception: pass - amqp_method = channel._METHOD_MAP.get(method_sig, None) + amqp_method = self._method_override.get(method_sig) or \ + channel._METHOD_MAP.get(method_sig, None) if amqp_method is None: raise Exception('Unknown AMQP method (%d, %d)' % method_sig) @@ -126,6 +144,7 @@ class Message(BaseMessage): class Channel(Channel): Message = Message + events = {"basic_return": []} def prepare_message(self, message_data, priority=None, content_type=None, content_encoding=None, headers=None, diff --git a/kombu/messaging.py b/kombu/messaging.py index 19324e8c..25b4ea9b 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -18,6 +18,10 @@ class Producer(object): compression. :keyword auto_declare: Automatically declare the exchange at instantiation. Default is ``True``. + :keyword on_return: Callback to call for undeliverable messages, + when ``mandatory`` or ``imediate`` is used. This callback + needs the following signature: + ``(exception, exchange, routing_key, message). .. attribute:: channel @@ -46,14 +50,17 @@ class Producer(object): auto_declare = True routing_key = "" compression = None + on_return = None def __init__(self, channel, exchange=None, routing_key=None, - serializer=None, auto_declare=None, compression=None): + serializer=None, auto_declare=None, compression=None, + on_return=None): self.channel = channel self.exchange = exchange or self.exchange self.routing_key = routing_key or self.routing_key self.serializer = serializer or self.serializer self.compression = compression or self.compression + self.on_return = on_return or self.on_return if auto_declare is not None: self.auto_declare = auto_declare @@ -61,6 +68,9 @@ class Producer(object): self.exchange = self.exchange(self.channel) self.auto_declare and self.declare() + if self.on_return: + self.channel.events["basic_return"].append(self.on_return) + def declare(self): """Declare the exchange.