mitogen/econtext/master.py

325 lines
12 KiB
Python
Raw Normal View History

2016-08-12 17:45:26 +00:00
"""
This module implements functionality required by master processes, such as
starting new contexts via SSH. Its size is also restricted, since it must be
sent to any context that will be used to establish additional child contexts.
"""
2016-08-11 00:57:39 +00:00
import commands
import getpass
import inspect
import logging
import os
import pkgutil
import re
import socket
import sys
import textwrap
import types
2016-08-11 00:57:39 +00:00
import zlib
if not hasattr(pkgutil, 'find_loader'):
# find_loader() was new in >=2.5, but the modern pkgutil.py syntax has
# been kept intentionally 2.3 compatible so we can reuse it.
from econtext.compat import pkgutil
2016-08-11 00:57:39 +00:00
import econtext.core
LOG = logging.getLogger('econtext')
IOLOG = logging.getLogger('econtext.io')
RLOG = logging.getLogger('econtext.ctx')
DOCSTRING_RE = re.compile(r'""".+?"""', re.M | re.S)
2016-08-16 01:55:28 +00:00
COMMENT_RE = re.compile(r'^[ ]*#[^\n]*$', re.M)
IOLOG_RE = re.compile(r'^[ ]*IOLOG.debug\(.+?\)$', re.M)
2016-08-11 17:15:53 +00:00
def minimize_source(source):
2016-08-11 15:46:51 +00:00
"""Remove comments and docstrings from Python `source`, preserving line
numbers and syntax of empty blocks."""
subber = lambda match: '""' + ('\n' * match.group(0).count('\n'))
source = DOCSTRING_RE.sub(subber, source)
2016-08-16 01:19:15 +00:00
source = COMMENT_RE.sub('', source)
return source.replace(' ', '\t')
2016-08-11 00:57:39 +00:00
def get_child_modules(path, fullname):
2016-08-11 15:46:51 +00:00
"""Return the canonical names of all submodules of a package `module`."""
it = pkgutil.iter_modules([os.path.dirname(path)])
return ['%s.%s' % (fullname, name) for _, name, _ in it]
2016-08-11 00:57:39 +00:00
2016-08-11 17:15:53 +00:00
def create_child(*args):
2016-08-11 00:57:39 +00:00
"""Create a child process whose stdin/stdout is connected to a socket,
returning `(pid, socket_obj)`."""
parentfp, childfp = socket.socketpair()
pid = os.fork()
if not pid:
os.dup2(childfp.fileno(), 0)
os.dup2(childfp.fileno(), 1)
childfp.close()
parentfp.close()
os.execvp(args[0], args)
raise SystemExit
childfp.close()
2016-08-11 17:15:53 +00:00
LOG.debug('create_child() child %d fd %d, parent %d, args %r',
2016-08-11 00:57:39 +00:00
pid, parentfp.fileno(), os.getpid(), args)
return pid, parentfp
class Listener(econtext.core.BasicStream):
def __init__(self, broker, address=None, backlog=30):
self._broker = broker
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.bind(address or ('0.0.0.0', 0))
self._sock.listen(backlog)
2016-08-11 01:39:29 +00:00
econtext.core.set_cloexec(self._sock.fileno())
2016-08-11 00:57:39 +00:00
self._listen_addr = self._sock.getsockname()
self.receive_side = econtext.core.Side(self, self._sock.fileno())
2016-08-11 17:15:53 +00:00
broker.update_stream(self)
2016-08-11 00:57:39 +00:00
def on_receive(self, broker):
2016-08-11 00:57:39 +00:00
sock, addr = self._sock.accept()
context = Context(self._broker, name=addr)
stream = econtext.core.Stream(context)
2016-08-11 17:15:53 +00:00
stream.accept(sock.fileno(), sock.fileno())
2016-08-11 00:57:39 +00:00
class LogForwarder(object):
def __init__(self, context):
self._context = context
2016-08-11 17:15:53 +00:00
self._context.add_handle_cb(self.forward_log,
handle=econtext.core.FORWARD_LOG)
name = '%s.%s' % (RLOG.name, self._context.name)
self._log = logging.getLogger(name)
2016-08-11 00:57:39 +00:00
2016-08-11 17:15:53 +00:00
def forward_log(self, data):
2016-08-11 00:57:39 +00:00
if data == econtext.core._DEAD:
return
name, level, s = data
self._log.log(level, '%s: %s', name, s)
class ModuleResponder(object):
def __init__(self, context):
self._context = context
2016-08-11 17:15:53 +00:00
self._context.add_handle_cb(self.get_module,
handle=econtext.core.GET_MODULE)
2016-08-11 00:57:39 +00:00
def __repr__(self):
return 'ModuleResponder(%r)' % (self._context,)
2016-08-11 17:15:53 +00:00
def get_module(self, data):
2016-08-11 00:57:39 +00:00
if data == econtext.core._DEAD:
return
reply_to, fullname = data
LOG.debug('%r.get_module(%r, %r)', self, reply_to, fullname)
2016-08-11 00:57:39 +00:00
try:
loader = pkgutil.find_loader(fullname)
LOG.debug('pkgutil.find_loader(%r) -> %r', fullname, loader)
if loader is None:
raise ImportError('pkgutil provides no loader for %r' %
(fullname,))
path = loader.get_filename(fullname)
LOG.debug('%r.get_filename(%r) -> %r', loader, fullname, path)
# Handle __main__ specially.
if path is None and fullname in sys.modules:
path = sys.modules[fullname].__file__.rstrip('co')
source = inspect.getsource(sys.modules[fullname])
is_pkg = hasattr(sys.modules[fullname], '__path__')
else:
source = loader.get_source(fullname)
is_pkg = loader.is_package(fullname)
2016-08-11 00:57:39 +00:00
if is_pkg:
pkg_present = get_child_modules(path, fullname)
LOG.debug('get_child_modules(%r, %r) -> %r',
path, fullname, pkg_present)
2016-08-11 00:57:39 +00:00
else:
pkg_present = None
2016-08-11 00:57:39 +00:00
2016-08-11 17:15:53 +00:00
compressed = zlib.compress(minimize_source(source))
reply = (pkg_present, path, compressed)
2016-08-11 17:15:53 +00:00
self._context.enqueue(reply_to, reply)
2016-08-11 00:57:39 +00:00
except Exception:
2016-08-11 13:57:31 +00:00
LOG.debug('While importing %r', fullname, exc_info=True)
2016-08-11 17:15:53 +00:00
self._context.enqueue(reply_to, None)
2016-08-11 00:57:39 +00:00
class LocalStream(econtext.core.Stream):
"""
Base for streams capable of starting new slaves.
"""
#: The path to the remote Python interpreter.
python_path = 'python'
2016-08-11 00:57:39 +00:00
def __init__(self, context):
super(LocalStream, self).__init__(context)
self._permitted_classes = set([
('econtext.core', 'CallError'),
('econtext.core', 'Dead'),
])
2016-08-11 00:57:39 +00:00
def on_shutdown(self, broker):
2016-08-12 03:42:14 +00:00
"""Request the slave gracefully shut itself down."""
2016-08-14 15:11:20 +00:00
LOG.debug('%r closing CALL_FUNCTION channel', self)
self.enqueue(econtext.core.CALL_FUNCTION, econtext.core._DEAD)
2016-08-11 15:46:51 +00:00
2016-08-11 17:15:53 +00:00
def _find_global(self, module_name, class_name):
2016-08-11 00:57:39 +00:00
"""Return the class implementing `module_name.class_name` or raise
`StreamError` if the module is not whitelisted."""
if (module_name, class_name) not in self._permitted_classes:
raise econtext.core.StreamError(
'%r attempted to unpickle %r in module %r',
self._context, class_name, module_name)
return getattr(sys.modules[module_name], class_name)
2016-08-11 17:15:53 +00:00
def allow_class(self, module_name, class_name):
2016-08-11 00:57:39 +00:00
"""Add `module_name` to the list of permitted modules."""
self._permitted_modules.add((module_name, class_name))
# base64'd and passed to 'python -c'. It forks, dups 0->100, creates a
# pipe, then execs a new interpreter with a custom argv. CONTEXT_NAME is
# replaced with the context name. Optimized for size.
2016-08-11 17:15:53 +00:00
def _first_stage():
2016-08-11 00:57:39 +00:00
import os,sys,zlib
R,W=os.pipe()
if os.fork():
os.dup2(0,100)
os.dup2(R,0)
os.close(R)
os.close(W)
os.execv(sys.executable,('econtext:'+CONTEXT_NAME,))
else:
os.fdopen(W,'wb',0).write(zlib.decompress(sys.stdin.read(input())))
print 'OK'
sys.exit(0)
2016-08-11 17:15:53 +00:00
def get_boot_command(self):
2016-08-11 00:57:39 +00:00
name = self._context.remote_name
if name is None:
name = '%s@%s:%d'
name %= (getpass.getuser(), socket.gethostname(), os.getpid())
2016-08-11 17:15:53 +00:00
source = inspect.getsource(self._first_stage)
2016-08-11 00:57:39 +00:00
source = textwrap.dedent('\n'.join(source.strip().split('\n')[1:]))
source = source.replace(' ', '\t')
source = source.replace('CONTEXT_NAME', repr(name))
encoded = source.encode('base64').replace('\n', '')
return [self.python_path, '-c',
'exec "%s".decode("base64")' % (encoded,)]
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, self._context)
2016-08-11 17:15:53 +00:00
def get_preamble(self):
2016-08-11 02:02:23 +00:00
source = inspect.getsource(econtext.core)
source += '\nExternalContext().main%r\n' % ((
self._context.key,
LOG.level or logging.getLogger().level or logging.INFO,
),)
2016-08-11 17:15:53 +00:00
compressed = zlib.compress(minimize_source(source))
2016-08-11 02:02:23 +00:00
return str(len(compressed)) + '\n' + compressed
2016-08-11 17:15:53 +00:00
def connect(self):
LOG.debug('%r.connect()', self)
pid, sock = create_child(*self.get_boot_command())
self.receive_side = econtext.core.Side(self, os.dup(sock.fileno()))
self.transmit_side = econtext.core.Side(self, os.dup(sock.fileno()))
2016-08-11 00:57:39 +00:00
sock.close()
2016-08-11 17:15:53 +00:00
LOG.debug('%r.connect(): child process stdin/stdout=%r',
self, self.receive_side.fd)
2016-08-11 00:57:39 +00:00
econtext.core.write_all(self.transmit_side.fd, self.get_preamble())
s = os.read(self.receive_side.fd, 4096)
if s != 'OK\n':
raise econtext.core.StreamError('Bootstrap failed; stdout: %r', s)
2016-08-11 00:57:39 +00:00
2016-08-14 00:17:03 +00:00
class SshStream(LocalStream):
2016-08-11 00:57:39 +00:00
#: The path to the SSH binary.
ssh_path = 'ssh'
2016-08-11 17:15:53 +00:00
def get_boot_command(self):
2016-08-11 00:57:39 +00:00
bits = [self.ssh_path]
if self._context.username:
bits += ['-l', self._context.username]
bits.append(self._context.hostname)
2016-08-14 00:17:03 +00:00
base = super(SshStream, self).get_boot_command()
2016-08-11 00:57:39 +00:00
return bits + map(commands.mkarg, base)
class Broker(econtext.core.Broker):
shutdown_timeout = 5.0
2016-08-11 17:15:53 +00:00
def create_listener(self, address=None, backlog=30):
2016-08-14 00:17:03 +00:00
"""Listen on `address` for connections from newly spawned contexts."""
2016-08-11 00:57:39 +00:00
self._listener = Listener(self, address, backlog)
2016-08-11 17:15:53 +00:00
def get_local(self, name='default', python_path=None):
2016-08-11 00:57:39 +00:00
"""Get the named context running on the local machine, creating it if
it does not exist."""
context = Context(self, name)
context.stream = LocalStream(context)
2016-08-11 13:47:17 +00:00
if python_path:
context.stream.python_path = python_path
2016-08-11 17:15:53 +00:00
context.stream.connect()
return self.register(context)
2016-08-11 00:57:39 +00:00
def get_remote(self, hostname, username=None, name=None, python_path=None):
2016-08-11 00:57:39 +00:00
"""Get the named remote context, creating it if it does not exist."""
if name is None:
name = hostname
context = Context(self, name, hostname, username)
2016-08-14 00:17:03 +00:00
context.stream = SshStream(context)
2016-08-11 00:57:39 +00:00
if python_path:
context.stream.python_path = python_path
2016-08-11 17:15:53 +00:00
context.stream.connect()
return self.register(context)
2016-08-11 00:57:39 +00:00
class Context(econtext.core.Context):
def __init__(self, *args, **kwargs):
super(Context, self).__init__(*args, **kwargs)
self.responder = ModuleResponder(self)
self.log_forwarder = LogForwarder(self)
def on_disconnect(self, broker):
2016-08-11 00:57:39 +00:00
self.stream = None
def call_with_deadline(self, deadline, with_context, fn, *args, **kwargs):
"""Invoke `fn([context,] *args, **kwargs)` in the external context.
If `with_context` is True, pass its
:py:class:`econtext.core.ExternalContext` instance as first parameter.
If `deadline` is not ``None``, expire the call after `deadline`
seconds. If `deadline` is ``None``, the invocation may block
indefinitely."""
LOG.debug('%r.call_with_deadline(%r, %r, %r, *%r, **%r)',
self, deadline, with_context, fn, args, kwargs)
if isinstance(fn, types.MethodType) and \
isinstance(fn.im_self, (type, types.ClassType)):
klass = fn.im_self.__name__
else:
klass = None
call = (with_context, fn.__module__, klass, fn.__name__, args, kwargs)
result = self.enqueue_await_reply(econtext.core.CALL_FUNCTION,
deadline, call)
if isinstance(result, econtext.core.CallError):
raise result
return result
def call(self, fn, *args, **kwargs):
"""Invoke `fn(*args, **kwargs)` in the external context."""
return self.call_with_deadline(None, False, fn, *args, **kwargs)