From b656969b157f9cf4158afc92219cf41be0b409ad Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 29 Jul 2018 19:27:48 -0700 Subject: [PATCH] issue #260: core: don't defer stream writes. --- mitogen/core.py | 63 +++++++++++++++++++++++++++++++++-------------- mitogen/parent.py | 4 +-- 2 files changed, 47 insertions(+), 20 deletions(-) diff --git a/mitogen/core.py b/mitogen/core.py index 58289553..7c2670ef 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -370,6 +370,8 @@ def set_block(fd): fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK) +_IO_ERRORS = (select.error, OSError, IOError) + def io_op(func, *args): """Wrap `func(*args)` that may raise :class:`select.error`, :class:`IOError`, or :class:`OSError`, trapping UNIX error codes relating @@ -393,7 +395,7 @@ def io_op(func, *args): while True: try: return func(*args), False - except (select.error, OSError, IOError): + except _IO_ERRORS: e = sys.exc_info()[1] _vv and IOLOG.debug('io_op(%r) -> OSError: %s', func, e) if e.args[0] == errno.EINTR: @@ -1352,6 +1354,7 @@ class Stream(BasicStream): self.name = u'default' self.sent_modules = set(['mitogen', 'mitogen.core']) self.construct(**kwargs) + self._lock = threading.Lock() self._input_buf = collections.deque() self._output_buf = collections.deque() self._input_buf_len = 0 @@ -1462,20 +1465,40 @@ class Stream(BasicStream): if not self._output_buf: broker._stop_transmit(self) - def _send(self, msg): - _vv and IOLOG.debug('%r._send(%r)', self, msg) - pkt = struct.pack(self.HEADER_FMT, msg.dst_id, msg.src_id, - msg.auth_id, msg.handle, msg.reply_to or 0, - len(msg.data)) + msg.data - if not self._output_buf_len: - self._router.broker._start_transmit(self) - self._output_buf.append(pkt) - self._output_buf_len += len(pkt) - def send(self, msg): """Send `data` to `handle`, and tell the broker we have output. May be called from any thread.""" - self._router.broker.defer(self._send, msg) + _vv and IOLOG.debug('%r.send(%r)', self, msg) + pkt = struct.pack(self.HEADER_FMT, msg.dst_id, msg.src_id, + msg.auth_id, msg.handle, msg.reply_to or 0, + len(msg.data)) + msg.data + pktlen = len(pkt) + + self._lock.acquire() + try: + if self._output_buf_len: + self._output_buf.append(pkt) + self._output_buf_len += pktlen + return + + written = None + try: + written = self.transmit_side.write(pkt) + except _IO_ERRORS: + e = sys.exc_info()[1] + if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN): + raise + + if not written: + self._output_buf.append(pkt) + self._output_buf_len += pktlen + elif written != pktlen: + self._output_buf.append(buffer(pkt, written)) + self._output_buf_len += pktlen - written + broker = self._router.broker + broker.defer(broker._start_transmit, self) + finally: + self._lock.release() def on_shutdown(self, broker): """Override BasicStream behaviour of immediately disconnecting.""" @@ -1778,8 +1801,6 @@ class Latch(object): self._cls_all_sockets.extend((rsock, wsock)) return rsock, wsock - COOKIE_SIZE = 33 - def _make_cookie(self): """ Return a 33-byte string encoding the ID of the instance and the current @@ -1787,7 +1808,12 @@ class Latch(object): the FD, and buggy internal FD sharing. """ ident = threading.currentThread().ident - return b(u'%016x-%016x' % (int(id(self)), ident)) + return b( + (u'%-8d-%-16x-%-16x' % (os.getpid(), int(id(self)), ident)) + .replace(' ', '-') + ) + + COOKIE_SIZE = len(_make_cookie(None)) def get(self, timeout=None, block=True): """ @@ -2360,7 +2386,7 @@ class Router(object): in_stream.remote_id, out_stream.remote_id) return - out_stream._send(msg) + out_stream.send(msg) def route(self, msg): """ @@ -2372,7 +2398,7 @@ class Router(object): This may be called from any thread. """ - self.broker.defer(self._async_route, msg) + self._async_route(msg) class Broker(object): @@ -2498,7 +2524,8 @@ class Broker(object): def _broker_exit(self): for _, (side, _) in self.poller.readers + self.poller.writers: - LOG.error('_broker_main() force disconnecting %r', side) + if side.keep_alive: + LOG.error('_broker_main() force disconnecting %r', side) side.stream.on_disconnect(self) self.poller.close() diff --git a/mitogen/parent.py b/mitogen/parent.py index 04f7784e..3e961566 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -995,7 +995,7 @@ class Stream(mitogen.core.Stream): def on_shutdown(self, broker): """Request the slave gracefully shut itself down.""" LOG.debug('%r closing CALL_FUNCTION channel', self) - self._send( + self.send( mitogen.core.Message( src_id=mitogen.context_id, dst_id=self.remote_id, @@ -1962,7 +1962,7 @@ class ModuleForwarder(object): self, fullname, context_id, stream.remote_id) self._send_module_and_related(stream, fullname) if stream.remote_id != context_id: - stream._send( + stream.send( mitogen.core.Message( data=msg.data, handle=mitogen.core.FORWARD_MODULE,