mitogen/tests/router_test.py

316 lines
10 KiB
Python
Raw Normal View History

import Queue
import StringIO
import logging
import subprocess
import time
import unittest2
import testlib
import mitogen.master
import mitogen.parent
import mitogen.utils
def ping():
return True
@mitogen.core.takes_router
def ping_context(other, router):
other = mitogen.parent.Context(router, other.context_id)
other.call(ping)
@mitogen.core.takes_router
def return_router_max_message_size(router):
return router.max_message_size
def send_n_sized_reply(sender, n):
sender.send(' ' * n)
return 123
class SourceVerifyTest(testlib.RouterMixin, unittest2.TestCase):
def setUp(self):
super(SourceVerifyTest, self).setUp()
# Create some children, ping them, and store what their messages look
# like so we can mess with them later.
self.child1 = self.router.fork()
self.child1_msg = self.child1.call_async(ping).get()
self.child1_stream = self.router._stream_by_id[self.child1.context_id]
self.child2 = self.router.fork()
self.child2_msg = self.child2.call_async(ping).get()
self.child2_stream = self.router._stream_by_id[self.child2.context_id]
def test_bad_auth_id(self):
# Deliver a message locally from child2, but using child1's stream.
log = testlib.LogCapturer()
log.start()
# Used to ensure the message was dropped rather than routed after the
# error is logged.
recv = mitogen.core.Receiver(self.router)
self.child2_msg.handle = recv.handle
self.broker.defer(self.router._async_route,
self.child2_msg,
in_stream=self.child1_stream)
# Wait for IO loop to finish everything above.
self.sync_with_broker()
# Ensure message wasn't forwarded.
self.assertTrue(recv.empty())
# Ensure error was logged.
expect = 'bad auth_id: got %d via' % (self.child2_msg.auth_id,)
self.assertTrue(expect in log.stop())
def test_bad_src_id(self):
# Deliver a message locally from child2 with the correct auth_id, but
# the wrong src_id.
log = testlib.LogCapturer()
log.start()
# Used to ensure the message was dropped rather than routed after the
# error is logged.
recv = mitogen.core.Receiver(self.router)
self.child2_msg.handle = recv.handle
self.child2_msg.src_id = self.child1.context_id
self.broker.defer(self.router._async_route,
self.child2_msg,
self.child2_stream)
# Wait for IO loop to finish everything above.
self.sync_with_broker()
# Ensure message wasn't forwarded.
self.assertTrue(recv.empty())
# Ensure error was lgoged.
expect = 'bad src_id: got %d via' % (self.child1_msg.src_id,)
self.assertTrue(expect in log.stop())
class PolicyTest(testlib.RouterMixin, testlib.TestCase):
def test_allow_any(self):
# This guy gets everything.
recv = mitogen.core.Receiver(self.router)
recv.to_sender().send(123)
self.sync_with_broker()
self.assertFalse(recv.empty())
self.assertEquals(123, recv.get().unpickle())
def test_refuse_all(self):
# Deliver a message locally from child2 with the correct auth_id, but
# the wrong src_id.
log = testlib.LogCapturer()
log.start()
# This guy never gets anything.
recv = mitogen.core.Receiver(
router=self.router,
policy=(lambda msg, stream: False),
)
# This guy becomes the reply_to of our refused message.
reply_target = mitogen.core.Receiver(self.router)
# Send the message.
self.router.route(
mitogen.core.Message(
dst_id=mitogen.context_id,
handle=recv.handle,
reply_to=reply_target.handle,
)
)
# Wait for IO loop.
self.sync_with_broker()
# Verify log.
expect = '%r: policy refused message: ' % (self.router,)
self.assertTrue(expect in log.stop())
# Verify message was not delivered.
self.assertTrue(recv.empty())
# Verify CallError received by reply_to target.
e = self.assertRaises(mitogen.core.CallError,
lambda: reply_target.get().unpickle())
self.assertEquals(e[0], self.router.refused_msg)
class CrashTest(testlib.BrokerMixin, unittest2.TestCase):
# This is testing both Broker's ability to crash nicely, and Router's
# ability to respond to the crash event.
klass = mitogen.master.Router
def _naughty(self):
raise ValueError('eek')
def test_shutdown(self):
router = self.klass(self.broker)
sem = mitogen.core.Latch()
router.add_handler(sem.put)
log = testlib.LogCapturer('mitogen')
log.start()
# Force a crash and ensure it wakes up.
self.broker._loop_once = self._naughty
self.broker.defer(lambda: None)
# sem should have received dead message.
self.assertTrue(sem.get().is_dead)
# Ensure it was logged.
expect = '_broker_main() crashed'
self.assertTrue(expect in log.stop())
class AddHandlerTest(unittest2.TestCase):
klass = mitogen.master.Router
def test_invoked_at_shutdown(self):
router = self.klass()
queue = Queue.Queue()
handle = router.add_handler(queue.put)
router.broker.shutdown()
self.assertTrue(queue.get(timeout=5).is_dead)
class MessageSizeTest(testlib.BrokerMixin, unittest2.TestCase):
klass = mitogen.master.Router
def test_local_exceeded(self):
router = self.klass(broker=self.broker, max_message_size=4096)
recv = mitogen.core.Receiver(router)
logs = testlib.LogCapturer()
logs.start()
sem = mitogen.core.Latch()
router.route(mitogen.core.Message.pickled(' '*8192))
router.broker.defer(sem.put, ' ') # wlil always run after _async_route
sem.get()
expect = 'message too large (max 4096 bytes)'
self.assertTrue(expect in logs.stop())
def test_remote_configured(self):
router = self.klass(broker=self.broker, max_message_size=4096)
remote = router.fork()
size = remote.call(return_router_max_message_size)
self.assertEquals(size, 4096)
def test_remote_exceeded(self):
# Ensure new contexts receive a router with the same value.
router = self.klass(broker=self.broker, max_message_size=4096)
recv = mitogen.core.Receiver(router)
logs = testlib.LogCapturer()
logs.start()
remote = router.fork()
remote.call(send_n_sized_reply, recv.to_sender(), 8192)
expect = 'message too large (max 4096 bytes)'
self.assertTrue(expect in logs.stop())
class NoRouteTest(testlib.RouterMixin, testlib.TestCase):
def test_invalid_handle_returns_dead(self):
# Verify sending a message to an invalid handle yields a dead message
# from the target context.
l1 = self.router.fork()
recv = l1.send_async(mitogen.core.Message(handle=999))
msg = recv.get(throw_dead=False)
self.assertEquals(msg.is_dead, True)
self.assertEquals(msg.src_id, l1.context_id)
recv = l1.send_async(mitogen.core.Message(handle=999))
e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get())
self.assertEquals(e.args[0], mitogen.core.ChannelError.remote_msg)
def test_totally_invalid_context_returns_dead(self):
recv = mitogen.core.Receiver(self.router)
msg = mitogen.core.Message(
dst_id=1234,
handle=1234,
reply_to=recv.handle,
)
self.router.route(msg)
rmsg = recv.get(throw_dead=False)
self.assertEquals(rmsg.is_dead, True)
self.assertEquals(rmsg.src_id, mitogen.context_id)
self.router.route(msg)
e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get())
self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg)
def test_previously_alive_context_returns_dead(self):
l1 = self.router.fork()
l1.shutdown(wait=True)
recv = mitogen.core.Receiver(self.router)
msg = mitogen.core.Message(
dst_id=l1.context_id,
handle=mitogen.core.CALL_FUNCTION,
reply_to=recv.handle,
)
self.router.route(msg)
rmsg = recv.get(throw_dead=False)
self.assertEquals(rmsg.is_dead, True)
self.assertEquals(rmsg.src_id, mitogen.context_id)
self.router.route(msg)
e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get())
self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg)
class UnidirectionalTest(testlib.RouterMixin, testlib.TestCase):
def test_siblings_cant_talk(self):
self.router.unidirectional = True
l1 = self.router.fork()
l2 = self.router.fork()
logs = testlib.LogCapturer()
logs.start()
e = self.assertRaises(mitogen.core.CallError,
lambda: l2.call(ping_context, l1))
msg = 'mitogen.core.ChannelError: Channel closed by remote end.'
self.assertTrue(msg in str(e))
self.assertTrue('routing mode prevents forward of ' in logs.stop())
def test_auth_id_can_talk(self):
self.router.unidirectional = True
# One stream has auth_id stamped to that of the master, so it should be
# treated like a parent.
l1 = self.router.fork()
l1s = self.router.stream_by_id(l1.context_id)
l1s.auth_id = mitogen.context_id
l1s.is_privileged = True
l2 = self.router.fork()
logs = testlib.LogCapturer()
logs.start()
e = self.assertRaises(mitogen.core.CallError,
lambda: l2.call(ping_context, l1))
msg = 'mitogen.core.CallError: Refused by policy.'
self.assertTrue(msg in str(e))
self.assertTrue('policy refused message: ' in logs.stop())
if __name__ == '__main__':
unittest2.main()