diff --git a/docs/api.rst b/docs/api.rst index 72a6b4db..52d5dcec 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -1227,6 +1227,14 @@ Broker Class thread, or immediately if the current thread is the broker thread. Safe to call from any thread. + .. method:: defer_sync (func) + + Arrange for `func()` to execute on the broker thread, blocking the + current thread until a result or exception is available. + + :returns: + Call result. + .. method:: start_receive (stream) Mark the :attr:`receive_side ` on `stream` as diff --git a/mitogen/core.py b/mitogen/core.py index a5270ec7..01be1cfe 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1940,6 +1940,25 @@ class Broker(object): it = (side.keep_alive for (_, (side, _)) in self.poller.readers) return sum(it, 0) + def defer_sync(self, func): + """ + Block the calling thread while `func` runs on a broker thread. + + :returns: + Return value of `func()`. + """ + latch = Latch() + def wrapper(): + try: + latch.put(func()) + except Exception: + latch.put(sys.exc_info()[1]) + self.defer(wrapper) + res = latch.get() + if isinstance(res, Exception): + raise res + return res + def _call(self, stream, func): try: func(self) @@ -2100,11 +2119,6 @@ class ExternalContext(object): _v and LOG.debug('%r: parent stream is gone, dying.', self) self.broker.shutdown() - def _sync(self, func): - latch = Latch() - self.broker.defer(lambda: latch.put(func())) - return latch.get() - def detach(self): self.detached = True stream = self.router.stream_by_id(mitogen.parent_id) @@ -2113,7 +2127,7 @@ class ExternalContext(object): self.parent.send_await(Message(handle=DETACHING)) LOG.info('Detaching from %r; parent is %s', stream, self.parent) for x in range(20): - pending = self._sync(lambda: stream.pending_bytes()) + pending = self.broker.defer_sync(lambda: stream.pending_bytes()) if not pending: break time.sleep(0.05) diff --git a/tests/broker_test.py b/tests/broker_test.py new file mode 100644 index 00000000..7d070e3d --- /dev/null +++ b/tests/broker_test.py @@ -0,0 +1,32 @@ + +import threading + +import unittest2 + +import testlib + +import mitogen.core + + +class DeferSyncTest(testlib.TestCase): + klass = mitogen.core.Broker + + def test_okay(self): + broker = self.klass() + try: + th = broker.defer_sync(lambda: threading.currentThread()) + self.assertEquals(th, broker._thread) + finally: + broker.shutdown() + + def test_exception(self): + broker = self.klass() + try: + self.assertRaises(ValueError, + broker.defer_sync, lambda: int('dave')) + finally: + broker.shutdown() + + +if __name__ == '__main__': + unittest2.main()