From 1cbff1011e9353b8a1b125efbfebe210426921b1 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Tue, 30 Oct 2018 10:53:35 +0000 Subject: [PATCH] core: send dead message if max message size exceeded; closes #405 --- mitogen/core.py | 20 +++++++++++--------- tests/router_test.py | 26 +++++++++++++++++++++----- 2 files changed, 32 insertions(+), 14 deletions(-) diff --git a/mitogen/core.py b/mitogen/core.py index 01be1cfe..642540a5 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1832,11 +1832,17 @@ class Router(object): except Exception: LOG.exception('%r._invoke(%r): %r crashed', self, msg, fn) + def _maybe_send_dead(self, msg): + if msg.reply_to and not msg.is_dead: + msg.reply(Message.dead(), router=self) + def _async_route(self, msg, in_stream=None): _vv and IOLOG.debug('%r._async_route(%r, %r)', self, msg, in_stream) + if len(msg.data) > self.max_message_size: LOG.error('message too large (max %d bytes): %r', self.max_message_size, msg) + self._maybe_send_dead(msg) return # Perform source verification. @@ -1868,22 +1874,18 @@ class Router(object): if out_stream is None: out_stream = self._stream_by_id.get(mitogen.parent_id) - dead = False if out_stream is None: if msg.reply_to not in (0, IS_DEAD): LOG.error('%r: no route for %r, my ID is %r', self, msg, mitogen.context_id) - dead = True + self._maybe_send_dead(msg) + return - if in_stream and self.unidirectional and not dead and \ - not (in_stream.is_privileged or out_stream.is_privileged): + if in_stream and self.unidirectional and not \ + (in_stream.is_privileged or out_stream.is_privileged): LOG.error('routing mode prevents forward of %r from %r -> %r', msg, in_stream, out_stream) - dead = True - - if dead: - if msg.reply_to and not msg.is_dead: - msg.reply(Message.dead(), router=self) + self._maybe_send_dead(msg) return out_stream._send(msg) diff --git a/tests/router_test.py b/tests/router_test.py index d0e4f539..7b7e2896 100644 --- a/tests/router_test.py +++ b/tests/router_test.py @@ -1,6 +1,7 @@ import logging import subprocess import time +import zlib import unittest2 @@ -189,20 +190,35 @@ class AddHandlerTest(unittest2.TestCase): self.assertTrue(queue.get(timeout=5).is_dead) -class MessageSizeTest(testlib.BrokerMixin, unittest2.TestCase): +class MessageSizeTest(testlib.BrokerMixin, testlib.TestCase): klass = mitogen.master.Router def test_local_exceeded(self): router = self.klass(broker=self.broker, max_message_size=4096) - recv = mitogen.core.Receiver(router) logs = testlib.LogCapturer() logs.start() - sem = mitogen.core.Latch() + # Send message and block for one IO loop, so _async_route can run. router.route(mitogen.core.Message.pickled(' '*8192)) - router.broker.defer(sem.put, ' ') # wlil always run after _async_route - sem.get() + router.broker.defer_sync(lambda: None) + + expect = 'message too large (max 4096 bytes)' + self.assertTrue(expect in logs.stop()) + + def test_local_dead_message(self): + # Local router should generate dead message when reply_to is set. + router = self.klass(broker=self.broker, max_message_size=4096) + + logs = testlib.LogCapturer() + logs.start() + + # Try function call. Receiver should be woken by a dead message sent by + # router due to message size exceeded. + child = router.fork() + e = self.assertRaises(mitogen.core.ChannelError, + lambda: child.call(zlib.crc32, ' '*8192)) + self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg) expect = 'message too large (max 4096 bytes)' self.assertTrue(expect in logs.stop())