Properly clean up pipe multiprocessing workers (#13259)

Before this change, the workers of pipe call with n_process != 1 were
stopped by calling `terminate` on the processes. However, terminating a
process can leave queues, pipes, and other concurrent data structures in
an invalid state.

With this change, we stop using terminate and take the following approach
instead:

* When the all documents are processed, the parent process puts a
  sentinel in the queue of each worker.
* The parent process then calls `join` on each worker process to
  let them finish up gracefully.
* Worker processes break from the queue processing loop when the
  sentinel is encountered, so that they exit.

We need special handling when one of the workers encounters an error and
the error handler is set to raise an exception. In this case, we cannot
rely on the sentinel to finish all workers -- the queue is a FIFO queue
and there may be other work queued up before the sentinel. We use the
following approach to handle error scenarios:

* The parent puts the end-of-work sentinel in the queue of each worker.
* The parent closes the reading-end of the channel of each worker.
* Then:
  - If the worker was waiting for work, it will encounter the sentinel
    and break from the processing loop.
  - If the worker was processing a batch, it will attempt to write
    results to the channel. This will fail because the channel was
    closed by the parent and the worker will break from the processing
    loop.
This commit is contained in:
Daniël de Kok 2024-01-23 18:33:04 +01:00 committed by GitHub
parent 3b3b5cdc63
commit 128197a5fc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 46 additions and 3 deletions

View File

@ -1683,6 +1683,12 @@ class Language:
for proc in procs: for proc in procs:
proc.start() proc.start()
# Close writing-end of channels. This is needed to avoid that reading
# from the channel blocks indefinitely when the worker closes the
# channel.
for tx in bytedocs_send_ch:
tx.close()
# Cycle channels not to break the order of docs. # Cycle channels not to break the order of docs.
# The received object is a batch of byte-encoded docs, so flatten them with chain.from_iterable. # The received object is a batch of byte-encoded docs, so flatten them with chain.from_iterable.
byte_tuples = chain.from_iterable( byte_tuples = chain.from_iterable(
@ -1705,8 +1711,23 @@ class Language:
# tell `sender` that one batch was consumed. # tell `sender` that one batch was consumed.
sender.step() sender.step()
finally: finally:
# If we are stopping in an orderly fashion, the workers' queues
# are empty. Put the sentinel in their queues to signal that work
# is done, so that they can exit gracefully.
for q in texts_q:
q.put(_WORK_DONE_SENTINEL)
# Otherwise, we are stopping because the error handler raised an
# exception. The sentinel will be last to go out of the queue.
# To avoid doing unnecessary work or hanging on platforms that
# block on sending (Windows), we'll close our end of the channel.
# This signals to the worker that it can exit the next time it
# attempts to send data down the channel.
for r in bytedocs_recv_ch:
r.close()
for proc in procs: for proc in procs:
proc.terminate() proc.join()
def _link_components(self) -> None: def _link_components(self) -> None:
"""Register 'listeners' within pipeline components, to allow them to """Register 'listeners' within pipeline components, to allow them to
@ -2323,6 +2344,11 @@ def _apply_pipes(
while True: while True:
try: try:
texts_with_ctx = receiver.get() texts_with_ctx = receiver.get()
# Stop working if we encounter the end-of-work sentinel.
if isinstance(texts_with_ctx, _WorkDoneSentinel):
return
docs = ( docs = (
ensure_doc(doc_like, context) for doc_like, context in texts_with_ctx ensure_doc(doc_like, context) for doc_like, context in texts_with_ctx
) )
@ -2331,11 +2357,21 @@ def _apply_pipes(
# Connection does not accept unpickable objects, so send list. # Connection does not accept unpickable objects, so send list.
byte_docs = [(doc.to_bytes(), doc._context, None) for doc in docs] byte_docs = [(doc.to_bytes(), doc._context, None) for doc in docs]
padding = [(None, None, None)] * (len(texts_with_ctx) - len(byte_docs)) padding = [(None, None, None)] * (len(texts_with_ctx) - len(byte_docs))
sender.send(byte_docs + padding) # type: ignore[operator] data: Sequence[Tuple[Optional[bytes], Optional[Any], Optional[bytes]]] = (
byte_docs + padding # type: ignore[operator]
)
except Exception: except Exception:
error_msg = [(None, None, srsly.msgpack_dumps(traceback.format_exc()))] error_msg = [(None, None, srsly.msgpack_dumps(traceback.format_exc()))]
padding = [(None, None, None)] * (len(texts_with_ctx) - 1) padding = [(None, None, None)] * (len(texts_with_ctx) - 1)
sender.send(error_msg + padding) data = error_msg + padding
try:
sender.send(data)
except BrokenPipeError:
# Parent has closed the pipe prematurely. This happens when a
# worker encounters an error and the error handler is set to
# stop processing.
return
class _Sender: class _Sender:
@ -2365,3 +2401,10 @@ class _Sender:
if self.count >= self.chunk_size: if self.count >= self.chunk_size:
self.count = 0 self.count = 0
self.send() self.send()
class _WorkDoneSentinel:
pass
_WORK_DONE_SENTINEL = _WorkDoneSentinel()