mirror of https://github.com/explosion/spaCy.git
498 lines
15 KiB
Python
498 lines
15 KiB
Python
import ctypes
|
|
import os
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
try:
|
|
from pydantic.v1 import ValidationError
|
|
except ImportError:
|
|
from pydantic import ValidationError # type: ignore
|
|
|
|
from thinc.api import (
|
|
Config,
|
|
ConfigValidationError,
|
|
CupyOps,
|
|
MPSOps,
|
|
NumpyOps,
|
|
Optimizer,
|
|
get_current_ops,
|
|
set_current_ops,
|
|
)
|
|
from thinc.compat import has_cupy_gpu, has_torch_mps_gpu
|
|
|
|
from spacy import prefer_gpu, require_cpu, require_gpu, util
|
|
from spacy.about import __version__ as spacy_version
|
|
from spacy.lang.en import English
|
|
from spacy.lang.nl import Dutch
|
|
from spacy.language import DEFAULT_CONFIG_PATH
|
|
from spacy.ml._precomputable_affine import (
|
|
PrecomputableAffine,
|
|
_backprop_precomputable_affine_padding,
|
|
)
|
|
from spacy.schemas import ConfigSchemaTraining, TokenPattern, TokenPatternSchema
|
|
from spacy.training.batchers import minibatch_by_words
|
|
from spacy.util import (
|
|
SimpleFrozenList,
|
|
dot_to_object,
|
|
find_available_port,
|
|
import_file,
|
|
to_ternary_int,
|
|
)
|
|
|
|
from .util import get_random_doc, make_tempdir
|
|
|
|
|
|
@pytest.fixture
|
|
def is_admin():
|
|
"""Determine if the tests are run as admin or not."""
|
|
try:
|
|
admin = os.getuid() == 0
|
|
except AttributeError:
|
|
admin = ctypes.windll.shell32.IsUserAnAdmin() != 0
|
|
|
|
return admin
|
|
|
|
|
|
@pytest.mark.issue(6207)
|
|
def test_issue6207(en_tokenizer):
|
|
doc = en_tokenizer("zero one two three four five six")
|
|
|
|
# Make spans
|
|
s1 = doc[:4]
|
|
s2 = doc[3:6] # overlaps with s1
|
|
s3 = doc[5:7] # overlaps with s2, not s1
|
|
|
|
result = util.filter_spans((s1, s2, s3))
|
|
assert s1 in result
|
|
assert s2 not in result
|
|
assert s3 in result
|
|
|
|
|
|
@pytest.mark.issue(6258)
|
|
def test_issue6258():
|
|
"""Test that the non-empty constraint pattern field is respected"""
|
|
# These one is valid
|
|
TokenPatternSchema(pattern=[TokenPattern()])
|
|
# But an empty pattern list should fail to validate
|
|
# based on the schema's constraint
|
|
with pytest.raises(ValidationError):
|
|
TokenPatternSchema(pattern=[])
|
|
|
|
|
|
@pytest.mark.parametrize("text", ["hello/world", "hello world"])
|
|
def test_util_ensure_path_succeeds(text):
|
|
path = util.ensure_path(text)
|
|
assert isinstance(path, Path)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"package,result", [("numpy", True), ("sfkodskfosdkfpsdpofkspdof", False)]
|
|
)
|
|
def test_util_is_package(package, result):
|
|
"""Test that an installed package via pip is recognised by util.is_package."""
|
|
assert util.is_package(package) is result
|
|
|
|
|
|
@pytest.mark.parametrize("package", ["thinc"])
|
|
def test_util_get_package_path(package):
|
|
"""Test that a Path object is returned for a package name."""
|
|
path = util.get_package_path(package)
|
|
assert isinstance(path, Path)
|
|
|
|
|
|
def test_PrecomputableAffine(nO=4, nI=5, nF=3, nP=2):
|
|
model = PrecomputableAffine(nO=nO, nI=nI, nF=nF, nP=nP).initialize()
|
|
assert model.get_param("W").shape == (nF, nO, nP, nI)
|
|
tensor = model.ops.alloc((10, nI))
|
|
Y, get_dX = model.begin_update(tensor)
|
|
assert Y.shape == (tensor.shape[0] + 1, nF, nO, nP)
|
|
dY = model.ops.alloc((15, nO, nP))
|
|
ids = model.ops.alloc((15, nF))
|
|
ids[1, 2] = -1
|
|
dY[1] = 1
|
|
assert not model.has_grad("pad")
|
|
d_pad = _backprop_precomputable_affine_padding(model, dY, ids)
|
|
assert d_pad[0, 2, 0, 0] == 1.0
|
|
ids.fill(0.0)
|
|
dY.fill(0.0)
|
|
dY[0] = 0
|
|
ids[1, 2] = 0
|
|
ids[1, 1] = -1
|
|
ids[1, 0] = -1
|
|
dY[1] = 1
|
|
ids[2, 0] = -1
|
|
dY[2] = 5
|
|
d_pad = _backprop_precomputable_affine_padding(model, dY, ids)
|
|
assert d_pad[0, 0, 0, 0] == 6
|
|
assert d_pad[0, 1, 0, 0] == 1
|
|
assert d_pad[0, 2, 0, 0] == 0
|
|
|
|
|
|
def test_prefer_gpu():
|
|
current_ops = get_current_ops()
|
|
if has_cupy_gpu:
|
|
assert prefer_gpu()
|
|
assert isinstance(get_current_ops(), CupyOps)
|
|
elif has_torch_mps_gpu:
|
|
assert prefer_gpu()
|
|
assert isinstance(get_current_ops(), MPSOps)
|
|
else:
|
|
assert not prefer_gpu()
|
|
set_current_ops(current_ops)
|
|
|
|
|
|
def test_require_gpu():
|
|
current_ops = get_current_ops()
|
|
if has_cupy_gpu:
|
|
require_gpu()
|
|
assert isinstance(get_current_ops(), CupyOps)
|
|
elif has_torch_mps_gpu:
|
|
require_gpu()
|
|
assert isinstance(get_current_ops(), MPSOps)
|
|
set_current_ops(current_ops)
|
|
|
|
|
|
def test_require_cpu():
|
|
current_ops = get_current_ops()
|
|
require_cpu()
|
|
assert isinstance(get_current_ops(), NumpyOps)
|
|
try:
|
|
import cupy # noqa: F401
|
|
|
|
require_gpu()
|
|
assert isinstance(get_current_ops(), CupyOps)
|
|
except ImportError:
|
|
pass
|
|
require_cpu()
|
|
assert isinstance(get_current_ops(), NumpyOps)
|
|
set_current_ops(current_ops)
|
|
|
|
|
|
def test_ascii_filenames():
|
|
"""Test that all filenames in the project are ASCII.
|
|
See: https://twitter.com/_inesmontani/status/1177941471632211968
|
|
"""
|
|
root = Path(__file__).parent.parent
|
|
for path in root.glob("**/*"):
|
|
assert all(ord(c) < 128 for c in path.name), path.name
|
|
|
|
|
|
def test_load_model_blank_shortcut():
|
|
"""Test that using a model name like "blank:en" works as a shortcut for
|
|
spacy.blank("en").
|
|
"""
|
|
nlp = util.load_model("blank:en")
|
|
assert nlp.lang == "en"
|
|
assert nlp.pipeline == []
|
|
|
|
# ImportError for loading an unsupported language
|
|
with pytest.raises(ImportError):
|
|
util.load_model("blank:zxx")
|
|
|
|
# ImportError for requesting an invalid language code that isn't registered
|
|
with pytest.raises(ImportError):
|
|
util.load_model("blank:fjsfijsdof")
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"version,constraint,compatible",
|
|
[
|
|
(spacy_version, spacy_version, True),
|
|
(spacy_version, f">={spacy_version}", True),
|
|
("3.0.0", "2.0.0", False),
|
|
("3.2.1", ">=2.0.0", True),
|
|
("2.2.10a1", ">=1.0.0,<2.1.1", False),
|
|
("3.0.0.dev3", ">=1.2.3,<4.5.6", True),
|
|
("n/a", ">=1.2.3,<4.5.6", None),
|
|
("1.2.3", "n/a", None),
|
|
("n/a", "n/a", None),
|
|
],
|
|
)
|
|
def test_is_compatible_version(version, constraint, compatible):
|
|
assert util.is_compatible_version(version, constraint) is compatible
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"constraint,expected",
|
|
[
|
|
("3.0.0", False),
|
|
("==3.0.0", False),
|
|
(">=2.3.0", True),
|
|
(">2.0.0", True),
|
|
("<=2.0.0", True),
|
|
(">2.0.0,<3.0.0", False),
|
|
(">=2.0.0,<3.0.0", False),
|
|
("!=1.1,>=1.0,~=1.0", True),
|
|
("n/a", None),
|
|
],
|
|
)
|
|
def test_is_unconstrained_version(constraint, expected):
|
|
assert util.is_unconstrained_version(constraint) is expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"a1,a2,b1,b2,is_match",
|
|
[
|
|
("3.0.0", "3.0", "3.0.1", "3.0", True),
|
|
("3.1.0", "3.1", "3.2.1", "3.2", False),
|
|
("xxx", None, "1.2.3.dev0", "1.2", False),
|
|
],
|
|
)
|
|
def test_minor_version(a1, a2, b1, b2, is_match):
|
|
assert util.get_minor_version(a1) == a2
|
|
assert util.get_minor_version(b1) == b2
|
|
assert util.is_minor_version_match(a1, b1) is is_match
|
|
assert util.is_minor_version_match(a2, b2) is is_match
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"dot_notation,expected",
|
|
[
|
|
(
|
|
{"token.pos": True, "token._.xyz": True},
|
|
{"token": {"pos": True, "_": {"xyz": True}}},
|
|
),
|
|
(
|
|
{"training.batch_size": 128, "training.optimizer.learn_rate": 0.01},
|
|
{"training": {"batch_size": 128, "optimizer": {"learn_rate": 0.01}}},
|
|
),
|
|
(
|
|
{"attribute_ruler.scorer.@scorers": "spacy.tagger_scorer.v1"},
|
|
{"attribute_ruler": {"scorer": {"@scorers": "spacy.tagger_scorer.v1"}}},
|
|
),
|
|
],
|
|
)
|
|
def test_dot_to_dict(dot_notation, expected):
|
|
result = util.dot_to_dict(dot_notation)
|
|
assert result == expected
|
|
assert util.dict_to_dot(result) == dot_notation
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"dot_notation,expected",
|
|
[
|
|
(
|
|
{"token.pos": True, "token._.xyz": True},
|
|
{"token": {"pos": True, "_": {"xyz": True}}},
|
|
),
|
|
(
|
|
{"training.batch_size": 128, "training.optimizer.learn_rate": 0.01},
|
|
{"training": {"batch_size": 128, "optimizer": {"learn_rate": 0.01}}},
|
|
),
|
|
(
|
|
{"attribute_ruler.scorer": {"@scorers": "spacy.tagger_scorer.v1"}},
|
|
{"attribute_ruler": {"scorer": {"@scorers": "spacy.tagger_scorer.v1"}}},
|
|
),
|
|
],
|
|
)
|
|
def test_dot_to_dict_overrides(dot_notation, expected):
|
|
result = util.dot_to_dict(dot_notation)
|
|
assert result == expected
|
|
assert util.dict_to_dot(result, for_overrides=True) == dot_notation
|
|
|
|
|
|
def test_set_dot_to_object():
|
|
config = {"foo": {"bar": 1, "baz": {"x": "y"}}, "test": {"a": {"b": "c"}}}
|
|
with pytest.raises(KeyError):
|
|
util.set_dot_to_object(config, "foo.bar.baz", 100)
|
|
with pytest.raises(KeyError):
|
|
util.set_dot_to_object(config, "hello.world", 100)
|
|
with pytest.raises(KeyError):
|
|
util.set_dot_to_object(config, "test.a.b.c", 100)
|
|
util.set_dot_to_object(config, "foo.bar", 100)
|
|
assert config["foo"]["bar"] == 100
|
|
util.set_dot_to_object(config, "foo.baz.x", {"hello": "world"})
|
|
assert config["foo"]["baz"]["x"]["hello"] == "world"
|
|
assert config["test"]["a"]["b"] == "c"
|
|
util.set_dot_to_object(config, "foo", 123)
|
|
assert config["foo"] == 123
|
|
util.set_dot_to_object(config, "test", "hello")
|
|
assert dict(config) == {"foo": 123, "test": "hello"}
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"doc_sizes, expected_batches",
|
|
[
|
|
([400, 400, 199], [3]),
|
|
([400, 400, 199, 3], [4]),
|
|
([400, 400, 199, 3, 200], [3, 2]),
|
|
([400, 400, 199, 3, 1], [5]),
|
|
([400, 400, 199, 3, 1, 1500], [5]), # 1500 will be discarded
|
|
([400, 400, 199, 3, 1, 200], [3, 3]),
|
|
([400, 400, 199, 3, 1, 999], [3, 3]),
|
|
([400, 400, 199, 3, 1, 999, 999], [3, 2, 1, 1]),
|
|
([1, 2, 999], [3]),
|
|
([1, 2, 999, 1], [4]),
|
|
([1, 200, 999, 1], [2, 2]),
|
|
([1, 999, 200, 1], [2, 2]),
|
|
],
|
|
)
|
|
def test_util_minibatch(doc_sizes, expected_batches):
|
|
docs = [get_random_doc(doc_size) for doc_size in doc_sizes]
|
|
tol = 0.2
|
|
batch_size = 1000
|
|
batches = list(
|
|
minibatch_by_words(docs, size=batch_size, tolerance=tol, discard_oversize=True)
|
|
)
|
|
assert [len(batch) for batch in batches] == expected_batches
|
|
|
|
max_size = batch_size + batch_size * tol
|
|
for batch in batches:
|
|
assert sum([len(doc) for doc in batch]) < max_size
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"doc_sizes, expected_batches",
|
|
[
|
|
([400, 4000, 199], [1, 2]),
|
|
([400, 400, 199, 3000, 200], [1, 4]),
|
|
([400, 400, 199, 3, 1, 1500], [1, 5]),
|
|
([400, 400, 199, 3000, 2000, 200, 200], [1, 1, 3, 2]),
|
|
([1, 2, 9999], [1, 2]),
|
|
([2000, 1, 2000, 1, 1, 1, 2000], [1, 1, 1, 4]),
|
|
],
|
|
)
|
|
def test_util_minibatch_oversize(doc_sizes, expected_batches):
|
|
"""Test that oversized documents are returned in their own batch"""
|
|
docs = [get_random_doc(doc_size) for doc_size in doc_sizes]
|
|
tol = 0.2
|
|
batch_size = 1000
|
|
batches = list(
|
|
minibatch_by_words(docs, size=batch_size, tolerance=tol, discard_oversize=False)
|
|
)
|
|
assert [len(batch) for batch in batches] == expected_batches
|
|
|
|
|
|
def test_util_dot_section():
|
|
cfg_string = """
|
|
[nlp]
|
|
lang = "en"
|
|
pipeline = ["textcat"]
|
|
|
|
[components]
|
|
|
|
[components.textcat]
|
|
factory = "textcat"
|
|
|
|
[components.textcat.model]
|
|
@architectures = "spacy.TextCatBOW.v3"
|
|
exclusive_classes = true
|
|
length = 262144
|
|
ngram_size = 1
|
|
no_output_layer = false
|
|
"""
|
|
nlp_config = Config().from_str(cfg_string)
|
|
en_nlp = util.load_model_from_config(nlp_config, auto_fill=True)
|
|
default_config = Config().from_disk(DEFAULT_CONFIG_PATH)
|
|
default_config["nlp"]["lang"] = "nl"
|
|
nl_nlp = util.load_model_from_config(default_config, auto_fill=True)
|
|
# Test that creation went OK
|
|
assert isinstance(en_nlp, English)
|
|
assert isinstance(nl_nlp, Dutch)
|
|
assert nl_nlp.pipe_names == []
|
|
assert en_nlp.pipe_names == ["textcat"]
|
|
# not exclusive_classes
|
|
assert en_nlp.get_pipe("textcat").model.attrs["multi_label"] is False
|
|
# Test that default values got overwritten
|
|
assert en_nlp.config["nlp"]["pipeline"] == ["textcat"]
|
|
assert nl_nlp.config["nlp"]["pipeline"] == [] # default value []
|
|
# Test proper functioning of 'dot_to_object'
|
|
with pytest.raises(KeyError):
|
|
dot_to_object(en_nlp.config, "nlp.pipeline.tagger")
|
|
with pytest.raises(KeyError):
|
|
dot_to_object(en_nlp.config, "nlp.unknownattribute")
|
|
T = util.registry.resolve(nl_nlp.config["training"], schema=ConfigSchemaTraining)
|
|
assert isinstance(dot_to_object({"training": T}, "training.optimizer"), Optimizer)
|
|
|
|
|
|
def test_simple_frozen_list():
|
|
t = SimpleFrozenList(["foo", "bar"])
|
|
assert t == ["foo", "bar"]
|
|
assert t.index("bar") == 1 # okay method
|
|
with pytest.raises(NotImplementedError):
|
|
t.append("baz")
|
|
with pytest.raises(NotImplementedError):
|
|
t.sort()
|
|
with pytest.raises(NotImplementedError):
|
|
t.extend(["baz"])
|
|
with pytest.raises(NotImplementedError):
|
|
t.pop()
|
|
t = SimpleFrozenList(["foo", "bar"], error="Error!")
|
|
with pytest.raises(NotImplementedError):
|
|
t.append("baz")
|
|
|
|
|
|
def test_resolve_dot_names():
|
|
config = {
|
|
"training": {"optimizer": {"@optimizers": "Adam.v1"}},
|
|
"foo": {"bar": "training.optimizer", "baz": "training.xyz"},
|
|
}
|
|
result = util.resolve_dot_names(config, ["training.optimizer"])
|
|
assert isinstance(result[0], Optimizer)
|
|
with pytest.raises(ConfigValidationError) as e:
|
|
util.resolve_dot_names(config, ["training.xyz", "training.optimizer"])
|
|
errors = e.value.errors
|
|
assert len(errors) == 1
|
|
assert errors[0]["loc"] == ["training", "xyz"]
|
|
|
|
|
|
def test_import_code():
|
|
code_str = """
|
|
from spacy import Language
|
|
|
|
class DummyComponent:
|
|
def __init__(self, vocab, name):
|
|
pass
|
|
|
|
def initialize(self, get_examples, *, nlp, dummy_param: int):
|
|
pass
|
|
|
|
@Language.factory(
|
|
"dummy_component",
|
|
)
|
|
def make_dummy_component(
|
|
nlp: Language, name: str
|
|
):
|
|
return DummyComponent(nlp.vocab, name)
|
|
"""
|
|
|
|
with make_tempdir() as temp_dir:
|
|
code_path = os.path.join(temp_dir, "code.py")
|
|
with open(code_path, "w") as fileh:
|
|
fileh.write(code_str)
|
|
|
|
import_file("python_code", code_path)
|
|
config = {"initialize": {"components": {"dummy_component": {"dummy_param": 1}}}}
|
|
nlp = English.from_config(config)
|
|
nlp.add_pipe("dummy_component")
|
|
nlp.initialize()
|
|
|
|
|
|
def test_to_ternary_int():
|
|
assert to_ternary_int(True) == 1
|
|
assert to_ternary_int(None) == 0
|
|
assert to_ternary_int(False) == -1
|
|
assert to_ternary_int(1) == 1
|
|
assert to_ternary_int(1.0) == 1
|
|
assert to_ternary_int(0) == 0
|
|
assert to_ternary_int(0.0) == 0
|
|
assert to_ternary_int(-1) == -1
|
|
assert to_ternary_int(5) == -1
|
|
assert to_ternary_int(-10) == -1
|
|
assert to_ternary_int("string") == -1
|
|
assert to_ternary_int([0, "string"]) == -1
|
|
|
|
|
|
def test_find_available_port():
|
|
host = "0.0.0.0"
|
|
port = 5001
|
|
assert find_available_port(port, host) == port, "Port 5001 isn't free"
|
|
|
|
from wsgiref.simple_server import demo_app, make_server
|
|
|
|
with make_server(host, port, demo_app) as httpd:
|
|
with pytest.warns(UserWarning, match="already in use"):
|
|
found_port = find_available_port(port, host, auto_select=True)
|
|
assert found_port == port + 1, "Didn't find next port"
|