mitogen/examples/mitop.py

249 lines
7.6 KiB
Python

"""
mitop.py is a version of the UNIX top command that knows how to display process
lists from multiple machines in a single listing.
This is a basic, initial version showing overall program layout. A future
version will extend it to:
* Only notify the master of changed processes, rather than all processes.
* Runtime-reconfigurable filters and aggregations handled on the remote
machines rather than forcing a bottleneck in the master.
"""
import curses
import subprocess
import sys
import time
import mitogen.core
import mitogen.master
import mitogen.select
import mitogen.utils
class Host(object):
"""
A target host from the perspective of the master process.
"""
#: String hostname.
name = None
#: mitogen.parent.Context used to call functions on the host.
context = None
#: mitogen.core.Receiver the target delivers state updates to.
recv = None
def __init__(self):
#: Mapping of pid -> Process() for each process described
#: in the host's previous status update.
self.procs = {}
class Process(object):
"""
A single process running on a target host.
"""
host = None
user = None
pid = None
ppid = None
pgid = None
command = None
rss = None
pcpu = None
rss = None
def child_main(sender, delay):
"""
Executed on the main thread of the Python interpreter running on each
target machine, Context.call() from the master. It simply sends the output
of the UNIX 'ps' command at regular intervals toward a Receiver on master.
:param mitogen.core.Sender sender:
The Sender to use for delivering our result. This could target
anywhere, but the sender supplied by the master simply causes results
to be delivered to the master's associated per-host Receiver.
"""
args = ['ps', '-axwwo', 'user,pid,ppid,pgid,%cpu,rss,command']
while True:
sender.send(subprocess.check_output(args))
time.sleep(delay)
def parse_output(host, s):
prev_pids = set(host.procs)
for line in s.splitlines()[1:]:
bits = line.split(None, 6)
pid = int(bits[1])
new = pid not in prev_pids
prev_pids.discard(pid)
try:
proc = host.procs[pid]
except KeyError:
host.procs[pid] = proc = Process()
proc.hostname = host.name
proc.new = new
proc.user = bits[0]
proc.pid = pid
proc.ppid = int(bits[2])
proc.pgid = int(bits[3])
proc.pcpu = float(bits[4])
proc.rss = int(bits[5]) / 1024
proc.command = bits[6]
# These PIDs had no update, so probably they are dead now.
for pid in prev_pids:
del host.procs[pid]
class Painter(object):
"""
This is ncurses (screen drawing) magic, you can ignore it. :)
"""
def __init__(self, hosts):
self.stdscr = curses.initscr()
curses.start_color()
self.height, self.width = self.stdscr.getmaxyx()
curses.cbreak()
curses.noecho()
self.stdscr.keypad(1)
self.hosts = hosts
self.format = (
'%(hostname)10.10s '
'%(pid)7.7s '
'%(ppid)7.7s '
'%(pcpu)6.6s '
'%(rss)5.5s '
'%(command)20s'
)
def close(self):
curses.endwin()
def paint(self):
self.stdscr.erase()
self.stdscr.addstr(0, 0, time.ctime())
all_procs = []
for host in self.hosts:
all_procs.extend(host.procs.itervalues())
all_procs.sort(key=(lambda proc: -proc.pcpu))
self.stdscr.addstr(1, 0, self.format % {
'hostname': 'HOST',
'pid': 'PID',
'ppid': 'PPID',
'pcpu': '%CPU',
'rss': 'RSS',
'command': 'COMMAND',
})
for i, proc in enumerate(all_procs):
if (i+3) >= self.height:
break
if proc.new:
self.stdscr.attron(curses.A_BOLD)
else:
self.stdscr.attroff(curses.A_BOLD)
self.stdscr.addstr(2+i, 0, self.format % dict(
vars(proc),
command=proc.command[:self.width-36]
))
self.stdscr.refresh()
def master_main(painter, router, select, delay):
"""
Loop until CTRL+C is pressed, waiting for the next result delivered by the
Select. Use parse_output() to turn that result ('ps' command output) into
rich data, and finally repaint the screen if the repaint delay has passed.
"""
next_paint = 0
while True:
msg = select.get()
parse_output(msg.receiver.host, msg.unpickle())
if next_paint < time.time():
next_paint = time.time() + delay
painter.paint()
@mitogen.main()
def main(router):
"""
Main program entry point. @mitogen.main() is just a helper to handle
reliable setup/destruction of Broker, Router and the logging package.
"""
argv = sys.argv[1:]
if not len(argv):
print('mitop: Need a list of SSH hosts to connect to.')
sys.exit(1)
delay = 2.0
select = mitogen.select.Select(oneshot=False)
hosts = []
# For each hostname on the command line, create a Host instance, a Mitogen
# connection, a Receiver to accept messages from the host, and finally
# start child_main() on the host to pump messages into the receiver.
for hostname in argv:
print('Starting on', hostname)
host = Host()
host.name = hostname
if host.name == 'localhost':
host.context = router.local()
else:
host.context = router.ssh(hostname=host.name)
# A receiver wires up a handle (via Router.add_handler()) to an
# internal thread-safe queue object, which can be drained through calls
# to recv.get().
host.recv = mitogen.core.Receiver(router)
host.recv.host = host
# But we don't want to receive data from just one receiver, we want to
# receive data from many. In this case we can use a Select(). It knows
# how to efficiently sleep while waiting for the first message sent to
# many receivers.
select.add(host.recv)
# The inverse of a Receiver is a Sender. Unlike receivers, senders are
# serializable, so we can call the .to_sender() helper method to create
# one equivalent to our host's receiver, and pass it directly to the
# host as a function parameter.
sender = host.recv.to_sender()
# Finally invoke the function in the remote target. Since child_main()
# is an infinite loop, using .call() would block the parent, since
# child_main() never returns. Instead use .call_async(), which returns
# another Receiver. We also want to wait for results from it --
# although child_main() never returns, if it crashes the exception will
# be delivered instead.
call_recv = host.context.call_async(child_main, sender, delay)
call_recv.host = host
# Adding call_recv to the select will cause mitogen.core.CallError to
# be thrown by .get() if startup of any context fails, causing halt of
# master_main(), and the exception to be printed.
select.add(call_recv)
hosts.append(host)
# Painter just wraps up all the prehistory ncurses code and keeps it out of
# master_main().
painter = Painter(hosts)
try:
try:
master_main(painter, router, select, delay)
except KeyboardInterrupt:
# Shut down gracefully when the user presses CTRL+C.
pass
finally:
painter.close()