mitogen/examples/the_basics.py

280 lines
12 KiB
Python

#
# This program is a stand-in for good intro docs. It just documents various
# basics of using Mitogen.
#
from __future__ import absolute_import
from __future__ import print_function
import hashlib
import io
import os
import mitogen.core
import mitogen.master
import mitogen.service
import mitogen.utils
def get_file_contents(path):
"""
Get the contents of a file.
"""
with open(path, 'rb') as fp:
# mitogen.core.Blob() is a bytes subclass with a repr() that returns a
# summary of the blob, rather than the raw blob data. This makes
# logging output *much* nicer. Unlike most custom types, blobs can be
# serialized.
return mitogen.core.Blob(fp.read())
def put_file_contents(path, s):
"""
Write the contents of a file.
"""
with open(path, 'wb') as fp:
fp.write(s)
def streamy_download_file(context, path):
"""
Fetch a file from the FileService hosted by `context`.
"""
bio = io.BytesIO()
# FileService.get() is not actually an exposed service method, it's just a
# classmethod that wraps up the complicated dance of implementing the
# transfer.
ok, metadata = mitogen.service.FileService.get(context, path, bio)
return {
'success': ok,
'metadata': metadata,
'size': len(bio.getvalue()),
}
def md5sum(path):
"""
Return the MD5 checksum for a file.
"""
return hashlib.md5(get_file_contents(path)).hexdigest()
def work_on_machine(context):
"""
Do stuff to a remote context.
"""
print("Created context. Context ID is", context.context_id)
# You don't need to understand any/all of this, but it's helpful to grok
# the whole chain:
# - Context.call() is a light wrapper around .call_async(), the wrapper
# simply blocks the caller until a reply arrives.
# - .call_async() serializes the call signature into a message and passes
# it to .send_async()
# - .send_async() creates a mitogen.core.Receiver() on the local router.
# The receiver constructor uses Router.add_handle() to allocate a
# 'reply_to' handle and install a callback function that wakes the
# receiver when a reply message arrives.
# - .send_async() puts the reply handle in Message.reply_to field and
# passes it to .send()
# - Context.send() stamps the destination context ID into the
# Message.dst_id field and passes it to Router.route()
# - Router.route() uses Broker.defer() to schedule _async_route(msg)
# on the Broker thread.
# [broker thread]
# - The broker thread wakes and calls _async_route(msg)
# - Router._async_route() notices 'dst_id' is for a remote context and
# looks up the stream on which messages for dst_id should be sent (may be
# direct connection or not), and calls Stream.send()
# - Stream.send() packs the message into a bytestring, appends it to
# Stream._output_buf, and calls Broker.start_transmit()
# - Broker finishes work, reenters IO loop. IO loop wakes due to writeable
# stream.
# - Stream.on_transmit() writes the full/partial buffer to SSH, calls
# stop_transmit() to mark the stream unwriteable once _output_buf is
# empty.
# - Broker IO loop sleeps, no readers/writers.
# - Broker wakes due to SSH stream readable.
# - Stream.on_receive() called, reads the reply message, converts it to a
# Message and passes it to Router._async_route().
# - Router._async_route() notices message is for local context, looks up
# target handle in the .add_handle() registry.
# - Receiver._on_receive() called, appends message to receiver queue.
# [main thread]
# - Receiver.get() used to block the original Context.call() wakes and pops
# the message from the queue.
# - Message data (pickled return value) is deserialized and returned to the
# caller.
print("It's running on the local machine. Its PID is",
context.call(os.getpid))
# Now let's call a function defined in this module. On receiving the
# function call request, the child attempts to import __main__, which is
# initially missing, causing the importer in the child to request it from
# its parent. That causes _this script_ to be sent as the module source
# over the wire.
print("Calling md5sum(/etc/passwd) in the child:",
context.call(md5sum, '/etc/passwd'))
# Now let's "transfer" a file. The simplest way to do this is calling a
# function that returns the file data, which is totally fine for small
# files.
print("Download /etc/passwd via function call: %d bytes" % (
len(context.call(get_file_contents, '/etc/passwd'))
))
# And using function calls, in the other direction:
print("Upload /tmp/blah via function call: %s" % (
context.call(put_file_contents, '/tmp/blah', b'blah!'),
))
# Now lets transfer what might be a big files. The problem with big files
# is that they may not fit in RAM. This uses mitogen.services.FileService
# to implement streamy file transfer instead. The sender must have a
# 'service pool' running that will host FileService. First let's do the
# 'upload' direction, where the master hosts FileService.
# Steals the 'Router' reference from the context object. In a real app the
# pool would be constructed once at startup, this is just demo code.
file_service = mitogen.service.FileService(context.router)
# Start the pool.
pool = mitogen.service.Pool(context.router, services=[file_service])
# Grant access to a file on the local disk from unprivileged contexts.
# .register() is also exposed as a service method -- you can call it on a
# child context from any more privileged context.
file_service.register('/etc/passwd')
# Now call our wrapper function that knows how to handle the transfer. In a
# real app, this wrapper might also set ownership/modes or do any other
# app-specific stuff relating to the file that was transferred.
print("Streamy upload /etc/passwd: remote result: %s" % (
context.call(
streamy_download_file,
# To avoid hard-wiring streamy_download_file(), we want to pass it
# a Context object that hosts the file service it should request
# files from. Router.myself() returns a Context referring to this
# process.
context=router.myself(),
path='/etc/passwd',
),
))
# Shut down the pool now we're done with it, else app will hang at exit.
# Once again, this should only happen once at app startup/exit, not for
# every file transfer!
pool.stop(join=True)
# Now let's do the same thing but in reverse: we use FileService on the
# remote download a file. This uses context.call_service(), which invokes a
# special code path that causes auto-initialization of a thread pool in the
# target, and auto-construction of the target service, but only if the
# service call was made by a more privileged context. We could write a
# helper function that runs in the remote to do all that by hand, but the
# library handles it for us.
# Make the file accessible. A future FileService could avoid the need for
# this for privileged contexts.
context.call_service(
service_name=mitogen.service.FileService,
method_name='register',
path='/etc/passwd'
)
# Now we can use our streamy_download_file() function in reverse -- running
# it from this process and having it fetch from the remote process:
print("Streamy download /etc/passwd: result: %s" % (
streamy_download_file(context, '/etc/passwd'),
))
def main():
# Setup logging. Mitogen produces a LOT of logging. Over the course of the
# stable series, Mitogen's loggers will be carved up so more selective /
# user-friendly logging is possible. mitogen.log_to_file() just sets up
# something basic, defaulting to INFO level, but you can override from the
# command-line by passing MITOGEN_LOG_LEVEL=debug or MITOGEN_LOG_LEVEL=io.
# IO logging is sometimes useful for hangs, but it is often creates more
# confusion than it solves.
mitogen.utils.log_to_file()
# Construct the Broker thread. It manages an async IO loop listening for
# reads from any active connection, or wakes from any non-Broker thread.
# Because Mitogen uses a background worker thread, it is extremely
# important to pay attention to the use of UNIX fork in your code --
# forking entails making a snapshot of the state of all locks in the
# program, including those in the logging module, and thus can create code
# that appears to work for a long time, before deadlocking randomly.
# Forking in a Mitogen app requires significant upfront planning!
broker = mitogen.master.Broker()
# Construct a Router. This accepts messages (mitogen.core.Message) and
# either dispatches locally addressed messages to local handlers (added via
# Router.add_handle()) on the broker thread, or forwards the message
# towards the target context.
# The router also acts as an uglyish God object for creating new
# connections. This was a design mistake, really those methods should be
# directly imported from e.g. 'mitogen.ssh'.
router = mitogen.master.Router(broker)
# Router can act like a context manager. It simply ensures
# Broker.shutdown() is called on exception / exit. That prevents the app
# hanging due to a forgotten background thread. For throwaway scripts,
# there are also decorator versions "@mitogen.main()" and
# "@mitogen.utils.with_router" that do the same thing with less typing.
with router:
# Now let's construct a context. The '.local()' constructor just creates
# the context as a subprocess, the simplest possible case.
child = router.local()
print("Created a context:", child)
print()
# This demonstrates the standard IO redirection. We call the print
# function in the remote context, that should cause a log message to be
# emitted. Any subprocesses started by the remote also get the same
# treatment, so it's very easy to spot otherwise discarded errors/etc.
# from remote tools.
child.call(print, "Hello from child.")
# Context objects make it semi-convenient to treat the local machine the
# same as a remote machine.
work_on_machine(child)
# Now let's construct a proxied context. We'll simply use the .local()
# constructor again, but construct it via 'child'. In effect we are
# constructing a sub-sub-process. Instead of .local() here, we could
# have used .sudo() or .ssh() or anything else.
subchild = router.local(via=child)
print()
print()
print()
print("Created a context as a child of another context:", subchild)
# Do everything again with the new child.
work_on_machine(subchild)
# We can selectively shut down individual children if we want:
subchild.shutdown(wait=True)
# Or we can simply fall off the end of the scope, effectively calling
# Broker.shutdown(), which causes all children to die as part of
# shutdown.
# The child module importer detects the execution guard below and removes any
# code appearing after it, and refuses to execute "__main__" if it is absent.
# This is necessary to prevent a common problem where people try to call
# functions defined in __main__ without first wrapping it up to be importable
# as a module, which previously hung the target, or caused bizarre recursive
# script runs.
if __name__ == '__main__':
main()