diff --git a/mitogen/core.py b/mitogen/core.py index f99da96a..334166c1 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -2,6 +2,7 @@ import Queue import cPickle import cStringIO +import collections import errno import fcntl import imp @@ -586,13 +587,13 @@ class Stream(BasicStream): protocol `. """ _input_buf = '' - _output_buf = '' def __init__(self, router, remote_id, **kwargs): self._router = router self.remote_id = remote_id self.name = 'default' self.construct(**kwargs) + self._output_buf = collections.deque() def construct(self): pass @@ -643,14 +644,19 @@ class Stream(BasicStream): def on_transmit(self, broker): """Transmit buffered messages.""" IOLOG.debug('%r.on_transmit()', self) - written = self.transmit_side.write(self._output_buf) - if written is None: - LOG.debug('%r.on_transmit(): disconnection detected', self) - self.on_disconnect(broker) - return - IOLOG.debug('%r.on_transmit() -> len %d', self, written) - self._output_buf = self._output_buf[written:] + if self._output_buf: + buf = self._output_buf.popleft() + written = self.transmit_side.write(buf) + if not written: + LOG.debug('%r.on_transmit(): disconnection detected', self) + self.on_disconnect(broker) + return + elif written != len(buf): + self._output_buf.appendleft(buf[written:]) + + IOLOG.debug('%r.on_transmit() -> len %d', self, written) + if not self._output_buf: broker.stop_transmit(self) @@ -659,7 +665,7 @@ class Stream(BasicStream): pkt = struct.pack('>hhLLL', msg.dst_id, msg.src_id, msg.handle, msg.reply_to or 0, len(msg.data) ) + msg.data - self._output_buf += pkt + self._output_buf.append(pkt) self._router.broker.start_transmit(self) def send(self, msg):