spaCy/spacy/pipeline/entityruler.py

329 lines
13 KiB
Python

# coding: utf8
from __future__ import unicode_literals
from collections import defaultdict, OrderedDict
import srsly
from ..errors import Errors
from ..compat import basestring_
from ..util import ensure_path, to_disk, from_disk
from ..tokens import Span
from ..matcher import Matcher, PhraseMatcher
DEFAULT_ENT_ID_SEP = "||"
class EntityRuler(object):
"""The EntityRuler lets you add spans to the `Doc.ents` using token-based
rules or exact phrase matches. It can be combined with the statistical
`EntityRecognizer` to boost accuracy, or used on its own to implement a
purely rule-based entity recognition system. After initialization, the
component is typically added to the pipeline using `nlp.add_pipe`.
DOCS: https://spacy.io/api/entityruler
USAGE: https://spacy.io/usage/rule-based-matching#entityruler
"""
name = "entity_ruler"
def __init__(self, nlp, phrase_matcher_attr=None, validate=False, **cfg):
"""Initialize the entitiy ruler. If patterns are supplied here, they
need to be a list of dictionaries with a `"label"` and `"pattern"`
key. A pattern can either be a token pattern (list) or a phrase pattern
(string). For example: `{'label': 'ORG', 'pattern': 'Apple'}`.
nlp (Language): The shared nlp object to pass the vocab to the matchers
and process phrase patterns.
phrase_matcher_attr (int / unicode): Token attribute to match on, passed
to the internal PhraseMatcher as `attr`
validate (bool): Whether patterns should be validated, passed to
Matcher and PhraseMatcher as `validate`
patterns (iterable): Optional patterns to load in.
overwrite_ents (bool): If existing entities are present, e.g. entities
added by the model, overwrite them by matches if necessary.
**cfg: Other config parameters. If pipeline component is loaded as part
of a model pipeline, this will include all keyword arguments passed
to `spacy.load`.
RETURNS (EntityRuler): The newly constructed object.
DOCS: https://spacy.io/api/entityruler#init
"""
self.nlp = nlp
self.overwrite = cfg.get("overwrite_ents", False)
self.token_patterns = defaultdict(list)
self.phrase_patterns = defaultdict(list)
self.matcher = Matcher(nlp.vocab, validate=validate)
if phrase_matcher_attr is not None:
if phrase_matcher_attr.upper() == "TEXT":
phrase_matcher_attr = "ORTH"
self.phrase_matcher_attr = phrase_matcher_attr
self.phrase_matcher = PhraseMatcher(
nlp.vocab, attr=self.phrase_matcher_attr, validate=validate
)
else:
self.phrase_matcher_attr = None
self.phrase_matcher = PhraseMatcher(nlp.vocab, validate=validate)
self.ent_id_sep = cfg.get("ent_id_sep", DEFAULT_ENT_ID_SEP)
self._ent_ids = defaultdict(dict)
patterns = cfg.get("patterns")
if patterns is not None:
self.add_patterns(patterns)
def __len__(self):
"""The number of all patterns added to the entity ruler."""
n_token_patterns = sum(len(p) for p in self.token_patterns.values())
n_phrase_patterns = sum(len(p) for p in self.phrase_patterns.values())
return n_token_patterns + n_phrase_patterns
def __contains__(self, label):
"""Whether a label is present in the patterns."""
return label in self.token_patterns or label in self.phrase_patterns
def __call__(self, doc):
"""Find matches in document and add them as entities.
doc (Doc): The Doc object in the pipeline.
RETURNS (Doc): The Doc with added entities, if available.
DOCS: https://spacy.io/api/entityruler#call
"""
matches = list(self.matcher(doc)) + list(self.phrase_matcher(doc))
matches = set(
[(m_id, start, end) for m_id, start, end in matches if start != end]
)
get_sort_key = lambda m: (m[2] - m[1], m[1])
matches = sorted(matches, key=get_sort_key, reverse=True)
entities = list(doc.ents)
new_entities = []
seen_tokens = set()
for match_id, start, end in matches:
if any(t.ent_type for t in doc[start:end]) and not self.overwrite:
continue
# check for end - 1 here because boundaries are inclusive
if start not in seen_tokens and end - 1 not in seen_tokens:
if match_id in self._ent_ids:
label, ent_id = self._ent_ids[match_id]
span = Span(doc, start, end, label=label)
if ent_id:
for token in span:
token.ent_id_ = ent_id
else:
span = Span(doc, start, end, label=match_id)
new_entities.append(span)
entities = [
e for e in entities if not (e.start < end and e.end > start)
]
seen_tokens.update(range(start, end))
doc.ents = entities + new_entities
return doc
@property
def labels(self):
"""All labels present in the match patterns.
RETURNS (set): The string labels.
DOCS: https://spacy.io/api/entityruler#labels
"""
all_labels = set(self.token_patterns.keys())
all_labels.update(self.phrase_patterns.keys())
return tuple(all_labels)
@property
def ent_ids(self):
"""All entity ids present in the match patterns `id` properties
RETURNS (set): The string entity ids.
DOCS: https://spacy.io/api/entityruler#ent_ids
"""
all_ent_ids = set()
for l in self.labels:
if self.ent_id_sep in l:
_, ent_id = self._split_label(l)
all_ent_ids.add(ent_id)
return tuple(all_ent_ids)
@property
def patterns(self):
"""Get all patterns that were added to the entity ruler.
RETURNS (list): The original patterns, one dictionary per pattern.
DOCS: https://spacy.io/api/entityruler#patterns
"""
all_patterns = []
for label, patterns in self.token_patterns.items():
for pattern in patterns:
ent_label, ent_id = self._split_label(label)
p = {"label": ent_label, "pattern": pattern}
if ent_id:
p["id"] = ent_id
all_patterns.append(p)
for label, patterns in self.phrase_patterns.items():
for pattern in patterns:
ent_label, ent_id = self._split_label(label)
p = {"label": ent_label, "pattern": pattern.text}
if ent_id:
p["id"] = ent_id
all_patterns.append(p)
return all_patterns
def add_patterns(self, patterns):
"""Add patterns to the entitiy ruler. A pattern can either be a token
pattern (list of dicts) or a phrase pattern (string). For example:
{'label': 'ORG', 'pattern': 'Apple'}
{'label': 'GPE', 'pattern': [{'lower': 'san'}, {'lower': 'francisco'}]}
patterns (list): The patterns to add.
DOCS: https://spacy.io/api/entityruler#add_patterns
"""
# disable the nlp components after this one in case they hadn't been initialized / deserialised yet
try:
current_index = self.nlp.pipe_names.index(self.name)
subsequent_pipes = [
pipe for pipe in self.nlp.pipe_names[current_index + 1 :]
]
except ValueError:
subsequent_pipes = []
with self.nlp.disable_pipes(*subsequent_pipes):
for entry in patterns:
label = entry["label"]
if "id" in entry:
ent_label = label
label = self._create_label(label, entry["id"])
key = self.matcher._normalize_key(label)
self._ent_ids[key] = (ent_label, entry["id"])
pattern = entry["pattern"]
if isinstance(pattern, basestring_):
self.phrase_patterns[label].append(self.nlp(pattern))
elif isinstance(pattern, list):
self.token_patterns[label].append(pattern)
else:
raise ValueError(Errors.E097.format(pattern=pattern))
for label, patterns in self.token_patterns.items():
self.matcher.add(label, None, *patterns)
for label, patterns in self.phrase_patterns.items():
self.phrase_matcher.add(label, None, *patterns)
def _split_label(self, label):
"""Split Entity label into ent_label and ent_id if it contains self.ent_id_sep
RETURNS (tuple): ent_label, ent_id
"""
if self.ent_id_sep in label:
ent_label, ent_id = label.rsplit(self.ent_id_sep, 1)
else:
ent_label = label
ent_id = None
return ent_label, ent_id
def _create_label(self, label, ent_id):
"""Join Entity label with ent_id if the pattern has an `id` attribute
RETURNS (str): The ent_label joined with configured `ent_id_sep`
"""
if isinstance(ent_id, basestring_):
label = "{}{}{}".format(label, self.ent_id_sep, ent_id)
return label
def from_bytes(self, patterns_bytes, **kwargs):
"""Load the entity ruler from a bytestring.
patterns_bytes (bytes): The bytestring to load.
**kwargs: Other config paramters, mostly for consistency.
RETURNS (EntityRuler): The loaded entity ruler.
DOCS: https://spacy.io/api/entityruler#from_bytes
"""
cfg = srsly.msgpack_loads(patterns_bytes)
if isinstance(cfg, dict):
self.add_patterns(cfg.get("patterns", cfg))
self.overwrite = cfg.get("overwrite", False)
self.phrase_matcher_attr = cfg.get("phrase_matcher_attr", None)
if self.phrase_matcher_attr is not None:
self.phrase_matcher = PhraseMatcher(
self.nlp.vocab, attr=self.phrase_matcher_attr
)
self.ent_id_sep = cfg.get("ent_id_sep", DEFAULT_ENT_ID_SEP)
else:
self.add_patterns(cfg)
return self
def to_bytes(self, **kwargs):
"""Serialize the entity ruler patterns to a bytestring.
RETURNS (bytes): The serialized patterns.
DOCS: https://spacy.io/api/entityruler#to_bytes
"""
serial = OrderedDict(
(
("overwrite", self.overwrite),
("ent_id_sep", self.ent_id_sep),
("phrase_matcher_attr", self.phrase_matcher_attr),
("patterns", self.patterns),
)
)
return srsly.msgpack_dumps(serial)
def from_disk(self, path, **kwargs):
"""Load the entity ruler from a file. Expects a file containing
newline-delimited JSON (JSONL) with one entry per line.
path (unicode / Path): The JSONL file to load.
**kwargs: Other config paramters, mostly for consistency.
RETURNS (EntityRuler): The loaded entity ruler.
DOCS: https://spacy.io/api/entityruler#from_disk
"""
path = ensure_path(path)
depr_patterns_path = path.with_suffix(".jsonl")
if depr_patterns_path.is_file():
patterns = srsly.read_jsonl(depr_patterns_path)
self.add_patterns(patterns)
else:
cfg = {}
deserializers = {
"patterns": lambda p: self.add_patterns(
srsly.read_jsonl(p.with_suffix(".jsonl"))
),
"cfg": lambda p: cfg.update(srsly.read_json(p)),
}
from_disk(path, deserializers, {})
self.overwrite = cfg.get("overwrite", False)
self.phrase_matcher_attr = cfg.get("phrase_matcher_attr")
self.ent_id_sep = cfg.get("ent_id_sep", DEFAULT_ENT_ID_SEP)
if self.phrase_matcher_attr is not None:
self.phrase_matcher = PhraseMatcher(
self.nlp.vocab, attr=self.phrase_matcher_attr
)
return self
def to_disk(self, path, **kwargs):
"""Save the entity ruler patterns to a directory. The patterns will be
saved as newline-delimited JSON (JSONL).
path (unicode / Path): The JSONL file to save.
**kwargs: Other config paramters, mostly for consistency.
DOCS: https://spacy.io/api/entityruler#to_disk
"""
path = ensure_path(path)
cfg = {
"overwrite": self.overwrite,
"phrase_matcher_attr": self.phrase_matcher_attr,
"ent_id_sep": self.ent_id_sep,
}
serializers = {
"patterns": lambda p: srsly.write_jsonl(
p.with_suffix(".jsonl"), self.patterns
),
"cfg": lambda p: srsly.write_json(p, cfg),
}
if path.suffix == ".jsonl": # user wants to save only JSONL
srsly.write_jsonl(path, self.patterns)
else:
to_disk(path, serializers, {})