Initial support for multiple queue bindings

This commit is contained in:
Federico Ficarelli 2016-03-14 11:52:31 +01:00 committed by Ask Solem
parent b1ad2b159a
commit 8ad9e1e2b2
1 changed files with 32 additions and 24 deletions

View File

@ -13,10 +13,9 @@ import base64
import socket
import sys
import warnings
import collections
from array import array
from collections import OrderedDict
from collections import OrderedDict, defaultdict, namedtuple
from itertools import count
from multiprocessing.util import Finalize
from time import sleep
@ -55,6 +54,16 @@ RESTORE_PANIC_FMT = 'UNABLE TO RESTORE {0} MESSAGES: {1}'
logger = get_logger(__name__)
#: Key format used for queue argument lookups in BrokerState.bindings.
binding_key_t = namedtuple('binding_key_t', (
'queue', 'exchange', 'routing_key',
))
#: BrokerState.queue_bindings generates tuples in this format.
queue_binding_t = namedtuple('queue_binding_t', (
'exchange', 'routing_key', 'arguments',
))
class Base64(object):
@ -84,6 +93,7 @@ class BrokerState(object):
#: This is the actual bindings registry, used to store bindings and to
#: test 'in' relationships in constant time. It has the following
#: structure::
#:
#: {
#: (queue, exchange, routing_key): arguments,
#: # ...,
@ -103,12 +113,11 @@ class BrokerState(object):
def __init__(self, exchanges=None):
self.exchanges = {} if exchanges is None else exchanges
self.bindings = dict()
self.queue_index = collections.defaultdict(set)
self.bindings = {}
self.queue_index = defaultdict(set)
def clear(self):
self.exchanges.clear()
# self.bindings.clear()
self.bindings.clear()
self.queue_index.clear()
@ -116,33 +125,32 @@ class BrokerState(object):
return (queue, exchange, routing_key) in self.bindings
def binding_declare(self, queue, exchange, routing_key, arguments):
if not self.has_binding(queue, exchange, routing_key):
key = (queue, exchange, routing_key)
self.bindings[key] = arguments
self.queue_index[queue].add(key)
key = binding_key_t(queue, exchange, routing_key)
self.bindings.setdefault(key, arguments)
self.queue_index[queue].add(key)
def binding_delete(self, queue, exchange, routing_key):
if self.has_binding(queue, exchange, routing_key):
key = (queue, exchange, routing_key)
key = binding_key_t(queue, exchange, routing_key)
try:
del self.bindings[key]
except KeyError:
pass
else:
self.queue_index[queue].remove(key)
def queue_bindings_delete(self, queue):
if queue in self.queue_index:
for binding in self.queue_index[queue]:
del self.bindings[binding]
del self.queue_index[queue]
try:
bindings = self.queue_index.pop(queue)
except KeyError:
pass
else:
[self.bindings.pop(binding, None) for binding in bindings]
def queue_bindings(self, queue):
for queue, exchange, routing_key in self.queue_index[queue]:
yield (
# Not yelding directly from `queue_index` because
# we need to remove the `queue` field and retrieve `arguments`
# as needed by client code:
exchange,
routing_key,
self.bindings[(queue, exchange, routing_key)]
)
return (
queue_binding_t(key.exchange, key.routing_key, self.bindings[key])
for key in self.queue_index[queue]
)
class QoS(object):