From b158259c8654d9e89fcbe74819c495c3a8a3e490 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Mon, 12 Feb 2018 15:47:59 +0545 Subject: [PATCH] Split up parent and master modules Knocks 4kb off network footprint for a proxy connection. --- mitogen/fakessh.py | 6 +- mitogen/master.py | 553 ++++----------------------------------------- mitogen/parent.py | 504 +++++++++++++++++++++++++++++++++++++++++ mitogen/ssh.py | 10 +- mitogen/sudo.py | 10 +- preamble_size.py | 4 +- 6 files changed, 563 insertions(+), 524 deletions(-) create mode 100644 mitogen/parent.py diff --git a/mitogen/fakessh.py b/mitogen/fakessh.py index 96786b82..40d490c7 100644 --- a/mitogen/fakessh.py +++ b/mitogen/fakessh.py @@ -30,7 +30,6 @@ import inspect import logging import os import shutil -import signal import socket import subprocess import sys @@ -39,6 +38,7 @@ import threading import mitogen.core import mitogen.master +import mitogen.parent from mitogen.core import LOG, IOLOG @@ -123,7 +123,7 @@ class Process(object): mitogen.core.listen(self.pump, 'receive', self._on_pump_receive) if proc: - pmon = mitogen.master.ProcessMonitor.instance() + pmon = mitogen.parent.ProcessMonitor.instance() pmon.add(proc.pid, self._on_proc_exit) def __repr__(self): @@ -313,7 +313,7 @@ def _fakessh_main(dest_context_id, econtext): @mitogen.core.takes_router def run(dest, router, args, deadline=None, econtext=None): if econtext is not None: - mitogen.master.upgrade_router(econtext) + mitogen.parent.upgrade_router(econtext) context_id = router.allocate_id() fakessh = mitogen.master.Context(router, context_id) diff --git a/mitogen/master.py b/mitogen/master.py index e2deca90..86661979 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -26,24 +26,15 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import dis -import errno -import getpass import imp import inspect import itertools import logging import os import pkgutil -import pty import re -import select -import signal -import socket import sys -import termios -import textwrap import threading -import time import types import zlib @@ -58,188 +49,18 @@ if not hasattr(pkgutil, 'find_loader'): from mitogen.compat import pkgutil import mitogen.core +import mitogen.parent +from mitogen.core import LOG -LOG = logging.getLogger('mitogen') -IOLOG = logging.getLogger('mitogen.io') RLOG = logging.getLogger('mitogen.ctx') -DOCSTRING_RE = re.compile(r'""".+?"""', re.M | re.S) -COMMENT_RE = re.compile(r'^[ ]*#[^\n]*$', re.M) -IOLOG_RE = re.compile(r'^[ ]*IOLOG.debug\(.+?\)$', re.M) - - -def minimize_source(source): - subber = lambda match: '""' + ('\n' * match.group(0).count('\n')) - source = DOCSTRING_RE.sub(subber, source) - source = COMMENT_RE.sub('', source) - return source.replace(' ', '\t') - def get_child_modules(path, fullname): it = pkgutil.iter_modules([os.path.dirname(path)]) return ['%s.%s' % (fullname, name) for _, name, _ in it] -class Argv(object): - def __init__(self, argv): - self.argv = argv - - def escape(self, x): - s = '"' - for c in x: - if c in '\\$"`': - s += '\\' - s += c - s += '"' - return s - - def __str__(self): - return ' '.join(map(self.escape, self.argv)) - - -def create_child(*args): - parentfp, childfp = socket.socketpair() - pid = os.fork() - if not pid: - mitogen.core.set_block(childfp.fileno()) - os.dup2(childfp.fileno(), 0) - os.dup2(childfp.fileno(), 1) - childfp.close() - parentfp.close() - os.execvp(args[0], args) - - childfp.close() - # Decouple the socket from the lifetime of the Python socket object. - fd = os.dup(parentfp.fileno()) - parentfp.close() - - LOG.debug('create_child() child %d fd %d, parent %d, cmd: %s', - pid, fd, os.getpid(), Argv(args)) - return pid, fd - - -def flags(names): - """Return the result of ORing a set of (space separated) :py:mod:`termios` - module constants together.""" - return sum(getattr(termios, name) for name in names.split()) - - -def cfmakeraw(tflags): - """Given a list returned by :py:func:`termios.tcgetattr`, return a list - that has been modified in the same manner as the `cfmakeraw()` C library - function.""" - iflag, oflag, cflag, lflag, ispeed, ospeed, cc = tflags - iflag &= ~flags('IGNBRK BRKINT PARMRK ISTRIP INLCR IGNCR ICRNL IXON') - oflag &= ~flags('OPOST IXOFF') - lflag &= ~flags('ECHO ECHOE ECHONL ICANON ISIG IEXTEN') - cflag &= ~flags('CSIZE PARENB') - cflag |= flags('CS8') - - # TODO: one or more of the above bit twiddles sets or omits a necessary - # flag. Forcing these fields to zero, as shown below, gets us what we want - # on Linux/OS X, but it is possibly broken on some other OS. - iflag = 0 - oflag = 0 - lflag = 0 - return [iflag, oflag, cflag, lflag, ispeed, ospeed, cc] - - -def disable_echo(fd): - old = termios.tcgetattr(fd) - new = cfmakeraw(old) - flags = ( - termios.TCSAFLUSH | - getattr(termios, 'TCSASOFT', 0) - ) - termios.tcsetattr(fd, flags, new) - - -def close_nonstandard_fds(): - for fd in xrange(3, 1024): - try: - os.close(fd) - except OSError: - pass - - -def tty_create_child(*args): - master_fd, slave_fd = os.openpty() - disable_echo(master_fd) - disable_echo(slave_fd) - - pid = os.fork() - if not pid: - mitogen.core.set_block(slave_fd) - os.dup2(slave_fd, 0) - os.dup2(slave_fd, 1) - os.dup2(slave_fd, 2) - close_nonstandard_fds() - os.setsid() - os.close(os.open(os.ttyname(1), os.O_RDWR)) - os.execvp(args[0], args) - - os.close(slave_fd) - LOG.debug('tty_create_child() child %d fd %d, parent %d, cmd: %s', - pid, master_fd, os.getpid(), Argv(args)) - return pid, master_fd - - -def write_all(fd, s, deadline=None): - timeout = None - written = 0 - - while written < len(s): - if deadline is not None: - timeout = max(0, deadline - time.time()) - if timeout == 0: - raise mitogen.core.TimeoutError('write timed out') - - _, wfds, _ = select.select([], [fd], [], timeout) - if not wfds: - continue - - n, disconnected = mitogen.core.io_op(os.write, fd, buffer(s, written)) - if disconnected: - raise mitogen.core.StreamError('EOF on stream during write') - - written += n - - -def iter_read(fd, deadline=None): - bits = [] - timeout = None - - while True: - if deadline is not None: - timeout = max(0, deadline - time.time()) - if timeout == 0: - break - - rfds, _, _ = select.select([fd], [], [], timeout) - if not rfds: - continue - - s, disconnected = mitogen.core.io_op(os.read, fd, 4096) - IOLOG.debug('iter_read(%r) -> %r', fd, s) - if disconnected or not s: - raise mitogen.core.StreamError( - 'EOF on stream; last 300 bytes received: %r' % - (''.join(bits)[-300:],) - ) - - bits.append(s) - yield s - - raise mitogen.core.TimeoutError('read timed out') - - -def discard_until(fd, s, deadline): - for buf in iter_read(fd, deadline): - if buf.endswith(s): - return - - def scan_code_imports(co, LOAD_CONST=dis.opname.index('LOAD_CONST'), IMPORT_NAME=dis.opname.index('IMPORT_NAME')): """Given a code object `co`, scan its bytecode yielding any @@ -713,170 +534,6 @@ class ModuleResponder(object): ) -class ModuleForwarder(object): - """ - Respond to GET_MODULE requests in a slave by forwarding the request to our - parent context, or satisfying the request from our local Importer cache. - """ - def __init__(self, router, parent_context, importer): - self.router = router - self.parent_context = parent_context - self.importer = importer - router.add_handler(self._on_get_module, mitogen.core.GET_MODULE) - - def __repr__(self): - return 'ModuleForwarder(%r)' % (self.router,) - - def _on_get_module(self, msg): - LOG.debug('%r._on_get_module(%r)', self, msg) - if msg == mitogen.core._DEAD: - return - - fullname = msg.data - callback = lambda: self._on_cache_callback(msg, fullname) - self.importer._request_module(fullname, callback) - - def _send_one_module(self, msg, tup): - self.router.route( - mitogen.core.Message.pickled( - tup, - dst_id=msg.src_id, - handle=mitogen.core.LOAD_MODULE, - ) - ) - - def _on_cache_callback(self, msg, fullname): - LOG.debug('%r._on_get_module(): sending %r', self, fullname) - tup = self.importer._cache[fullname] - if tup is not None: - for related in tup[4]: - rtup = self.importer._cache[fullname] - self._send_one_module(msg, rtup) - - self._send_one_module(msg, tup) - - -class Stream(mitogen.core.Stream): - """ - Base for streams capable of starting new slaves. - """ - #: The path to the remote Python interpreter. - python_path = 'python2.7' - - #: True to cause context to write verbose /tmp/mitogen..log. - debug = False - - #: True to cause context to write /tmp/mitogen.stats...log. - profiling = False - - def __init__(self, *args, **kwargs): - super(Stream, self).__init__(*args, **kwargs) - self.sent_modules = set(['mitogen', 'mitogen.core']) - self.sent_packages = set(['mitogen']) - - def construct(self, remote_name=None, python_path=None, debug=False, - profiling=False, **kwargs): - """Get the named context running on the local machine, creating it if - it does not exist.""" - super(Stream, self).construct(**kwargs) - if python_path: - self.python_path = python_path - - if remote_name is None: - remote_name = '%s@%s:%d' - remote_name %= (getpass.getuser(), socket.gethostname(), os.getpid()) - self.remote_name = remote_name - self.debug = debug - self.profiling = profiling - - def on_shutdown(self, broker): - """Request the slave gracefully shut itself down.""" - LOG.debug('%r closing CALL_FUNCTION channel', self) - self.send( - mitogen.core.Message( - src_id=mitogen.context_id, - dst_id=self.remote_id, - handle=mitogen.core.SHUTDOWN, - ) - ) - - # base64'd and passed to 'python -c'. It forks, dups 0->100, creates a - # pipe, then execs a new interpreter with a custom argv. 'CONTEXT_NAME' is - # replaced with the context name. Optimized for size. - @staticmethod - def _first_stage(): - import os,sys,zlib - R,W=os.pipe() - r,w=os.pipe() - if os.fork(): - os.dup2(0,100) - os.dup2(R,0) - os.dup2(r,101) - for f in R,r,W,w:os.close(f) - os.environ['ARGV0']=e=sys.executable - os.execv(e,['mitogen:CONTEXT_NAME']) - os.write(1,'EC0\n') - C=zlib.decompress(sys.stdin.read(input())) - os.fdopen(W,'w',0).write(C) - os.fdopen(w,'w',0).write('%s\n'%len(C)+C) - os.write(1,'EC1\n') - sys.exit(0) - - def get_boot_command(self): - source = inspect.getsource(self._first_stage) - source = textwrap.dedent('\n'.join(source.strip().split('\n')[2:])) - source = source.replace(' ', '\t') - source = source.replace('CONTEXT_NAME', self.remote_name) - encoded = source.encode('zlib').encode('base64').replace('\n', '') - # We can't use bytes.decode() in 3.x since it was restricted to always - # return unicode, so codecs.decode() is used instead. In 3.x - # codecs.decode() requires a bytes object. Since we must be compatible - # with 2.4 (no bytes literal), an extra .encode() either returns the - # same str (2.x) or an equivalent bytes (3.x). - return [ - self.python_path, '-c', - 'from codecs import decode as _;' - 'exec(_(_("%s".encode(),"base64"),"zlib"))' % (encoded,) - ] - - def get_preamble(self): - parent_ids = mitogen.parent_ids[:] - parent_ids.insert(0, mitogen.context_id) - source = inspect.getsource(mitogen.core) - source += '\nExternalContext().main%r\n' % (( - parent_ids, # parent_ids - self.remote_id, # context_id - self.debug, - self.profiling, - LOG.level or logging.getLogger().level or logging.INFO, - ),) - - compressed = zlib.compress(minimize_source(source)) - return str(len(compressed)) + '\n' + compressed - - create_child = staticmethod(create_child) - - def connect(self): - LOG.debug('%r.connect()', self) - pid, fd = self.create_child(*self.get_boot_command()) - self.name = 'local.%s' % (pid,) - self.receive_side = mitogen.core.Side(self, fd) - self.transmit_side = mitogen.core.Side(self, os.dup(fd)) - LOG.debug('%r.connect(): child process stdin/stdout=%r', - self, self.receive_side.fd) - - self._connect_bootstrap() - - def _ec0_received(self): - LOG.debug('%r._ec0_received()', self) - write_all(self.transmit_side.fd, self.get_preamble()) - discard_until(self.receive_side.fd, 'EC1\n', time.time() + 10.0) - - def _connect_bootstrap(self): - discard_until(self.receive_side.fd, 'EC0\n', time.time() + 10.0) - self._ec0_received() - - class Broker(mitogen.core.Broker): shutdown_timeout = 5.0 @@ -919,43 +576,55 @@ class Context(mitogen.core.Context): return self.call_async(fn, *args, **kwargs).get_data() -def _local_method(): - return Stream +class Router(mitogen.parent.Router): + context_class = Context + broker_class = Broker + debug = False + profiling = False -def _ssh_method(): - import mitogen.ssh - return mitogen.ssh.Stream + def __init__(self, broker=None): + if broker is None: + broker = self.broker_class() + super(Router, self).__init__(broker) + self.id_allocator = IdAllocator(self) + self.responder = ModuleResponder(self) + self.log_forwarder = LogForwarder(self) -def _sudo_method(): - import mitogen.sudo - return mitogen.sudo.Stream + def enable_debug(self): + mitogen.core.enable_debug_logging() + self.debug = True + def __enter__(self): + return self -METHOD_NAMES = { - 'local': _local_method, - 'ssh': _ssh_method, - 'sudo': _sudo_method, -} + def __exit__(self, e_type, e_val, tb): + self.broker.shutdown() + self.broker.join() + def local(self, **kwargs): + return self.connect('local', **kwargs) -def upgrade_router(econtext): - if not isinstance(econtext.router, Router): # TODO - econtext.router.__class__ = Router # TODO - econtext.router.id_allocator = ChildIdAllocator(econtext.router) - LOG.debug('_proxy_connect(): constructing ModuleForwarder') - ModuleForwarder(econtext.router, econtext.parent, econtext.importer) + def sudo(self, **kwargs): + return self.connect('sudo', **kwargs) + def ssh(self, **kwargs): + return self.connect('ssh', **kwargs) -@mitogen.core.takes_econtext -def _proxy_connect(name, context_id, method_name, kwargs, econtext): - upgrade_router(econtext) - context = econtext.router._connect( - context_id, - METHOD_NAMES[method_name](), - name=name, - **kwargs - ) - return context.name + def propagate_route(self, target, via): + self.add_route(target.context_id, via.context_id) + child = via + parent = via.via + + while parent is not None: + LOG.debug('Adding route to %r for %r via %r', parent, target, child) + parent.send( + mitogen.core.Message( + data='%s\x00%s' % (target.context_id, child.context_id), + handle=mitogen.core.ADD_ROUTE, + ) + ) + child = parent + parent = parent.via class IdAllocator(object): @@ -994,139 +663,3 @@ class IdAllocator(object): LOG.debug('%r: publishing route to %r via %r', self, allocated, requestee) self.router.propagate_route(allocated, requestee) - - -class ChildIdAllocator(object): - def __init__(self, router): - self.router = router - - def allocate(self): - master = Context(self.router, 0) - return master.send_await( - mitogen.core.Message(dst_id=0, handle=mitogen.core.ALLOCATE_ID) - ) - - -class Router(mitogen.core.Router): - broker_class = Broker - debug = False - - profiling = False - - def __init__(self, broker=None): - if broker is None: - broker = self.broker_class() - super(Router, self).__init__(broker) - self.id_allocator = IdAllocator(self) - self.responder = ModuleResponder(self) - self.log_forwarder = LogForwarder(self) - - def enable_debug(self): - mitogen.core.enable_debug_logging() - self.debug = True - - def __enter__(self): - return self - - def __exit__(self, e_type, e_val, tb): - self.broker.shutdown() - self.broker.join() - - def allocate_id(self): - return self.id_allocator.allocate() - - def context_by_id(self, context_id, via_id=None): - context = self._context_by_id.get(context_id) - if context is None: - context = Context(self, context_id) - if via_id is not None: - context.via = self.context_by_id(via_id) - self._context_by_id[context_id] = context - return context - - def local(self, **kwargs): - return self.connect('local', **kwargs) - - def sudo(self, **kwargs): - return self.connect('sudo', **kwargs) - - def ssh(self, **kwargs): - return self.connect('ssh', **kwargs) - - def _connect(self, context_id, klass, name=None, **kwargs): - context = Context(self, context_id) - stream = klass(self, context.context_id, **kwargs) - if name is not None: - stream.name = name - stream.connect() - context.name = stream.name - self.register(context, stream) - return context - - def connect(self, method_name, name=None, **kwargs): - klass = METHOD_NAMES[method_name]() - kwargs.setdefault('debug', self.debug) - kwargs.setdefault('profiling', self.profiling) - - via = kwargs.pop('via', None) - if via is not None: - return self.proxy_connect(via, method_name, name=name, **kwargs) - context_id = self.allocate_id() - return self._connect(context_id, klass, name=name, **kwargs) - - def propagate_route(self, target, via): - self.add_route(target.context_id, via.context_id) - child = via - parent = via.via - - while parent is not None: - LOG.debug('Adding route to %r for %r via %r', parent, target, child) - parent.send( - mitogen.core.Message( - data='%s\x00%s' % (target.context_id, child.context_id), - handle=mitogen.core.ADD_ROUTE, - ) - ) - child = parent - parent = parent.via - - def proxy_connect(self, via_context, method_name, name=None, **kwargs): - context_id = self.allocate_id() - # Must be added prior to _proxy_connect() to avoid a race. - self.add_route(context_id, via_context.context_id) - name = via_context.call(_proxy_connect, - name, context_id, method_name, kwargs - ) - name = '%s.%s' % (via_context.name, name) - - context = Context(self, context_id, name=name) - context.via = via_context - self._context_by_id[context.context_id] = context - - self.propagate_route(context, via_context) - return context - - -class ProcessMonitor(object): - def __init__(self): - # pid -> callback() - self.callback_by_pid = {} - signal.signal(signal.SIGCHLD, self._on_sigchld) - - def _on_sigchld(self, _signum, _frame): - for pid, callback in self.callback_by_pid.items(): - pid, status = os.waitpid(pid, os.WNOHANG) - if pid: - callback(status) - del self.callback_by_pid[pid] - - def add(self, pid, callback): - self.callback_by_pid[pid] = callback - - _instance = None - - @classmethod - def instance(cls): - if cls._instance is None: - cls._instance = cls() - return cls._instance diff --git a/mitogen/parent.py b/mitogen/parent.py new file mode 100644 index 00000000..9c91f348 --- /dev/null +++ b/mitogen/parent.py @@ -0,0 +1,504 @@ +# Copyright 2017, David Wilson +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its contributors +# may be used to endorse or promote products derived from this software without +# specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import re +import logging +import os +import termios +import signal +import select +import getpass +import time +import socket +import inspect +import textwrap +import zlib + +import mitogen.core +from mitogen.core import LOG, IOLOG + + +DOCSTRING_RE = re.compile(r'""".+?"""', re.M | re.S) +COMMENT_RE = re.compile(r'^[ ]*#[^\n]*$', re.M) + + +class Argv(object): + def __init__(self, argv): + self.argv = argv + + def escape(self, x): + s = '"' + for c in x: + if c in '\\$"`': + s += '\\' + s += c + s += '"' + return s + + def __str__(self): + return ' '.join(map(self.escape, self.argv)) + + +def minimize_source(source): + subber = lambda match: '""' + ('\n' * match.group(0).count('\n')) + source = DOCSTRING_RE.sub(subber, source) + source = COMMENT_RE.sub('', source) + return source.replace(' ', '\t') + + +def flags(names): + """Return the result of ORing a set of (space separated) :py:mod:`termios` + module constants together.""" + return sum(getattr(termios, name) for name in names.split()) + + +def cfmakeraw(tflags): + """Given a list returned by :py:func:`termios.tcgetattr`, return a list + that has been modified in the same manner as the `cfmakeraw()` C library + function.""" + iflag, oflag, cflag, lflag, ispeed, ospeed, cc = tflags + iflag &= ~flags('IGNBRK BRKINT PARMRK ISTRIP INLCR IGNCR ICRNL IXON') + oflag &= ~flags('OPOST IXOFF') + lflag &= ~flags('ECHO ECHOE ECHONL ICANON ISIG IEXTEN') + cflag &= ~flags('CSIZE PARENB') + cflag |= flags('CS8') + + # TODO: one or more of the above bit twiddles sets or omits a necessary + # flag. Forcing these fields to zero, as shown below, gets us what we want + # on Linux/OS X, but it is possibly broken on some other OS. + iflag = 0 + oflag = 0 + lflag = 0 + return [iflag, oflag, cflag, lflag, ispeed, ospeed, cc] + + +def disable_echo(fd): + old = termios.tcgetattr(fd) + new = cfmakeraw(old) + flags = ( + termios.TCSAFLUSH | + getattr(termios, 'TCSASOFT', 0) + ) + termios.tcsetattr(fd, flags, new) + + +def close_nonstandard_fds(): + for fd in xrange(3, 1024): + try: + os.close(fd) + except OSError: + pass + + +def create_child(*args): + parentfp, childfp = socket.socketpair() + pid = os.fork() + if not pid: + mitogen.core.set_block(childfp.fileno()) + os.dup2(childfp.fileno(), 0) + os.dup2(childfp.fileno(), 1) + childfp.close() + parentfp.close() + os.execvp(args[0], args) + + childfp.close() + # Decouple the socket from the lifetime of the Python socket object. + fd = os.dup(parentfp.fileno()) + parentfp.close() + + LOG.debug('create_child() child %d fd %d, parent %d, cmd: %s', + pid, fd, os.getpid(), Argv(args)) + return pid, fd + + +def tty_create_child(*args): + master_fd, slave_fd = os.openpty() + disable_echo(master_fd) + disable_echo(slave_fd) + + pid = os.fork() + if not pid: + mitogen.core.set_block(slave_fd) + os.dup2(slave_fd, 0) + os.dup2(slave_fd, 1) + os.dup2(slave_fd, 2) + close_nonstandard_fds() + os.setsid() + os.close(os.open(os.ttyname(1), os.O_RDWR)) + os.execvp(args[0], args) + + os.close(slave_fd) + LOG.debug('tty_create_child() child %d fd %d, parent %d, cmd: %s', + pid, master_fd, os.getpid(), Argv(args)) + return pid, master_fd + + +def write_all(fd, s, deadline=None): + timeout = None + written = 0 + + while written < len(s): + if deadline is not None: + timeout = max(0, deadline - time.time()) + if timeout == 0: + raise mitogen.core.TimeoutError('write timed out') + + _, wfds, _ = select.select([], [fd], [], timeout) + if not wfds: + continue + + n, disconnected = mitogen.core.io_op(os.write, fd, buffer(s, written)) + if disconnected: + raise mitogen.core.StreamError('EOF on stream during write') + + written += n + + +def iter_read(fd, deadline=None): + bits = [] + timeout = None + + while True: + if deadline is not None: + timeout = max(0, deadline - time.time()) + if timeout == 0: + break + + rfds, _, _ = select.select([fd], [], [], timeout) + if not rfds: + continue + + s, disconnected = mitogen.core.io_op(os.read, fd, 4096) + IOLOG.debug('iter_read(%r) -> %r', fd, s) + if disconnected or not s: + raise mitogen.core.StreamError( + 'EOF on stream; last 300 bytes received: %r' % + (''.join(bits)[-300:],) + ) + + bits.append(s) + yield s + + raise mitogen.core.TimeoutError('read timed out') + + +def discard_until(fd, s, deadline): + for buf in iter_read(fd, deadline): + if buf.endswith(s): + return + + +def upgrade_router(econtext): + if not isinstance(econtext.router, Router): # TODO + econtext.router.__class__ = Router # TODO + econtext.router.id_allocator = ChildIdAllocator(econtext.router) + LOG.debug('_proxy_connect(): constructing ModuleForwarder') + ModuleForwarder(econtext.router, econtext.parent, econtext.importer) + + +def _local_method(): + return mitogen.parent.Stream + +def _ssh_method(): + import mitogen.ssh + return mitogen.ssh.Stream + +def _sudo_method(): + import mitogen.sudo + return mitogen.sudo.Stream + + +METHOD_NAMES = { + 'local': _local_method, + 'ssh': _ssh_method, + 'sudo': _sudo_method, +} + + +@mitogen.core.takes_econtext +def _proxy_connect(name, context_id, method_name, kwargs, econtext): + mitogen.parent.upgrade_router(econtext) + context = econtext.router._connect( + context_id, + METHOD_NAMES[method_name](), + name=name, + **kwargs + ) + return context.name + + +class Stream(mitogen.core.Stream): + """ + Base for streams capable of starting new slaves. + """ + #: The path to the remote Python interpreter. + python_path = 'python2.7' + + #: True to cause context to write verbose /tmp/mitogen..log. + debug = False + + #: True to cause context to write /tmp/mitogen.stats...log. + profiling = False + + def __init__(self, *args, **kwargs): + super(Stream, self).__init__(*args, **kwargs) + self.sent_modules = set(['mitogen', 'mitogen.core']) + self.sent_packages = set(['mitogen']) + + def construct(self, remote_name=None, python_path=None, debug=False, + profiling=False, **kwargs): + """Get the named context running on the local machine, creating it if + it does not exist.""" + super(Stream, self).construct(**kwargs) + if python_path: + self.python_path = python_path + + if remote_name is None: + remote_name = '%s@%s:%d' + remote_name %= (getpass.getuser(), socket.gethostname(), os.getpid()) + self.remote_name = remote_name + self.debug = debug + self.profiling = profiling + + def on_shutdown(self, broker): + """Request the slave gracefully shut itself down.""" + LOG.debug('%r closing CALL_FUNCTION channel', self) + self.send( + mitogen.core.Message( + src_id=mitogen.context_id, + dst_id=self.remote_id, + handle=mitogen.core.SHUTDOWN, + ) + ) + + # base64'd and passed to 'python -c'. It forks, dups 0->100, creates a + # pipe, then execs a new interpreter with a custom argv. 'CONTEXT_NAME' is + # replaced with the context name. Optimized for size. + @staticmethod + def _first_stage(): + import os,sys,zlib + R,W=os.pipe() + r,w=os.pipe() + if os.fork(): + os.dup2(0,100) + os.dup2(R,0) + os.dup2(r,101) + for f in R,r,W,w:os.close(f) + os.environ['ARGV0']=e=sys.executable + os.execv(e,['mitogen:CONTEXT_NAME']) + os.write(1,'EC0\n') + C=zlib.decompress(sys.stdin.read(input())) + os.fdopen(W,'w',0).write(C) + os.fdopen(w,'w',0).write('%s\n'%len(C)+C) + os.write(1,'EC1\n') + sys.exit(0) + + def get_boot_command(self): + source = inspect.getsource(self._first_stage) + source = textwrap.dedent('\n'.join(source.strip().split('\n')[2:])) + source = source.replace(' ', '\t') + source = source.replace('CONTEXT_NAME', self.remote_name) + encoded = source.encode('zlib').encode('base64').replace('\n', '') + # We can't use bytes.decode() in 3.x since it was restricted to always + # return unicode, so codecs.decode() is used instead. In 3.x + # codecs.decode() requires a bytes object. Since we must be compatible + # with 2.4 (no bytes literal), an extra .encode() either returns the + # same str (2.x) or an equivalent bytes (3.x). + return [ + self.python_path, '-c', + 'from codecs import decode as _;' + 'exec(_(_("%s".encode(),"base64"),"zlib"))' % (encoded,) + ] + + def get_preamble(self): + parent_ids = mitogen.parent_ids[:] + parent_ids.insert(0, mitogen.context_id) + source = inspect.getsource(mitogen.core) + source += '\nExternalContext().main%r\n' % (( + parent_ids, # parent_ids + self.remote_id, # context_id + self.debug, + self.profiling, + LOG.level or logging.getLogger().level or logging.INFO, + ),) + + compressed = zlib.compress(minimize_source(source)) + return str(len(compressed)) + '\n' + compressed + + create_child = staticmethod(create_child) + + def connect(self): + LOG.debug('%r.connect()', self) + pid, fd = self.create_child(*self.get_boot_command()) + self.name = 'local.%s' % (pid,) + self.receive_side = mitogen.core.Side(self, fd) + self.transmit_side = mitogen.core.Side(self, os.dup(fd)) + LOG.debug('%r.connect(): child process stdin/stdout=%r', + self, self.receive_side.fd) + + self._connect_bootstrap() + + def _ec0_received(self): + LOG.debug('%r._ec0_received()', self) + write_all(self.transmit_side.fd, self.get_preamble()) + discard_until(self.receive_side.fd, 'EC1\n', time.time() + 10.0) + + def _connect_bootstrap(self): + discard_until(self.receive_side.fd, 'EC0\n', time.time() + 10.0) + self._ec0_received() + + +class ChildIdAllocator(object): + def __init__(self, router): + self.router = router + + def allocate(self): + master = mitogen.core.Context(self.router, 0) + return master.send_await( + mitogen.core.Message(dst_id=0, handle=mitogen.core.ALLOCATE_ID) + ) + + +class Router(mitogen.core.Router): + context_class = mitogen.core.Context + + def allocate_id(self): + return self.id_allocator.allocate() + + def context_by_id(self, context_id, via_id=None): + context = self._context_by_id.get(context_id) + if context is None: + context = self.context_class(self, context_id) + if via_id is not None: + context.via = self.context_by_id(via_id) + self._context_by_id[context_id] = context + return context + + def _connect(self, context_id, klass, name=None, **kwargs): + context = self.context_class(self, context_id) + stream = klass(self, context.context_id, **kwargs) + if name is not None: + stream.name = name + stream.connect() + context.name = stream.name + self.register(context, stream) + return context + + def connect(self, method_name, name=None, **kwargs): + klass = METHOD_NAMES[method_name]() + kwargs.setdefault('debug', self.debug) + kwargs.setdefault('profiling', self.profiling) + + via = kwargs.pop('via', None) + if via is not None: + return self.proxy_connect(via, method_name, name=name, **kwargs) + context_id = self.allocate_id() + return self._connect(context_id, klass, name=name, **kwargs) + + def proxy_connect(self, via_context, method_name, name=None, **kwargs): + context_id = self.allocate_id() + # Must be added prior to _proxy_connect() to avoid a race. + self.add_route(context_id, via_context.context_id) + name = via_context.call(_proxy_connect, + name, context_id, method_name, kwargs + ) + name = '%s.%s' % (via_context.name, name) + + context = self.context_class(self, context_id, name=name) + context.via = via_context + self._context_by_id[context.context_id] = context + + self.propagate_route(context, via_context) + return context + + +class ProcessMonitor(object): + def __init__(self): + # pid -> callback() + self.callback_by_pid = {} + signal.signal(signal.SIGCHLD, self._on_sigchld) + + def _on_sigchld(self, _signum, _frame): + for pid, callback in self.callback_by_pid.items(): + pid, status = os.waitpid(pid, os.WNOHANG) + if pid: + callback(status) + del self.callback_by_pid[pid] + + def add(self, pid, callback): + self.callback_by_pid[pid] = callback + + _instance = None + + @classmethod + def instance(cls): + if cls._instance is None: + cls._instance = cls() + return cls._instance + + +class ModuleForwarder(object): + """ + Respond to GET_MODULE requests in a slave by forwarding the request to our + parent context, or satisfying the request from our local Importer cache. + """ + def __init__(self, router, parent_context, importer): + self.router = router + self.parent_context = parent_context + self.importer = importer + router.add_handler(self._on_get_module, mitogen.core.GET_MODULE) + + def __repr__(self): + return 'ModuleForwarder(%r)' % (self.router,) + + def _on_get_module(self, msg): + LOG.debug('%r._on_get_module(%r)', self, msg) + if msg == mitogen.core._DEAD: + return + + fullname = msg.data + callback = lambda: self._on_cache_callback(msg, fullname) + self.importer._request_module(fullname, callback) + + def _send_one_module(self, msg, tup): + self.router.route( + mitogen.core.Message.pickled( + tup, + dst_id=msg.src_id, + handle=mitogen.core.LOAD_MODULE, + ) + ) + + def _on_cache_callback(self, msg, fullname): + LOG.debug('%r._on_get_module(): sending %r', self, fullname) + tup = self.importer._cache[fullname] + if tup is not None: + for related in tup[4]: + rtup = self.importer._cache[fullname] + self._send_one_module(msg, rtup) + + self._send_one_module(msg, tup) diff --git a/mitogen/ssh.py b/mitogen/ssh.py index faee347b..adab955d 100644 --- a/mitogen/ssh.py +++ b/mitogen/ssh.py @@ -33,7 +33,7 @@ import commands import logging import time -import mitogen.master +import mitogen.parent LOG = logging.getLogger('mitogen') @@ -46,8 +46,8 @@ class PasswordError(mitogen.core.Error): pass -class Stream(mitogen.master.Stream): - create_child = staticmethod(mitogen.master.tty_create_child) +class Stream(mitogen.parent.Stream): + create_child = staticmethod(mitogen.parent.tty_create_child) python_path = 'python2.7' #: The path to the SSH binary. @@ -103,8 +103,8 @@ class Stream(mitogen.master.Stream): def _connect_bootstrap(self): password_sent = False - for buf in mitogen.master.iter_read(self.receive_side.fd, - time.time() + 10.0): + for buf in mitogen.parent.iter_read(self.receive_side.fd, + time.time() + 10.0): LOG.debug('%r: received %r', self, buf) if buf.endswith('EC0\n'): self._ec0_received() diff --git a/mitogen/sudo.py b/mitogen/sudo.py index 76889066..808f459d 100644 --- a/mitogen/sudo.py +++ b/mitogen/sudo.py @@ -30,7 +30,7 @@ import os import time import mitogen.core -import mitogen.master +import mitogen.parent LOG = logging.getLogger(__name__) @@ -41,8 +41,8 @@ class PasswordError(mitogen.core.Error): pass -class Stream(mitogen.master.Stream): - create_child = staticmethod(mitogen.master.tty_create_child) +class Stream(mitogen.parent.Stream): + create_child = staticmethod(mitogen.parent.tty_create_child) sudo_path = 'sudo' password = None @@ -93,8 +93,8 @@ class Stream(mitogen.master.Stream): def _connect_bootstrap(self): password_sent = False - for buf in mitogen.master.iter_read(self.receive_side.fd, - time.time() + 10.0): + for buf in mitogen.parent.iter_read(self.receive_side.fd, + time.time() + 10.0): LOG.debug('%r: received %r', self, buf) if buf.endswith('EC0\n'): self._ec0_received() diff --git a/preamble_size.py b/preamble_size.py index fd6d9423..4f2334e5 100644 --- a/preamble_size.py +++ b/preamble_size.py @@ -8,6 +8,7 @@ import zlib import mitogen.fakessh import mitogen.master +import mitogen.parent import mitogen.ssh import mitogen.sudo @@ -26,9 +27,10 @@ print 'Preamble size: %s (%.2fKiB)' % ( for mod in ( mitogen.master, + mitogen.parent, mitogen.ssh, mitogen.sudo, mitogen.fakessh, ): - sz = len(zlib.compress(mitogen.master.minimize_source(inspect.getsource(mod)))) + sz = len(zlib.compress(mitogen.parent.minimize_source(inspect.getsource(mod)))) print '%s size: %s (%.2fKiB)' % (mod.__name__, sz, sz / 1024.0)