From bf61bd556c75cb95c6c8202bcd46e412f0273ade Mon Sep 17 00:00:00 2001 From: rumyana neykova Date: Thu, 20 Sep 2012 19:46:04 +0100 Subject: [PATCH] change unbind to support bindings parameters. --- examples/complete_send.py | 11 ++++++----- kombu/entity.py | 24 +++++++++++++++++------- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/examples/complete_send.py b/examples/complete_send.py index 83de890b..c81f8620 100644 --- a/examples/complete_send.py +++ b/examples/complete_send.py @@ -11,11 +11,12 @@ from kombu import Connection, Producer, Exchange, Queue #: By default messages sent to exchanges are persistent (delivery_mode=2), #: and queues and exchanges are durable. -gateway_exchange = Exchange('gateway_kombu_demo', type='direct') +exchange = Exchange('kombu_demo', type='direct') +queue = Queue('kombu_demo', exchange, routing_key='kombu_demo') + + +with Connection('amqp://guest:guest@localhost:5672//') as connection: -with Connection('pyamqp://guest:guest@localhost:5672//') as connection: - #binded = exchange.bind(connection.channel()) - #binded.exchange_bind(gateway_exchange, routing_key = 'kombu_demo') #: Producers are used to publish messages. #: a default exchange and routing key can also be specifed #: as arguments the Producer, but we rather specify this explicitly @@ -26,6 +27,6 @@ with Connection('pyamqp://guest:guest@localhost:5672//') as connection: #: and zlib compression. The kombu consumer will automatically detect #: encoding, serialization and compression used and decode accordingly. producer.publish({'hello': 'world'}, - exchange=gateway_exchange, + exchange=exchange, routing_key='kombu_demo', serializer='json', compression='zlib') diff --git a/kombu/entity.py b/kombu/entity.py index 42175669..e9d7f187 100644 --- a/kombu/entity.py +++ b/kombu/entity.py @@ -457,7 +457,6 @@ class Queue(MaybeChannelBound): arguments = arguments if arguments else self.binding_arguments if (exchange.name, routing_key) not in self.bindings: self.add_binding(exchange, routing_key, arguments) - return self.channel.queue_bind(queue=self.name, exchange=exchange.name, routing_key=routing_key, @@ -542,13 +541,22 @@ class Queue(MaybeChannelBound): if_empty=if_empty, nowait=nowait) - def unbind(self): + def unbind(self, exchange=None, routing_key=None): + exchange = exchange if exchange else self.exchange + routing_key = routing_key if routing_key else self.routing_key + if (exchange.name, routing_key) in self.bindings: + _, args = self.bindings.pop((exchange.name, routing_key)) + else: + #TODO:Sell we allow unbinding when the binding is not registered + args = self.binding_arguments + print 'Be warned: Binding is not registered in queue bindings', self.bindings + """Delete the binding on the server.""" return self.channel.queue_unbind(queue=self.name, - exchange=self.exchange.name, - routing_key=self.routing_key, - arguments=self.binding_arguments) - + exchange=exchange.name, + routing_key=routing_key, + arguments=args) + def __eq__(self, other): if isinstance(other, Queue): return (self.name == other.name and @@ -595,6 +603,7 @@ class Queue(MaybeChannelBound): e_arguments = options.get('exchange_arguments') q_arguments = options.get('queue_arguments') b_arguments = options.get('binding_arguments') + bindings = options.get('bindings') exchange = Exchange(options.get('exchange'), type=options.get('exchange_type'), @@ -611,4 +620,5 @@ class Queue(MaybeChannelBound): auto_delete=q_auto_delete, no_ack=options.get('no_ack'), queue_arguments=q_arguments, - binding_arguments=b_arguments) + binding_arguments=b_arguments, + bindings = bindings)