mirror of https://github.com/celery/kombu.git
amqplib backend: Support for mandatory/immediate (basic_return) callbacks.
Example usage:: from time import sleep from kombu import BrokerConnection, Producer, Exchange e = Exchange("mcasdsada") conn = BrokerConnection() chan = conn.channel() def return_callback(exc, exchange, routing_key, message): print("%r: %s" % (exc, message)) p = Producer(chan, e, on_return=return_callback) p.publish({"foo": "bar"}, immediate=True, routing_key="akka") for i in range(5): conn.drain_events() sleep(0.1) Note that the events must be drained for the callback to be called, the solution to support this for synchronous use is still a challenge.
This commit is contained in:
parent
7385724268
commit
a3f67ffce8
|
@ -11,6 +11,23 @@ DEFAULT_PORT = 5672
|
|||
|
||||
class Connection(amqp.Connection):
|
||||
|
||||
def _dispatch_basic_return(self, channel, args, msg):
|
||||
reply_code = args.read_short()
|
||||
reply_text = args.read_shortstr()
|
||||
exchange = args.read_shortstr()
|
||||
routing_key = args.read_shortstr()
|
||||
|
||||
exc = AMQPChannelException(reply_code, reply_text, (50, 60))
|
||||
if channel.events["basic_return"]:
|
||||
for callback in channel.events["basic_return"]:
|
||||
callback(exc, exchange, routing_key, msg)
|
||||
else:
|
||||
raise exc
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(Connection, self).__init__(*args, **kwargs)
|
||||
self._method_override = {(60, 50): self._dispatch_basic_return}
|
||||
|
||||
def drain_events(self, allowed_methods=None, timeout=None):
|
||||
"""Wait for an event on any channel."""
|
||||
return self.wait_multi(self.channels.values(), timeout=timeout)
|
||||
|
@ -31,7 +48,8 @@ class Connection(amqp.Connection):
|
|||
except Exception:
|
||||
pass
|
||||
|
||||
amqp_method = channel._METHOD_MAP.get(method_sig, None)
|
||||
amqp_method = self._method_override.get(method_sig) or \
|
||||
channel._METHOD_MAP.get(method_sig, None)
|
||||
|
||||
if amqp_method is None:
|
||||
raise Exception('Unknown AMQP method (%d, %d)' % method_sig)
|
||||
|
@ -126,6 +144,7 @@ class Message(BaseMessage):
|
|||
|
||||
class Channel(Channel):
|
||||
Message = Message
|
||||
events = {"basic_return": []}
|
||||
|
||||
def prepare_message(self, message_data, priority=None,
|
||||
content_type=None, content_encoding=None, headers=None,
|
||||
|
|
|
@ -18,6 +18,10 @@ class Producer(object):
|
|||
compression.
|
||||
:keyword auto_declare: Automatically declare the exchange
|
||||
at instantiation. Default is ``True``.
|
||||
:keyword on_return: Callback to call for undeliverable messages,
|
||||
when ``mandatory`` or ``imediate`` is used. This callback
|
||||
needs the following signature:
|
||||
``(exception, exchange, routing_key, message).
|
||||
|
||||
.. attribute:: channel
|
||||
|
||||
|
@ -46,14 +50,17 @@ class Producer(object):
|
|||
auto_declare = True
|
||||
routing_key = ""
|
||||
compression = None
|
||||
on_return = None
|
||||
|
||||
def __init__(self, channel, exchange=None, routing_key=None,
|
||||
serializer=None, auto_declare=None, compression=None):
|
||||
serializer=None, auto_declare=None, compression=None,
|
||||
on_return=None):
|
||||
self.channel = channel
|
||||
self.exchange = exchange or self.exchange
|
||||
self.routing_key = routing_key or self.routing_key
|
||||
self.serializer = serializer or self.serializer
|
||||
self.compression = compression or self.compression
|
||||
self.on_return = on_return or self.on_return
|
||||
if auto_declare is not None:
|
||||
self.auto_declare = auto_declare
|
||||
|
||||
|
@ -61,6 +68,9 @@ class Producer(object):
|
|||
self.exchange = self.exchange(self.channel)
|
||||
self.auto_declare and self.declare()
|
||||
|
||||
if self.on_return:
|
||||
self.channel.events["basic_return"].append(self.on_return)
|
||||
|
||||
def declare(self):
|
||||
"""Declare the exchange.
|
||||
|
||||
|
|
Loading…
Reference in New Issue