mirror of https://github.com/celery/kombu.git
Memory backend state must be global
This commit is contained in:
parent
fb1e7e0d25
commit
8a49a85b8e
|
@ -41,3 +41,6 @@ class Channel(virtual.Channel):
|
|||
|
||||
class Transport(virtual.Transport):
|
||||
Channel = Channel
|
||||
|
||||
#: memory backend state is global.
|
||||
state = virtual.BrokerState()
|
||||
|
|
|
@ -222,6 +222,9 @@ class Channel(AbstractChannel):
|
|||
#: mapping of exchange types and corresponding classes.
|
||||
exchange_types = dict(STANDARD_EXCHANGE_TYPES)
|
||||
|
||||
#: flag set if the channel supports fanout exchanges.
|
||||
supports_fanout = False
|
||||
|
||||
#: counter used to generate delivery tags for this channel.
|
||||
_next_delivery_tag = count(1).next
|
||||
|
||||
|
@ -279,15 +282,17 @@ class Channel(AbstractChannel):
|
|||
def queue_bind(self, queue, exchange, routing_key, arguments=None,
|
||||
**kwargs):
|
||||
"""Bind `queue` to `exchange` with `routing key`."""
|
||||
if queue in self.state.bindings:
|
||||
return
|
||||
table = self.state.exchanges[exchange].setdefault("table", [])
|
||||
self.state.bindings[queue] = exchange, routing_key
|
||||
meta = self.typeof(exchange).prepare_bind(queue,
|
||||
exchange,
|
||||
routing_key,
|
||||
arguments)
|
||||
table.append(meta)
|
||||
if self.supports_fanout:
|
||||
self._queue_bind(exchange, *meta)
|
||||
table.append(meta)
|
||||
|
||||
def queue_purge(self, queue, **kwargs):
|
||||
"""Remove all ready messages from queue."""
|
||||
|
@ -477,9 +482,10 @@ class Transport(base.Transport):
|
|||
def __init__(self, client, **kwargs):
|
||||
self.client = client
|
||||
self.channels = []
|
||||
self.state = BrokerState()
|
||||
self._callbacks = {}
|
||||
self.cycle = self.Cycle(self._drain_channel, self.channels, Empty)
|
||||
if self.state is None:
|
||||
self.state = BrokerState()
|
||||
|
||||
def create_channel(self, connection):
|
||||
channel = self.Channel(connection)
|
||||
|
|
Loading…
Reference in New Issue