mirror of https://github.com/BOINC/boinc.git
- Updated Python assimilator (from Jeremy Cowles)
svn path=/trunk/boinc/; revision=18260
This commit is contained in:
parent
cc62cce8f7
commit
4e535a6804
|
@ -4917,3 +4917,10 @@ David 1 June 2009
|
|||
cs_scheduler.cpp
|
||||
sched/
|
||||
sched_send.cpp
|
||||
|
||||
David 1 June 2009
|
||||
- Updated Python assimilator (from Jeremy Cowles)
|
||||
|
||||
sched/
|
||||
assimilator.py
|
||||
testasm.py
|
||||
|
|
|
@ -1,150 +1,282 @@
|
|||
#!/usr/bin/python
|
||||
'''
|
||||
Python implementation of an assimilator
|
||||
Contributed by Stephen Pellicer
|
||||
Generic Assimilator framework
|
||||
'''
|
||||
|
||||
import os, re, signal, sys, time, hashlib
|
||||
import database, boinc_db, configxml, sched_messages
|
||||
|
||||
STOP_TRIGGER_FILENAME = os.path.join('..', 'stop_servers')
|
||||
caught_sig_int = False
|
||||
log_messages=sched_messages.SchedMessages()
|
||||
# Peter Norvig's Abstract base class hack
|
||||
def abstract():
|
||||
"""
|
||||
This function is not necessary, but provides
|
||||
a nice error message when Abstract methods are not
|
||||
overridden by child classes.
|
||||
See: http://norvig.com/python-iaq.html for details.
|
||||
"""
|
||||
import inspect
|
||||
# get the name of the calling function off the stack
|
||||
caller = inspect.getouterframes(inspect.currentframe())[1][3]
|
||||
raise NotImplementedError(caller + ' must be implemented in subclass')
|
||||
|
||||
def check_stop_trigger():
|
||||
global caught_sig_int, log_messages
|
||||
try:
|
||||
junk = open(STOP_TRIGGER_FILENAME, 'r')
|
||||
except IOError:
|
||||
if caught_sig_int:
|
||||
log_messages.printf(sched_messages.CRITICAL, "Caught SIGINT\n")
|
||||
sys.exit(1)
|
||||
else:
|
||||
log_messages.printf(sched_messages.CRITICAL, "Found stop trigger\n")
|
||||
sys.exit(1)
|
||||
class Assimilator():
|
||||
'''
|
||||
Use this class to create new pure-Python Assimilators.
|
||||
To create a new assimilator:
|
||||
1) call __init__ from the new child class' __init__ method
|
||||
2) override the assimilate_handler method
|
||||
3) add the standard if __name__ == "__main__" bootstrap (see end of this file)
|
||||
'''
|
||||
|
||||
def sigint_handler(sig, stack):
|
||||
global caught_sig_int
|
||||
log_messages.printf(sched_messages.DEBUG, "Handled SIGINT\n")
|
||||
caught_sig_int = True
|
||||
|
||||
def filename_hash(name, hash_fanout):
|
||||
h = hex(int(hashlib.md5(name).hexdigest()[:8], 16) % hash_fanout)[2:]
|
||||
if h.endswith('L'):
|
||||
h = h[:-1]
|
||||
return h
|
||||
|
||||
def get_file_path(result):
|
||||
name = re.search('<file_name>(.*)</file_name>',result.xml_doc_in).group(1)
|
||||
fanout = int(config.uldl_dir_fanout)
|
||||
hashed = filename_hash(name, fanout)
|
||||
updir = config.upload_dir
|
||||
result = os.path.join(updir,hashed,name)
|
||||
return result
|
||||
|
||||
def assimilate_handler(wu, results, canonical_result):
|
||||
# check for valid wu.canonical_resultid
|
||||
if wu.canonical_result:
|
||||
# do application specific processing
|
||||
log_messages.printf(sched_messages.NORMAL, "[%s] Found canonical result\n", wu.name)
|
||||
result = get_file_path(canonical_result)
|
||||
for line in open(result, 'r').readlines():
|
||||
line = line.strip()
|
||||
log_messages.printf(sched_messages.DEBUG, " [%s] Answer found %s %s\n", canonical_result.name, line[-32:], line[:-33])
|
||||
else:
|
||||
log_messages.printf(sched_messages.NORMAL, "[%s] No canonical result\n", wu.name)
|
||||
|
||||
if wu.error_mask&boinc_db.WU_ERROR_COULDNT_SEND_RESULT:
|
||||
log_messages.printf(sched_messages.CRITICAL, "[%s] Error: couldn't send a result\n", wu.name)
|
||||
if wu.error_mask&boinc_db.WU_ERROR_TOO_MANY_ERROR_RESULTS:
|
||||
log_messages.printf(sched_messages.CRITICAL, "[%s] Error: too many error results\n", wu.name)
|
||||
if wu.error_mask&boinc_db.WU_ERROR_TOO_MANY_TOTAL_RESULTS:
|
||||
log_messages.printf(sched_messages.CRITICAL, "[%s] Error: too many total results\n", wu.name)
|
||||
if wu.error_mask&boinc_db.WU_ERROR_TOO_MANY_SUCCESS_RESULTS:
|
||||
log_messages.printf(sched_messages.CRITICAL, "[%s] Error: too many success results\n", wu.name)
|
||||
# check for error conditions
|
||||
|
||||
pass_count = 0
|
||||
|
||||
def do_pass(app):
|
||||
global pass_count
|
||||
did_something=False
|
||||
# check for stop trigger
|
||||
check_stop_trigger()
|
||||
pass_count += 1
|
||||
|
||||
units = database.Workunits.find(app=app,assimilate_state=boinc_db.ASSIMILATE_READY)
|
||||
|
||||
log_messages.printf(sched_messages.DEBUG, "pass %d, units %d\n", pass_count, len(units))
|
||||
|
||||
# look for workunits with correct appid and
|
||||
# assimilate_state==ASSIMILATE_READY
|
||||
for wu in units:
|
||||
did_something=True
|
||||
canonical_result=None
|
||||
results=None
|
||||
log_messages.printf(sched_messages.DEBUG, "[%s] assimilating: state=%d\n", wu.name, wu.assimilate_state)
|
||||
results = database.Results.find(workunit=wu)
|
||||
# look for canonical result for workunit in results
|
||||
for result in results:
|
||||
if result == wu.canonical_result:
|
||||
canonical_result=result
|
||||
|
||||
# assimilate handler
|
||||
assimilate_handler(wu, results, canonical_result)
|
||||
|
||||
# tag wu as ASSIMILATE_DONE
|
||||
wu.assimilate_state = boinc_db.ASSIMILATE_DONE
|
||||
wu.transition_time = int(time.time())
|
||||
wu.commit()
|
||||
|
||||
# set wu transition_time
|
||||
|
||||
# return did something result
|
||||
return did_something
|
||||
|
||||
# main function
|
||||
asynch = False
|
||||
one_pass = False
|
||||
appname = ''
|
||||
|
||||
# check for asynch one_pass, debug, app
|
||||
args = sys.argv[1:]
|
||||
args.reverse()
|
||||
while(len(args)):
|
||||
arg = args.pop()
|
||||
if arg == '-asynch':
|
||||
asynch = True
|
||||
elif arg == '-one_pass':
|
||||
one_pass = True
|
||||
elif arg == '-d':
|
||||
arg = args.pop()
|
||||
log_messages.set_debug_level(arg)
|
||||
elif arg == '-app':
|
||||
arg = args.pop()
|
||||
appname = arg
|
||||
else:
|
||||
log_messages.printf(sched_messages.CRITICAL, "Unrecognized arg: %s\n", arg)
|
||||
# TODO: add a usage() method
|
||||
# TODO: improve error handling
|
||||
|
||||
def __init__(self):
|
||||
# Be sure to call Assimilator.__init__(self) from child classes
|
||||
|
||||
config = configxml.default_config().config
|
||||
database.connect()
|
||||
# HACK: this belongs in boinc_db.py!
|
||||
boinc_db.WU_ERROR_NO_CANONICAL_RESULT = 32
|
||||
|
||||
# initialize member vars
|
||||
self.config = None
|
||||
self.STOP_TRIGGER_FILENAME = os.path.join('..', 'stop_servers')
|
||||
self.caught_sig_int = False
|
||||
self.log=sched_messages.SchedMessages()
|
||||
self.pass_count = 0
|
||||
self.update_db = True
|
||||
self.noinsert = False
|
||||
self.wu_id_mod = 0
|
||||
self.wu_id_remainder = 0
|
||||
self.one_pass = False
|
||||
self.one_pass_N_WU = 0
|
||||
self.appname = ''
|
||||
self.sleep_interval = 10
|
||||
|
||||
def check_stop_trigger(self):
|
||||
"""
|
||||
Stops the daemon when not running in one_pass mode
|
||||
There are two cases when the daemon will stop:
|
||||
1) if the SIGINT signal is received
|
||||
2) if the stop trigger file is present
|
||||
"""
|
||||
|
||||
try:
|
||||
junk = open(self.STOP_TRIGGER_FILENAME, 'r')
|
||||
except IOError:
|
||||
if self.caught_sig_int:
|
||||
self.log.printf(sched_messages.CRITICAL, "Caught SIGINT\n")
|
||||
sys.exit(1)
|
||||
else:
|
||||
self.log.printf(sched_messages.CRITICAL, "Found stop trigger\n")
|
||||
sys.exit(1)
|
||||
|
||||
def sigint_handler(self, sig, stack):
|
||||
"""
|
||||
This method handles the SIGINT signal. It sets a flag
|
||||
but waits to exit until check_stop_trigger is called
|
||||
"""
|
||||
self.log.printf(sched_messages.DEBUG, "Handled SIGINT\n")
|
||||
self.caught_sig_int = True
|
||||
|
||||
def filename_hash(self, name, hash_fanout):
|
||||
"""
|
||||
Accepts a filename (without path) and the hash fanout.
|
||||
Returns the directory bucket where the file will reside.
|
||||
The hash fanout is typically provided by the project config file.
|
||||
"""
|
||||
h = hex(int(hashlib.md5(name).hexdigest()[:8], 16) % hash_fanout)[2:]
|
||||
|
||||
# check for the long L suffix. It seems like it should
|
||||
# never be present but that isn't the case
|
||||
if h.endswith('L'):
|
||||
h = h[:-1]
|
||||
return h
|
||||
|
||||
def get_file_path(self, result):
|
||||
"""
|
||||
Accepts a result object and returns the relative path to the file.
|
||||
This method accounts for file hashing and includes the directory
|
||||
bucket in the path returned.
|
||||
"""
|
||||
name = re.search('<file_name>(.*)</file_name>',result.xml_doc_in).group(1)
|
||||
fanout = int(self.config.uldl_dir_fanout)
|
||||
hashed = self.filename_hash(name, fanout)
|
||||
updir = self.config.upload_dir
|
||||
result = os.path.join(updir,hashed,name)
|
||||
return result
|
||||
|
||||
def assimilate_handler(self, wu, results, canonical_result):
|
||||
"""
|
||||
This method is called for each workunit (wu) that needs to be
|
||||
processed. A canonical result is not guarenteed and several error
|
||||
conditions may be present on the wu. Call report_errors(wu) when
|
||||
overriding this method.
|
||||
|
||||
Note that the -noinsert flag (self.noinsert) must be accounted for when
|
||||
overriding this method.
|
||||
"""
|
||||
abstract()
|
||||
|
||||
def report_errors(self, wu):
|
||||
"""
|
||||
Writes error logs based on the workunit (wu) error_mask field.
|
||||
Returns True if errors were present, False otherwise.
|
||||
"""
|
||||
if wu.error_mask&boinc_db.WU_ERROR_COULDNT_SEND_RESULT:
|
||||
self.log.printf(sched_messages.CRITICAL, "[%s] Error: couldn't send a result\n", wu.name)
|
||||
return True
|
||||
if wu.error_mask&boinc_db.WU_ERROR_TOO_MANY_ERROR_RESULTS:
|
||||
self.log.printf(sched_messages.CRITICAL, "[%s] Error: too many error results\n", wu.name)
|
||||
return True
|
||||
if wu.error_mask&boinc_db.WU_ERROR_TOO_MANY_TOTAL_RESULTS:
|
||||
self.log.printf(sched_messages.CRITICAL, "[%s] Error: too many total results\n", wu.name)
|
||||
return True
|
||||
if wu.error_mask&boinc_db.WU_ERROR_TOO_MANY_SUCCESS_RESULTS:
|
||||
self.log.printf(sched_messages.CRITICAL, "[%s] Error: too many success results\n", wu.name)
|
||||
return True
|
||||
return False
|
||||
|
||||
def do_pass(self, app):
|
||||
"""
|
||||
This method scans the database for workunits that need to be
|
||||
assimilated. It handles all processing rules passed in on the command
|
||||
line, except for -noinsert, which must be handled in assimilate_handler.
|
||||
Calls check_stop_trigger before doing any work.
|
||||
"""
|
||||
|
||||
# fork if asynch
|
||||
if(asynch):
|
||||
# add fork code
|
||||
pass
|
||||
did_something=False
|
||||
# check for stop trigger
|
||||
self.check_stop_trigger()
|
||||
self.pass_count += 1
|
||||
n = 0
|
||||
|
||||
units = database.Workunits.find(app=app,assimilate_state=boinc_db.ASSIMILATE_READY)
|
||||
|
||||
self.log.printf(sched_messages.DEBUG, "pass %d, units %d\n", self.pass_count, len(units))
|
||||
|
||||
# look for workunits with correct appid and
|
||||
# assimilate_state==ASSIMILATE_READY
|
||||
for wu in units:
|
||||
# if the user has turned on the WU mod flag, adhere to it
|
||||
if self.wu_id_mod > 0 and self.wu_id_remainder > 0:
|
||||
if wu.id % self.wu_id_mod != self.wu_id_remainder:
|
||||
continue
|
||||
|
||||
# track how many jobs have been processed
|
||||
# stop if the limit is reached
|
||||
n += 1
|
||||
if self.one_pass_N_WU > 0 and n > self.one_pass_N_WU:
|
||||
return did_something
|
||||
|
||||
# only mark as dirty if the database is modified
|
||||
if self.update_db:
|
||||
did_something=True
|
||||
|
||||
canonical_result = None
|
||||
results = None
|
||||
self.log.printf(sched_messages.DEBUG, "[%s] assimilating: state=%d\n", wu.name, wu.assimilate_state)
|
||||
results = database.Results.find(workunit=wu)
|
||||
|
||||
# retrieve app where name = app.name
|
||||
app=database.Apps.find1(name=appname)
|
||||
signal.signal(signal.SIGINT, sigint_handler)
|
||||
# look for canonical result for workunit in results
|
||||
# HACK: shouldn't have to check types for comparion
|
||||
# the Result object __eq__ method should handle
|
||||
# the case of int vs Result
|
||||
if type(wu.canonical_result) != type(0):
|
||||
for result in results:
|
||||
if result == wu.canonical_result:
|
||||
canonical_result=result
|
||||
|
||||
if canonical_result == None:
|
||||
# something is wrong, flag an error
|
||||
wu.error_mask = boinc_db.WU_ERROR_NO_CANONICAL_RESULT
|
||||
|
||||
# assimilate handler
|
||||
self.assimilate_handler(wu, results, canonical_result)
|
||||
|
||||
# TODO: check for DEFER_ASSIMILATION as a return value from assimilate_handler
|
||||
|
||||
if self.update_db:
|
||||
# tag wu as ASSIMILATE_DONE
|
||||
wu.assimilate_state = boinc_db.ASSIMILATE_DONE
|
||||
wu.transition_time = int(time.time())
|
||||
wu.commit()
|
||||
|
||||
# return did something result
|
||||
return did_something
|
||||
|
||||
def parse_args(self):
|
||||
"""
|
||||
Parses arguments provided on the command line and sets
|
||||
those argument values as member variables. Arguments
|
||||
are parsed as their true types, so integers will be ints,
|
||||
not strings.
|
||||
"""
|
||||
|
||||
# TODO: this is inflexible, it should allow child classes
|
||||
# to add or remove individual arguments easily
|
||||
# without copying the entire loop
|
||||
|
||||
args = sys.argv[1:]
|
||||
args.reverse()
|
||||
while(len(args)):
|
||||
arg = args.pop()
|
||||
if arg == '-sleep_interval':
|
||||
arg = args.pop()
|
||||
self.sleep_interval = float(arg)
|
||||
elif arg == '-one_pass':
|
||||
self.one_pass = True
|
||||
elif arg == '-one_pass_N_WU':
|
||||
arg = args.pop()
|
||||
self.one_pass_N_WU = int(arg)
|
||||
elif arg == '-noinsert':
|
||||
self.noinsert = True
|
||||
elif arg == '-dont_update_db':
|
||||
self.update_db = False
|
||||
elif arg == '-mod':
|
||||
self.wu_id_mod = int(args.pop())
|
||||
self.wu_id_remainder = int(args.pop())
|
||||
elif arg == '-d':
|
||||
arg = args.pop()
|
||||
self.log.set_debug_level(arg)
|
||||
elif arg == '-app':
|
||||
arg = args.pop()
|
||||
self.appname = arg
|
||||
else:
|
||||
self.log.printf(sched_messages.CRITICAL, "Unrecognized arg: %s\n", arg)
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
This function runs the class in a loop unless the
|
||||
one_pass or one_pass_WU_N flags are set. Before execution
|
||||
parse_args() is called, the xml config file is loaded and
|
||||
the SIGINT signal is hooked to the sigint_handler method.
|
||||
"""
|
||||
self.parse_args()
|
||||
self.config = configxml.default_config().config
|
||||
|
||||
# retrieve app where name = app.name
|
||||
database.connect()
|
||||
app=database.Apps.find1(name=self.appname)
|
||||
database.close()
|
||||
|
||||
signal.signal(signal.SIGINT, self.sigint_handler)
|
||||
|
||||
# do one pass or execute main loop
|
||||
if self.one_pass:
|
||||
self.do_pass(app)
|
||||
else:
|
||||
# main loop
|
||||
while(1):
|
||||
database.connect()
|
||||
workdone = self.do_pass(app)
|
||||
database.close()
|
||||
if not workdone:
|
||||
time.sleep(self.sleep_interval)
|
||||
|
||||
# --------------------------------------------
|
||||
# Add the following to your assimilator file:
|
||||
|
||||
#if __name__ == '__main__':
|
||||
# asm = YourAssimilator()
|
||||
# asm.run()
|
||||
|
||||
# do one pass or execute main loop
|
||||
if one_pass:
|
||||
do_pass(app)
|
||||
else:
|
||||
# main loop
|
||||
while(1):
|
||||
database.close()
|
||||
database.connect()
|
||||
if not do_pass(app):
|
||||
time.sleep(10)
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
from assimilator import *
|
||||
|
||||
class TestAssimilator(Assimilator):
|
||||
"""
|
||||
A minimal Assimilator example that dumps the result out to the log
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
Assimilator.__init__(self)
|
||||
|
||||
def assimilate_handler(self, wu, results, canonical_result):
|
||||
"""
|
||||
Assimilates a canonical result, in this case assimilation
|
||||
means dumping the contents of the result to the log.
|
||||
Also calls report_errors to log any problems present in the workunit (wu)
|
||||
"""
|
||||
|
||||
# check for valid wu.canonical_result
|
||||
if wu.canonical_result:
|
||||
# do application specific processing
|
||||
self.log.printf(sched_messages.NORMAL, "[%s] Found canonical result\n", wu.name)
|
||||
result = self.get_file_path(canonical_result)
|
||||
for line in open(result, 'r').readlines():
|
||||
line = line.strip()
|
||||
self.log.printf(sched_messages.DEBUG, " [%s] Answer found %s %s\n", canonical_result.name, line[-32:], line[:-33])
|
||||
else:
|
||||
self.log.printf(sched_messages.NORMAL, "[%s] No canonical result\n", wu.name)
|
||||
|
||||
if self.report_errors(wu):
|
||||
# report_errors returns true if error state was present
|
||||
# perhaps add some special logic here
|
||||
# even if no logic is required, report_errors should be called
|
||||
pass
|
||||
|
||||
# allow the module to be executed as an application
|
||||
if __name__ == '__main__':
|
||||
asm = TestAssimilator()
|
||||
asm.run()
|
Loading…
Reference in New Issue