[stream-refactor] don't abort Connection until all buffers are empty

This commit is contained in:
David Wilson 2019-07-28 02:51:56 +00:00
parent 93342ba60c
commit c02358698b
2 changed files with 64 additions and 34 deletions

View File

@ -1554,7 +1554,7 @@ class Stream(object):
"""
buf = self.receive_side.read(self.protocol.read_size)
if not buf:
LOG.debug('%r: empty read, disconnecting', self)
LOG.debug('%r: empty read, disconnecting', self.receive_side)
return self.on_disconnect(broker)
self.protocol.on_receive(broker, buf)

View File

@ -1227,8 +1227,8 @@ class Connection(object):
#: :class:`Process`
proc = None
#: :class:`mitogen.core.Stream`
stream = None
#: :class:`mitogen.core.Stream` with sides connected to stdin/stdout.
stdio_stream = None
#: If `proc.stderr` is set, referencing either a plain pipe or the
#: controlling TTY, this references the corresponding
@ -1264,7 +1264,7 @@ class Connection(object):
self._router = router
def __repr__(self):
return 'Connection(%r)' % (self.stream,)
return 'Connection(%r)' % (self.stdio_stream,)
# Minimised, gzipped, base64'd and passed to 'python -c'. It forks, dups
# file descriptor 0 as 100, creates a pipe, then execs a new interpreter
@ -1405,8 +1405,8 @@ class Connection(object):
def _complete_connection(self):
self.timer.cancel()
if not self.exception:
self._router.register(self.context, self.stream)
self.stream.set_protocol(
self._router.register(self.context, self.stdio_stream)
self.stdio_stream.set_protocol(
mitogen.core.MitogenProtocol(
router=self._router,
remote_id=self.context.context_id,
@ -1419,11 +1419,11 @@ class Connection(object):
Fail the connection attempt.
"""
LOG.debug('%s: failing connection due to %r',
self.stream.name, exc)
self.stdio_stream.name, exc)
if self.exception is None:
self._adorn_eof_error(exc)
self.exception = exc
for stream in self.stream, self.stderr_stream:
for stream in self.stdio_stream, self.stderr_stream:
if stream and not stream.receive_side.closed:
stream.on_disconnect(self._router.broker)
self._complete_connection()
@ -1433,24 +1433,52 @@ class Connection(object):
Request the slave gracefully shut itself down.
"""
LOG.debug('%r: requesting child shutdown', self)
self.stream.protocol._send(
self.stdio_stream.protocol._send(
mitogen.core.Message(
src_id=mitogen.context_id,
dst_id=self.stream.protocol.remote_id,
dst_id=self.stdio_stream.protocol.remote_id,
handle=mitogen.core.SHUTDOWN,
)
)
eof_error_msg = 'EOF on stream; last 100 lines received:\n'
def on_stream_disconnect(self):
if self.stderr_stream is not None:
self.stderr_stream.on_disconnect(self._router.broker)
def on_stdio_disconnect(self):
"""
Handle stdio stream disconnection by failing the Connection if the
stderr stream has already been closed. Otherwise, wait for it to close
(or timeout), to allow buffered diagnostic logs to be consumed.
It is normal that when a subprocess aborts, stdio has nothing buffered
when it is closed, thus signalling readability, causing an empty read
(interpreted as indicating disconnection) on the next loop iteration,
even if its stderr pipe has lots of diagnostic logs still buffered in
the kernel. Therefore we must wait for both pipes to indicate they are
empty before triggering connection failure.
"""
stderr = self.stderr_stream
if stderr is None or stderr.receive_side.closed:
self._on_streams_disconnected()
def on_stderr_disconnect(self):
"""
Inverse of :func:`on_stdio_disconnect`.
"""
if self.stdio_stream.receive_side.closed:
self._on_streams_disconnected()
def _on_streams_disconnected(self):
"""
When disconnection has been detected for both our streams, cancel the
connection timer, mark the connection failed, and reap the child
process. Do nothing if the timer has already been cancelled, indicating
some existing failure has already been noticed.
"""
if not self.timer.cancelled:
self.timer.cancel()
self._fail_connection(EofError(
self.eof_error_msg + get_history(
[self.stream, self.stderr_stream]
[self.stdio_stream, self.stderr_stream]
)
))
self.proc._async_reap(self, self._router)
@ -1477,33 +1505,35 @@ class Connection(object):
def stderr_stream_factory(self):
return self.diag_protocol_class.build_stream()
def _setup_stream(self):
self.stream = self.stream_factory()
self.stream.conn = self
self.stream.name = self.options.name or self._get_name()
self.stream.accept(self.proc.stdout, self.proc.stdin)
def _setup_stdio_stream(self):
stream = self.stream_factory()
stream.conn = self
stream.name = self.options.name or self._get_name()
stream.accept(self.proc.stdout, self.proc.stdin)
mitogen.core.listen(self.stream, 'shutdown',
self.on_stream_shutdown)
mitogen.core.listen(self.stream, 'disconnect',
self.on_stream_disconnect)
self._router.broker.start_receive(self.stream)
mitogen.core.listen(stream, 'shutdown', self.on_stream_shutdown)
mitogen.core.listen(stream, 'disconnect', self.on_stdio_disconnect)
self._router.broker.start_receive(stream)
return stream
def _setup_stderr_stream(self):
self.stderr_stream = self.stderr_stream_factory()
self.stderr_stream.conn = self
self.stderr_stream.name = self.options.name or self._get_name()
self.stderr_stream.accept(self.proc.stderr, self.proc.stderr)
self._router.broker.start_receive(self.stderr_stream)
stream = self.stderr_stream_factory()
stream.conn = self
stream.name = self.options.name or self._get_name()
stream.accept(self.proc.stderr, self.proc.stderr)
mitogen.core.listen(stream, 'disconnect', self.on_stderr_disconnect)
self._router.broker.start_receive(stream)
return stream
def _async_connect(self):
self._start_timer()
self._setup_stream()
self.stdio_stream = self._setup_stdio_stream()
if self.context.name is None:
self.context.name = self.stream.name
self.proc.name = self.stream.name
self.context.name = self.stdio_stream.name
self.proc.name = self.stdio_stream.name
if self.proc.stderr:
self._setup_stderr_stream()
self.stderr_stream = self._setup_stderr_stream()
def connect(self, context):
LOG.debug('%r.connect()', self)
@ -2181,7 +2211,7 @@ class Router(mitogen.core.Router):
except mitogen.core.TimeoutError:
raise mitogen.core.StreamError(self.connection_timeout_msg)
self.route_monitor.notice_stream(conn.stream)
self.route_monitor.notice_stream(conn.stdio_stream)
return context
def connect(self, method_name, name=None, **kwargs):