From 41dbbe3063f4c1e5a11fc8a0ce6c366c14917df6 Mon Sep 17 00:00:00 2001 From: Jason Held Date: Wed, 27 Feb 2019 09:23:12 -0500 Subject: [PATCH] Control pattern matching (#997) * Added pattern/matcher to Mailbox * pattern/match for kombu 4 * Ensure kombu.matcher is covered by our documentation. * Adds test_matcher & pidbox unit tests. * Added tests to ensure exception is raised when matcher is not registered. * Adds to test for destination passed in to process. --- docs/reference/index.rst | 1 + docs/reference/kombu.matcher.rst | 11 +++ kombu/matcher.py | 140 +++++++++++++++++++++++++++++++ kombu/pidbox.py | 38 +++++++-- t/unit/test_matcher.py | 32 +++++++ t/unit/test_pidbox.py | 18 ++++ 6 files changed, 234 insertions(+), 6 deletions(-) create mode 100644 docs/reference/kombu.matcher.rst create mode 100644 kombu/matcher.py create mode 100644 t/unit/test_matcher.py diff --git a/docs/reference/index.rst b/docs/reference/index.rst index 471fe4f8..e022ead7 100644 --- a/docs/reference/index.rst +++ b/docs/reference/index.rst @@ -10,6 +10,7 @@ kombu kombu.common + kombu.matcher kombu.mixins kombu.simple kombu.clocks diff --git a/docs/reference/kombu.matcher.rst b/docs/reference/kombu.matcher.rst new file mode 100644 index 00000000..0dc15521 --- /dev/null +++ b/docs/reference/kombu.matcher.rst @@ -0,0 +1,11 @@ +============================================== + Pattern matching registry - ``kombu.matcher`` +============================================== + +.. contents:: + :local: +.. currentmodule:: kombu.matcher + +.. automodule:: kombu.matcher + :members: + :undoc-members: diff --git a/kombu/matcher.py b/kombu/matcher.py new file mode 100644 index 00000000..018a0955 --- /dev/null +++ b/kombu/matcher.py @@ -0,0 +1,140 @@ +"""Pattern matching registry.""" +from __future__ import absolute_import, unicode_literals + +from re import match as rematch +from fnmatch import fnmatch + +from .utils.compat import entrypoints +from .utils.encoding import bytes_to_str + + +class MatcherNotInstalled(Exception): + """Matcher not installed/found.""" + + pass + + +class MatcherRegistry(object): + """Pattern matching function registry.""" + + MatcherNotInstalled = MatcherNotInstalled + matcher_pattern_first = ["pcre", ] + + def __init__(self): + self._matchers = {} + self._default_matcher = None + + def register(self, name, matcher): + """Add matcher by name to the registry.""" + self._matchers[name] = matcher + + def unregister(self, name): + """Remove matcher by name from the registry.""" + try: + self._matchers.pop(name) + except KeyError: + raise self.MatcherNotInstalled( + 'No matcher installed for {}'.format(name) + ) + + def _set_default_matcher(self, name): + """Set the default matching method. + + :param name: The name of the registered matching method. + For example, `glob` (default), `pcre`, or any custom + methods registered using :meth:`register`. + + :raises MatcherNotInstalled: If the matching method requested + is not available. + """ + try: + self._default_matcher = self._matchers[name] + except KeyError: + raise self.MatcherNotInstalled( + 'No matcher installed for {}'.format(name) + ) + + def match(self, data, pattern, matcher=None, matcher_kwargs=None): + """Call the matcher.""" + if matcher and not self._matchers.get(matcher): + raise self.MatcherNotInstalled( + 'No matcher installed for {}'.format(matcher) + ) + match_func = self._matchers[matcher or 'glob'] + if matcher in self.matcher_pattern_first: + first_arg = bytes_to_str(pattern) + second_arg = bytes_to_str(data) + else: + first_arg = bytes_to_str(data) + second_arg = bytes_to_str(pattern) + return match_func(first_arg, second_arg, **matcher_kwargs or {}) + + +#: Global registry of matchers. +registry = MatcherRegistry() + + +""" +.. function:: match(data, pattern, matcher=default_matcher, + matcher_kwargs=None): + + Match `data` by `pattern` using `matcher`. + + :param data: The data that should be matched. Must be string. + :param pattern: The pattern that should be applied. Must be string. + :keyword matcher: An optional string representing the mathcing + method (for example, `glob` or `pcre`). + + If :const:`None` (default), then `glob` will be used. + + :keyword matcher_kwargs: Additional keyword arguments that will be passed + to the specified `matcher`. + :returns: :const:`True` if `data` matches pattern, + :const:`False` otherwise. + + :raises MatcherNotInstalled: If the matching method requested is not + available. +""" +match = registry.match + + +""" +.. function:: register(name, matcher): + Register a new matching method. + + :param name: A convience name for the mathing method. + :param matcher: A method that will be passed data and pattern. +""" +register = registry.register + + +""" +.. function:: unregister(name): + Unregister registered matching method. + + :param name: Registered matching method name. +""" +unregister = registry.unregister + + +def register_glob(): + """Register glob into default registry.""" + registry.register('glob', fnmatch) + + +def register_pcre(): + """Register pcre into default registry.""" + registry.register('pcre', rematch) + + +# Register the base matching methods. +register_glob() +register_pcre() + +# Default matching method is 'glob' +registry._set_default_matcher('glob') + + +# Load entrypoints from installed extensions +for ep, args in entrypoints('kombu.matchers'): + register(ep.name, *args) diff --git a/kombu/pidbox.py b/kombu/pidbox.py index f8fcecb0..266c75ac 100644 --- a/kombu/pidbox.py +++ b/kombu/pidbox.py @@ -15,11 +15,14 @@ from . import Exchange, Queue, Consumer, Producer from .clocks import LamportClock from .common import maybe_declare, oid_from from .exceptions import InconsistencyError -from .five import range +from .five import range, string_t from .log import get_logger from .utils.functional import maybe_evaluate, reprcall from .utils.objects import cached_property from .utils.uuid import uuid +from .matcher import match + +REPLY_QUEUE_EXPIRES = 10 W_PIDBOX_IN_USE = """\ A node named {node.hostname} is already using this process mailbox! @@ -123,9 +126,21 @@ class Node(object): def handle_message(self, body, message=None): destination = body.get('destination') + pattern = body.get('pattern') + matcher = body.get('matcher') if message: self.adjust_clock(message.headers.get('clock') or 0) - if not destination or self.hostname in destination: + hostname = self.hostname + run_dispatch = False + if destination: + if hostname in destination: + run_dispatch = True + elif pattern and matcher: + if match(hostname, pattern, matcher): + run_dispatch = True + else: + run_dispatch = True + if run_dispatch: return self.dispatch(**body) dispatch_from_message = handle_message @@ -270,10 +285,12 @@ class Mailbox(object): def _publish(self, type, arguments, destination=None, reply_ticket=None, channel=None, timeout=None, - serializer=None, producer=None): + serializer=None, producer=None, pattern=None, matcher=None): message = {'method': type, 'arguments': arguments, - 'destination': destination} + 'destination': destination, + 'pattern': pattern, + 'matcher': matcher} chan = channel or self.connection.default_channel exchange = self.exchange if reply_ticket: @@ -292,12 +309,19 @@ class Mailbox(object): def _broadcast(self, command, arguments=None, destination=None, reply=False, timeout=1, limit=None, - callback=None, channel=None, serializer=None): + callback=None, channel=None, serializer=None, + pattern=None, matcher=None): if destination is not None and \ not isinstance(destination, (list, tuple)): raise ValueError( 'destination must be a list/tuple not {0}'.format( type(destination))) + if (pattern is not None and not isinstance(pattern, string_t) and + matcher is not None and not isinstance(matcher, string_t)): + raise ValueError( + 'pattern and matcher must be ' + 'strings not {}, {}'.format(type(pattern), type(matcher)) + ) arguments = arguments or {} reply_ticket = reply and uuid() or None @@ -312,7 +336,9 @@ class Mailbox(object): reply_ticket=reply_ticket, channel=chan, timeout=timeout, - serializer=serializer) + serializer=serializer, + pattern=pattern, + matcher=matcher) if reply_ticket: return self._collect(reply_ticket, limit=limit, diff --git a/t/unit/test_matcher.py b/t/unit/test_matcher.py new file mode 100644 index 00000000..8433a19f --- /dev/null +++ b/t/unit/test_matcher.py @@ -0,0 +1,32 @@ +from __future__ import absolute_import, unicode_literals + +from kombu.matcher import ( + match, register, registry, unregister, fnmatch, rematch, + MatcherNotInstalled +) + +import pytest + + +class test_Matcher(object): + + def test_register_match_unregister_matcher(self): + register("test_matcher", rematch) + registry.matcher_pattern_first.append("test_matcher") + assert registry._matchers["test_matcher"] == rematch + assert match("data", r"d.*", "test_matcher") is not None + assert registry._default_matcher == fnmatch + registry._set_default_matcher("test_matcher") + assert registry._default_matcher == rematch + unregister("test_matcher") + assert "test_matcher" not in registry._matchers + registry._set_default_matcher("glob") + assert registry._default_matcher == fnmatch + + def test_unregister_matcher_not_registered(self): + with pytest.raises(MatcherNotInstalled): + unregister('notinstalled') + + def test_match_using_unregistered_matcher(self): + with pytest.raises(MatcherNotInstalled): + match("data", r"d.*", "notinstalled") diff --git a/t/unit/test_pidbox.py b/t/unit/test_pidbox.py index c2dfccb0..a187af37 100644 --- a/t/unit/test_pidbox.py +++ b/t/unit/test_pidbox.py @@ -43,6 +43,11 @@ class test_Mailbox: def _handler(self, state): return self.stats['var'] + def test_broadcast_matcher_pattern_string_type(self): + mailbox = pidbox.Mailbox("test_matcher_str")(self.connection) + with pytest.raises(ValueError): + mailbox._broadcast("ping", pattern=1, matcher=2) + def test_publish_reply_ignores_InconsistencyError(self): mailbox = pidbox.Mailbox('test_reply__collect')(self.connection) with patch('kombu.pidbox.Producer') as Producer: @@ -233,6 +238,19 @@ class test_Mailbox: body['destination'] = ['some_other_node'] assert node.handle_message(body, None) is None + # message for me should be processed. + body['destination'] = ['test_dispatch_from_message'] + assert node.handle_message(body, None) is not None + + # message not for me should not be processed. + body.pop("destination") + body['matcher'] = 'glob' + body["pattern"] = "something*" + assert node.handle_message(body, None) is None + + body["pattern"] = "test*" + assert node.handle_message(body, None) is not None + def test_handle_message_adjusts_clock(self): node = self.bound.Node('test_adjusts_clock')