diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index 5ecc0d72..da5fb0f6 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -13,10 +13,9 @@ import base64 import socket import sys import warnings -import collections from array import array -from collections import OrderedDict +from collections import OrderedDict, defaultdict, namedtuple from itertools import count from multiprocessing.util import Finalize from time import sleep @@ -55,6 +54,16 @@ RESTORE_PANIC_FMT = 'UNABLE TO RESTORE {0} MESSAGES: {1}' logger = get_logger(__name__) +#: Key format used for queue argument lookups in BrokerState.bindings. +binding_key_t = namedtuple('binding_key_t', ( + 'queue', 'exchange', 'routing_key', +)) + +#: BrokerState.queue_bindings generates tuples in this format. +queue_binding_t = namedtuple('queue_binding_t', ( + 'exchange', 'routing_key', 'arguments', +)) + class Base64(object): @@ -84,6 +93,7 @@ class BrokerState(object): #: This is the actual bindings registry, used to store bindings and to #: test 'in' relationships in constant time. It has the following #: structure:: + #: #: { #: (queue, exchange, routing_key): arguments, #: # ..., @@ -103,12 +113,11 @@ class BrokerState(object): def __init__(self, exchanges=None): self.exchanges = {} if exchanges is None else exchanges - self.bindings = dict() - self.queue_index = collections.defaultdict(set) + self.bindings = {} + self.queue_index = defaultdict(set) def clear(self): self.exchanges.clear() - # self.bindings.clear() self.bindings.clear() self.queue_index.clear() @@ -116,33 +125,32 @@ class BrokerState(object): return (queue, exchange, routing_key) in self.bindings def binding_declare(self, queue, exchange, routing_key, arguments): - if not self.has_binding(queue, exchange, routing_key): - key = (queue, exchange, routing_key) - self.bindings[key] = arguments - self.queue_index[queue].add(key) + key = binding_key_t(queue, exchange, routing_key) + self.bindings.setdefault(key, arguments) + self.queue_index[queue].add(key) def binding_delete(self, queue, exchange, routing_key): - if self.has_binding(queue, exchange, routing_key): - key = (queue, exchange, routing_key) + key = binding_key_t(queue, exchange, routing_key) + try: del self.bindings[key] + except KeyError: + pass + else: self.queue_index[queue].remove(key) def queue_bindings_delete(self, queue): - if queue in self.queue_index: - for binding in self.queue_index[queue]: - del self.bindings[binding] - del self.queue_index[queue] + try: + bindings = self.queue_index.pop(queue) + except KeyError: + pass + else: + [self.bindings.pop(binding, None) for binding in bindings] def queue_bindings(self, queue): - for queue, exchange, routing_key in self.queue_index[queue]: - yield ( - # Not yelding directly from `queue_index` because - # we need to remove the `queue` field and retrieve `arguments` - # as needed by client code: - exchange, - routing_key, - self.bindings[(queue, exchange, routing_key)] - ) + return ( + queue_binding_t(key.exchange, key.routing_key, self.bindings[key]) + for key in self.queue_index[queue] + ) class QoS(object):