initial working version of multi-search

This commit is contained in:
Tal Einat 2019-04-22 23:11:47 +03:00
parent 1656444033
commit 1b8c586ca9
3 changed files with 282 additions and 0 deletions

View File

@ -17,6 +17,7 @@ __version__ = '0.6.2'
__all__ = [ __all__ = [
'find_near_matches', 'find_near_matches',
'find_near_matches_multiple',
'Match', 'Match',
] ]
@ -25,6 +26,7 @@ from fuzzysearch.common import Match, search_exact, LevenshteinSearchParams
from fuzzysearch.levenshtein import find_near_matches_levenshtein from fuzzysearch.levenshtein import find_near_matches_levenshtein
from fuzzysearch.substitutions_only import find_near_matches_substitutions from fuzzysearch.substitutions_only import find_near_matches_substitutions
from fuzzysearch.generic_search import find_near_matches_generic from fuzzysearch.generic_search import find_near_matches_generic
from fuzzysearch.multi import find_near_matches_multiple
def find_near_matches(subsequence, sequence, def find_near_matches(subsequence, sequence,

112
src/fuzzysearch/multi.py Normal file
View File

@ -0,0 +1,112 @@
"""Non-naive searching for multiple needles in multiple haystacks."""
from collections import defaultdict
from six.moves import xrange
from fuzzysearch import LevenshteinSearchParams
from fuzzysearch.common import get_best_match_in_group, group_matches
from fuzzysearch.generic_search import find_near_matches_generic_linear_programming
class SequenceNgramIndex(object):
"""An n-gram index of a sequence, for a given n-gram size.
Once created, this allows for very quick lookup of the indexes where
any n-gram of the given size appears in the sequence.
>>> SequenceNgramIndex("-abcde-abcde-", 3).indexes_of_ngram('abc')
(1, 7)
"""
def __init__(self, sequence, ngram_size):
self.sequence = sequence
self.ngram_size = ngram_size
self._index = self.index_sequence(self.sequence, self.ngram_size)
@classmethod
def index_sequence(cls, sequence, ngram_size):
index = defaultdict(list)
for i in range(len(sequence) - ngram_size + 1):
index[sequence[i:i + ngram_size]].append(i)
return {
ngram: tuple(indexes)
for ngram, indexes in index.items()
}
def indexes_of_ngram(self, ngram):
assert len(ngram) == self.ngram_size
return self._index.get(ngram, ())
def find_near_matches_multiple(subsequences, sequences,
max_substitutions=None,
max_insertions=None,
max_deletions=None,
max_l_dist=None):
""""Search for near-matches of sub-sequences in sequences.
This searches for near-matches, where the nearly-matching parts of the
sequences must meet the following limitations (relative to the
sub-sequences):
* the maximum allowed number of character substitutions
* the maximum allowed number of new characters inserted
* and the maximum allowed number of character deletions
* the total number of substitutions, insertions and deletions
(a.k.a. the Levenshtein distance)
This returns a list of lists: For each sequence, a list is returned
of the matches for each sub-sequence within that sequence.
>>> find_near_matches_multiple(['foo', 'bar'], ['fuo', 'ber'], 1, 1, 1, 1)
[[[Match(start=0, end=3, dist=1)], []],
[[], [Match(start=0, end=3, dist=1)]]]
"""
matches = [[None for _subseq in subsequences] for _seq in sequences]
if not subsequences:
return matches
search_params = LevenshteinSearchParams(
max_substitutions=max_substitutions,
max_insertions=max_insertions,
max_deletions=max_deletions,
max_l_dist=max_l_dist,
)
# note: LevenshteinSearchParams normalizes max_l_dist
ngram_len = min(map(len, subsequences)) // (search_params.max_l_dist + 1)
for n_seq, sequence in enumerate(sequences):
indexed_ngrams = SequenceNgramIndex(sequence, ngram_len)
for n_subseq, subsequence in enumerate(subsequences):
matches[n_seq][n_subseq] = \
search_with_ngram_index(subsequence, sequence,
search_params, indexed_ngrams)
return matches
def search_with_ngram_index(subsequence, sequence, search_params, indexed_ngrams):
max_l_dist = search_params.max_l_dist
ngram_len = indexed_ngrams.ngram_size
subseq_len = len(subsequence)
matches = []
for ngram_start in xrange(0, subseq_len - ngram_len + 1, ngram_len):
ngram_end = ngram_start + ngram_len
ngram = subsequence[ngram_start:ngram_end]
for index in indexed_ngrams.indexes_of_ngram(ngram):
# try to expand left and/or right according to n_ngram
for match in find_near_matches_generic_linear_programming(
subsequence, sequence[max(0, index - ngram_start - max_l_dist):index - ngram_start + subseq_len + max_l_dist],
search_params,
):
matches.append(match._replace(
start=match.start + max(0, index - ngram_start - max_l_dist),
end=match.end + max(0, index - ngram_start - max_l_dist),
))
# don't return overlapping matches; instead, group overlapping matches
# together and return the best match from each group
match_groups = group_matches(matches)
best_matches = [get_best_match_in_group(group) for group in match_groups]
return sorted(best_matches)

168
tests/test_multi.py Normal file
View File

@ -0,0 +1,168 @@
from tests.compat import unittest
from fuzzysearch.common import get_best_match_in_group, group_matches,\
LevenshteinSearchParams, Match
from fuzzysearch.multi import find_near_matches_multiple
from tests.test_generic_search import TestGenericSearchBase
class TestMultiSearch(unittest.TestCase):
def search(self, patterns, sequences, search_params):
return find_near_matches_multiple(patterns, sequences,
search_params.max_substitutions,
search_params.max_insertions,
search_params.max_deletions,
search_params.max_l_dist)
def test_empty_inputs(self):
self.assertEqual([], self.search([], [],
LevenshteinSearchParams(1, 1, 1, 1)))
self.assertEqual([], self.search(['needle'], [],
LevenshteinSearchParams(1, 1, 1, 1)))
self.assertEqual([[]], self.search([], ['haystack'],
LevenshteinSearchParams(1, 1, 1, 1)))
def test_multi_identical(self):
"""Search for two different strings, in both of them."""
needles = ["foo", "bar"]
haystacks = needles
for max_l_dist in [0, 1, 2]:
with self.subTest(max_l_dist=max_l_dist):
search_params = LevenshteinSearchParams(max_l_dist, max_l_dist,
max_l_dist, max_l_dist)
self.assertEqual(
[[[Match(0, 3, 0)], []],
[[], [Match(0, 3, 0)]]],
self.search(needles, haystacks, search_params)
)
def test_multi_different(self):
"""Search for two different strings, in variations of both of them."""
needles = ["foo", "bar"]
haystacks = ["fuo", "ber"]
for max_l_dist in [0]:
with self.subTest(max_l_dist=max_l_dist):
search_params = LevenshteinSearchParams(max_l_dist, max_l_dist,
max_l_dist, max_l_dist)
self.assertEqual(
[[[], []],
[[], []]],
self.search(needles, haystacks, search_params)
)
for max_l_dist in [1, 2]:
with self.subTest(max_l_dist=max_l_dist):
search_params = LevenshteinSearchParams(max_l_dist, max_l_dist,
max_l_dist, max_l_dist)
self.assertEqual(
[[[Match(0, 3, 1)], []],
[[], [Match(0, 3, 1)]]],
self.search(needles, haystacks, search_params)
)
def test_multi_random(self):
"""Search for random sub-strings of random strings.
Each sub-string is searched for in all of the random strings.
"""
import random
rand = random.Random()
rand.seed(1)
randint = rand.randint
texts = [
''.join(
chr(randint(0, 255))
for _i in range(randint(1000, 10000))
)
for _n_text in range(10)
]
needles = []
for n_text, text in enumerate(texts):
for needle_len in [4, 7, 10, 15, 50]:
index = randint(0, len(text) - needle_len + 1)
sub_text = text[index:index + needle_len]
needles.append((n_text, index, sub_text))
for max_l_dist in [0, 1]:
with self.subTest(max_l_dist=max_l_dist):
search_params = LevenshteinSearchParams(max_l_dist, max_l_dist,
max_l_dist, max_l_dist)
needle_strs = [needle for (n_text, index, needle) in needles]
results = self.search(needle_strs,
texts,
search_params)
for n_needle, (n_text, index, needle) in enumerate(needles):
self.assertIn(Match(index, index + len(needle), 0), results[n_text][n_needle])
for max_l_dist in [2]:
with self.subTest(max_l_dist=max_l_dist):
search_params = LevenshteinSearchParams(max_l_dist, max_l_dist,
max_l_dist, max_l_dist)
needles2 = [
(n_text, index, needle)
for (n_text, index, needle) in needles
if len(needle) >= 6
]
needle_strs = [needle for (n_text, index, needle) in needles2]
results = self.search(needle_strs,
texts,
search_params)
for n_needle, (n_text, index, needle) in enumerate(needles2):
self.assertIn(Match(index, index + len(needle), 0), results[n_text][n_needle])
def test_identical_needles(self):
"""Search for a single needle multiple times."""
for search_params in [
LevenshteinSearchParams(0, 0, 0, 0),
LevenshteinSearchParams(0, 1, 0, 1),
LevenshteinSearchParams(0, 0, 1, 1),
]:
with self.subTest(search_params=search_params):
self.assertEqual(
self.search(
['abc'] * 3,
['--abc-----adc--', '---------xyz----'],
search_params=search_params,
),
[[[Match(2, 5, 0)]] * 3,
[[]] * 3],
)
for search_params in [
LevenshteinSearchParams(1, 1, 1, 1),
LevenshteinSearchParams(1, 0, 0, 1),
# deletion + insertion = substitution
LevenshteinSearchParams(0, 1, 1, 1),
]:
with self.subTest(search_params=search_params):
self.assertEqual(
self.search(
['abc'] * 3,
['--abc-----adc--', '---------xyz----'],
search_params=search_params,
),
[[[Match(2, 5, 0), Match(10, 13, 1)]] * 3,
[[]] * 3],
)
class TestMultiSearchAsGenericSearch(unittest.TestCase, TestGenericSearchBase):
def search(self, pattern, sequence,
max_subs, max_ins, max_dels, max_l_dist=None):
results = find_near_matches_multiple([pattern], [sequence],
max_subs, max_ins,
max_dels, max_l_dist)
return results[0][0]
def expectedOutcomes(self, search_results, expected_outcomes, *args, **kwargs):
best_from_grouped_exepected_outcomes = [
get_best_match_in_group(group)
for group in group_matches(expected_outcomes)
]
return self.assertEqual(search_results,
best_from_grouped_exepected_outcomes)