Update with recent changes in carrot

This commit is contained in:
Ask Solem 2010-10-18 14:55:01 +02:00
parent 0a1c916a2b
commit 93b16d9c5d
4 changed files with 11 additions and 8 deletions

View File

@ -199,7 +199,7 @@ class Exchange(MaybeChannelBound):
headers=headers)
def publish(self, message, routing_key=None, mandatory=False,
immediate=False):
immediate=False, exchange=None):
"""Publish message.
:param message: :meth:`Message` instance to publish.
@ -208,8 +208,9 @@ class Exchange(MaybeChannelBound):
:param immediate: Currently not supported.
"""
exchange = exchange or self.name
return self.channel.basic_publish(message,
exchange=self.name,
exchange=exchange,
routing_key=routing_key,
mandatory=mandatory,
immediate=immediate)

View File

@ -111,7 +111,7 @@ class Producer(object):
def publish(self, body, routing_key=None, delivery_mode=None,
mandatory=False, immediate=False, priority=0, content_type=None,
content_encoding=None, serializer=None, headers=None,
compression=None):
compression=None, exchange=None):
"""Publish message to the specified exchange.
:param body: Message body.
@ -125,6 +125,8 @@ class Producer(object):
:keyword serializer: Serializer to use. Default is autodetect.
:keyword headers: Mapping of arbitrary headers to pass along
with the message body.
:keyword exchange: Override the exchange. Note that this exchange
must have been declared.
"""
headers = headers or {}
@ -143,7 +145,7 @@ class Producer(object):
content_encoding,
headers=headers)
return self.exchange.publish(message, routing_key, mandatory,
immediate)
immediate, exchange=exchange)
class Consumer(object):

View File

@ -14,7 +14,7 @@ class Channel(virtual.Channel):
def _get(self, queue):
return self.queues[queue].get(block=False)
def _put(self, queue, message):
def _put(self, queue, message, **kwargs):
self.queues[queue].put(message)
def _size(self, queue):

View File

@ -157,7 +157,7 @@ class Channel(object):
def _delete(self, queue):
self._purge(queue)
def _new_queue(self, queue):
def _new_queue(self, queue, **kwargs):
pass
def _lookup(self, exchange, routing_key, default="ae.undeliver"):
@ -216,7 +216,7 @@ class Channel(object):
table[routing_key] = queue
def queue_purge(self, queue, **kwargs):
return self._purge(queue, **kwargs)
return self._purge(queue)
def flow(self, active=True):
pass
@ -251,7 +251,7 @@ class Channel(object):
message["properties"]["delivery_info"]["exchange"] = exchange
message["properties"]["delivery_info"]["routing_key"] = routing_key
message["properties"]["delivery_tag"] = self._next_delivery_tag()
self._put(self._lookup(exchange, routing_key), message)
self._put(self._lookup(exchange, routing_key), message, **kwargs)
def basic_cancel(self, consumer_tag):
queue = _consumers.pop(consumer_tag, None)