mirror of https://github.com/celery/kombu.git
Carrot compat interface now working with celery
This commit is contained in:
parent
58e0c08146
commit
90eaf27f85
|
@ -18,7 +18,7 @@ def entry_to_binding(queue, **options):
|
||||||
e_auto_delete = options.get("exchange_auto_delete") or \
|
e_auto_delete = options.get("exchange_auto_delete") or \
|
||||||
options.get("auto_delete")
|
options.get("auto_delete")
|
||||||
q_durable = options.get("queue_durable") or options.get("durable")
|
q_durable = options.get("queue_durable") or options.get("durable")
|
||||||
q_auto_delete = options.get("queue_auto_delete") or
|
q_auto_delete = options.get("queue_auto_delete") or \
|
||||||
options.get("auto_delete")
|
options.get("auto_delete")
|
||||||
e_arguments = options.get("exchange_arguments")
|
e_arguments = options.get("exchange_arguments")
|
||||||
q_arguments = options.get("queue_arguments")
|
q_arguments = options.get("queue_arguments")
|
||||||
|
@ -29,7 +29,7 @@ def entry_to_binding(queue, **options):
|
||||||
routing_key=options.get("routing_key"),
|
routing_key=options.get("routing_key"),
|
||||||
durable=e_durable,
|
durable=e_durable,
|
||||||
auto_delete=e_auto_delete,
|
auto_delete=e_auto_delete,
|
||||||
arguments=e_arguments,
|
arguments=e_arguments)
|
||||||
return entity.Binding(queue,
|
return entity.Binding(queue,
|
||||||
exchange=exchange,
|
exchange=exchange,
|
||||||
routing_key=binding_key,
|
routing_key=binding_key,
|
||||||
|
@ -72,7 +72,7 @@ class Publisher(messaging.Producer):
|
||||||
super(Publisher, self).__init__(self.backend, self.exchange,
|
super(Publisher, self).__init__(self.backend, self.exchange,
|
||||||
**kwargs)
|
**kwargs)
|
||||||
|
|
||||||
def send(*args, **kwargs):
|
def send(self, *args, **kwargs):
|
||||||
return self.publish(*args, **kwargs)
|
return self.publish(*args, **kwargs)
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
|
@ -183,7 +183,7 @@ class _CSet(messaging.Consumer):
|
||||||
super(_CSet, self).__init__(self.backend, *args, **kwargs)
|
super(_CSet, self).__init__(self.backend, *args, **kwargs)
|
||||||
|
|
||||||
def iterconsume(self):
|
def iterconsume(self):
|
||||||
raise iterconsume(self.connection, self)
|
return iterconsume(self.connection, self)
|
||||||
|
|
||||||
def add_consumer_from_dict(self, queue, **options):
|
def add_consumer_from_dict(self, queue, **options):
|
||||||
self.bindings.append(entry_to_binding(queue, **options))
|
self.bindings.append(entry_to_binding(queue, **options))
|
||||||
|
@ -197,12 +197,13 @@ class _CSet(messaging.Consumer):
|
||||||
|
|
||||||
|
|
||||||
def ConsumerSet(connection, from_dict=None, consumers=None,
|
def ConsumerSet(connection, from_dict=None, consumers=None,
|
||||||
callbacks=None, **options):
|
callbacks=None, **kwargs):
|
||||||
|
|
||||||
bindings = []
|
bindings = []
|
||||||
for consumer in consumers:
|
if consumers:
|
||||||
bindings.extend(consumer.bindings)
|
map(bindings.extend, consumer.bindings)
|
||||||
for queue, options in from_dict.items():
|
if from_dict:
|
||||||
bindings.append(entry_to_binding(queue, **options))
|
for queue_name, queue_options in from_dict.items():
|
||||||
|
bindings.append(entry_to_binding(queue_name, **queue_options))
|
||||||
|
|
||||||
return _CSet(connection, bindings, **options)
|
return _CSet(connection, bindings, **kwargs)
|
||||||
|
|
Loading…
Reference in New Issue