diff --git a/checkin_notes b/checkin_notes index 9a3d6b5a59..193fba8ff0 100644 --- a/checkin_notes +++ b/checkin_notes @@ -4917,3 +4917,10 @@ David 1 June 2009 cs_scheduler.cpp sched/ sched_send.cpp + +David 1 June 2009 + - Updated Python assimilator (from Jeremy Cowles) + + sched/ + assimilator.py + testasm.py diff --git a/sched/assimilator.py b/sched/assimilator.py index 9da6606103..744dc9b639 100644 --- a/sched/assimilator.py +++ b/sched/assimilator.py @@ -1,150 +1,282 @@ #!/usr/bin/python ''' -Python implementation of an assimilator -Contributed by Stephen Pellicer +Generic Assimilator framework ''' import os, re, signal, sys, time, hashlib import database, boinc_db, configxml, sched_messages -STOP_TRIGGER_FILENAME = os.path.join('..', 'stop_servers') -caught_sig_int = False -log_messages=sched_messages.SchedMessages() +# Peter Norvig's Abstract base class hack +def abstract(): + """ + This function is not necessary, but provides + a nice error message when Abstract methods are not + overridden by child classes. + See: http://norvig.com/python-iaq.html for details. + """ + import inspect + # get the name of the calling function off the stack + caller = inspect.getouterframes(inspect.currentframe())[1][3] + raise NotImplementedError(caller + ' must be implemented in subclass') -def check_stop_trigger(): - global caught_sig_int, log_messages - try: - junk = open(STOP_TRIGGER_FILENAME, 'r') - except IOError: - if caught_sig_int: - log_messages.printf(sched_messages.CRITICAL, "Caught SIGINT\n") - sys.exit(1) - else: - log_messages.printf(sched_messages.CRITICAL, "Found stop trigger\n") - sys.exit(1) +class Assimilator(): + ''' + Use this class to create new pure-Python Assimilators. + To create a new assimilator: + 1) call __init__ from the new child class' __init__ method + 2) override the assimilate_handler method + 3) add the standard if __name__ == "__main__" bootstrap (see end of this file) + ''' -def sigint_handler(sig, stack): - global caught_sig_int - log_messages.printf(sched_messages.DEBUG, "Handled SIGINT\n") - caught_sig_int = True - -def filename_hash(name, hash_fanout): - h = hex(int(hashlib.md5(name).hexdigest()[:8], 16) % hash_fanout)[2:] - if h.endswith('L'): - h = h[:-1] - return h - -def get_file_path(result): - name = re.search('(.*)',result.xml_doc_in).group(1) - fanout = int(config.uldl_dir_fanout) - hashed = filename_hash(name, fanout) - updir = config.upload_dir - result = os.path.join(updir,hashed,name) - return result - -def assimilate_handler(wu, results, canonical_result): - # check for valid wu.canonical_resultid - if wu.canonical_result: - # do application specific processing - log_messages.printf(sched_messages.NORMAL, "[%s] Found canonical result\n", wu.name) - result = get_file_path(canonical_result) - for line in open(result, 'r').readlines(): - line = line.strip() - log_messages.printf(sched_messages.DEBUG, " [%s] Answer found %s %s\n", canonical_result.name, line[-32:], line[:-33]) - else: - log_messages.printf(sched_messages.NORMAL, "[%s] No canonical result\n", wu.name) - - if wu.error_mask&boinc_db.WU_ERROR_COULDNT_SEND_RESULT: - log_messages.printf(sched_messages.CRITICAL, "[%s] Error: couldn't send a result\n", wu.name) - if wu.error_mask&boinc_db.WU_ERROR_TOO_MANY_ERROR_RESULTS: - log_messages.printf(sched_messages.CRITICAL, "[%s] Error: too many error results\n", wu.name) - if wu.error_mask&boinc_db.WU_ERROR_TOO_MANY_TOTAL_RESULTS: - log_messages.printf(sched_messages.CRITICAL, "[%s] Error: too many total results\n", wu.name) - if wu.error_mask&boinc_db.WU_ERROR_TOO_MANY_SUCCESS_RESULTS: - log_messages.printf(sched_messages.CRITICAL, "[%s] Error: too many success results\n", wu.name) - # check for error conditions - -pass_count = 0 - -def do_pass(app): - global pass_count - did_something=False - # check for stop trigger - check_stop_trigger() - pass_count += 1 - - units = database.Workunits.find(app=app,assimilate_state=boinc_db.ASSIMILATE_READY) - - log_messages.printf(sched_messages.DEBUG, "pass %d, units %d\n", pass_count, len(units)) - - # look for workunits with correct appid and - # assimilate_state==ASSIMILATE_READY - for wu in units: - did_something=True - canonical_result=None - results=None - log_messages.printf(sched_messages.DEBUG, "[%s] assimilating: state=%d\n", wu.name, wu.assimilate_state) - results = database.Results.find(workunit=wu) - # look for canonical result for workunit in results - for result in results: - if result == wu.canonical_result: - canonical_result=result - - # assimilate handler - assimilate_handler(wu, results, canonical_result) - - # tag wu as ASSIMILATE_DONE - wu.assimilate_state = boinc_db.ASSIMILATE_DONE - wu.transition_time = int(time.time()) - wu.commit() - - # set wu transition_time - - # return did something result - return did_something - -# main function -asynch = False -one_pass = False -appname = '' - -# check for asynch one_pass, debug, app -args = sys.argv[1:] -args.reverse() -while(len(args)): - arg = args.pop() - if arg == '-asynch': - asynch = True - elif arg == '-one_pass': - one_pass = True - elif arg == '-d': - arg = args.pop() - log_messages.set_debug_level(arg) - elif arg == '-app': - arg = args.pop() - appname = arg - else: - log_messages.printf(sched_messages.CRITICAL, "Unrecognized arg: %s\n", arg) + # TODO: add a usage() method + # TODO: improve error handling + + def __init__(self): + # Be sure to call Assimilator.__init__(self) from child classes -config = configxml.default_config().config -database.connect() + # HACK: this belongs in boinc_db.py! + boinc_db.WU_ERROR_NO_CANONICAL_RESULT = 32 + + # initialize member vars + self.config = None + self.STOP_TRIGGER_FILENAME = os.path.join('..', 'stop_servers') + self.caught_sig_int = False + self.log=sched_messages.SchedMessages() + self.pass_count = 0 + self.update_db = True + self.noinsert = False + self.wu_id_mod = 0 + self.wu_id_remainder = 0 + self.one_pass = False + self.one_pass_N_WU = 0 + self.appname = '' + self.sleep_interval = 10 + + def check_stop_trigger(self): + """ + Stops the daemon when not running in one_pass mode + There are two cases when the daemon will stop: + 1) if the SIGINT signal is received + 2) if the stop trigger file is present + """ + + try: + junk = open(self.STOP_TRIGGER_FILENAME, 'r') + except IOError: + if self.caught_sig_int: + self.log.printf(sched_messages.CRITICAL, "Caught SIGINT\n") + sys.exit(1) + else: + self.log.printf(sched_messages.CRITICAL, "Found stop trigger\n") + sys.exit(1) + + def sigint_handler(self, sig, stack): + """ + This method handles the SIGINT signal. It sets a flag + but waits to exit until check_stop_trigger is called + """ + self.log.printf(sched_messages.DEBUG, "Handled SIGINT\n") + self.caught_sig_int = True + + def filename_hash(self, name, hash_fanout): + """ + Accepts a filename (without path) and the hash fanout. + Returns the directory bucket where the file will reside. + The hash fanout is typically provided by the project config file. + """ + h = hex(int(hashlib.md5(name).hexdigest()[:8], 16) % hash_fanout)[2:] + + # check for the long L suffix. It seems like it should + # never be present but that isn't the case + if h.endswith('L'): + h = h[:-1] + return h + + def get_file_path(self, result): + """ + Accepts a result object and returns the relative path to the file. + This method accounts for file hashing and includes the directory + bucket in the path returned. + """ + name = re.search('(.*)',result.xml_doc_in).group(1) + fanout = int(self.config.uldl_dir_fanout) + hashed = self.filename_hash(name, fanout) + updir = self.config.upload_dir + result = os.path.join(updir,hashed,name) + return result + + def assimilate_handler(self, wu, results, canonical_result): + """ + This method is called for each workunit (wu) that needs to be + processed. A canonical result is not guarenteed and several error + conditions may be present on the wu. Call report_errors(wu) when + overriding this method. + + Note that the -noinsert flag (self.noinsert) must be accounted for when + overriding this method. + """ + abstract() + + def report_errors(self, wu): + """ + Writes error logs based on the workunit (wu) error_mask field. + Returns True if errors were present, False otherwise. + """ + if wu.error_mask&boinc_db.WU_ERROR_COULDNT_SEND_RESULT: + self.log.printf(sched_messages.CRITICAL, "[%s] Error: couldn't send a result\n", wu.name) + return True + if wu.error_mask&boinc_db.WU_ERROR_TOO_MANY_ERROR_RESULTS: + self.log.printf(sched_messages.CRITICAL, "[%s] Error: too many error results\n", wu.name) + return True + if wu.error_mask&boinc_db.WU_ERROR_TOO_MANY_TOTAL_RESULTS: + self.log.printf(sched_messages.CRITICAL, "[%s] Error: too many total results\n", wu.name) + return True + if wu.error_mask&boinc_db.WU_ERROR_TOO_MANY_SUCCESS_RESULTS: + self.log.printf(sched_messages.CRITICAL, "[%s] Error: too many success results\n", wu.name) + return True + return False + + def do_pass(self, app): + """ + This method scans the database for workunits that need to be + assimilated. It handles all processing rules passed in on the command + line, except for -noinsert, which must be handled in assimilate_handler. + Calls check_stop_trigger before doing any work. + """ -# fork if asynch -if(asynch): - # add fork code - pass + did_something=False + # check for stop trigger + self.check_stop_trigger() + self.pass_count += 1 + n = 0 + + units = database.Workunits.find(app=app,assimilate_state=boinc_db.ASSIMILATE_READY) + + self.log.printf(sched_messages.DEBUG, "pass %d, units %d\n", self.pass_count, len(units)) + + # look for workunits with correct appid and + # assimilate_state==ASSIMILATE_READY + for wu in units: + # if the user has turned on the WU mod flag, adhere to it + if self.wu_id_mod > 0 and self.wu_id_remainder > 0: + if wu.id % self.wu_id_mod != self.wu_id_remainder: + continue + + # track how many jobs have been processed + # stop if the limit is reached + n += 1 + if self.one_pass_N_WU > 0 and n > self.one_pass_N_WU: + return did_something + + # only mark as dirty if the database is modified + if self.update_db: + did_something=True + + canonical_result = None + results = None + self.log.printf(sched_messages.DEBUG, "[%s] assimilating: state=%d\n", wu.name, wu.assimilate_state) + results = database.Results.find(workunit=wu) -# retrieve app where name = app.name -app=database.Apps.find1(name=appname) -signal.signal(signal.SIGINT, sigint_handler) + # look for canonical result for workunit in results + # HACK: shouldn't have to check types for comparion + # the Result object __eq__ method should handle + # the case of int vs Result + if type(wu.canonical_result) != type(0): + for result in results: + if result == wu.canonical_result: + canonical_result=result + + if canonical_result == None: + # something is wrong, flag an error + wu.error_mask = boinc_db.WU_ERROR_NO_CANONICAL_RESULT + + # assimilate handler + self.assimilate_handler(wu, results, canonical_result) + + # TODO: check for DEFER_ASSIMILATION as a return value from assimilate_handler + + if self.update_db: + # tag wu as ASSIMILATE_DONE + wu.assimilate_state = boinc_db.ASSIMILATE_DONE + wu.transition_time = int(time.time()) + wu.commit() + + # return did something result + return did_something + + def parse_args(self): + """ + Parses arguments provided on the command line and sets + those argument values as member variables. Arguments + are parsed as their true types, so integers will be ints, + not strings. + """ + + # TODO: this is inflexible, it should allow child classes + # to add or remove individual arguments easily + # without copying the entire loop + + args = sys.argv[1:] + args.reverse() + while(len(args)): + arg = args.pop() + if arg == '-sleep_interval': + arg = args.pop() + self.sleep_interval = float(arg) + elif arg == '-one_pass': + self.one_pass = True + elif arg == '-one_pass_N_WU': + arg = args.pop() + self.one_pass_N_WU = int(arg) + elif arg == '-noinsert': + self.noinsert = True + elif arg == '-dont_update_db': + self.update_db = False + elif arg == '-mod': + self.wu_id_mod = int(args.pop()) + self.wu_id_remainder = int(args.pop()) + elif arg == '-d': + arg = args.pop() + self.log.set_debug_level(arg) + elif arg == '-app': + arg = args.pop() + self.appname = arg + else: + self.log.printf(sched_messages.CRITICAL, "Unrecognized arg: %s\n", arg) + + def run(self): + """ + This function runs the class in a loop unless the + one_pass or one_pass_WU_N flags are set. Before execution + parse_args() is called, the xml config file is loaded and + the SIGINT signal is hooked to the sigint_handler method. + """ + self.parse_args() + self.config = configxml.default_config().config + + # retrieve app where name = app.name + database.connect() + app=database.Apps.find1(name=self.appname) + database.close() + + signal.signal(signal.SIGINT, self.sigint_handler) + + # do one pass or execute main loop + if self.one_pass: + self.do_pass(app) + else: + # main loop + while(1): + database.connect() + workdone = self.do_pass(app) + database.close() + if not workdone: + time.sleep(self.sleep_interval) + +# -------------------------------------------- +# Add the following to your assimilator file: + +#if __name__ == '__main__': +# asm = YourAssimilator() +# asm.run() -# do one pass or execute main loop -if one_pass: - do_pass(app) -else: - # main loop - while(1): - database.close() - database.connect() - if not do_pass(app): - time.sleep(10) diff --git a/sched/testasm.py b/sched/testasm.py new file mode 100644 index 0000000000..b918a40866 --- /dev/null +++ b/sched/testasm.py @@ -0,0 +1,40 @@ +#!/usr/bin/python + +from assimilator import * + +class TestAssimilator(Assimilator): + """ + A minimal Assimilator example that dumps the result out to the log + """ + + def __init__(self): + Assimilator.__init__(self) + + def assimilate_handler(self, wu, results, canonical_result): + """ + Assimilates a canonical result, in this case assimilation + means dumping the contents of the result to the log. + Also calls report_errors to log any problems present in the workunit (wu) + """ + + # check for valid wu.canonical_result + if wu.canonical_result: + # do application specific processing + self.log.printf(sched_messages.NORMAL, "[%s] Found canonical result\n", wu.name) + result = self.get_file_path(canonical_result) + for line in open(result, 'r').readlines(): + line = line.strip() + self.log.printf(sched_messages.DEBUG, " [%s] Answer found %s %s\n", canonical_result.name, line[-32:], line[:-33]) + else: + self.log.printf(sched_messages.NORMAL, "[%s] No canonical result\n", wu.name) + + if self.report_errors(wu): + # report_errors returns true if error state was present + # perhaps add some special logic here + # even if no logic is required, report_errors should be called + pass + +# allow the module to be executed as an application +if __name__ == '__main__': + asm = TestAssimilator() + asm.run()