From 01729b18a58c7269b3e86c0ba233db7bc90966ff Mon Sep 17 00:00:00 2001 From: David Wilson Date: Fri, 22 Sep 2017 02:02:38 +0530 Subject: [PATCH] core: use an output deque rather than string to improve worst case perf This probably worsens performance in the common case, but it prevents runaway producers (see e.g. issue #36) from spending all their CPU copying around huge strings. It's also a small step towards a solution to issue #6, which will replace the output buffer with some sort of fancier queue anyway. This reduces a particular 40 second run of rsync to 1.5 seconds. --- mitogen/core.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/mitogen/core.py b/mitogen/core.py index f99da96a..334166c1 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -2,6 +2,7 @@ import Queue import cPickle import cStringIO +import collections import errno import fcntl import imp @@ -586,13 +587,13 @@ class Stream(BasicStream): protocol `. """ _input_buf = '' - _output_buf = '' def __init__(self, router, remote_id, **kwargs): self._router = router self.remote_id = remote_id self.name = 'default' self.construct(**kwargs) + self._output_buf = collections.deque() def construct(self): pass @@ -643,14 +644,19 @@ class Stream(BasicStream): def on_transmit(self, broker): """Transmit buffered messages.""" IOLOG.debug('%r.on_transmit()', self) - written = self.transmit_side.write(self._output_buf) - if written is None: - LOG.debug('%r.on_transmit(): disconnection detected', self) - self.on_disconnect(broker) - return - IOLOG.debug('%r.on_transmit() -> len %d', self, written) - self._output_buf = self._output_buf[written:] + if self._output_buf: + buf = self._output_buf.popleft() + written = self.transmit_side.write(buf) + if not written: + LOG.debug('%r.on_transmit(): disconnection detected', self) + self.on_disconnect(broker) + return + elif written != len(buf): + self._output_buf.appendleft(buf[written:]) + + IOLOG.debug('%r.on_transmit() -> len %d', self, written) + if not self._output_buf: broker.stop_transmit(self) @@ -659,7 +665,7 @@ class Stream(BasicStream): pkt = struct.pack('>hhLLL', msg.dst_id, msg.src_id, msg.handle, msg.reply_to or 0, len(msg.data) ) + msg.data - self._output_buf += pkt + self._output_buf.append(pkt) self._router.broker.start_transmit(self) def send(self, msg):