mirror of https://github.com/celery/kombu.git
Virtual methods should support the nowait argument
This commit is contained in:
parent
2e6759f4d1
commit
0a1c916a2b
|
@ -188,7 +188,7 @@ class Channel(object):
|
||||||
return [_consumers[tag] for tag in self._consumers]
|
return [_consumers[tag] for tag in self._consumers]
|
||||||
|
|
||||||
def exchange_declare(self, exchange, type="direct", durable=False,
|
def exchange_declare(self, exchange, type="direct", durable=False,
|
||||||
auto_delete=False, arguments=None):
|
auto_delete=False, arguments=None, nowait=False):
|
||||||
if exchange not in _exchanges:
|
if exchange not in _exchanges:
|
||||||
_exchanges[exchange] = {"type": type,
|
_exchanges[exchange] = {"type": type,
|
||||||
"durable": durable,
|
"durable": durable,
|
||||||
|
@ -196,7 +196,7 @@ class Channel(object):
|
||||||
"arguments": arguments or {},
|
"arguments": arguments or {},
|
||||||
"table": {}}
|
"table": {}}
|
||||||
|
|
||||||
def exchange_delete(self, exchange, if_unused=False):
|
def exchange_delete(self, exchange, if_unused=False, nowait=False):
|
||||||
for rkey, queue in _exchanges[exchange]["table"].items():
|
for rkey, queue in _exchanges[exchange]["table"].items():
|
||||||
self._purge(queue)
|
self._purge(queue)
|
||||||
_exchanges.pop(exchange, None)
|
_exchanges.pop(exchange, None)
|
||||||
|
@ -205,12 +205,13 @@ class Channel(object):
|
||||||
self._new_queue(queue, **kwargs)
|
self._new_queue(queue, **kwargs)
|
||||||
return queue, self._size(queue), 0
|
return queue, self._size(queue), 0
|
||||||
|
|
||||||
def queue_delete(self, queue, if_unusued=False, if_empty=False):
|
def queue_delete(self, queue, if_unusued=False, if_empty=False, **kwargs):
|
||||||
if if_empty and self._size(queue):
|
if if_empty and self._size(queue):
|
||||||
return
|
return
|
||||||
self._delete(queue)
|
self._delete(queue)
|
||||||
|
|
||||||
def queue_bind(self, queue, exchange, routing_key, arguments=None):
|
def queue_bind(self, queue, exchange, routing_key, arguments=None,
|
||||||
|
**kwargs):
|
||||||
table = _exchanges[exchange].setdefault("table", {})
|
table = _exchanges[exchange].setdefault("table", {})
|
||||||
table[routing_key] = queue
|
table[routing_key] = queue
|
||||||
|
|
||||||
|
@ -241,8 +242,7 @@ class Channel(object):
|
||||||
if requeue:
|
if requeue:
|
||||||
self.qos_manager.requeue(delivery_tag)
|
self.qos_manager.requeue(delivery_tag)
|
||||||
|
|
||||||
def basic_consume(self, queue, no_ack, callback, consumer_tag,
|
def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
|
||||||
**kwargs):
|
|
||||||
_consumers[consumer_tag] = queue
|
_consumers[consumer_tag] = queue
|
||||||
_callbacks[queue] = callback
|
_callbacks[queue] = callback
|
||||||
self._consumers.add(consumer_tag)
|
self._consumers.add(consumer_tag)
|
||||||
|
|
Loading…
Reference in New Issue