core: split out & extend Broker.sync_call()

This commit is contained in:
David Wilson 2018-10-26 13:55:25 +01:00
parent 592d6fc8d3
commit 9ec360c26d
3 changed files with 60 additions and 6 deletions

View File

@ -1227,6 +1227,14 @@ Broker Class
thread, or immediately if the current thread is the broker thread. Safe thread, or immediately if the current thread is the broker thread. Safe
to call from any thread. to call from any thread.
.. method:: defer_sync (func)
Arrange for `func()` to execute on the broker thread, blocking the
current thread until a result or exception is available.
:returns:
Call result.
.. method:: start_receive (stream) .. method:: start_receive (stream)
Mark the :attr:`receive_side <Stream.receive_side>` on `stream` as Mark the :attr:`receive_side <Stream.receive_side>` on `stream` as

View File

@ -1940,6 +1940,25 @@ class Broker(object):
it = (side.keep_alive for (_, (side, _)) in self.poller.readers) it = (side.keep_alive for (_, (side, _)) in self.poller.readers)
return sum(it, 0) return sum(it, 0)
def defer_sync(self, func):
"""
Block the calling thread while `func` runs on a broker thread.
:returns:
Return value of `func()`.
"""
latch = Latch()
def wrapper():
try:
latch.put(func())
except Exception:
latch.put(sys.exc_info()[1])
self.defer(wrapper)
res = latch.get()
if isinstance(res, Exception):
raise res
return res
def _call(self, stream, func): def _call(self, stream, func):
try: try:
func(self) func(self)
@ -2100,11 +2119,6 @@ class ExternalContext(object):
_v and LOG.debug('%r: parent stream is gone, dying.', self) _v and LOG.debug('%r: parent stream is gone, dying.', self)
self.broker.shutdown() self.broker.shutdown()
def _sync(self, func):
latch = Latch()
self.broker.defer(lambda: latch.put(func()))
return latch.get()
def detach(self): def detach(self):
self.detached = True self.detached = True
stream = self.router.stream_by_id(mitogen.parent_id) stream = self.router.stream_by_id(mitogen.parent_id)
@ -2113,7 +2127,7 @@ class ExternalContext(object):
self.parent.send_await(Message(handle=DETACHING)) self.parent.send_await(Message(handle=DETACHING))
LOG.info('Detaching from %r; parent is %s', stream, self.parent) LOG.info('Detaching from %r; parent is %s', stream, self.parent)
for x in range(20): for x in range(20):
pending = self._sync(lambda: stream.pending_bytes()) pending = self.broker.defer_sync(lambda: stream.pending_bytes())
if not pending: if not pending:
break break
time.sleep(0.05) time.sleep(0.05)

32
tests/broker_test.py Normal file
View File

@ -0,0 +1,32 @@
import threading
import unittest2
import testlib
import mitogen.core
class DeferSyncTest(testlib.TestCase):
klass = mitogen.core.Broker
def test_okay(self):
broker = self.klass()
try:
th = broker.defer_sync(lambda: threading.currentThread())
self.assertEquals(th, broker._thread)
finally:
broker.shutdown()
def test_exception(self):
broker = self.klass()
try:
self.assertRaises(ValueError,
broker.defer_sync, lambda: int('dave'))
finally:
broker.shutdown()
if __name__ == '__main__':
unittest2.main()