From b120fb35113d95be25af9a6091a4a511d78254b5 Mon Sep 17 00:00:00 2001 From: Adriane Boyd Date: Mon, 17 May 2021 13:28:39 +0200 Subject: [PATCH] Handle errors while multiprocessing (#8004) * Handle errors while multiprocessing Handle errors while multiprocessing without hanging. * Return the traceback for errors raised while processing a batch, which can be handled by the top-level error handler * Allow for shortened batches due to custom error handlers that ignore errors and skip documents * Define custom components at a higher level * Also move up custom error handler * Use simpler component for test * Switch error type * Adjust test * Only call top-level error handler for exceptions * Register custom test components within tests Use global functions (so they can be pickled) but register the components only within the individual tests. --- spacy/errors.py | 1 + spacy/language.py | 32 ++++-- spacy/tests/test_language.py | 194 ++++++++++++++++++++++------------- 3 files changed, 148 insertions(+), 79 deletions(-) diff --git a/spacy/errors.py b/spacy/errors.py index 509b1ae8e..a1ea6e353 100644 --- a/spacy/errors.py +++ b/spacy/errors.py @@ -490,6 +490,7 @@ class Errors: E202 = ("Unsupported alignment mode '{mode}'. Supported modes: {modes}.") # New errors added in v3.x + E871 = ("Error encountered in nlp.pipe with multiprocessing:\n\n{error}") E872 = ("Unable to copy tokenizer from base model due to different " 'tokenizer settings: current tokenizer config "{curr_config}" ' 'vs. base model "{base_config}"') diff --git a/spacy/language.py b/spacy/language.py index 95a902380..1da0cbc2d 100644 --- a/spacy/language.py +++ b/spacy/language.py @@ -13,6 +13,7 @@ import srsly import multiprocessing as mp from itertools import chain, cycle from timeit import default_timer as timer +import traceback from .tokens.underscore import Underscore from .vocab import Vocab, create_vocab @@ -1521,11 +1522,15 @@ class Language: # Cycle channels not to break the order of docs. # The received object is a batch of byte-encoded docs, so flatten them with chain.from_iterable. - byte_docs = chain.from_iterable(recv.recv() for recv in cycle(bytedocs_recv_ch)) - docs = (Doc(self.vocab).from_bytes(byte_doc) for byte_doc in byte_docs) + byte_tuples = chain.from_iterable(recv.recv() for recv in cycle(bytedocs_recv_ch)) try: - for i, (_, doc) in enumerate(zip(raw_texts, docs), 1): - yield doc + for i, (_, (byte_doc, byte_error)) in enumerate(zip(raw_texts, byte_tuples), 1): + if byte_doc is not None: + doc = Doc(self.vocab).from_bytes(byte_doc) + yield doc + elif byte_error is not None: + error = srsly.msgpack_loads(byte_error) + self.default_error_handler(None, None, None, ValueError(Errors.E871.format(error=error))) if i % batch_size == 0: # tell `sender` that one batch was consumed. sender.step() @@ -2019,12 +2024,19 @@ def _apply_pipes( """ Underscore.load_state(underscore_state) while True: - texts = receiver.get() - docs = (make_doc(text) for text in texts) - for pipe in pipes: - docs = pipe(docs) - # Connection does not accept unpickable objects, so send list. - sender.send([doc.to_bytes() for doc in docs]) + try: + texts = receiver.get() + docs = (make_doc(text) for text in texts) + for pipe in pipes: + docs = pipe(docs) + # Connection does not accept unpickable objects, so send list. + byte_docs = [(doc.to_bytes(), None) for doc in docs] + padding = [(None, None)] * (len(texts) - len(byte_docs)) + sender.send(byte_docs + padding) + except Exception: + error_msg = [(None, srsly.msgpack_dumps(traceback.format_exc()))] + padding = [(None, None)] * (len(texts) - 1) + sender.send(error_msg + padding) class _Sender: diff --git a/spacy/tests/test_language.py b/spacy/tests/test_language.py index 7fb03da0c..86cce5f9e 100644 --- a/spacy/tests/test_language.py +++ b/spacy/tests/test_language.py @@ -8,13 +8,36 @@ from spacy.vocab import Vocab from spacy.training import Example from spacy.lang.en import English from spacy.lang.de import German -from spacy.util import registry, ignore_error, raise_error +from spacy.util import registry, ignore_error, raise_error, logger import spacy from thinc.api import NumpyOps, get_current_ops from .util import add_vecs_to_vocab, assert_docs_equal +def evil_component(doc): + if "2" in doc.text: + raise ValueError("no dice") + return doc + + +def perhaps_set_sentences(doc): + if not doc.text.startswith("4"): + doc[-1].is_sent_start = True + return doc + + +def assert_sents_error(doc): + if not doc.has_annotation("SENT_START"): + raise ValueError("no sents") + return doc + + +def warn_error(proc_name, proc, docs, e): + logger = logging.getLogger("spacy") + logger.warning(f"Trouble with component {proc_name}.") + + @pytest.fixture def nlp(): nlp = Language(Vocab()) @@ -93,19 +116,16 @@ def test_evaluate_no_pipe(nlp): nlp.evaluate([Example.from_dict(doc, annots)]) -@Language.component("test_language_vector_modification_pipe") def vector_modification_pipe(doc): doc.vector += 1 return doc -@Language.component("test_language_userdata_pipe") def userdata_pipe(doc): doc.user_data["foo"] = "bar" return doc -@Language.component("test_language_ner_pipe") def ner_pipe(doc): span = Span(doc, 0, 1, label="FIRST") doc.ents += (span,) @@ -123,6 +143,9 @@ def sample_vectors(): @pytest.fixture def nlp2(nlp, sample_vectors): + Language.component("test_language_vector_modification_pipe", func=vector_modification_pipe) + Language.component("test_language_userdata_pipe", func=userdata_pipe) + Language.component("test_language_ner_pipe", func=ner_pipe) add_vecs_to_vocab(nlp.vocab, sample_vectors) nlp.add_pipe("test_language_vector_modification_pipe") nlp.add_pipe("test_language_ner_pipe") @@ -168,82 +191,115 @@ def test_language_pipe_stream(nlp2, n_process, texts): assert_docs_equal(doc, expected_doc) -def test_language_pipe_error_handler(): +@pytest.mark.parametrize("n_process", [1, 2]) +def test_language_pipe_error_handler(n_process): """Test that the error handling of nlp.pipe works well""" - nlp = English() - nlp.add_pipe("merge_subtokens") - nlp.initialize() - texts = ["Curious to see what will happen to this text.", "And this one."] - # the pipeline fails because there's no parser - with pytest.raises(ValueError): + ops = get_current_ops() + if isinstance(ops, NumpyOps) or n_process < 2: + nlp = English() + nlp.add_pipe("merge_subtokens") + nlp.initialize() + texts = ["Curious to see what will happen to this text.", "And this one."] + # the pipeline fails because there's no parser + with pytest.raises(ValueError): + nlp(texts[0]) + with pytest.raises(ValueError): + list(nlp.pipe(texts, n_process=n_process)) + nlp.set_error_handler(raise_error) + with pytest.raises(ValueError): + list(nlp.pipe(texts, n_process=n_process)) + # set explicitely to ignoring + nlp.set_error_handler(ignore_error) + docs = list(nlp.pipe(texts, n_process=n_process)) + assert len(docs) == 0 nlp(texts[0]) - with pytest.raises(ValueError): - list(nlp.pipe(texts)) - nlp.set_error_handler(raise_error) - with pytest.raises(ValueError): - list(nlp.pipe(texts)) - # set explicitely to ignoring - nlp.set_error_handler(ignore_error) - docs = list(nlp.pipe(texts)) - assert len(docs) == 0 - nlp(texts[0]) -def test_language_pipe_error_handler_custom(en_vocab): +@pytest.mark.parametrize("n_process", [1, 2]) +def test_language_pipe_error_handler_custom(en_vocab, n_process): """Test the error handling of a custom component that has no pipe method""" + Language.component("my_evil_component", func=evil_component) + ops = get_current_ops() + if isinstance(ops, NumpyOps) or n_process < 2: + nlp = English() + nlp.add_pipe("my_evil_component") + texts = ["TEXT 111", "TEXT 222", "TEXT 333", "TEXT 342", "TEXT 666"] + with pytest.raises(ValueError): + # the evil custom component throws an error + list(nlp.pipe(texts)) - @Language.component("my_evil_component") - def evil_component(doc): - if "2" in doc.text: - raise ValueError("no dice") - return doc - - def warn_error(proc_name, proc, docs, e): - from spacy.util import logger - - logger.warning(f"Trouble with component {proc_name}.") - - nlp = English() - nlp.add_pipe("my_evil_component") - nlp.initialize() - texts = ["TEXT 111", "TEXT 222", "TEXT 333", "TEXT 342", "TEXT 666"] - with pytest.raises(ValueError): - # the evil custom component throws an error - list(nlp.pipe(texts)) - - nlp.set_error_handler(warn_error) - logger = logging.getLogger("spacy") - with mock.patch.object(logger, "warning") as mock_warning: - # the errors by the evil custom component raise a warning for each bad batch - docs = list(nlp.pipe(texts)) - mock_warning.assert_called() - assert mock_warning.call_count == 2 - assert len(docs) + mock_warning.call_count == len(texts) - assert [doc.text for doc in docs] == ["TEXT 111", "TEXT 333", "TEXT 666"] + nlp.set_error_handler(warn_error) + logger = logging.getLogger("spacy") + with mock.patch.object(logger, "warning") as mock_warning: + # the errors by the evil custom component raise a warning for each + # bad doc + docs = list(nlp.pipe(texts, n_process=n_process)) + # HACK/TODO? the warnings in child processes don't seem to be + # detected by the mock logger + if n_process == 1: + mock_warning.assert_called() + assert mock_warning.call_count == 2 + assert len(docs) + mock_warning.call_count == len(texts) + assert [doc.text for doc in docs] == ["TEXT 111", "TEXT 333", "TEXT 666"] -def test_language_pipe_error_handler_pipe(en_vocab): +@pytest.mark.parametrize("n_process", [1, 2]) +def test_language_pipe_error_handler_pipe(en_vocab, n_process): """Test the error handling of a component's pipe method""" + Language.component("my_perhaps_sentences", func=perhaps_set_sentences) + Language.component("assert_sents_error", func=assert_sents_error) + ops = get_current_ops() + if isinstance(ops, NumpyOps) or n_process < 2: + texts = [f"{str(i)} is enough. Done" for i in range(100)] + nlp = English() + nlp.add_pipe("my_perhaps_sentences") + nlp.add_pipe("assert_sents_error") + nlp.initialize() + with pytest.raises(ValueError): + # assert_sents_error requires sentence boundaries, will throw an error otherwise + docs = list(nlp.pipe(texts, n_process=n_process, batch_size=10)) + nlp.set_error_handler(ignore_error) + docs = list(nlp.pipe(texts, n_process=n_process, batch_size=10)) + # we lose/ignore the failing 4,40-49 docs + assert len(docs) == 89 - @Language.component("my_sentences") - def perhaps_set_sentences(doc): - if not doc.text.startswith("4"): - doc[-1].is_sent_start = True - return doc - texts = [f"{str(i)} is enough. Done" for i in range(100)] - nlp = English() - nlp.add_pipe("my_sentences") - entity_linker = nlp.add_pipe("entity_linker", config={"entity_vector_length": 3}) - entity_linker.kb.add_entity(entity="Q1", freq=12, entity_vector=[1, 2, 3]) - nlp.initialize() - with pytest.raises(ValueError): - # the entity linker requires sentence boundaries, will throw an error otherwise - docs = list(nlp.pipe(texts, batch_size=10)) - nlp.set_error_handler(ignore_error) - docs = list(nlp.pipe(texts, batch_size=10)) - # we lose/ignore the failing 0-9 and 40-49 batches - assert len(docs) == 80 +@pytest.mark.parametrize("n_process", [1, 2]) +def test_language_pipe_error_handler_make_doc_actual(n_process): + """Test the error handling for make_doc""" + # TODO: fix so that the following test is the actual behavior + + ops = get_current_ops() + if isinstance(ops, NumpyOps) or n_process < 2: + nlp = English() + nlp.max_length = 10 + texts = ["12345678901234567890", "12345"] * 10 + with pytest.raises(ValueError): + list(nlp.pipe(texts, n_process=n_process)) + nlp.default_error_handler = ignore_error + if n_process == 1: + with pytest.raises(ValueError): + list(nlp.pipe(texts, n_process=n_process)) + else: + docs = list(nlp.pipe(texts, n_process=n_process)) + assert len(docs) == 0 + + +@pytest.mark.xfail +@pytest.mark.parametrize("n_process", [1, 2]) +def test_language_pipe_error_handler_make_doc_preferred(n_process): + """Test the error handling for make_doc""" + + ops = get_current_ops() + if isinstance(ops, NumpyOps) or n_process < 2: + nlp = English() + nlp.max_length = 10 + texts = ["12345678901234567890", "12345"] * 10 + with pytest.raises(ValueError): + list(nlp.pipe(texts, n_process=n_process)) + nlp.default_error_handler = ignore_error + docs = list(nlp.pipe(texts, n_process=n_process)) + assert len(docs) == 0 def test_language_from_config_before_after_init():