From 514d35fd10ccc72ad583f0b1267e2110b8ef013f Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 17 Feb 2019 21:50:44 +0000 Subject: [PATCH] issue #535: service: support Pool.defer() like Broker.defer() --- mitogen/service.py | 36 +++++++++++++++++++++++++++--------- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/mitogen/service.py b/mitogen/service.py index 3254e69a..6f50275d 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -411,10 +411,12 @@ class Service(object): def __repr__(self): return '%s()' % (self.__class__.__name__,) - def on_message(self, recv, msg): + def on_message(self, event): """ Called when a message arrives on any of :attr:`select`'s registered receivers. + + :param mitogen.select.Event event: """ pass @@ -452,6 +454,7 @@ class Pool(object): def __init__(self, router, services, size=1, overwrite=False): self.router = router self._activator = self.activator_class() + self._ipc_latch = mitogen.core.Latch() self._receiver = mitogen.core.Receiver( router=router, handle=mitogen.core.CALL_SERVICE, @@ -460,9 +463,13 @@ class Pool(object): self._select = mitogen.select.Select(oneshot=False) self._select.add(self._receiver) + self._select.add(self._ipc_latch) #: Serialize service construction. self._lock = threading.Lock() - self._func_by_recv = {self._receiver: self._on_service_call} + self._func_by_source = { + self._receiver: self._on_service_call, + self._ipc_latch: self._on_ipc_latch, + } self._invoker_by_name = {} for service in services: @@ -488,10 +495,10 @@ class Pool(object): name = service.name() if name in self._invoker_by_name: raise Error('service named %r already registered' % (name,)) - assert service.select not in self._func_by_recv + assert service.select not in self._func_by_source invoker = service.invoker_class(service=service) self._invoker_by_name[name] = invoker - self._func_by_recv[service.select] = service.on_message + self._func_by_source[service.select] = service.on_message closed = False @@ -534,7 +541,18 @@ class Pool(object): isinstance(tup[2], dict)): raise mitogen.core.CallError('Invalid message format.') - def _on_service_call(self, recv, msg): + def defer(self, func, *args, **kwargs): + """ + Arrange for `func(*args, **kwargs)` to be invoked in the context of a + service pool thread. + """ + self._ipc_latch.put(lambda: func(*args, **kwargs)) + + def _on_ipc_latch(self, event): + event.data() + + def _on_service_call(self, event): + msg = event.data service_name = None method_name = None try: @@ -555,17 +573,17 @@ class Pool(object): def _worker_run(self): while not self.closed: try: - msg = self._select.get() + event = self._select.get_event() except (mitogen.core.ChannelError, mitogen.core.LatchError): e = sys.exc_info()[1] LOG.debug('%r: channel or latch closed, exitting: %s', self, e) return - func = self._func_by_recv[msg.receiver] + func = self._func_by_source[event.source] try: - func(msg.receiver, msg) + func(event) except Exception: - LOG.exception('While handling %r using %r', msg, func) + LOG.exception('While handling %r using %r', event.data, func) def _worker_main(self): try: