core: use an output deque rather than string to improve worst case perf
This probably worsens performance in the common case, but it prevents runaway producers (see e.g. issue #36) from spending all their CPU copying around huge strings. It's also a small step towards a solution to issue #6, which will replace the output buffer with some sort of fancier queue anyway. This reduces a particular 40 second run of rsync to 1.5 seconds.
This commit is contained in:
parent
eb060e1a0e
commit
01729b18a5
|
@ -2,6 +2,7 @@
|
||||||
import Queue
|
import Queue
|
||||||
import cPickle
|
import cPickle
|
||||||
import cStringIO
|
import cStringIO
|
||||||
|
import collections
|
||||||
import errno
|
import errno
|
||||||
import fcntl
|
import fcntl
|
||||||
import imp
|
import imp
|
||||||
|
@ -586,13 +587,13 @@ class Stream(BasicStream):
|
||||||
protocol <stream-protocol>`.
|
protocol <stream-protocol>`.
|
||||||
"""
|
"""
|
||||||
_input_buf = ''
|
_input_buf = ''
|
||||||
_output_buf = ''
|
|
||||||
|
|
||||||
def __init__(self, router, remote_id, **kwargs):
|
def __init__(self, router, remote_id, **kwargs):
|
||||||
self._router = router
|
self._router = router
|
||||||
self.remote_id = remote_id
|
self.remote_id = remote_id
|
||||||
self.name = 'default'
|
self.name = 'default'
|
||||||
self.construct(**kwargs)
|
self.construct(**kwargs)
|
||||||
|
self._output_buf = collections.deque()
|
||||||
|
|
||||||
def construct(self):
|
def construct(self):
|
||||||
pass
|
pass
|
||||||
|
@ -643,14 +644,19 @@ class Stream(BasicStream):
|
||||||
def on_transmit(self, broker):
|
def on_transmit(self, broker):
|
||||||
"""Transmit buffered messages."""
|
"""Transmit buffered messages."""
|
||||||
IOLOG.debug('%r.on_transmit()', self)
|
IOLOG.debug('%r.on_transmit()', self)
|
||||||
written = self.transmit_side.write(self._output_buf)
|
|
||||||
if written is None:
|
if self._output_buf:
|
||||||
|
buf = self._output_buf.popleft()
|
||||||
|
written = self.transmit_side.write(buf)
|
||||||
|
if not written:
|
||||||
LOG.debug('%r.on_transmit(): disconnection detected', self)
|
LOG.debug('%r.on_transmit(): disconnection detected', self)
|
||||||
self.on_disconnect(broker)
|
self.on_disconnect(broker)
|
||||||
return
|
return
|
||||||
|
elif written != len(buf):
|
||||||
|
self._output_buf.appendleft(buf[written:])
|
||||||
|
|
||||||
IOLOG.debug('%r.on_transmit() -> len %d', self, written)
|
IOLOG.debug('%r.on_transmit() -> len %d', self, written)
|
||||||
self._output_buf = self._output_buf[written:]
|
|
||||||
if not self._output_buf:
|
if not self._output_buf:
|
||||||
broker.stop_transmit(self)
|
broker.stop_transmit(self)
|
||||||
|
|
||||||
|
@ -659,7 +665,7 @@ class Stream(BasicStream):
|
||||||
pkt = struct.pack('>hhLLL', msg.dst_id, msg.src_id,
|
pkt = struct.pack('>hhLLL', msg.dst_id, msg.src_id,
|
||||||
msg.handle, msg.reply_to or 0, len(msg.data)
|
msg.handle, msg.reply_to or 0, len(msg.data)
|
||||||
) + msg.data
|
) + msg.data
|
||||||
self._output_buf += pkt
|
self._output_buf.append(pkt)
|
||||||
self._router.broker.start_transmit(self)
|
self._router.broker.start_transmit(self)
|
||||||
|
|
||||||
def send(self, msg):
|
def send(self, msg):
|
||||||
|
|
Loading…
Reference in New Issue