core: send dead message if max message size exceeded; closes #405
This commit is contained in:
parent
1eae594e32
commit
1cbff1011e
|
@ -1832,11 +1832,17 @@ class Router(object):
|
|||
except Exception:
|
||||
LOG.exception('%r._invoke(%r): %r crashed', self, msg, fn)
|
||||
|
||||
def _maybe_send_dead(self, msg):
|
||||
if msg.reply_to and not msg.is_dead:
|
||||
msg.reply(Message.dead(), router=self)
|
||||
|
||||
def _async_route(self, msg, in_stream=None):
|
||||
_vv and IOLOG.debug('%r._async_route(%r, %r)', self, msg, in_stream)
|
||||
|
||||
if len(msg.data) > self.max_message_size:
|
||||
LOG.error('message too large (max %d bytes): %r',
|
||||
self.max_message_size, msg)
|
||||
self._maybe_send_dead(msg)
|
||||
return
|
||||
|
||||
# Perform source verification.
|
||||
|
@ -1868,22 +1874,18 @@ class Router(object):
|
|||
if out_stream is None:
|
||||
out_stream = self._stream_by_id.get(mitogen.parent_id)
|
||||
|
||||
dead = False
|
||||
if out_stream is None:
|
||||
if msg.reply_to not in (0, IS_DEAD):
|
||||
LOG.error('%r: no route for %r, my ID is %r',
|
||||
self, msg, mitogen.context_id)
|
||||
dead = True
|
||||
self._maybe_send_dead(msg)
|
||||
return
|
||||
|
||||
if in_stream and self.unidirectional and not dead and \
|
||||
not (in_stream.is_privileged or out_stream.is_privileged):
|
||||
if in_stream and self.unidirectional and not \
|
||||
(in_stream.is_privileged or out_stream.is_privileged):
|
||||
LOG.error('routing mode prevents forward of %r from %r -> %r',
|
||||
msg, in_stream, out_stream)
|
||||
dead = True
|
||||
|
||||
if dead:
|
||||
if msg.reply_to and not msg.is_dead:
|
||||
msg.reply(Message.dead(), router=self)
|
||||
self._maybe_send_dead(msg)
|
||||
return
|
||||
|
||||
out_stream._send(msg)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import logging
|
||||
import subprocess
|
||||
import time
|
||||
import zlib
|
||||
|
||||
import unittest2
|
||||
|
||||
|
@ -189,20 +190,35 @@ class AddHandlerTest(unittest2.TestCase):
|
|||
self.assertTrue(queue.get(timeout=5).is_dead)
|
||||
|
||||
|
||||
class MessageSizeTest(testlib.BrokerMixin, unittest2.TestCase):
|
||||
class MessageSizeTest(testlib.BrokerMixin, testlib.TestCase):
|
||||
klass = mitogen.master.Router
|
||||
|
||||
def test_local_exceeded(self):
|
||||
router = self.klass(broker=self.broker, max_message_size=4096)
|
||||
recv = mitogen.core.Receiver(router)
|
||||
|
||||
logs = testlib.LogCapturer()
|
||||
logs.start()
|
||||
|
||||
sem = mitogen.core.Latch()
|
||||
# Send message and block for one IO loop, so _async_route can run.
|
||||
router.route(mitogen.core.Message.pickled(' '*8192))
|
||||
router.broker.defer(sem.put, ' ') # wlil always run after _async_route
|
||||
sem.get()
|
||||
router.broker.defer_sync(lambda: None)
|
||||
|
||||
expect = 'message too large (max 4096 bytes)'
|
||||
self.assertTrue(expect in logs.stop())
|
||||
|
||||
def test_local_dead_message(self):
|
||||
# Local router should generate dead message when reply_to is set.
|
||||
router = self.klass(broker=self.broker, max_message_size=4096)
|
||||
|
||||
logs = testlib.LogCapturer()
|
||||
logs.start()
|
||||
|
||||
# Try function call. Receiver should be woken by a dead message sent by
|
||||
# router due to message size exceeded.
|
||||
child = router.fork()
|
||||
e = self.assertRaises(mitogen.core.ChannelError,
|
||||
lambda: child.call(zlib.crc32, ' '*8192))
|
||||
self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg)
|
||||
|
||||
expect = 'message too large (max 4096 bytes)'
|
||||
self.assertTrue(expect in logs.stop())
|
||||
|
|
Loading…
Reference in New Issue