issue #535: service: support Pool.defer() like Broker.defer()

This commit is contained in:
David Wilson 2019-02-17 21:50:44 +00:00
parent eb9ec26622
commit 514d35fd10
1 changed files with 27 additions and 9 deletions

View File

@ -411,10 +411,12 @@ class Service(object):
def __repr__(self): def __repr__(self):
return '%s()' % (self.__class__.__name__,) return '%s()' % (self.__class__.__name__,)
def on_message(self, recv, msg): def on_message(self, event):
""" """
Called when a message arrives on any of :attr:`select`'s registered Called when a message arrives on any of :attr:`select`'s registered
receivers. receivers.
:param mitogen.select.Event event:
""" """
pass pass
@ -452,6 +454,7 @@ class Pool(object):
def __init__(self, router, services, size=1, overwrite=False): def __init__(self, router, services, size=1, overwrite=False):
self.router = router self.router = router
self._activator = self.activator_class() self._activator = self.activator_class()
self._ipc_latch = mitogen.core.Latch()
self._receiver = mitogen.core.Receiver( self._receiver = mitogen.core.Receiver(
router=router, router=router,
handle=mitogen.core.CALL_SERVICE, handle=mitogen.core.CALL_SERVICE,
@ -460,9 +463,13 @@ class Pool(object):
self._select = mitogen.select.Select(oneshot=False) self._select = mitogen.select.Select(oneshot=False)
self._select.add(self._receiver) self._select.add(self._receiver)
self._select.add(self._ipc_latch)
#: Serialize service construction. #: Serialize service construction.
self._lock = threading.Lock() self._lock = threading.Lock()
self._func_by_recv = {self._receiver: self._on_service_call} self._func_by_source = {
self._receiver: self._on_service_call,
self._ipc_latch: self._on_ipc_latch,
}
self._invoker_by_name = {} self._invoker_by_name = {}
for service in services: for service in services:
@ -488,10 +495,10 @@ class Pool(object):
name = service.name() name = service.name()
if name in self._invoker_by_name: if name in self._invoker_by_name:
raise Error('service named %r already registered' % (name,)) raise Error('service named %r already registered' % (name,))
assert service.select not in self._func_by_recv assert service.select not in self._func_by_source
invoker = service.invoker_class(service=service) invoker = service.invoker_class(service=service)
self._invoker_by_name[name] = invoker self._invoker_by_name[name] = invoker
self._func_by_recv[service.select] = service.on_message self._func_by_source[service.select] = service.on_message
closed = False closed = False
@ -534,7 +541,18 @@ class Pool(object):
isinstance(tup[2], dict)): isinstance(tup[2], dict)):
raise mitogen.core.CallError('Invalid message format.') raise mitogen.core.CallError('Invalid message format.')
def _on_service_call(self, recv, msg): def defer(self, func, *args, **kwargs):
"""
Arrange for `func(*args, **kwargs)` to be invoked in the context of a
service pool thread.
"""
self._ipc_latch.put(lambda: func(*args, **kwargs))
def _on_ipc_latch(self, event):
event.data()
def _on_service_call(self, event):
msg = event.data
service_name = None service_name = None
method_name = None method_name = None
try: try:
@ -555,17 +573,17 @@ class Pool(object):
def _worker_run(self): def _worker_run(self):
while not self.closed: while not self.closed:
try: try:
msg = self._select.get() event = self._select.get_event()
except (mitogen.core.ChannelError, mitogen.core.LatchError): except (mitogen.core.ChannelError, mitogen.core.LatchError):
e = sys.exc_info()[1] e = sys.exc_info()[1]
LOG.debug('%r: channel or latch closed, exitting: %s', self, e) LOG.debug('%r: channel or latch closed, exitting: %s', self, e)
return return
func = self._func_by_recv[msg.receiver] func = self._func_by_source[event.source]
try: try:
func(msg.receiver, msg) func(event)
except Exception: except Exception:
LOG.exception('While handling %r using %r', msg, func) LOG.exception('While handling %r using %r', event.data, func)
def _worker_main(self): def _worker_main(self):
try: try: