diff --git a/kombu/entity.py b/kombu/entity.py index e533be28..6cc44d63 100644 --- a/kombu/entity.py +++ b/kombu/entity.py @@ -199,7 +199,7 @@ class Exchange(MaybeChannelBound): headers=headers) def publish(self, message, routing_key=None, mandatory=False, - immediate=False): + immediate=False, exchange=None): """Publish message. :param message: :meth:`Message` instance to publish. @@ -208,8 +208,9 @@ class Exchange(MaybeChannelBound): :param immediate: Currently not supported. """ + exchange = exchange or self.name return self.channel.basic_publish(message, - exchange=self.name, + exchange=exchange, routing_key=routing_key, mandatory=mandatory, immediate=immediate) diff --git a/kombu/messaging.py b/kombu/messaging.py index 5c3ed40e..e3ef3dd7 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -111,7 +111,7 @@ class Producer(object): def publish(self, body, routing_key=None, delivery_mode=None, mandatory=False, immediate=False, priority=0, content_type=None, content_encoding=None, serializer=None, headers=None, - compression=None): + compression=None, exchange=None): """Publish message to the specified exchange. :param body: Message body. @@ -125,6 +125,8 @@ class Producer(object): :keyword serializer: Serializer to use. Default is autodetect. :keyword headers: Mapping of arbitrary headers to pass along with the message body. + :keyword exchange: Override the exchange. Note that this exchange + must have been declared. """ headers = headers or {} @@ -143,7 +145,7 @@ class Producer(object): content_encoding, headers=headers) return self.exchange.publish(message, routing_key, mandatory, - immediate) + immediate, exchange=exchange) class Consumer(object): diff --git a/kombu/transport/memory.py b/kombu/transport/memory.py index 81637791..194c07d6 100644 --- a/kombu/transport/memory.py +++ b/kombu/transport/memory.py @@ -14,7 +14,7 @@ class Channel(virtual.Channel): def _get(self, queue): return self.queues[queue].get(block=False) - def _put(self, queue, message): + def _put(self, queue, message, **kwargs): self.queues[queue].put(message) def _size(self, queue): diff --git a/kombu/transport/virtual.py b/kombu/transport/virtual.py index dc2e1983..9bd889cc 100644 --- a/kombu/transport/virtual.py +++ b/kombu/transport/virtual.py @@ -157,7 +157,7 @@ class Channel(object): def _delete(self, queue): self._purge(queue) - def _new_queue(self, queue): + def _new_queue(self, queue, **kwargs): pass def _lookup(self, exchange, routing_key, default="ae.undeliver"): @@ -216,7 +216,7 @@ class Channel(object): table[routing_key] = queue def queue_purge(self, queue, **kwargs): - return self._purge(queue, **kwargs) + return self._purge(queue) def flow(self, active=True): pass @@ -251,7 +251,7 @@ class Channel(object): message["properties"]["delivery_info"]["exchange"] = exchange message["properties"]["delivery_info"]["routing_key"] = routing_key message["properties"]["delivery_tag"] = self._next_delivery_tag() - self._put(self._lookup(exchange, routing_key), message) + self._put(self._lookup(exchange, routing_key), message, **kwargs) def basic_cancel(self, consumer_tag): queue = _consumers.pop(consumer_tag, None)