diff --git a/spacy/language.py b/spacy/language.py index 26152b90a..0287549db 100644 --- a/spacy/language.py +++ b/spacy/language.py @@ -1683,6 +1683,12 @@ class Language: for proc in procs: proc.start() + # Close writing-end of channels. This is needed to avoid that reading + # from the channel blocks indefinitely when the worker closes the + # channel. + for tx in bytedocs_send_ch: + tx.close() + # Cycle channels not to break the order of docs. # The received object is a batch of byte-encoded docs, so flatten them with chain.from_iterable. byte_tuples = chain.from_iterable( @@ -1705,8 +1711,23 @@ class Language: # tell `sender` that one batch was consumed. sender.step() finally: + # If we are stopping in an orderly fashion, the workers' queues + # are empty. Put the sentinel in their queues to signal that work + # is done, so that they can exit gracefully. + for q in texts_q: + q.put(_WORK_DONE_SENTINEL) + + # Otherwise, we are stopping because the error handler raised an + # exception. The sentinel will be last to go out of the queue. + # To avoid doing unnecessary work or hanging on platforms that + # block on sending (Windows), we'll close our end of the channel. + # This signals to the worker that it can exit the next time it + # attempts to send data down the channel. + for r in bytedocs_recv_ch: + r.close() + for proc in procs: - proc.terminate() + proc.join() def _link_components(self) -> None: """Register 'listeners' within pipeline components, to allow them to @@ -2323,6 +2344,11 @@ def _apply_pipes( while True: try: texts_with_ctx = receiver.get() + + # Stop working if we encounter the end-of-work sentinel. + if isinstance(texts_with_ctx, _WorkDoneSentinel): + return + docs = ( ensure_doc(doc_like, context) for doc_like, context in texts_with_ctx ) @@ -2331,11 +2357,21 @@ def _apply_pipes( # Connection does not accept unpickable objects, so send list. byte_docs = [(doc.to_bytes(), doc._context, None) for doc in docs] padding = [(None, None, None)] * (len(texts_with_ctx) - len(byte_docs)) - sender.send(byte_docs + padding) # type: ignore[operator] + data: Sequence[Tuple[Optional[bytes], Optional[Any], Optional[bytes]]] = ( + byte_docs + padding # type: ignore[operator] + ) except Exception: error_msg = [(None, None, srsly.msgpack_dumps(traceback.format_exc()))] padding = [(None, None, None)] * (len(texts_with_ctx) - 1) - sender.send(error_msg + padding) + data = error_msg + padding + + try: + sender.send(data) + except BrokenPipeError: + # Parent has closed the pipe prematurely. This happens when a + # worker encounters an error and the error handler is set to + # stop processing. + return class _Sender: @@ -2365,3 +2401,10 @@ class _Sender: if self.count >= self.chunk_size: self.count = 0 self.send() + + +class _WorkDoneSentinel: + pass + + +_WORK_DONE_SENTINEL = _WorkDoneSentinel()