RapidFuzz/tests/common.py

398 lines
15 KiB
Python

"""
common parts of the test suite for rapidfuzz
"""
from __future__ import annotations
from dataclasses import dataclass
from math import isnan
from typing import Any
import pytest
from rapidfuzz import process_cpp, process_py
try:
from pandas import NA as pandas_NA
except BaseException:
pandas_NA = None
def _get_scorer_flags_py(scorer: Any, scorer_kwargs: dict[str, Any]) -> tuple[int, int]:
params = getattr(scorer, "_RF_ScorerPy", None)
if params is not None:
flags = params["get_scorer_flags"](**scorer_kwargs)
return (flags["worst_score"], flags["optimal_score"])
return (0, 100)
def is_none(s):
if s is None or s is pandas_NA:
return True
if isinstance(s, float) and isnan(s):
return True
return False
def call_and_maybe_catch(call, *args, catch_exceptions=False, **kwargs):
if not catch_exceptions:
return call(*args, **kwargs)
try:
return call(*args, **kwargs)
except AssertionError as e:
raise e
except Exception as e:
return e
def compare_exceptions(e1, e2):
try:
return str(e1) == str(e2)
except Exception:
return False
def scorer_tester(scorer, s1, s2, catch_exceptions=False, **kwargs):
score1 = call_and_maybe_catch(scorer, s1, s2, **kwargs)
exception = isinstance(score1, Exception)
temp_kwargs = kwargs.copy()
process_kwargs = {}
if "processor" in kwargs:
process_kwargs["processor"] = kwargs["processor"]
del temp_kwargs["processor"]
if "score_cutoff" in kwargs:
process_kwargs["score_cutoff"] = kwargs["score_cutoff"]
del temp_kwargs["score_cutoff"]
if temp_kwargs:
process_kwargs["scorer_kwargs"] = temp_kwargs
extractOne_res1 = call_and_maybe_catch(
process_cpp.extractOne, s1, [s2], catch_exceptions=catch_exceptions, scorer=scorer, **process_kwargs
)
extractOne_res2 = call_and_maybe_catch(
process_py.extractOne, s1, [s2], catch_exceptions=catch_exceptions, scorer=scorer, **process_kwargs
)
extract_res1 = call_and_maybe_catch(
process_cpp.extract, s1, [s2], catch_exceptions=catch_exceptions, scorer=scorer, **process_kwargs
)
extract_res2 = call_and_maybe_catch(
process_py.extract, s1, [s2], catch_exceptions=catch_exceptions, scorer=scorer, **process_kwargs
)
extract_iter_res1 = call_and_maybe_catch(
list, process_cpp.extract_iter(s1, [s2], scorer=scorer, **process_kwargs), catch_exceptions=catch_exceptions
)
extract_iter_res2 = call_and_maybe_catch(
list, process_py.extract_iter(s1, [s2], scorer=scorer, **process_kwargs), catch_exceptions=catch_exceptions
)
if exception:
assert compare_exceptions(extractOne_res1, score1)
assert compare_exceptions(extractOne_res2, score1)
assert compare_exceptions(extract_res1, score1)
assert compare_exceptions(extract_res2, score1)
assert compare_exceptions(extract_iter_res1, score1)
assert compare_exceptions(extract_iter_res2, score1)
elif is_none(s1) or is_none(s2):
assert extractOne_res1 is None
assert extractOne_res2 is None
assert extract_res1 == []
assert extract_res2 == []
assert extract_iter_res1 == []
assert extract_iter_res2 == []
elif kwargs.get("score_cutoff") is not None:
worst_score, optimal_score = _get_scorer_flags_py(scorer, process_kwargs.get("scorer_kwargs", {}))
lowest_score_worst = optimal_score > worst_score
is_filtered = score1 < kwargs["score_cutoff"] if lowest_score_worst else score1 > kwargs["score_cutoff"]
if is_filtered:
assert extractOne_res1 is None
assert extractOne_res2 is None
assert extract_res1 == []
assert extract_res2 == []
assert extract_iter_res1 == []
assert extract_iter_res2 == []
else:
assert pytest.approx(score1) == extractOne_res1[1]
assert pytest.approx(score1) == extractOne_res2[1]
assert pytest.approx(score1) == extract_res1[0][1]
assert pytest.approx(score1) == extract_res2[0][1]
assert pytest.approx(score1) == extract_iter_res1[0][1]
assert pytest.approx(score1) == extract_iter_res2[0][1]
else:
assert pytest.approx(score1) == extractOne_res1[1]
assert pytest.approx(score1) == extractOne_res2[1]
assert pytest.approx(score1) == extract_res1[0][1]
assert pytest.approx(score1) == extract_res2[0][1]
assert pytest.approx(score1) == extract_iter_res1[0][1]
assert pytest.approx(score1) == extract_iter_res2[0][1]
try:
import numpy as np
except Exception:
np = None
if np is not None:
cdist_scores1 = call_and_maybe_catch(
process_cpp.cdist, [s1], [s2], catch_exceptions=catch_exceptions, scorer=scorer, **process_kwargs
)
cdist_scores2 = call_and_maybe_catch(
process_py.cdist, [s1], [s2], catch_exceptions=catch_exceptions, scorer=scorer, **process_kwargs
)
# probably trigger multi match / simd implementations
cdist_scores3 = call_and_maybe_catch(
process_cpp.cdist, [s1] * 2, [s2] * 4, catch_exceptions=catch_exceptions, scorer=scorer, **process_kwargs
)
cdist_scores4 = call_and_maybe_catch(
process_py.cdist, [s1] * 2, [s2] * 4, catch_exceptions=catch_exceptions, scorer=scorer, **process_kwargs
)
if exception:
assert compare_exceptions(cdist_scores1, score1)
assert compare_exceptions(cdist_scores2, score1)
assert compare_exceptions(cdist_scores3, score1)
assert compare_exceptions(cdist_scores4, score1)
else:
assert np.all(np.isclose(cdist_scores1, score1))
assert np.all(np.isclose(cdist_scores2, score1))
assert np.all(np.isclose(cdist_scores3, score1))
assert np.all(np.isclose(cdist_scores4, score1))
if exception:
raise score1
return score1
def symmetric_scorer_tester(scorer, s1, s2, catch_exceptions=False, **kwargs):
score1 = call_and_maybe_catch(scorer_tester, scorer, s1, s2, catch_exceptions=catch_exceptions, **kwargs)
score2 = call_and_maybe_catch(scorer_tester, scorer, s2, s1, catch_exceptions=catch_exceptions, **kwargs)
if isinstance(score1, Exception):
assert compare_exceptions(score1, score2)
raise score1
assert pytest.approx(score1) == score2
return score1
@dataclass
class Scorer:
distance: Any
similarity: Any
normalized_distance: Any
normalized_similarity: Any
editops: Any
opcodes: Any
class GenericScorer:
def __init__(self, py_scorers, cpp_scorers, get_scorer_flags):
self.py_scorers = py_scorers
self.cpp_scorers = cpp_scorers
self.scorers = self.py_scorers + self.cpp_scorers
def validate_attrs(func1, func2):
assert hasattr(func1, "_RF_ScorerPy")
assert hasattr(func2, "_RF_ScorerPy")
assert func1.__name__ == func2.__name__
assert func1.__qualname__ == func2.__qualname__
assert func1.__doc__ == func2.__doc__
for scorer in self.scorers:
validate_attrs(scorer.distance, self.scorers[0].distance)
validate_attrs(scorer.similarity, self.scorers[0].similarity)
validate_attrs(scorer.normalized_distance, self.scorers[0].normalized_distance)
validate_attrs(scorer.normalized_similarity, self.scorers[0].normalized_similarity)
for scorer in self.cpp_scorers:
assert hasattr(scorer.distance, "_RF_Scorer")
assert hasattr(scorer.similarity, "_RF_Scorer")
assert hasattr(scorer.normalized_distance, "_RF_Scorer")
assert hasattr(scorer.normalized_similarity, "_RF_Scorer")
self.get_scorer_flags = get_scorer_flags
def _editops(self, s1, s2, catch_exceptions=False, **kwargs):
results = [
call_and_maybe_catch(scorer.editops, s1, s2, catch_exceptions=catch_exceptions, **kwargs)
for scorer in self.scorers
]
for result in results:
assert compare_exceptions(result, results[0])
if any(isinstance(result, Exception) for result in results):
raise results[0]
return results[0]
def _opcodes(self, s1, s2, catch_exceptions=False, **kwargs):
results = [
call_and_maybe_catch(scorer.opcodes, s1, s2, catch_exceptions=catch_exceptions, **kwargs)
for scorer in self.scorers
]
for result in results:
assert compare_exceptions(result, results[0])
if any(isinstance(result, Exception) for result in results):
raise results[0]
return results[0]
def _distance(self, s1, s2, catch_exceptions=False, **kwargs):
symmetric = self.get_scorer_flags(s1, s2, **kwargs)["symmetric"]
tester = symmetric_scorer_tester if symmetric else scorer_tester
scores = [
call_and_maybe_catch(tester, scorer.distance, s1, s2, catch_exceptions=catch_exceptions, **kwargs)
for scorer in self.scorers
]
if any(isinstance(score, Exception) for score in scores):
for score in scores:
assert compare_exceptions(score, scores[0])
raise scores[0]
scores = sorted(scores)
assert pytest.approx(scores[0]) == scores[-1]
return scores[0]
def _similarity(self, s1, s2, catch_exceptions=False, **kwargs):
symmetric = self.get_scorer_flags(s1, s2, **kwargs)["symmetric"]
tester = symmetric_scorer_tester if symmetric else scorer_tester
scores = [
call_and_maybe_catch(tester, scorer.similarity, s1, s2, catch_exceptions=catch_exceptions, **kwargs)
for scorer in self.scorers
]
if any(isinstance(score, Exception) for score in scores):
for score in scores:
assert compare_exceptions(score, scores[0])
raise scores[0]
scores = sorted(scores)
assert pytest.approx(scores[0]) == scores[-1]
return scores[0]
def _normalized_distance(self, s1, s2, catch_exceptions=False, **kwargs):
symmetric = self.get_scorer_flags(s1, s2, **kwargs)["symmetric"]
tester = symmetric_scorer_tester if symmetric else scorer_tester
scores = [
call_and_maybe_catch(
tester, scorer.normalized_distance, s1, s2, catch_exceptions=catch_exceptions, **kwargs
)
for scorer in self.scorers
]
if any(isinstance(score, Exception) for score in scores):
for score in scores:
assert compare_exceptions(score, scores[0])
raise scores[0]
scores = sorted(scores)
assert pytest.approx(scores[0]) == scores[-1]
return scores[0]
def _normalized_similarity(self, s1, s2, catch_exceptions=False, **kwargs):
symmetric = self.get_scorer_flags(s1, s2, **kwargs)["symmetric"]
tester = symmetric_scorer_tester if symmetric else scorer_tester
scores = [
call_and_maybe_catch(
tester, scorer.normalized_similarity, s1, s2, catch_exceptions=catch_exceptions, **kwargs
)
for scorer in self.scorers
]
if any(isinstance(score, Exception) for score in scores):
for score in scores:
assert compare_exceptions(score, scores[0])
raise scores[0]
scores = sorted(scores)
assert pytest.approx(scores[0]) == scores[-1]
return scores[0]
def _validate(self, s1, s2, catch_exceptions=False, **kwargs):
# todo requires more complex test handling
# score_cutoff = kwargs.get("score_cutoff")
kwargs = {k: v for k, v in kwargs.items() if k != "score_cutoff"}
maximum = self.get_scorer_flags(s1, s2, **kwargs)["maximum"]
dist = call_and_maybe_catch(self._distance, s1, s2, catch_exceptions=catch_exceptions, **kwargs)
sim = call_and_maybe_catch(self._similarity, s1, s2, catch_exceptions=catch_exceptions, **kwargs)
norm_dist = call_and_maybe_catch(self._normalized_distance, s1, s2, catch_exceptions=catch_exceptions, **kwargs)
norm_sim = call_and_maybe_catch(
self._normalized_similarity, s1, s2, catch_exceptions=catch_exceptions, **kwargs
)
if isinstance(dist, Exception):
assert compare_exceptions(dist, sim)
assert compare_exceptions(dist, norm_dist)
assert compare_exceptions(dist, norm_sim)
raise dist
assert pytest.approx(dist) == maximum - sim
if maximum != 0:
assert pytest.approx(dist / maximum) == norm_dist
assert pytest.approx(sim / maximum) == norm_sim
else:
assert pytest.approx(0.0) == norm_dist
assert pytest.approx(1.0) == norm_sim
return dist, sim, norm_dist, norm_sim
def distance(self, s1, s2, catch_exceptions=False, **kwargs):
dist, _, _, _ = self._validate(s1, s2, catch_exceptions=catch_exceptions, **kwargs)
if "score_cutoff" not in kwargs:
return dist
return self._distance(s1, s2, catch_exceptions=catch_exceptions, **kwargs)
def similarity(self, s1, s2, catch_exceptions=False, **kwargs):
_, sim, _, _ = self._validate(s1, s2, catch_exceptions=catch_exceptions, **kwargs)
if "score_cutoff" not in kwargs:
return sim
return self._similarity(s1, s2, catch_exceptions=catch_exceptions, **kwargs)
def normalized_distance(self, s1, s2, catch_exceptions=False, **kwargs):
if not is_none(s1) and not is_none(s2):
_, _, norm_dist, _ = self._validate(s1, s2, catch_exceptions=catch_exceptions, **kwargs)
# todo we should be able to handle this in a nicer way
if "score_cutoff" not in kwargs:
return norm_dist
return self._normalized_distance(s1, s2, catch_exceptions=catch_exceptions, **kwargs)
def normalized_similarity(self, s1, s2, catch_exceptions=False, **kwargs):
if not is_none(s1) and not is_none(s2):
_, _, _, norm_sim = self._validate(s1, s2, catch_exceptions=catch_exceptions, **kwargs)
if "score_cutoff" not in kwargs:
return norm_sim
return self._normalized_similarity(s1, s2, catch_exceptions=catch_exceptions, **kwargs)
def editops(self, s1, s2, catch_exceptions=False, **kwargs):
editops_ = self._editops(s1, s2, catch_exceptions=catch_exceptions, **kwargs)
opcodes_ = self._opcodes(s1, s2, catch_exceptions=catch_exceptions, **kwargs)
assert opcodes_.as_editops() == editops_
assert opcodes_ == editops_.as_opcodes()
return editops_
def opcodes(self, s1, s2, catch_exceptions=False, **kwargs):
editops_ = self._editops(s1, s2, catch_exceptions=catch_exceptions, **kwargs)
opcodes_ = self._opcodes(s1, s2, catch_exceptions=catch_exceptions, **kwargs)
assert opcodes_.as_editops() == editops_
assert opcodes_ == editops_.as_opcodes()
return opcodes_