From 42b1b3d2867a6c6026b59ccc4f99e6248b783248 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 8 Sep 2018 19:19:14 +0100 Subject: [PATCH] core: support mitogen_chain dispatcher option. --- docs/api.rst | 52 +++++++++++++++++++++++++++++++++++-- mitogen/core.py | 48 +++++++++++++++++++++------------- tests/call_function_test.py | 34 +++++++++++++++++++++--- 3 files changed, 110 insertions(+), 24 deletions(-) diff --git a/docs/api.rst b/docs/api.rst index e823b877..395147b2 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -892,6 +892,52 @@ Context Class :param dict kwargs: Function keyword arguments, if any. See :ref:`serialization-rules` for permitted types. + :param str mitogen_chain: + Optional cancellation key for threading unrelated asynchronous + requests to one context. If any prior call in the chain raised an + exception, subsequent calls with the same key immediately produce + the same exception. + + This permits a sequence of :meth:`no-reply ` or + pipelined asynchronous calls to be made without wasting network + round-trips to discover if prior calls succeeded, while allowing + such chains to overlap concurrently from multiple unrelated source + contexts. The chain is cancelled on first exception, enabling + patterns like:: + + # Must be distinct for each overlapping sequence, and cannot be + # reused. + chain = 'make-dirs-and-do-stuff-%s-%s-%s-%s' % ( + socket.gethostname(), + os.getpid(), + threading.currentThread().id, + time.time(), + ) + context.call_no_reply(os.mkdir, '/tmp/foo', + mitogen_chain=chain) + + # If os.mkdir() fails, this never runs: + context.call_no_reply(os.mkdir, '/tmp/foo/bar', + mitogen_chain=chain) + + # If either os.mkdir() fails, this never runs, and returns the + # exception. + recv = context.call_async(subprocess.check_output, '/tmp/foo', + mitogen_chain=chain) + + # If os.mkdir() or check_call() failed, this never runs, and + # the exception that occurred is raised. + context.call(do_something, mitogen_chain=chain) + + # The receiver also got a copy of the exception, so if this + # code was executed, the exception would also be raised. + if recv.get().unpickle() == 'baz': + pass + + Note that for long-lived programs, there is presently no mechanism + for clearing the chain history on a target. This will be addressed + in future. + :returns: :class:`mitogen.core.Receiver` configured to receive the result of the invocation: @@ -923,8 +969,10 @@ Context Class .. method:: call_no_reply (fn, \*args, \*\*kwargs) - Send a function call, but expect no return value. If the call fails, - the full exception will be logged to the target context's logging framework. + Like :meth:`call_async`, but do not wait for a return value, and inform + the target context no such reply is expected. If the call fails, the + full exception will be logged to the target context's logging + framework, unless the `mitogen_chain` argument was present. :raises mitogen.core.CallError: An exception was raised in the remote context during execution. diff --git a/mitogen/core.py b/mitogen/core.py index 1f5ad0a9..4092db4a 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1952,15 +1952,14 @@ class Broker(object): class Dispatcher(object): def __init__(self, econtext): self.econtext = econtext + #: Chain ID -> CallError if prior call failed. + self._error_by_chain_id = {} self.recv = Receiver(router=econtext.router, handle=CALL_FUNCTION, policy=has_parent_authority) - listen(econtext.broker, 'shutdown', self._on_broker_shutdown) + listen(econtext.broker, 'shutdown', self.recv.close) - def _on_broker_shutdown(self): - self.recv.close() - - def _dispatch_one(self, msg): + def _parse_request(self, msg): data = msg.unpickle(throw=False) _v and LOG.debug('_dispatch_one(%r)', data) @@ -1973,22 +1972,35 @@ class Dispatcher(object): kwargs.setdefault('econtext', self.econtext) if getattr(fn, 'mitogen_takes_router', None): kwargs.setdefault('router', self.econtext.router) - return fn(*args, **kwargs) + + return fn, args, kwargs + + def _dispatch_one(self, msg): + try: + fn, args, kwargs = self._parse_request(msg) + except Exception: + return None, CallError(sys.exc_info()[1]) + + chain_id = kwargs.pop('mitogen_chain', None) + if chain_id in self._error_by_chain_id: + return chain_id, self._error_by_chain_id[chain_id] + + try: + return chain_id, fn(*args, **kwargs) + except Exception: + e = CallError(sys.exc_info()[1]) + if chain_id is not None: + self._error_by_chain_id[chain_id] = e + return chain_id, e def _dispatch_calls(self): for msg in self.recv: - try: - ret = self._dispatch_one(msg) - _v and LOG.debug('_dispatch_calls: %r -> %r', msg, ret) - if msg.reply_to: - msg.reply(ret) - except Exception: - e = sys.exc_info()[1] - if msg.reply_to: - _v and LOG.debug('_dispatch_calls: %s', e) - msg.reply(CallError(e)) - else: - LOG.exception('_dispatch_calls: %r', msg) + chain_id, ret = self._dispatch_one(msg) + _v and LOG.debug('_dispatch_calls: %r -> %r', msg, ret) + if msg.reply_to: + msg.reply(ret) + elif isinstance(ret, CallError) and chain_id is None: + LOG.error('No-reply function call failed: %s', ret) def run(self): if self.econtext.config.get('on_start'): diff --git a/tests/call_function_test.py b/tests/call_function_test.py index eb83dff5..b8b07283 100644 --- a/tests/call_function_test.py +++ b/tests/call_function_test.py @@ -18,15 +18,15 @@ def function_that_adds_numbers(x, y): return x + y -def function_that_fails(): - raise plain_old_module.MyError('exception text') +def function_that_fails(s=''): + raise plain_old_module.MyError('exception text'+s) def func_with_bad_return_value(): return CrazyType() -def func_accepts_returns_context(context): +def func_returns_arg(context): return context @@ -101,7 +101,7 @@ class CallFunctionTest(testlib.RouterMixin, testlib.TestCase): self.assertEquals(exc.args[0], mitogen.core.ChannelError.local_msg) def test_accepts_returns_context(self): - context = self.local.call(func_accepts_returns_context, self.local) + context = self.local.call(func_returns_arg, self.local) self.assertIsNot(context, self.local) self.assertEqual(context.context_id, self.local.context_id) self.assertEqual(context.name, self.local.name) @@ -118,5 +118,31 @@ class CallFunctionTest(testlib.RouterMixin, testlib.TestCase): lambda: recv.get().unpickle()) +class ChainTest(testlib.RouterMixin, testlib.TestCase): + # Verify mitogen_chain functionality. + + def setUp(self): + super(ChainTest, self).setUp() + self.local = self.router.fork() + + def test_subsequent_calls_produce_same_error(self): + self.assertEquals('xx', + self.local.call(func_returns_arg, 'xx', mitogen_chain='c1')) + self.local.call_no_reply(function_that_fails, 'x1', mitogen_chain='c1') + e1 = self.assertRaises(mitogen.core.CallError, + lambda: self.local.call(function_that_fails, 'x2', mitogen_chain='c1')) + e2 = self.assertRaises(mitogen.core.CallError, + lambda: self.local.call(func_returns_arg, 'x3', mitogen_chain='c1')) + self.assertEquals(str(e1), str(e2)) + + def test_unrelated_overlapping_failed_chains(self): + self.local.call_no_reply(function_that_fails, 'c1', mitogen_chain='c1') + self.assertEquals('yes', + self.local.call(func_returns_arg, 'yes', mitogen_chain='c2')) + self.assertRaises(mitogen.core.CallError, + lambda: self.local.call(func_returns_arg, 'yes', mitogen_chain='c1')) + self.local.call_no_reply(function_that_fails, 'c2', mitogen_chain='c2') + + if __name__ == '__main__': unittest2.main()