mirror of https://github.com/celery/kombu.git
change unbind to support bindings parameters.
This commit is contained in:
parent
144c339622
commit
bf61bd556c
|
@ -11,11 +11,12 @@ from kombu import Connection, Producer, Exchange, Queue
|
|||
|
||||
#: By default messages sent to exchanges are persistent (delivery_mode=2),
|
||||
#: and queues and exchanges are durable.
|
||||
gateway_exchange = Exchange('gateway_kombu_demo', type='direct')
|
||||
exchange = Exchange('kombu_demo', type='direct')
|
||||
queue = Queue('kombu_demo', exchange, routing_key='kombu_demo')
|
||||
|
||||
|
||||
with Connection('amqp://guest:guest@localhost:5672//') as connection:
|
||||
|
||||
with Connection('pyamqp://guest:guest@localhost:5672//') as connection:
|
||||
#binded = exchange.bind(connection.channel())
|
||||
#binded.exchange_bind(gateway_exchange, routing_key = 'kombu_demo')
|
||||
#: Producers are used to publish messages.
|
||||
#: a default exchange and routing key can also be specifed
|
||||
#: as arguments the Producer, but we rather specify this explicitly
|
||||
|
@ -26,6 +27,6 @@ with Connection('pyamqp://guest:guest@localhost:5672//') as connection:
|
|||
#: and zlib compression. The kombu consumer will automatically detect
|
||||
#: encoding, serialization and compression used and decode accordingly.
|
||||
producer.publish({'hello': 'world'},
|
||||
exchange=gateway_exchange,
|
||||
exchange=exchange,
|
||||
routing_key='kombu_demo',
|
||||
serializer='json', compression='zlib')
|
||||
|
|
|
@ -457,7 +457,6 @@ class Queue(MaybeChannelBound):
|
|||
arguments = arguments if arguments else self.binding_arguments
|
||||
if (exchange.name, routing_key) not in self.bindings:
|
||||
self.add_binding(exchange, routing_key, arguments)
|
||||
|
||||
return self.channel.queue_bind(queue=self.name,
|
||||
exchange=exchange.name,
|
||||
routing_key=routing_key,
|
||||
|
@ -542,13 +541,22 @@ class Queue(MaybeChannelBound):
|
|||
if_empty=if_empty,
|
||||
nowait=nowait)
|
||||
|
||||
def unbind(self):
|
||||
def unbind(self, exchange=None, routing_key=None):
|
||||
exchange = exchange if exchange else self.exchange
|
||||
routing_key = routing_key if routing_key else self.routing_key
|
||||
if (exchange.name, routing_key) in self.bindings:
|
||||
_, args = self.bindings.pop((exchange.name, routing_key))
|
||||
else:
|
||||
#TODO:Sell we allow unbinding when the binding is not registered
|
||||
args = self.binding_arguments
|
||||
print 'Be warned: Binding is not registered in queue bindings', self.bindings
|
||||
|
||||
"""Delete the binding on the server."""
|
||||
return self.channel.queue_unbind(queue=self.name,
|
||||
exchange=self.exchange.name,
|
||||
routing_key=self.routing_key,
|
||||
arguments=self.binding_arguments)
|
||||
|
||||
exchange=exchange.name,
|
||||
routing_key=routing_key,
|
||||
arguments=args)
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, Queue):
|
||||
return (self.name == other.name and
|
||||
|
@ -595,6 +603,7 @@ class Queue(MaybeChannelBound):
|
|||
e_arguments = options.get('exchange_arguments')
|
||||
q_arguments = options.get('queue_arguments')
|
||||
b_arguments = options.get('binding_arguments')
|
||||
bindings = options.get('bindings')
|
||||
|
||||
exchange = Exchange(options.get('exchange'),
|
||||
type=options.get('exchange_type'),
|
||||
|
@ -611,4 +620,5 @@ class Queue(MaybeChannelBound):
|
|||
auto_delete=q_auto_delete,
|
||||
no_ack=options.get('no_ack'),
|
||||
queue_arguments=q_arguments,
|
||||
binding_arguments=b_arguments)
|
||||
binding_arguments=b_arguments,
|
||||
bindings = bindings)
|
||||
|
|
Loading…
Reference in New Issue