core: synchronize Stream._output_buf by deferring send()
Previously _output_buf was racy. This may or may not be cheaper than simply using a lock, but it requires much less code, so I prefer it for now.
This commit is contained in:
parent
ead67de883
commit
e4c832685d
|
@ -624,16 +624,19 @@ class Stream(BasicStream):
|
|||
if not self._output_buf:
|
||||
broker.stop_transmit(self)
|
||||
|
||||
def send(self, msg):
|
||||
"""Send `data` to `handle`, and tell the broker we have output. May
|
||||
be called from any thread."""
|
||||
def _send(self, msg):
|
||||
IOLOG.debug('%r._send(%r)', self, msg)
|
||||
pkt = struct.pack('>hhLLL', msg.dst_id, msg.src_id,
|
||||
msg.handle, msg.reply_to or 0, len(msg.data)
|
||||
) + msg.data
|
||||
self._output_buf += pkt # TODO: It's actually possible for this to race
|
||||
self._output_buf += pkt
|
||||
self._router.broker.start_transmit(self)
|
||||
|
||||
def send(self, msg):
|
||||
"""Send `data` to `handle`, and tell the broker we have output. May
|
||||
be called from any thread."""
|
||||
self._router.broker.defer(self._send, msg)
|
||||
|
||||
def on_disconnect(self, broker):
|
||||
super(Stream, self).on_disconnect(broker)
|
||||
self._router.on_disconnect(self, broker)
|
||||
|
|
Loading…
Reference in New Issue