From 8a49a85b8ea8aa19d1255693142dda3c21aa09cc Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Wed, 27 Oct 2010 09:35:23 +0200 Subject: [PATCH] Memory backend state must be global --- kombu/transport/memory.py | 3 +++ kombu/transport/virtual/__init__.py | 10 ++++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/kombu/transport/memory.py b/kombu/transport/memory.py index 8f08bcde..fc9d2f36 100644 --- a/kombu/transport/memory.py +++ b/kombu/transport/memory.py @@ -41,3 +41,6 @@ class Channel(virtual.Channel): class Transport(virtual.Transport): Channel = Channel + + #: memory backend state is global. + state = virtual.BrokerState() diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index 38dccb38..faf7f562 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -222,6 +222,9 @@ class Channel(AbstractChannel): #: mapping of exchange types and corresponding classes. exchange_types = dict(STANDARD_EXCHANGE_TYPES) + #: flag set if the channel supports fanout exchanges. + supports_fanout = False + #: counter used to generate delivery tags for this channel. _next_delivery_tag = count(1).next @@ -279,15 +282,17 @@ class Channel(AbstractChannel): def queue_bind(self, queue, exchange, routing_key, arguments=None, **kwargs): """Bind `queue` to `exchange` with `routing key`.""" + if queue in self.state.bindings: + return table = self.state.exchanges[exchange].setdefault("table", []) self.state.bindings[queue] = exchange, routing_key meta = self.typeof(exchange).prepare_bind(queue, exchange, routing_key, arguments) + table.append(meta) if self.supports_fanout: self._queue_bind(exchange, *meta) - table.append(meta) def queue_purge(self, queue, **kwargs): """Remove all ready messages from queue.""" @@ -477,9 +482,10 @@ class Transport(base.Transport): def __init__(self, client, **kwargs): self.client = client self.channels = [] - self.state = BrokerState() self._callbacks = {} self.cycle = self.Cycle(self._drain_channel, self.channels, Empty) + if self.state is None: + self.state = BrokerState() def create_channel(self, connection): channel = self.Channel(connection)