boinc/sched/start

562 lines
18 KiB
Plaintext
Raw Normal View History

#!/usr/bin/env python
## $Id$
'''
A cron-like program to start/stop BOINC server daemons, and run utility tasks
START parses config.xml and runs <daemon> and <task> entries. A config.xml
file looks like this:
<boinc>
<config>
...
<!-- optional; defaults as indicated: -->
<project_dir>../</project_dir> <!-- relative to location of "start" -->
<bin_dir>bin</bin_dir> <!-- relative to project_dir -->
<cgi_bin_dir>cgi-bin</cgi_dir>
<log_dir>log</log_dir>
<pid_dir>pid</pid_dir>
...
</config>
<daemons>
<daemon>
<cmd>feeder -d 3</cmd>
</daemon>
</daemons>
<tasks>
<task>
<cmd>get_load</cmd>
<output>get_load.out</output>
<period>5 min</period>
</task>
<task>
<cmd>echo "HI" | mail quarl</cmd>
<output>/dev/null</output>
<period>1 day</period>
</task>
</tasks>
</boinc>
CMD and PERIOD are required. OUTPUT specifies the file to output and by
default is COMMAND_BASE_NAME.out. Commands are run in the <bin_dir> directory
which is a path relative to <project_dir> and output to <log_dir>.
Invocation methods:
--enable (default if invoked as "start")
Set BOINC in ENABLED mode and start daemons
--cron (default if invoked as "stop")
If BOINC is in ENABLED mode start daemons and run tasks Else do
nothing. This command is intended to be run as a real cron job
every five minutes.
--disable Set BOINC in DISABLED mode and stop daemons.
--status Show status.
See "start --help" for options.
A daemon is a task for which a process ID is recorded in the <pid_dir>
directory and is sent a SIGINT in a DISABLE operation.
IMPLEMENTATION:
Daemons:
Writes a PID to pid_dir/command.pid.
Non-daemon tasks:
Writes a timestamp to run_state.xml to remember when the task was last run.
Both:
A lock file (pid_dir/command.lock) prevents tasks and daemons from being run
again when they are currently running.
Start/stop:
The main script is "start"; sym-link or hard-link "start" to "stop".
'''
import sys, os, getopt, time, glob, fcntl, signal
from boinc_config import *
verbose = os.isatty(sys.stdout.fileno())
verbose_daemon_run = 0
# how long (in seconds) parent should wait before continuing after a
# fork. this is just a safety measure in case anything doesn't play nice if
# starting simultaneously. also it keeps output in sequence.
fork_delay = 0.1
ignore_timestamps = False
def get_dir(name):
return config.config.__dict__.get(name+'_dir') or os.path.join(project_dir,name)
def ensure_get_dir(name):
f = get_dir(name)
ensure_dir(f)
return f
def is_daemon(task):
'''returns true if task is a daemon'''
return task._name == 'daemon'
def get_task_command_basename(task):
return os.path.basename(task.cmd.split()[0])
def get_task_output_name(task):
return os.path.join(log_dir,
task.__dict__.get('output') or get_task_command_basename(task) + '.out')
def get_daemon_output_name(task):
return os.path.join(log_dir,
task.__dict__.get('output') or get_task_command_basename(task) + '.log')
def get_daemon_pid_name(task):
return os.path.join(pid_dir,
task.__dict__.get('pid_file') or get_task_command_basename(task) + '.pid')
def output_is_file(filename):
return filename and not filename.startswith('/dev/')
def get_task_lock_name(task):
return os.path.join(pid_dir,
task.__dict__.get('lock_file') or
(output_is_file(task.__dict__.get('output')) and task.__dict__.get('output')+'.lock') or
get_task_command_basename(task) + '.lock')
def ensure_dir(filename):
try:
os.mkdir(filename)
except OSError:
return
def timestamp(t = None):
return time.strftime('%Y/%m/%d %H:%M:%S', time.localtime(t or time.time()))
def safe_read_int(filename):
try:
return int(open(filename).readline().strip())
except:
return 0
def get_stop_trigger_filename():
return os.path.join(project_dir, 'stop_servers')
def write_stop_trigger():
print >>open(get_stop_trigger_filename(),'w'), '<stop/>'
def remove_stop_trigger():
if os.path.exists(get_stop_trigger_filename()):
os.unlink(get_stop_trigger_filename())
def safe_unlink(filename):
try:
os.unlink(filename)
except OSError, e:
print "Couldn't unlink %s:"%filename,e
def redirect(stdout='/dev/null', stderr=None, stdin='/dev/null'):
'''
Redirects stdio. The stdin, stdout, and stderr arguments are file names
that will be opened and be used to replace the standard file descriptors
in sys.stdin, sys.stdout, and sys.stderr. These arguments are optional
and default to /dev/null.
'''
si = open(stdin, 'r')
if not stderr: stderr = stdout
se = open(stderr, 'a+', 0)
if stdout == stderr:
so = se
else:
so = open(stdout, 'a+')
# Redirect standard file descriptors.
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
def fork():
''' fork with fork_delay '''
pid = os.fork()
if pid:
time.sleep(fork_delay)
return pid
def double_fork():
'''
This forks the current process into a daemon using a double-fork.
Returns 1 for parent, 0 for child.
See: http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16
http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012
'''
# this is necessary because otherwise any buffered output would get
# printed twice after the fork!
sys.stdout.flush()
# Do first fork.
try:
pid = fork()
if pid > 0: return 1
except OSError, e:
sys.stderr.write("fork #1 failed: (%d) %s\n" % (e.errno, e.strerror))
sys.exit(1)
# Decouple from parent environment.
os.chdir("/")
os.umask(0)
os.setsid()
# Do second fork.
try:
pid = os.fork()
if pid > 0: sys._exit(0) # Exit second parent.
except OSError, e:
sys.stderr.write("fork #2 failed: (%d) %s\n" % (e.errno, e.strerror))
sys._exit(1)
return 0
def write_pid_file(pidfile):
print >>open(pidfile,'w'), os.getpid()
def is_pid_running(pid):
try:
os.kill(pid,0)
return True
except OSError:
return False
# if we ever want to use this on windows see:
# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/65203
# returns 0 on success, -1 on error
locks = []
def lock_file(filename):
global locks
file = open(filename,'w')
locks.append(file)
try:
return fcntl.flock(file.fileno(), fcntl.LOCK_EX|fcntl.LOCK_NB)
except IOError:
return -1
def is_lock_file_locked(filename):
if lock_file(filename):
return True
else:
os.unlink(filename)
def contains_shell_characters(command):
return ('"' in command or "'" in command or
'\\' in command or '|' in command or
'>' in command)
def exec_command_string(command):
args = command.strip().split()
# set default path for program to <bin_dir>:
args[0] = os.path.realpath(os.path.join( bin_dir, args[0] ))
os.chdir(log_dir)
try:
if contains_shell_characters(command):
os.execl('/bin/sh', 'sh', '-c', ' '.join(args))
else:
os.execv( args[0], args )
# on success we don't reach here
print >>sys.stderr, "Couldn't exec '%s'"%command
except OSError, e:
print >>sys.stderr, "Couldn't execute '%s':" %command, e
os._exit(1)
def lookup_task_run_state(task):
for run_state_task in run_state.tasks:
if run_state_task.cmd == task.cmd:
return run_state_task
run_state_task = run_state.tasks.make_node_and_append('task')
run_state_task.cmd = task.cmd
run_state_task.last_run = 0
return run_state_task
def interpret_period(str):
''' "5 min" -> 5*60 ; "1 hour" -> 1*60*60; "2" -> 2*60 '''
s = str.strip().split()
try:
num = int(s[0])
if len(s) == 1:
return num*60
if len(s) == 2:
u = s[1].lower()
if u.startswith('s'):
return num
if u.startswith('m'):
return num*60
if u.startswith('h'):
return num*60*60
if u.startswith('d'):
return num*60*60*24
if u.startswith('w'):
return num*60*60*24*7
if u.startswith('mo'):
return num*60*60*24*30
except ValueError:
pass
raise SystemExit('Invalid task period "%s"'%str)
def when_will_task_next_run(task, task_run_state):
return float(task_run_state.last_run) + interpret_period(task.period)
def time_to_run_task(task, task_run_state):
return (ignore_timestamps or
(time.time() >= when_will_task_next_run(task,task_run_state)))
def update_task_timestamp(task_run_state):
task_run_state.last_run = time.time()
def run_task(task):
'''Fork and exec command without stdout/err redirection'''
task_run_state = lookup_task_run_state(task)
if not time_to_run_task(task, task_run_state):
if verbose:
print " Not running task because not time yet:",task.cmd
return
if verbose:
print " Running task:", task.cmd
update_task_timestamp(task_run_state)
# we don't need the full double-fork because this should finish quickly
if fork() > 0: return
if lock_file(get_task_lock_name(task)):
print >>sys.stderr, " Task currently running! (%s)"%task.cmd
sys.exit(1)
redirect(get_task_output_name(task))
exec_command_string(task.cmd)
def run_daemon(task):
'''Double-fork and exec command with stdout/err redirection and pid writing'''
if double_fork() > 0: return
if lock_file(get_task_lock_name(task)):
if verbose:
print >>sys.stderr, " Daemon already running:",task.cmd
sys.exit(0)
if verbose or verbose_daemon_run:
print " Starting daemon:", task.cmd
sys.stdout.flush()
redirect(get_daemon_output_name(task))
write_pid_file(get_daemon_pid_name(task))
print "[%s] Executing command:"%timestamp(), task.cmd
sys.stdout.flush()
exec_command_string(task.cmd)
def run_daemons():
if verbose: print "Starting daemons"
remove_stop_trigger()
map(run_daemon, config.daemons)
def run_tasks():
if verbose: print "Running tasks"
map(run_task, config.tasks)
def stop_daemon(pid):
'''returns 1 if something stopped, else 0'''
try:
os.kill(pid, signal.SIGINT)
except OSError, e:
if e.errno != 3:
print >>sys.stderr, "Warning: couldn't kill pid %d:"%pid, e
return 0
if verbose:
print " Killed process", pid
return 1
def stop_daemons():
if verbose: print "Stopping all daemons"
write_stop_trigger()
pid_files = glob.glob(os.path.join(pid_dir, '*.pid'))
count = 0
for pid_file in pid_files:
count += stop_daemon(safe_read_int(pid_file))
safe_unlink(pid_file)
if verbose:
if not count:
print " (No processes stopped)"
######################################################################
## command (action) functions:
def command_enable_start():
if verbose:
if run_state.enabled:
print "Staying in ENABLED mode"
else:
print "Entering ENABLED mode"
run_state.enabled = True
run_daemons()
def command_cron_start():
if verbose: print "Verbose cron-start: status ==", (run_state.enabled and 'ENABLED' or 'DISABLED')
if run_state.enabled:
global verbose_daemon_run
verbose_daemon_run = 1
run_daemons()
run_tasks()
def command_disable_stop():
if verbose:
if run_state.enabled:
print "Entering DISABLED mode"
else:
print "Staying in DISABLED mode"
run_state.enabled = True
run_state.enabled = False
stop_daemons()
def command_status():
if run_state.enabled:
print "BOINC is ENABLED"
else:
print "BOINC is DISABLED"
if verbose:
print
print "DAEMON pid status lock file commandline"
n = 0
for task in config.daemons:
n += 1
pid = safe_read_int(get_daemon_pid_name(task)) or 0
if not pid:
rs = " "
elif is_pid_running(pid):
rs = " running "
else:
rs = "NOT FOUND"
if is_lock_file_locked(get_task_lock_name(task)):
lu = " locked "
else:
lu = "UNLOCKED"
print " %2d"%n, " %5d"%pid, rs, lu, " ", task.cmd
print
print "TASK last run period next run lock file commandline"
n = 0
for task in config.tasks:
n += 1
task_run_state = lookup_task_run_state(task)
when_last_run = float(task_run_state.last_run)
last_run = when_last_run and timestamp(when_last_run) or '?'
when_next_run = when_will_task_next_run(task, lookup_task_run_state(task))
next_run = (when_next_run <=time.time()) and 'NOW' or timestamp(when_next_run)
if is_lock_file_locked(get_task_lock_name(task)):
lu = " LOCKED "
else:
lu = "unlocked"
print " %2d"%n, last_run.center(20), task.period.ljust(10), \
next_run.center(20), lu, " ", task.cmd
pass
def command_show_config():
# TODO: - all config items (e.g. where's logdir?)
raise SystemExit('TODO')
program_name = os.path.basename(sys.argv[0])
if program_name == 'start':
command = command_enable_start
elif program_name == 'stop':
command = command_disable_stop
elif program_name == 'status':
command = command_status
else:
command = None
def help():
print >>sys.stderr, "Syntax: %s [options] [command]" % sys.argv[0]
print >>sys.stderr, """ Starts or stops BOINC daemons and tasks.
Commands:
--enable (-e) Set BOINC to ENABLED mode and start daemons
--cron (-c) If ENABLED, start daemons and run tasks
Intended to be run from real cron every 5 min.
--disable (-d) Set BOINC to DISABLED mode and stop daemons
--status (-s) Show status.
--show-config Show configuration
Options:
--quiet (-q) Operate quietly, even if STDOUT is a tty.
--verbose (-v) Operate verbosely, even if STDOUT is not a tty.
--config-file= Use specified file instead of program-path/../config.xml
--run-state-file= Use specified file instead of program-path/../run_state.xml
--fork-delay= Seconds to sleep between daemon forks instead of 0.1
--ignore-timestamps Ignore timestamps; for cron mode, runs all tasks now
"""
if program_name == 'start':
print >>sys.stderr, "Based on the invocation name as `start', the default action is --enable."
elif program_name == 'stop':
print >>sys.stderr, "Based on the invocation name as `stop', the default action is --disable."
sys.exit(1)
program_path = os.path.realpath(os.path.dirname(sys.argv[0]))
config_filename = os.path.realpath(os.path.join(program_path, '../config.xml'))
run_state_filename = os.path.realpath(os.path.join(program_path, '../run_state.xml'))
try:
opts, args = getopt.getopt(sys.argv[1:], 'cedskqvh?',
('enable', 'cron', 'disable',
'start', 'stop', 'kill', 'status',
'show-config',
'ignore-timestamps',
'fork-delay=',
'config-file=', 'run-state-file=',
'quiet', 'verbose', 'help'))
except Exception, e:
print >>sys.stderr, e
print >>sys.stderr, "Use '%s --help' for help" % sys.argv[0]
sys.exit(1)
for opt,v in opts:
if opt == '-q' or opt == '--quiet':
verbose = 0
elif opt == '-v' or opt == '--verbose':
verbose = 1
elif opt == '-h' or opt == '--help' or opt == '-?':
help()
elif opt == '-e' or opt == '--enable' or opt == '--start':
command = command_enable_start
elif opt == '-c' or opt == '--cron':
command = command_cron_start
elif opt == '-d' or opt == '--disable' or opt == '--stop' or opt == '-k' or opt == '--kill':
command = command_disable_stop
elif opt == '-s' or opt == '--status':
command = command_status
elif opt == '--show-config':
command = command_show_config
elif opt == '--ignore-timestamps':
ignore_timestamps = True
elif opt == '--config-file':
config_filename = v
elif opt == '--run-state-file':
run_state_filename = v
elif opt == '--fork-delay':
fork_delay = v
else: assert(False)
if not command:
raise SystemExit('No command specified and script name is not "start", "stop", or "status"')
config = BoincConfig(config_filename).read()
run_state = BoincRunState(run_state_filename).read(failopen_ok = True)
project_dir = os.path.realpath(config.config.__dict__.get('project_dir') or
os.path.join(program_path, '../'))
os.chdir(project_dir)
bin_dir = get_dir('bin')
cgi_bin_dir = get_dir('cgi_bin')
log_dir = ensure_get_dir('log')
pid_dir = ensure_get_dir('pid')
start_lockfile = os.path.join(pid_dir, 'start.lock')
if lock_file(start_lockfile):
print >>sys.stderr, "start is currently running!"
sys.exit(1)
apply(command)
run_state.write()
os.unlink(start_lockfile)