core: detect stream corruption. Closes #438.
This commit is contained in:
parent
9868e4ea3a
commit
300cb41e2e
|
@ -273,6 +273,9 @@ Core Library
|
||||||
fail to start if :data:`sys.stdout` was opened in block buffered mode, and
|
fail to start if :data:`sys.stdout` was opened in block buffered mode, and
|
||||||
buffered data was pending in the parent prior to fork.
|
buffered data was pending in the parent prior to fork.
|
||||||
|
|
||||||
|
* `#438 <https://github.com/dw/mitogen/issues/438>`_: a descriptive error is
|
||||||
|
logged when stream corruption is detected.
|
||||||
|
|
||||||
* `#439 <https://github.com/dw/mitogen/issues/439>`_: descriptive errors are
|
* `#439 <https://github.com/dw/mitogen/issues/439>`_: descriptive errors are
|
||||||
raised when attempting to invoke unsupported function types.
|
raised when attempting to invoke unsupported function types.
|
||||||
|
|
||||||
|
|
|
@ -273,6 +273,10 @@ parent and child. Integers use big endian in their encoded form.
|
||||||
- Size
|
- Size
|
||||||
- Description
|
- Description
|
||||||
|
|
||||||
|
* - `magic`
|
||||||
|
- 2
|
||||||
|
- Integer 0x4d49 (``MI``), used to detect stream corruption.
|
||||||
|
|
||||||
* - `dst_id`
|
* - `dst_id`
|
||||||
- 4
|
- 4
|
||||||
- Integer target context ID. :py:class:`Router` delivers messages
|
- Integer target context ID. :py:class:`Router` delivers messages
|
||||||
|
|
|
@ -1393,8 +1393,16 @@ class Stream(BasicStream):
|
||||||
|
|
||||||
self._internal_receive(broker, buf)
|
self._internal_receive(broker, buf)
|
||||||
|
|
||||||
HEADER_FMT = '>LLLLLL'
|
HEADER_FMT = '>hLLLLLL'
|
||||||
HEADER_LEN = struct.calcsize(HEADER_FMT)
|
HEADER_LEN = struct.calcsize(HEADER_FMT)
|
||||||
|
HEADER_MAGIC = 0x4d49 # 'MI'
|
||||||
|
|
||||||
|
corrupt_msg = (
|
||||||
|
'Corruption detected: frame signature incorrect. This likely means '
|
||||||
|
'some external process is interfering with the connection. Received:'
|
||||||
|
'\n\n'
|
||||||
|
'%r'
|
||||||
|
)
|
||||||
|
|
||||||
def _receive_one(self, broker):
|
def _receive_one(self, broker):
|
||||||
if self._input_buf_len < self.HEADER_LEN:
|
if self._input_buf_len < self.HEADER_LEN:
|
||||||
|
@ -1402,12 +1410,17 @@ class Stream(BasicStream):
|
||||||
|
|
||||||
msg = Message()
|
msg = Message()
|
||||||
msg.router = self._router
|
msg.router = self._router
|
||||||
(msg.dst_id, msg.src_id, msg.auth_id,
|
(magic, msg.dst_id, msg.src_id, msg.auth_id,
|
||||||
msg.handle, msg.reply_to, msg_len) = struct.unpack(
|
msg.handle, msg.reply_to, msg_len) = struct.unpack(
|
||||||
self.HEADER_FMT,
|
self.HEADER_FMT,
|
||||||
self._input_buf[0][:self.HEADER_LEN],
|
self._input_buf[0][:self.HEADER_LEN],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if magic != self.HEADER_MAGIC:
|
||||||
|
LOG.error(self.corrupt_msg, self._input_buf[0][:128])
|
||||||
|
self.on_disconnect(broker)
|
||||||
|
return False
|
||||||
|
|
||||||
if msg_len > self._router.max_message_size:
|
if msg_len > self._router.max_message_size:
|
||||||
LOG.error('Maximum message size exceeded (got %d, max %d)',
|
LOG.error('Maximum message size exceeded (got %d, max %d)',
|
||||||
msg_len, self._router.max_message_size)
|
msg_len, self._router.max_message_size)
|
||||||
|
@ -1473,9 +1486,9 @@ class Stream(BasicStream):
|
||||||
|
|
||||||
def _send(self, msg):
|
def _send(self, msg):
|
||||||
_vv and IOLOG.debug('%r._send(%r)', self, msg)
|
_vv and IOLOG.debug('%r._send(%r)', self, msg)
|
||||||
pkt = struct.pack(self.HEADER_FMT, msg.dst_id, msg.src_id,
|
pkt = struct.pack(self.HEADER_FMT, self.HEADER_MAGIC, msg.dst_id,
|
||||||
msg.auth_id, msg.handle, msg.reply_to or 0,
|
msg.src_id, msg.auth_id, msg.handle,
|
||||||
len(msg.data)) + msg.data
|
msg.reply_to or 0, len(msg.data)) + msg.data
|
||||||
if not self._output_buf_len:
|
if not self._output_buf_len:
|
||||||
self._router.broker._start_transmit(self)
|
self._router.broker._start_transmit(self)
|
||||||
self._output_buf.append(pkt)
|
self._output_buf.append(pkt)
|
||||||
|
|
|
@ -0,0 +1,33 @@
|
||||||
|
|
||||||
|
import unittest2
|
||||||
|
import mock
|
||||||
|
|
||||||
|
import mitogen.core
|
||||||
|
|
||||||
|
import testlib
|
||||||
|
|
||||||
|
|
||||||
|
class ReceiveOneTest(testlib.TestCase):
|
||||||
|
klass = mitogen.core.Stream
|
||||||
|
|
||||||
|
def test_corruption(self):
|
||||||
|
broker = mock.Mock()
|
||||||
|
router = mock.Mock()
|
||||||
|
|
||||||
|
stream = self.klass(router, 1)
|
||||||
|
junk = mitogen.core.b('x') * stream.HEADER_LEN
|
||||||
|
stream._input_buf = [junk]
|
||||||
|
stream._input_buf_len = len(junk)
|
||||||
|
|
||||||
|
capture = testlib.LogCapturer()
|
||||||
|
capture.start()
|
||||||
|
ret = stream._receive_one(broker)
|
||||||
|
#self.assertEquals(1, broker.stop_receive.mock_calls)
|
||||||
|
capture.stop()
|
||||||
|
|
||||||
|
self.assertFalse(ret)
|
||||||
|
self.assertTrue((self.klass.corrupt_msg % (junk,)) in capture.raw())
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest2.main()
|
Loading…
Reference in New Issue