diff --git a/mitogen/core.py b/mitogen/core.py index 3990819f..1f5ad0a9 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1949,15 +1949,60 @@ class Broker(object): return 'Broker(%#x)' % (id(self),) +class Dispatcher(object): + def __init__(self, econtext): + self.econtext = econtext + self.recv = Receiver(router=econtext.router, + handle=CALL_FUNCTION, + policy=has_parent_authority) + listen(econtext.broker, 'shutdown', self._on_broker_shutdown) + + def _on_broker_shutdown(self): + self.recv.close() + + def _dispatch_one(self, msg): + data = msg.unpickle(throw=False) + _v and LOG.debug('_dispatch_one(%r)', data) + + modname, klass, func, args, kwargs = data + obj = import_module(modname) + if klass: + obj = getattr(obj, klass) + fn = getattr(obj, func) + if getattr(fn, 'mitogen_takes_econtext', None): + kwargs.setdefault('econtext', self.econtext) + if getattr(fn, 'mitogen_takes_router', None): + kwargs.setdefault('router', self.econtext.router) + return fn(*args, **kwargs) + + def _dispatch_calls(self): + for msg in self.recv: + try: + ret = self._dispatch_one(msg) + _v and LOG.debug('_dispatch_calls: %r -> %r', msg, ret) + if msg.reply_to: + msg.reply(ret) + except Exception: + e = sys.exc_info()[1] + if msg.reply_to: + _v and LOG.debug('_dispatch_calls: %s', e) + msg.reply(CallError(e)) + else: + LOG.exception('_dispatch_calls: %r', msg) + + def run(self): + if self.econtext.config.get('on_start'): + self.econtext.config['on_start'](self) + + _profile_hook('main', self._dispatch_calls) + + class ExternalContext(object): detached = False def __init__(self, config): self.config = config - def _on_broker_shutdown(self): - self.recv.close() - def _on_broker_exit(self): if not self.config['profiling']: os.kill(os.getpid(), signal.SIGTERM) @@ -2041,16 +2086,12 @@ class ExternalContext(object): in_fd = self.config.get('in_fd', 100) out_fd = self.config.get('out_fd', 1) - self.recv = Receiver(router=self.router, - handle=CALL_FUNCTION, - policy=has_parent_authority) self.stream = Stream(self.router, parent_id) self.stream.name = 'parent' self.stream.accept(in_fd, out_fd) self.stream.receive_side.keep_alive = False listen(self.stream, 'disconnect', self._on_parent_disconnect) - listen(self.broker, 'shutdown', self._on_broker_shutdown) listen(self.broker, 'exit', self._on_broker_exit) os.close(in_fd) @@ -2148,40 +2189,6 @@ class ExternalContext(object): # Reopen with line buffering. sys.stdout = os.fdopen(1, 'w', 1) - def _dispatch_one(self, msg): - data = msg.unpickle(throw=False) - _v and LOG.debug('_dispatch_calls(%r)', data) - - modname, klass, func, args, kwargs = data - obj = import_module(modname) - if klass: - obj = getattr(obj, klass) - fn = getattr(obj, func) - if getattr(fn, 'mitogen_takes_econtext', None): - kwargs.setdefault('econtext', self) - if getattr(fn, 'mitogen_takes_router', None): - kwargs.setdefault('router', self.router) - return fn(*args, **kwargs) - - def _dispatch_calls(self): - if self.config.get('on_start'): - self.config['on_start'](self) - - for msg in self.recv: - try: - ret = self._dispatch_one(msg) - _v and LOG.debug('_dispatch_calls: %r -> %r', msg, ret) - if msg.reply_to: - msg.reply(ret) - except Exception: - e = sys.exc_info()[1] - if msg.reply_to: - _v and LOG.debug('_dispatch_calls: %s', e) - msg.reply(CallError(e)) - else: - LOG.exception('_dispatch_calls: %r', msg) - self.dispatch_stopped = True - def main(self): self._setup_master() try: @@ -2203,7 +2210,8 @@ class ExternalContext(object): self.parent, mitogen.context_id, os.getpid()) _v and LOG.debug('Recovered sys.executable: %r', sys.executable) - _profile_hook('main', self._dispatch_calls) + self.dispatcher = Dispatcher(self) + self.dispatcher.run() _v and LOG.debug('ExternalContext.main() normal exit') except KeyboardInterrupt: LOG.debug('KeyboardInterrupt received, exiting gracefully.')