mirror of https://github.com/explosion/spaCy.git
152 lines
5.2 KiB
Python
152 lines
5.2 KiB
Python
from typing import List
|
|
from thinc.types import Floats2d
|
|
from thinc.api import SequenceCategoricalCrossentropy, set_dropout_rate
|
|
from thinc.util import to_numpy
|
|
|
|
from .defaults import default_simple_ner
|
|
from ..gold import Example, spans_from_biluo_tags, iob_to_biluo, biluo_to_iob
|
|
from ..tokens import Doc
|
|
from ..language import component
|
|
from ..util import link_vectors_to_models
|
|
from .pipes import Pipe
|
|
|
|
|
|
@component("simple_ner", assigns=["doc.ents"], default_model=default_simple_ner)
|
|
class SimpleNER(Pipe):
|
|
"""Named entity recognition with a tagging model. The model should include
|
|
validity constraints to ensure that only valid tag sequences are returned."""
|
|
|
|
def __init__(self, vocab, model):
|
|
self.vocab = vocab
|
|
self.model = model
|
|
self.cfg = {"labels": []}
|
|
self.loss_func = SequenceCategoricalCrossentropy(
|
|
names=self.get_tag_names(),
|
|
normalize=True,
|
|
missing_value=None
|
|
)
|
|
assert self.model is not None
|
|
|
|
@property
|
|
def labels(self):
|
|
return self.cfg["labels"]
|
|
|
|
@property
|
|
def is_biluo(self):
|
|
return self.model.name.startswith("biluo")
|
|
|
|
def add_label(self, label):
|
|
if label not in self.cfg["labels"]:
|
|
self.cfg["labels"].append(label)
|
|
|
|
def get_tag_names(self):
|
|
if self.is_biluo:
|
|
return (
|
|
[f"B-{label}" for label in self.labels] +
|
|
[f"I-{label}" for label in self.labels] +
|
|
[f"L-{label}" for label in self.labels] +
|
|
[f"U-{label}" for label in self.labels] +
|
|
["O"]
|
|
)
|
|
else:
|
|
return (
|
|
[f"B-{label}" for label in self.labels] +
|
|
[f"I-{label}" for label in self.labels] +
|
|
["O"]
|
|
)
|
|
|
|
def predict(self, docs: List[Doc]) -> List[Floats2d]:
|
|
scores = self.model.predict(docs)
|
|
return scores
|
|
|
|
def set_annotations(self, docs: List[Doc], scores: List[Floats2d], tensors=None):
|
|
"""Set entities on a batch of documents from a batch of scores."""
|
|
tag_names = self.get_tag_names()
|
|
for i, doc in enumerate(docs):
|
|
actions = to_numpy(scores[i].argmax(axis=1))
|
|
tags = [tag_names[actions[j]] for j in range(len(doc))]
|
|
if not self.is_biluo:
|
|
tags = iob_to_biluo(tags)
|
|
doc.ents = spans_from_biluo_tags(doc, tags)
|
|
|
|
def update(self, examples, set_annotations=False, drop=0.0, sgd=None, losses=None):
|
|
if not any(_has_ner(eg) for eg in examples):
|
|
return 0
|
|
examples = Example.to_example_objects(examples)
|
|
docs = [ex.doc for ex in examples]
|
|
set_dropout_rate(self.model, drop)
|
|
scores, bp_scores = self.model.begin_update(docs)
|
|
loss, d_scores = self.get_loss(examples, scores)
|
|
bp_scores(d_scores)
|
|
if set_annotations:
|
|
self.set_annotations(docs, scores)
|
|
if sgd is not None:
|
|
self.model.finish_update(sgd)
|
|
if losses is not None:
|
|
losses.setdefault("ner", 0.0)
|
|
losses["ner"] += loss
|
|
return loss
|
|
|
|
def get_loss(self, examples, scores):
|
|
loss = 0
|
|
d_scores = []
|
|
truths = []
|
|
for eg in examples:
|
|
gold_tags = [(tag if tag != "-" else None) for tag in eg.gold.ner]
|
|
if not self.is_biluo:
|
|
gold_tags = biluo_to_iob(gold_tags)
|
|
truths.append(gold_tags)
|
|
for i in range(len(scores)):
|
|
if len(scores[i]) != len(truths[i]):
|
|
raise ValueError(
|
|
f"Mismatched output and gold sizes.\n"
|
|
f"Output: {len(scores[i])}, gold: {len(truths[i])}."
|
|
f"Input: {len(examples[i].doc)}"
|
|
)
|
|
d_scores, loss = self.loss_func(scores, truths)
|
|
return loss, d_scores
|
|
|
|
def begin_training(self, get_examples, pipeline=None, sgd=None, **kwargs):
|
|
self.cfg.update(kwargs)
|
|
if not hasattr(get_examples, '__call__'):
|
|
gold_tuples = get_examples
|
|
get_examples = lambda: gold_tuples
|
|
labels = _get_labels(get_examples())
|
|
for label in _get_labels(get_examples()):
|
|
self.add_label(label)
|
|
labels = self.labels
|
|
n_actions = self.model.attrs["get_num_actions"](len(labels))
|
|
self.model.set_dim("nO", n_actions)
|
|
self.model.initialize()
|
|
if pipeline is not None:
|
|
self.init_multitask_objectives(get_examples, pipeline, sgd=sgd, **self.cfg)
|
|
link_vectors_to_models(self.vocab)
|
|
self.loss_func = SequenceCategoricalCrossentropy(
|
|
names=self.get_tag_names(),
|
|
normalize=True,
|
|
missing_value=None
|
|
)
|
|
|
|
return sgd
|
|
|
|
def init_multitask_objectives(self, *args, **kwargs):
|
|
pass
|
|
|
|
|
|
def _has_ner(eg):
|
|
for ner_tag in eg.gold.ner:
|
|
if ner_tag != "-" and ner_tag != None:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
|
|
def _get_labels(examples):
|
|
labels = set()
|
|
for eg in examples:
|
|
for ner_tag in eg.token_annotation.entities:
|
|
if ner_tag != 'O' and ner_tag != '-':
|
|
_, label = ner_tag.split('-', 1)
|
|
labels.add(label)
|
|
return list(sorted(labels))
|