From 1b8c586ca90b007893c3a18aa0ecb0b65bc10747 Mon Sep 17 00:00:00 2001 From: Tal Einat Date: Mon, 22 Apr 2019 23:11:47 +0300 Subject: [PATCH] initial working version of multi-search --- src/fuzzysearch/__init__.py | 2 + src/fuzzysearch/multi.py | 112 ++++++++++++++++++++++++ tests/test_multi.py | 168 ++++++++++++++++++++++++++++++++++++ 3 files changed, 282 insertions(+) create mode 100644 src/fuzzysearch/multi.py create mode 100644 tests/test_multi.py diff --git a/src/fuzzysearch/__init__.py b/src/fuzzysearch/__init__.py index ea8a237..6bf7b45 100644 --- a/src/fuzzysearch/__init__.py +++ b/src/fuzzysearch/__init__.py @@ -17,6 +17,7 @@ __version__ = '0.6.2' __all__ = [ 'find_near_matches', + 'find_near_matches_multiple', 'Match', ] @@ -25,6 +26,7 @@ from fuzzysearch.common import Match, search_exact, LevenshteinSearchParams from fuzzysearch.levenshtein import find_near_matches_levenshtein from fuzzysearch.substitutions_only import find_near_matches_substitutions from fuzzysearch.generic_search import find_near_matches_generic +from fuzzysearch.multi import find_near_matches_multiple def find_near_matches(subsequence, sequence, diff --git a/src/fuzzysearch/multi.py b/src/fuzzysearch/multi.py new file mode 100644 index 0000000..996720d --- /dev/null +++ b/src/fuzzysearch/multi.py @@ -0,0 +1,112 @@ +"""Non-naive searching for multiple needles in multiple haystacks.""" +from collections import defaultdict + +from six.moves import xrange + +from fuzzysearch import LevenshteinSearchParams +from fuzzysearch.common import get_best_match_in_group, group_matches +from fuzzysearch.generic_search import find_near_matches_generic_linear_programming + + +class SequenceNgramIndex(object): + """An n-gram index of a sequence, for a given n-gram size. + + Once created, this allows for very quick lookup of the indexes where + any n-gram of the given size appears in the sequence. + + >>> SequenceNgramIndex("-abcde-abcde-", 3).indexes_of_ngram('abc') + (1, 7) + """ + def __init__(self, sequence, ngram_size): + self.sequence = sequence + self.ngram_size = ngram_size + + self._index = self.index_sequence(self.sequence, self.ngram_size) + + @classmethod + def index_sequence(cls, sequence, ngram_size): + index = defaultdict(list) + for i in range(len(sequence) - ngram_size + 1): + index[sequence[i:i + ngram_size]].append(i) + return { + ngram: tuple(indexes) + for ngram, indexes in index.items() + } + + def indexes_of_ngram(self, ngram): + assert len(ngram) == self.ngram_size + return self._index.get(ngram, ()) + + +def find_near_matches_multiple(subsequences, sequences, + max_substitutions=None, + max_insertions=None, + max_deletions=None, + max_l_dist=None): + """"Search for near-matches of sub-sequences in sequences. + + This searches for near-matches, where the nearly-matching parts of the + sequences must meet the following limitations (relative to the + sub-sequences): + + * the maximum allowed number of character substitutions + * the maximum allowed number of new characters inserted + * and the maximum allowed number of character deletions + * the total number of substitutions, insertions and deletions + (a.k.a. the Levenshtein distance) + + This returns a list of lists: For each sequence, a list is returned + of the matches for each sub-sequence within that sequence. + + >>> find_near_matches_multiple(['foo', 'bar'], ['fuo', 'ber'], 1, 1, 1, 1) + [[[Match(start=0, end=3, dist=1)], []], + [[], [Match(start=0, end=3, dist=1)]]] + """ + matches = [[None for _subseq in subsequences] for _seq in sequences] + if not subsequences: + return matches + + search_params = LevenshteinSearchParams( + max_substitutions=max_substitutions, + max_insertions=max_insertions, + max_deletions=max_deletions, + max_l_dist=max_l_dist, + ) + # note: LevenshteinSearchParams normalizes max_l_dist + ngram_len = min(map(len, subsequences)) // (search_params.max_l_dist + 1) + + for n_seq, sequence in enumerate(sequences): + indexed_ngrams = SequenceNgramIndex(sequence, ngram_len) + for n_subseq, subsequence in enumerate(subsequences): + matches[n_seq][n_subseq] = \ + search_with_ngram_index(subsequence, sequence, + search_params, indexed_ngrams) + + return matches + + +def search_with_ngram_index(subsequence, sequence, search_params, indexed_ngrams): + max_l_dist = search_params.max_l_dist + ngram_len = indexed_ngrams.ngram_size + subseq_len = len(subsequence) + + matches = [] + for ngram_start in xrange(0, subseq_len - ngram_len + 1, ngram_len): + ngram_end = ngram_start + ngram_len + ngram = subsequence[ngram_start:ngram_end] + for index in indexed_ngrams.indexes_of_ngram(ngram): + # try to expand left and/or right according to n_ngram + for match in find_near_matches_generic_linear_programming( + subsequence, sequence[max(0, index - ngram_start - max_l_dist):index - ngram_start + subseq_len + max_l_dist], + search_params, + ): + matches.append(match._replace( + start=match.start + max(0, index - ngram_start - max_l_dist), + end=match.end + max(0, index - ngram_start - max_l_dist), + )) + + # don't return overlapping matches; instead, group overlapping matches + # together and return the best match from each group + match_groups = group_matches(matches) + best_matches = [get_best_match_in_group(group) for group in match_groups] + return sorted(best_matches) diff --git a/tests/test_multi.py b/tests/test_multi.py new file mode 100644 index 0000000..98b1993 --- /dev/null +++ b/tests/test_multi.py @@ -0,0 +1,168 @@ +from tests.compat import unittest + +from fuzzysearch.common import get_best_match_in_group, group_matches,\ + LevenshteinSearchParams, Match +from fuzzysearch.multi import find_near_matches_multiple + +from tests.test_generic_search import TestGenericSearchBase + + +class TestMultiSearch(unittest.TestCase): + def search(self, patterns, sequences, search_params): + return find_near_matches_multiple(patterns, sequences, + search_params.max_substitutions, + search_params.max_insertions, + search_params.max_deletions, + search_params.max_l_dist) + + def test_empty_inputs(self): + self.assertEqual([], self.search([], [], + LevenshteinSearchParams(1, 1, 1, 1))) + self.assertEqual([], self.search(['needle'], [], + LevenshteinSearchParams(1, 1, 1, 1))) + self.assertEqual([[]], self.search([], ['haystack'], + LevenshteinSearchParams(1, 1, 1, 1))) + + def test_multi_identical(self): + """Search for two different strings, in both of them.""" + needles = ["foo", "bar"] + haystacks = needles + + for max_l_dist in [0, 1, 2]: + with self.subTest(max_l_dist=max_l_dist): + search_params = LevenshteinSearchParams(max_l_dist, max_l_dist, + max_l_dist, max_l_dist) + self.assertEqual( + [[[Match(0, 3, 0)], []], + [[], [Match(0, 3, 0)]]], + self.search(needles, haystacks, search_params) + ) + + def test_multi_different(self): + """Search for two different strings, in variations of both of them.""" + needles = ["foo", "bar"] + haystacks = ["fuo", "ber"] + + for max_l_dist in [0]: + with self.subTest(max_l_dist=max_l_dist): + search_params = LevenshteinSearchParams(max_l_dist, max_l_dist, + max_l_dist, max_l_dist) + self.assertEqual( + [[[], []], + [[], []]], + self.search(needles, haystacks, search_params) + ) + + for max_l_dist in [1, 2]: + with self.subTest(max_l_dist=max_l_dist): + search_params = LevenshteinSearchParams(max_l_dist, max_l_dist, + max_l_dist, max_l_dist) + self.assertEqual( + [[[Match(0, 3, 1)], []], + [[], [Match(0, 3, 1)]]], + self.search(needles, haystacks, search_params) + ) + + def test_multi_random(self): + """Search for random sub-strings of random strings. + + Each sub-string is searched for in all of the random strings. + """ + import random + + rand = random.Random() + rand.seed(1) + randint = rand.randint + texts = [ + ''.join( + chr(randint(0, 255)) + for _i in range(randint(1000, 10000)) + ) + for _n_text in range(10) + ] + + needles = [] + for n_text, text in enumerate(texts): + for needle_len in [4, 7, 10, 15, 50]: + index = randint(0, len(text) - needle_len + 1) + sub_text = text[index:index + needle_len] + needles.append((n_text, index, sub_text)) + + for max_l_dist in [0, 1]: + with self.subTest(max_l_dist=max_l_dist): + search_params = LevenshteinSearchParams(max_l_dist, max_l_dist, + max_l_dist, max_l_dist) + needle_strs = [needle for (n_text, index, needle) in needles] + results = self.search(needle_strs, + texts, + search_params) + for n_needle, (n_text, index, needle) in enumerate(needles): + self.assertIn(Match(index, index + len(needle), 0), results[n_text][n_needle]) + + for max_l_dist in [2]: + with self.subTest(max_l_dist=max_l_dist): + search_params = LevenshteinSearchParams(max_l_dist, max_l_dist, + max_l_dist, max_l_dist) + needles2 = [ + (n_text, index, needle) + for (n_text, index, needle) in needles + if len(needle) >= 6 + ] + needle_strs = [needle for (n_text, index, needle) in needles2] + results = self.search(needle_strs, + texts, + search_params) + for n_needle, (n_text, index, needle) in enumerate(needles2): + self.assertIn(Match(index, index + len(needle), 0), results[n_text][n_needle]) + + def test_identical_needles(self): + """Search for a single needle multiple times.""" + for search_params in [ + LevenshteinSearchParams(0, 0, 0, 0), + LevenshteinSearchParams(0, 1, 0, 1), + LevenshteinSearchParams(0, 0, 1, 1), + ]: + with self.subTest(search_params=search_params): + self.assertEqual( + self.search( + ['abc'] * 3, + ['--abc-----adc--', '---------xyz----'], + search_params=search_params, + ), + [[[Match(2, 5, 0)]] * 3, + [[]] * 3], + ) + + for search_params in [ + LevenshteinSearchParams(1, 1, 1, 1), + LevenshteinSearchParams(1, 0, 0, 1), + # deletion + insertion = substitution + LevenshteinSearchParams(0, 1, 1, 1), + ]: + with self.subTest(search_params=search_params): + self.assertEqual( + self.search( + ['abc'] * 3, + ['--abc-----adc--', '---------xyz----'], + search_params=search_params, + ), + [[[Match(2, 5, 0), Match(10, 13, 1)]] * 3, + [[]] * 3], + ) + + +class TestMultiSearchAsGenericSearch(unittest.TestCase, TestGenericSearchBase): + def search(self, pattern, sequence, + max_subs, max_ins, max_dels, max_l_dist=None): + results = find_near_matches_multiple([pattern], [sequence], + max_subs, max_ins, + max_dels, max_l_dist) + return results[0][0] + + def expectedOutcomes(self, search_results, expected_outcomes, *args, **kwargs): + best_from_grouped_exepected_outcomes = [ + get_best_match_in_group(group) + for group in group_matches(expected_outcomes) + ] + return self.assertEqual(search_results, + best_from_grouped_exepected_outcomes)