mirror of https://github.com/celery/kombu.git
amqp style routing for virtual exchanges (e.g. memory backend)
forgot to git add this part of the change
This commit is contained in:
parent
f8ee383cc3
commit
04b9a6f2fb
|
@ -2,6 +2,7 @@ import atexit
|
|||
import pickle
|
||||
import sys
|
||||
import tempfile
|
||||
import re
|
||||
|
||||
from time import sleep
|
||||
from itertools import count, cycle
|
||||
|
@ -162,16 +163,27 @@ class Channel(object):
|
|||
|
||||
def _lookup(self, exchange, routing_key, default="ae.undeliver"):
|
||||
try:
|
||||
return _exchanges[exchange]["table"][routing_key]
|
||||
type = _exchanges[exchange].get('type', 'direct')
|
||||
if type == 'direct':
|
||||
# routing key is a direct match
|
||||
return [q[2] for q in _exchanges[exchange]["table"] if q[0] == routing_key]
|
||||
elif type == 'fanout':
|
||||
# routing key not used, all queues receive the message
|
||||
return [q[2] for q in _exchanges[exchange]["table"]]
|
||||
elif type == 'topic':
|
||||
# pattern matching for the queue
|
||||
return [q[2] for q in _exchanges[exchange]["table"] if q[1].match(routing_key)]
|
||||
else:
|
||||
raise KeyError('Unknown exchange type')
|
||||
except KeyError:
|
||||
self._new_queue(default)
|
||||
return default
|
||||
return [default]
|
||||
|
||||
def _restore(self, message):
|
||||
delivery_info = message.delivery_info
|
||||
self._put(self._lookup(delivery_info["exchange"],
|
||||
delivery_info["routing_key"]),
|
||||
message)
|
||||
for q in self._lookup(delivery_info["exchange"],
|
||||
delivery_info["routing_key"]):
|
||||
self._put(q, message)
|
||||
|
||||
def _poll(self, queues):
|
||||
return FairCycle(self._get, queues, QueueEmpty).get()
|
||||
|
@ -188,16 +200,16 @@ class Channel(object):
|
|||
return [_consumers[tag] for tag in self._consumers]
|
||||
|
||||
def exchange_declare(self, exchange, type="direct", durable=False,
|
||||
auto_delete=False, arguments=None):
|
||||
auto_delete=False, arguments=None, nowait=False):
|
||||
if exchange not in _exchanges:
|
||||
_exchanges[exchange] = {"type": type,
|
||||
"durable": durable,
|
||||
"auto_delete": auto_delete,
|
||||
"arguments": arguments or {},
|
||||
"table": {}}
|
||||
"table": []}
|
||||
|
||||
def exchange_delete(self, exchange, if_unused=False):
|
||||
for rkey, queue in _exchanges[exchange]["table"].items():
|
||||
for rkey, m, queue in _exchanges[exchange]["table"]:
|
||||
self._purge(queue)
|
||||
_exchanges.pop(exchange, None)
|
||||
|
||||
|
@ -210,9 +222,14 @@ class Channel(object):
|
|||
return
|
||||
self._delete(queue)
|
||||
|
||||
def queue_bind(self, queue, exchange, routing_key, arguments=None):
|
||||
table = _exchanges[exchange].setdefault("table", {})
|
||||
table[routing_key] = queue
|
||||
def queue_bind(self, queue, exchange, routing_key, arguments=None, nowait=False):
|
||||
table = _exchanges[exchange].setdefault("table", [])
|
||||
m = None
|
||||
type = _exchanges[exchange].get('type', 'direct')
|
||||
if type == 'topic':
|
||||
re_key = "^%s$" % routing_key.replace('.', '\.').replace('#', '[^\.]').replace('*','.*?')
|
||||
m = re.compile(re_key, re.U)
|
||||
table.append([routing_key, m, queue])
|
||||
|
||||
def queue_purge(self, queue, **kwargs):
|
||||
return self._purge(queue, **kwargs)
|
||||
|
@ -251,7 +268,8 @@ class Channel(object):
|
|||
message["properties"]["delivery_info"]["exchange"] = exchange
|
||||
message["properties"]["delivery_info"]["routing_key"] = routing_key
|
||||
message["properties"]["delivery_tag"] = self._next_delivery_tag()
|
||||
self._put(self._lookup(exchange, routing_key), message)
|
||||
for q in self._lookup(exchange, routing_key):
|
||||
self._put(q, message)
|
||||
|
||||
def basic_cancel(self, consumer_tag):
|
||||
queue = _consumers.pop(consumer_tag, None)
|
||||
|
|
Loading…
Reference in New Issue