From 270ef7594e34bb093e2f62f1ddac33eb65730564 Mon Sep 17 00:00:00 2001 From: David Anderson Date: Fri, 29 May 2009 22:47:31 +0000 Subject: [PATCH] - updated assimilator.py svn path=/trunk/boinc/; revision=18246 --- sched/assimilator.py | 47 +++++++++++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/sched/assimilator.py b/sched/assimilator.py index 685dcf4f08..9da6606103 100644 --- a/sched/assimilator.py +++ b/sched/assimilator.py @@ -4,8 +4,8 @@ Python implementation of an assimilator Contributed by Stephen Pellicer ''' -import os, re, boinc_path_config, signal, sys, time -from Boinc import database, boinc_db, configxml, sched_messages +import os, re, signal, sys, time, hashlib +import database, boinc_db, configxml, sched_messages STOP_TRIGGER_FILENAME = os.path.join('..', 'stop_servers') caught_sig_int = False @@ -28,25 +28,29 @@ def sigint_handler(sig, stack): log_messages.printf(sched_messages.DEBUG, "Handled SIGINT\n") caught_sig_int = True +def filename_hash(name, hash_fanout): + h = hex(int(hashlib.md5(name).hexdigest()[:8], 16) % hash_fanout)[2:] + if h.endswith('L'): + h = h[:-1] + return h + def get_file_path(result): - return os.path.join(config.upload_dir, re.search('(.*)', result.xml_doc_out).group(1)) + name = re.search('(.*)',result.xml_doc_in).group(1) + fanout = int(config.uldl_dir_fanout) + hashed = filename_hash(name, fanout) + updir = config.upload_dir + result = os.path.join(updir,hashed,name) + return result def assimilate_handler(wu, results, canonical_result): # check for valid wu.canonical_resultid if wu.canonical_result: # do application specific processing log_messages.printf(sched_messages.NORMAL, "[%s] Found canonical result\n", wu.name) - question = open(os.path.join('..', 'question'), 'r').read()[:32] - log_messages.printf(sched_messages.DEBUG, "Comparing to %s\n", question) - if len(question) != 32: - log_messages.printf(sched_messages.CRITICAL, "Question %s is wrong length\n", question) - else: - result = get_file_path(canonical_result) - for line in open(result, 'r').readlines(): - line = line.strip() - log_messages.printf(sched_messages.DEBUG, " [%s] Answer found %s %s\n", canonical_result.name, line[-32:], line[:-33]) - if line[-32:] == question: - log_messages.printf(sched_messages.CRITICAL, "[RESULT#%d %s] Found Answer %s\n", canonical_result.id, canonical_result.name, line[:-33]) + result = get_file_path(canonical_result) + for line in open(result, 'r').readlines(): + line = line.strip() + log_messages.printf(sched_messages.DEBUG, " [%s] Answer found %s %s\n", canonical_result.name, line[-32:], line[:-33]) else: log_messages.printf(sched_messages.NORMAL, "[%s] No canonical result\n", wu.name) @@ -60,13 +64,22 @@ def assimilate_handler(wu, results, canonical_result): log_messages.printf(sched_messages.CRITICAL, "[%s] Error: too many success results\n", wu.name) # check for error conditions +pass_count = 0 + def do_pass(app): + global pass_count did_something=False # check for stop trigger check_stop_trigger() + pass_count += 1 - # look for workunits with correct appid and assimilate_state==ASSIMILATE_READY - for wu in database.Workunits.find(app=app, assimilate_state=boinc_db.ASSIMILATE_READY): + units = database.Workunits.find(app=app,assimilate_state=boinc_db.ASSIMILATE_READY) + + log_messages.printf(sched_messages.DEBUG, "pass %d, units %d\n", pass_count, len(units)) + + # look for workunits with correct appid and + # assimilate_state==ASSIMILATE_READY + for wu in units: did_something=True canonical_result=None results=None @@ -131,5 +144,7 @@ if one_pass: else: # main loop while(1): + database.close() + database.connect() if not do_pass(app): time.sleep(10)