Initial pass at module preloading
* Don't implement the rules for when preloading occurs yet * Don't attempt to streamily preload modules downstream while this context hasn't yet received the final module. There is quite significant latency buried in here, but for now it's a lot of work to fix. This works well enough to handle at least the mitogen package, but it's likely broken for anything bigger.
This commit is contained in:
parent
e01632c431
commit
f44356af32
|
@ -56,6 +56,7 @@ FORWARD_LOG = 102
|
|||
ADD_ROUTE = 103
|
||||
ALLOCATE_ID = 104
|
||||
SHUTDOWN = 105
|
||||
LOAD_MODULE = 106
|
||||
|
||||
CHUNK_SIZE = 16384
|
||||
|
||||
|
@ -305,6 +306,8 @@ def _queue_interruptible_get(queue, timeout=None, block=True):
|
|||
if timeout is not None:
|
||||
timeout += time.time()
|
||||
|
||||
LOG.info('timeout = %r, block = %r', timeout, block)
|
||||
|
||||
msg = None
|
||||
while msg is None and (timeout is None or timeout > time.time()):
|
||||
try:
|
||||
|
@ -395,7 +398,7 @@ class Importer(object):
|
|||
|
||||
:param context: Context to communicate via.
|
||||
"""
|
||||
def __init__(self, context, core_src):
|
||||
def __init__(self, router, context, core_src):
|
||||
self._context = context
|
||||
self._present = {'mitogen': [
|
||||
'mitogen.ansible',
|
||||
|
@ -407,13 +410,19 @@ class Importer(object):
|
|||
'mitogen.sudo',
|
||||
'mitogen.utils',
|
||||
]}
|
||||
self._lock = threading.Lock()
|
||||
# Presence of an entry in this map indicates in-flight GET_MODULE.
|
||||
self._callbacks = {}
|
||||
self.tls = threading.local()
|
||||
router.add_handler(self._on_load_module, LOAD_MODULE)
|
||||
self._cache = {}
|
||||
if core_src:
|
||||
self._cache['mitogen.core'] = (
|
||||
'mitogen.core',
|
||||
None,
|
||||
'mitogen/core.py',
|
||||
zlib.compress(core_src),
|
||||
[],
|
||||
)
|
||||
|
||||
def __repr__(self):
|
||||
|
@ -468,23 +477,53 @@ class Importer(object):
|
|||
# later.
|
||||
os.environ['PBR_VERSION'] = '0.0.0'
|
||||
|
||||
def _on_load_module(self, msg):
|
||||
tup = msg.unpickle()
|
||||
fullname = tup[0]
|
||||
LOG.debug('Importer._on_load_module(%r)', fullname)
|
||||
|
||||
self._lock.acquire()
|
||||
try:
|
||||
self._cache[fullname] = tup
|
||||
callbacks = self._callbacks.pop(fullname, [])
|
||||
finally:
|
||||
self._lock.release()
|
||||
|
||||
for callback in callbacks:
|
||||
callback()
|
||||
|
||||
def _request_module(self, fullname, callback):
|
||||
self._lock.acquire()
|
||||
try:
|
||||
present = fullname in self._cache
|
||||
if not present:
|
||||
funcs = self._callbacks.get(fullname)
|
||||
if funcs is not None:
|
||||
LOG.debug('_request_module(%r): in flight', fullname)
|
||||
funcs.append(callback)
|
||||
else:
|
||||
LOG.debug('_request_module(%r): new request', fullname)
|
||||
self._callbacks[fullname] = [callback]
|
||||
self._context.send(Message(data=fullname, handle=GET_MODULE))
|
||||
finally:
|
||||
self._lock.release()
|
||||
|
||||
if present:
|
||||
callback()
|
||||
|
||||
def load_module(self, fullname):
|
||||
LOG.debug('Importer.load_module(%r)', fullname)
|
||||
self._load_module_hacks(fullname)
|
||||
|
||||
try:
|
||||
ret = self._cache[fullname]
|
||||
except KeyError:
|
||||
self._cache[fullname] = ret = (
|
||||
self._context.send_await(
|
||||
Message(data=fullname, handle=GET_MODULE)
|
||||
)
|
||||
)
|
||||
event = threading.Event()
|
||||
self._request_module(fullname, event.set)
|
||||
event.wait()
|
||||
|
||||
ret = self._cache[fullname]
|
||||
if ret is None:
|
||||
raise ImportError('Master does not have %r' % (fullname,))
|
||||
|
||||
pkg_present = ret[0]
|
||||
pkg_present = ret[1]
|
||||
mod = sys.modules.setdefault(fullname, imp.new_module(fullname))
|
||||
mod.__file__ = self.get_filename(fullname)
|
||||
mod.__loader__ = self
|
||||
|
@ -500,11 +539,11 @@ class Importer(object):
|
|||
|
||||
def get_filename(self, fullname):
|
||||
if fullname in self._cache:
|
||||
return 'master:' + self._cache[fullname][1]
|
||||
return 'master:' + self._cache[fullname][2]
|
||||
|
||||
def get_source(self, fullname):
|
||||
if fullname in self._cache:
|
||||
return zlib.decompress(self._cache[fullname][2])
|
||||
return zlib.decompress(self._cache[fullname][3])
|
||||
|
||||
|
||||
class LogHandler(logging.Handler):
|
||||
|
@ -593,6 +632,7 @@ class Stream(BasicStream):
|
|||
self._router = router
|
||||
self.remote_id = remote_id
|
||||
self.name = 'default'
|
||||
self.modules_sent = set()
|
||||
self.construct(**kwargs)
|
||||
self._output_buf = collections.deque()
|
||||
|
||||
|
@ -853,6 +893,10 @@ class Router(object):
|
|||
def __repr__(self):
|
||||
return 'Router(%r)' % (self.broker,)
|
||||
|
||||
def stream_by_id(self, dst_id):
|
||||
return self._stream_by_id.get(dst_id,
|
||||
self._stream_by_id.get(mitogen.parent_id))
|
||||
|
||||
def on_disconnect(self, stream, broker):
|
||||
"""Invoked by Stream.on_disconnect()."""
|
||||
for context in self._context_by_id.itervalues():
|
||||
|
@ -1132,7 +1176,7 @@ class ExternalContext(object):
|
|||
else:
|
||||
core_src = None
|
||||
|
||||
self.importer = Importer(self.parent, core_src)
|
||||
self.importer = Importer(self.router, self.parent, core_src)
|
||||
sys.meta_path.append(self.importer)
|
||||
|
||||
def _setup_package(self, context_id, parent_ids):
|
||||
|
|
|
@ -599,13 +599,14 @@ class ModuleFinder(object):
|
|||
stack.extend(set(fullnames).difference(found, stack, [fullname]))
|
||||
found.update(fullnames)
|
||||
|
||||
return found
|
||||
return sorted(found)
|
||||
|
||||
|
||||
class ModuleResponder(object):
|
||||
def __init__(self, router):
|
||||
self._router = router
|
||||
self._finder = ModuleFinder()
|
||||
self._cache = {} # fullname -> pickled
|
||||
router.add_handler(self._on_get_module, mitogen.core.GET_MODULE)
|
||||
|
||||
def __repr__(self):
|
||||
|
@ -622,6 +623,40 @@ class ModuleResponder(object):
|
|||
return src[:match.start()]
|
||||
return src
|
||||
|
||||
def _build_tuple(self, fullname):
|
||||
if fullname in self._cache:
|
||||
return self._cache[fullname]
|
||||
|
||||
path, source, is_pkg = self._finder.get_module_source(fullname)
|
||||
if source is None:
|
||||
raise ImportError('could not find %r' % (fullname,))
|
||||
|
||||
if is_pkg:
|
||||
pkg_present = get_child_modules(path, fullname)
|
||||
LOG.debug('_build_tuple(%r, %r) -> %r',
|
||||
path, fullname, pkg_present)
|
||||
else:
|
||||
pkg_present = None
|
||||
|
||||
if fullname == '__main__':
|
||||
source = self.neutralize_main(source)
|
||||
compressed = zlib.compress(source)
|
||||
related = list(self._finder.find_related(fullname))
|
||||
# 0:fullname 1:pkg_present 2:path 3:compressed 4:related
|
||||
return fullname, pkg_present, path, compressed, related
|
||||
|
||||
def _send_load_module(self, msg, fullname):
|
||||
stream = self._router.stream_by_id(msg.src_id)
|
||||
if fullname not in stream.sent_modules:
|
||||
self._router.route(
|
||||
mitogen.core.Message.pickled(
|
||||
self._build_tuple(fullname),
|
||||
dst_id=msg.src_id,
|
||||
handle=mitogen.core.LOAD_MODULE,
|
||||
)
|
||||
)
|
||||
stream.sent_modules.add(fullname)
|
||||
|
||||
def _on_get_module(self, msg):
|
||||
LOG.debug('%r.get_module(%r)', self, msg)
|
||||
if msg == mitogen.core._DEAD:
|
||||
|
@ -629,33 +664,16 @@ class ModuleResponder(object):
|
|||
|
||||
fullname = msg.data
|
||||
try:
|
||||
path, source, is_pkg = self._finder.get_module_source(fullname)
|
||||
if source is None:
|
||||
raise ImportError('could not find %r' % (fullname,))
|
||||
|
||||
if is_pkg:
|
||||
pkg_present = get_child_modules(path, fullname)
|
||||
LOG.debug('get_child_modules(%r, %r) -> %r',
|
||||
path, fullname, pkg_present)
|
||||
else:
|
||||
pkg_present = None
|
||||
|
||||
if fullname == '__main__':
|
||||
source = self.neutralize_main(source)
|
||||
compressed = zlib.compress(source)
|
||||
related = list(self._finder.find_related(fullname))
|
||||
self._router.route(
|
||||
mitogen.core.Message.pickled(
|
||||
(pkg_present, path, compressed, related),
|
||||
dst_id=msg.src_id,
|
||||
handle=msg.reply_to,
|
||||
)
|
||||
)
|
||||
tup = self._build_tuple(fullname)
|
||||
related = tup[4]
|
||||
for name in related + [fullname]:
|
||||
if name not in ('mitogen', 'mitogen.core'):
|
||||
self._send_load_module(msg, name)
|
||||
except Exception:
|
||||
LOG.debug('While importing %r', fullname, exc_info=True)
|
||||
self._router.route(
|
||||
mitogen.core.Message.pickled(
|
||||
None,
|
||||
(fullname, None, None, None, []),
|
||||
dst_id=msg.src_id,
|
||||
handle=msg.reply_to,
|
||||
)
|
||||
|
@ -682,41 +700,28 @@ class ModuleForwarder(object):
|
|||
return
|
||||
|
||||
fullname = msg.data
|
||||
cached = self.importer._cache.get(fullname)
|
||||
if cached:
|
||||
LOG.debug('%r._on_get_module(): using cached %r', self, fullname)
|
||||
self.router.route(
|
||||
mitogen.core.Message.pickled(
|
||||
cached,
|
||||
dst_id=msg.src_id,
|
||||
handle=msg.reply_to,
|
||||
)
|
||||
)
|
||||
else:
|
||||
LOG.debug('%r._on_get_module(): requesting %r', self, fullname)
|
||||
self.parent_context.send(
|
||||
mitogen.core.Message(
|
||||
data=msg.data,
|
||||
handle=mitogen.core.GET_MODULE,
|
||||
reply_to=self.router.add_handler(
|
||||
lambda m: self._on_got_source(m, msg),
|
||||
persist=False
|
||||
)
|
||||
)
|
||||
)
|
||||
callback = lambda: self._on_cache_callback(msg, fullname)
|
||||
self.importer._request_module(fullname, callback)
|
||||
|
||||
def _on_got_source(self, msg, original_msg):
|
||||
LOG.debug('%r._on_got_source(%r, %r)', self, msg, original_msg)
|
||||
fullname = original_msg.data
|
||||
self.importer._cache[fullname] = msg.unpickle()
|
||||
def _send_one_module(self, msg, tup):
|
||||
self.router.route(
|
||||
mitogen.core.Message(
|
||||
data=msg.data,
|
||||
dst_id=original_msg.src_id,
|
||||
handle=original_msg.reply_to,
|
||||
mitogen.core.Message.pickled(
|
||||
self.importer._cache[fullname],
|
||||
dst_id=msg.src_id,
|
||||
handle=mitogen.core.LOAD_MODULE,
|
||||
)
|
||||
)
|
||||
|
||||
def _on_cache_callback(self, msg, fullname):
|
||||
LOG.debug('%r._on_get_module(): sending %r', self, fullname)
|
||||
tup = self.importer._cache[fullname]
|
||||
if tup is not None:
|
||||
for related in tup[4]:
|
||||
rtup = self.importer._cache[fullname]
|
||||
self._send_one_module(rtup)
|
||||
|
||||
self._send_one_module(tup)
|
||||
|
||||
|
||||
class Stream(mitogen.core.Stream):
|
||||
"""
|
||||
|
@ -731,6 +736,10 @@ class Stream(mitogen.core.Stream):
|
|||
#: True to cause context to write /tmp/mitogen.stats.<pid>.<thread>.log.
|
||||
profiling = False
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(Stream, self).__init__(*args, **kwargs)
|
||||
self.sent_modules = set()
|
||||
|
||||
def construct(self, remote_name=None, python_path=None, debug=False,
|
||||
profiling=False, **kwargs):
|
||||
"""Get the named context running on the local machine, creating it if
|
||||
|
|
Loading…
Reference in New Issue