spaCy/examples/training/train_textcat.py

161 lines
5.9 KiB
Python

#!/usr/bin/env python
# coding: utf8
"""Train a convolutional neural network text classifier on the
IMDB dataset, using the TextCategorizer component. The dataset will be loaded
automatically via Thinc's built-in dataset loader. The model is added to
spacy.pipeline, and predictions are available via `doc.cats`. For more details,
see the documentation:
* Training: https://spacy.io/usage/training
Compatible with: spaCy v2.0.0+
"""
from __future__ import unicode_literals, print_function
import plac
import random
from pathlib import Path
import thinc.extra.datasets
import spacy
from spacy.util import minibatch, compounding
@plac.annotations(
model=("Model name. Defaults to blank 'en' model.", "option", "m", str),
output_dir=("Optional output directory", "option", "o", Path),
n_texts=("Number of texts to train from", "option", "t", int),
n_iter=("Number of training iterations", "option", "n", int),
init_tok2vec=("Pretrained tok2vec weights", "option", "t2v", Path),
)
def main(model=None, output_dir=None, n_iter=20, n_texts=2000, init_tok2vec=None):
if output_dir is not None:
output_dir = Path(output_dir)
if not output_dir.exists():
output_dir.mkdir()
if model is not None:
nlp = spacy.load(model) # load existing spaCy model
print("Loaded model '%s'" % model)
else:
nlp = spacy.blank("en") # create blank Language class
print("Created blank 'en' model")
# add the text classifier to the pipeline if it doesn't exist
# nlp.create_pipe works for built-ins that are registered with spaCy
if "textcat" not in nlp.pipe_names:
textcat = nlp.create_pipe(
"textcat", config={"exclusive_classes": True, "architecture": "simple_cnn"}
)
nlp.add_pipe(textcat, last=True)
# otherwise, get it, so we can add labels to it
else:
textcat = nlp.get_pipe("textcat")
# add label to text classifier
textcat.add_label("POSITIVE")
textcat.add_label("NEGATIVE")
# load the IMDB dataset
print("Loading IMDB data...")
(train_texts, train_cats), (dev_texts, dev_cats) = load_data()
train_texts = train_texts[:n_texts]
train_cats = train_cats[:n_texts]
print(
"Using {} examples ({} training, {} evaluation)".format(
n_texts, len(train_texts), len(dev_texts)
)
)
train_data = list(zip(train_texts, [{"cats": cats} for cats in train_cats]))
# get names of other pipes to disable them during training
pipe_exceptions = ["textcat", "trf_wordpiecer", "trf_tok2vec"]
other_pipes = [pipe for pipe in nlp.pipe_names if pipe not in pipe_exceptions]
with nlp.disable_pipes(*other_pipes): # only train textcat
optimizer = nlp.begin_training()
if init_tok2vec is not None:
with init_tok2vec.open("rb") as file_:
textcat.model.tok2vec.from_bytes(file_.read())
print("Training the model...")
print("{:^5}\t{:^5}\t{:^5}\t{:^5}".format("LOSS", "P", "R", "F"))
batch_sizes = compounding(4.0, 32.0, 1.001)
for i in range(n_iter):
losses = {}
# batch up the examples using spaCy's minibatch
random.shuffle(train_data)
batches = minibatch(train_data, size=batch_sizes)
for batch in batches:
texts, annotations = zip(*batch)
nlp.update(texts, annotations, sgd=optimizer, drop=0.2, losses=losses)
with textcat.model.use_params(optimizer.averages):
# evaluate on the dev data split off in load_data()
scores = evaluate(nlp.tokenizer, textcat, dev_texts, dev_cats)
print(
"{0:.3f}\t{1:.3f}\t{2:.3f}\t{3:.3f}".format( # print a simple table
losses["textcat"],
scores["textcat_p"],
scores["textcat_r"],
scores["textcat_f"],
)
)
# test the trained model
test_text = "This movie sucked"
doc = nlp(test_text)
print(test_text, doc.cats)
if output_dir is not None:
with nlp.use_params(optimizer.averages):
nlp.to_disk(output_dir)
print("Saved model to", output_dir)
# test the saved model
print("Loading from", output_dir)
nlp2 = spacy.load(output_dir)
doc2 = nlp2(test_text)
print(test_text, doc2.cats)
def load_data(limit=0, split=0.8):
"""Load data from the IMDB dataset."""
# Partition off part of the train data for evaluation
train_data, _ = thinc.extra.datasets.imdb()
random.shuffle(train_data)
train_data = train_data[-limit:]
texts, labels = zip(*train_data)
cats = [{"POSITIVE": bool(y), "NEGATIVE": not bool(y)} for y in labels]
split = int(len(train_data) * split)
return (texts[:split], cats[:split]), (texts[split:], cats[split:])
def evaluate(tokenizer, textcat, texts, cats):
docs = (tokenizer(text) for text in texts)
tp = 0.0 # True positives
fp = 1e-8 # False positives
fn = 1e-8 # False negatives
tn = 0.0 # True negatives
for i, doc in enumerate(textcat.pipe(docs)):
gold = cats[i]
for label, score in doc.cats.items():
if label not in gold:
continue
if label == "NEGATIVE":
continue
if score >= 0.5 and gold[label] >= 0.5:
tp += 1.0
elif score >= 0.5 and gold[label] < 0.5:
fp += 1.0
elif score < 0.5 and gold[label] < 0.5:
tn += 1
elif score < 0.5 and gold[label] >= 0.5:
fn += 1
precision = tp / (tp + fp)
recall = tp / (tp + fn)
if (precision + recall) == 0:
f_score = 0.0
else:
f_score = 2 * (precision * recall) / (precision + recall)
return {"textcat_p": precision, "textcat_r": recall, "textcat_f": f_score}
if __name__ == "__main__":
plac.call(main)