Make `Language.pipe` workers exit cleanly (#13321)

Also warn when any worker exited with a non-zero exit code and modify
test to ensure that workers exit cleanly by default.
This commit is contained in:
Daniël de Kok 2024-02-12 14:39:38 +01:00 committed by GitHub
parent 14bd9d89a3
commit fdfdbcd9f4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 14 additions and 3 deletions

View File

@ -220,6 +220,7 @@ class Warnings(metaclass=ErrorsWithCodes):
"key attribute for vectors, configure it through Vectors(attr=) or " "key attribute for vectors, configure it through Vectors(attr=) or "
"'spacy init vectors --attr'") "'spacy init vectors --attr'")
W126 = ("These keys are unsupported: {unsupported}") W126 = ("These keys are unsupported: {unsupported}")
W127 = ("Not all `Language.pipe` worker processes completed successfully")
class Errors(metaclass=ErrorsWithCodes): class Errors(metaclass=ErrorsWithCodes):

View File

@ -1730,6 +1730,9 @@ class Language:
for proc in procs: for proc in procs:
proc.join() proc.join()
if not all(proc.exitcode == 0 for proc in procs):
warnings.warn(Warnings.W127)
def _link_components(self) -> None: def _link_components(self) -> None:
"""Register 'listeners' within pipeline components, to allow them to """Register 'listeners' within pipeline components, to allow them to
effectively share weights. effectively share weights.
@ -2350,6 +2353,7 @@ def _apply_pipes(
if isinstance(texts_with_ctx, _WorkDoneSentinel): if isinstance(texts_with_ctx, _WorkDoneSentinel):
sender.close() sender.close()
receiver.close() receiver.close()
return
docs = ( docs = (
ensure_doc(doc_like, context) for doc_like, context in texts_with_ctx ensure_doc(doc_like, context) for doc_like, context in texts_with_ctx
@ -2375,6 +2379,7 @@ def _apply_pipes(
# stop processing. # stop processing.
sender.close() sender.close()
receiver.close() receiver.close()
return
class _Sender: class _Sender:

View File

@ -1,5 +1,6 @@
import itertools import itertools
import logging import logging
import warnings
from unittest import mock from unittest import mock
import pytest import pytest
@ -738,9 +739,13 @@ def test_pass_doc_to_pipeline(nlp, n_process):
assert doc.text == texts[0] assert doc.text == texts[0]
assert len(doc.cats) > 0 assert len(doc.cats) > 0
if isinstance(get_current_ops(), NumpyOps) or n_process < 2: if isinstance(get_current_ops(), NumpyOps) or n_process < 2:
docs = nlp.pipe(docs, n_process=n_process) # Catch warnings to ensure that all worker processes exited
assert [doc.text for doc in docs] == texts # succesfully.
assert all(len(doc.cats) for doc in docs) with warnings.catch_warnings():
warnings.simplefilter("error")
docs = nlp.pipe(docs, n_process=n_process)
assert [doc.text for doc in docs] == texts
assert all(len(doc.cats) for doc in docs)
def test_invalid_arg_to_pipeline(nlp): def test_invalid_arg_to_pipeline(nlp):