Control pattern matching (#997)

* Added pattern/matcher to Mailbox

* pattern/match for kombu 4

* Ensure kombu.matcher is covered by our documentation.

* Adds test_matcher & pidbox unit tests.

* Added tests to ensure exception is raised when matcher is not registered.

* Adds to test for destination passed in to process.
This commit is contained in:
Jason Held 2019-02-27 09:23:12 -05:00 committed by Asif Saif Uddin
parent eb6e4c8d51
commit 41dbbe3063
6 changed files with 234 additions and 6 deletions

View File

@ -10,6 +10,7 @@
kombu
kombu.common
kombu.matcher
kombu.mixins
kombu.simple
kombu.clocks

View File

@ -0,0 +1,11 @@
==============================================
Pattern matching registry - ``kombu.matcher``
==============================================
.. contents::
:local:
.. currentmodule:: kombu.matcher
.. automodule:: kombu.matcher
:members:
:undoc-members:

140
kombu/matcher.py Normal file
View File

@ -0,0 +1,140 @@
"""Pattern matching registry."""
from __future__ import absolute_import, unicode_literals
from re import match as rematch
from fnmatch import fnmatch
from .utils.compat import entrypoints
from .utils.encoding import bytes_to_str
class MatcherNotInstalled(Exception):
"""Matcher not installed/found."""
pass
class MatcherRegistry(object):
"""Pattern matching function registry."""
MatcherNotInstalled = MatcherNotInstalled
matcher_pattern_first = ["pcre", ]
def __init__(self):
self._matchers = {}
self._default_matcher = None
def register(self, name, matcher):
"""Add matcher by name to the registry."""
self._matchers[name] = matcher
def unregister(self, name):
"""Remove matcher by name from the registry."""
try:
self._matchers.pop(name)
except KeyError:
raise self.MatcherNotInstalled(
'No matcher installed for {}'.format(name)
)
def _set_default_matcher(self, name):
"""Set the default matching method.
:param name: The name of the registered matching method.
For example, `glob` (default), `pcre`, or any custom
methods registered using :meth:`register`.
:raises MatcherNotInstalled: If the matching method requested
is not available.
"""
try:
self._default_matcher = self._matchers[name]
except KeyError:
raise self.MatcherNotInstalled(
'No matcher installed for {}'.format(name)
)
def match(self, data, pattern, matcher=None, matcher_kwargs=None):
"""Call the matcher."""
if matcher and not self._matchers.get(matcher):
raise self.MatcherNotInstalled(
'No matcher installed for {}'.format(matcher)
)
match_func = self._matchers[matcher or 'glob']
if matcher in self.matcher_pattern_first:
first_arg = bytes_to_str(pattern)
second_arg = bytes_to_str(data)
else:
first_arg = bytes_to_str(data)
second_arg = bytes_to_str(pattern)
return match_func(first_arg, second_arg, **matcher_kwargs or {})
#: Global registry of matchers.
registry = MatcherRegistry()
"""
.. function:: match(data, pattern, matcher=default_matcher,
matcher_kwargs=None):
Match `data` by `pattern` using `matcher`.
:param data: The data that should be matched. Must be string.
:param pattern: The pattern that should be applied. Must be string.
:keyword matcher: An optional string representing the mathcing
method (for example, `glob` or `pcre`).
If :const:`None` (default), then `glob` will be used.
:keyword matcher_kwargs: Additional keyword arguments that will be passed
to the specified `matcher`.
:returns: :const:`True` if `data` matches pattern,
:const:`False` otherwise.
:raises MatcherNotInstalled: If the matching method requested is not
available.
"""
match = registry.match
"""
.. function:: register(name, matcher):
Register a new matching method.
:param name: A convience name for the mathing method.
:param matcher: A method that will be passed data and pattern.
"""
register = registry.register
"""
.. function:: unregister(name):
Unregister registered matching method.
:param name: Registered matching method name.
"""
unregister = registry.unregister
def register_glob():
"""Register glob into default registry."""
registry.register('glob', fnmatch)
def register_pcre():
"""Register pcre into default registry."""
registry.register('pcre', rematch)
# Register the base matching methods.
register_glob()
register_pcre()
# Default matching method is 'glob'
registry._set_default_matcher('glob')
# Load entrypoints from installed extensions
for ep, args in entrypoints('kombu.matchers'):
register(ep.name, *args)

View File

@ -15,11 +15,14 @@ from . import Exchange, Queue, Consumer, Producer
from .clocks import LamportClock
from .common import maybe_declare, oid_from
from .exceptions import InconsistencyError
from .five import range
from .five import range, string_t
from .log import get_logger
from .utils.functional import maybe_evaluate, reprcall
from .utils.objects import cached_property
from .utils.uuid import uuid
from .matcher import match
REPLY_QUEUE_EXPIRES = 10
W_PIDBOX_IN_USE = """\
A node named {node.hostname} is already using this process mailbox!
@ -123,9 +126,21 @@ class Node(object):
def handle_message(self, body, message=None):
destination = body.get('destination')
pattern = body.get('pattern')
matcher = body.get('matcher')
if message:
self.adjust_clock(message.headers.get('clock') or 0)
if not destination or self.hostname in destination:
hostname = self.hostname
run_dispatch = False
if destination:
if hostname in destination:
run_dispatch = True
elif pattern and matcher:
if match(hostname, pattern, matcher):
run_dispatch = True
else:
run_dispatch = True
if run_dispatch:
return self.dispatch(**body)
dispatch_from_message = handle_message
@ -270,10 +285,12 @@ class Mailbox(object):
def _publish(self, type, arguments, destination=None,
reply_ticket=None, channel=None, timeout=None,
serializer=None, producer=None):
serializer=None, producer=None, pattern=None, matcher=None):
message = {'method': type,
'arguments': arguments,
'destination': destination}
'destination': destination,
'pattern': pattern,
'matcher': matcher}
chan = channel or self.connection.default_channel
exchange = self.exchange
if reply_ticket:
@ -292,12 +309,19 @@ class Mailbox(object):
def _broadcast(self, command, arguments=None, destination=None,
reply=False, timeout=1, limit=None,
callback=None, channel=None, serializer=None):
callback=None, channel=None, serializer=None,
pattern=None, matcher=None):
if destination is not None and \
not isinstance(destination, (list, tuple)):
raise ValueError(
'destination must be a list/tuple not {0}'.format(
type(destination)))
if (pattern is not None and not isinstance(pattern, string_t) and
matcher is not None and not isinstance(matcher, string_t)):
raise ValueError(
'pattern and matcher must be '
'strings not {}, {}'.format(type(pattern), type(matcher))
)
arguments = arguments or {}
reply_ticket = reply and uuid() or None
@ -312,7 +336,9 @@ class Mailbox(object):
reply_ticket=reply_ticket,
channel=chan,
timeout=timeout,
serializer=serializer)
serializer=serializer,
pattern=pattern,
matcher=matcher)
if reply_ticket:
return self._collect(reply_ticket, limit=limit,

32
t/unit/test_matcher.py Normal file
View File

@ -0,0 +1,32 @@
from __future__ import absolute_import, unicode_literals
from kombu.matcher import (
match, register, registry, unregister, fnmatch, rematch,
MatcherNotInstalled
)
import pytest
class test_Matcher(object):
def test_register_match_unregister_matcher(self):
register("test_matcher", rematch)
registry.matcher_pattern_first.append("test_matcher")
assert registry._matchers["test_matcher"] == rematch
assert match("data", r"d.*", "test_matcher") is not None
assert registry._default_matcher == fnmatch
registry._set_default_matcher("test_matcher")
assert registry._default_matcher == rematch
unregister("test_matcher")
assert "test_matcher" not in registry._matchers
registry._set_default_matcher("glob")
assert registry._default_matcher == fnmatch
def test_unregister_matcher_not_registered(self):
with pytest.raises(MatcherNotInstalled):
unregister('notinstalled')
def test_match_using_unregistered_matcher(self):
with pytest.raises(MatcherNotInstalled):
match("data", r"d.*", "notinstalled")

View File

@ -43,6 +43,11 @@ class test_Mailbox:
def _handler(self, state):
return self.stats['var']
def test_broadcast_matcher_pattern_string_type(self):
mailbox = pidbox.Mailbox("test_matcher_str")(self.connection)
with pytest.raises(ValueError):
mailbox._broadcast("ping", pattern=1, matcher=2)
def test_publish_reply_ignores_InconsistencyError(self):
mailbox = pidbox.Mailbox('test_reply__collect')(self.connection)
with patch('kombu.pidbox.Producer') as Producer:
@ -233,6 +238,19 @@ class test_Mailbox:
body['destination'] = ['some_other_node']
assert node.handle_message(body, None) is None
# message for me should be processed.
body['destination'] = ['test_dispatch_from_message']
assert node.handle_message(body, None) is not None
# message not for me should not be processed.
body.pop("destination")
body['matcher'] = 'glob'
body["pattern"] = "something*"
assert node.handle_message(body, None) is None
body["pattern"] = "test*"
assert node.handle_message(body, None) is not None
def test_handle_message_adjusts_clock(self):
node = self.bound.Node('test_adjusts_clock')