- updated assimilator.py

svn path=/trunk/boinc/; revision=18246
This commit is contained in:
David Anderson 2009-05-29 22:47:31 +00:00
parent 7580a01415
commit 270ef7594e
1 changed files with 31 additions and 16 deletions

View File

@ -4,8 +4,8 @@ Python implementation of an assimilator
Contributed by Stephen Pellicer
'''
import os, re, boinc_path_config, signal, sys, time
from Boinc import database, boinc_db, configxml, sched_messages
import os, re, signal, sys, time, hashlib
import database, boinc_db, configxml, sched_messages
STOP_TRIGGER_FILENAME = os.path.join('..', 'stop_servers')
caught_sig_int = False
@ -28,25 +28,29 @@ def sigint_handler(sig, stack):
log_messages.printf(sched_messages.DEBUG, "Handled SIGINT\n")
caught_sig_int = True
def filename_hash(name, hash_fanout):
h = hex(int(hashlib.md5(name).hexdigest()[:8], 16) % hash_fanout)[2:]
if h.endswith('L'):
h = h[:-1]
return h
def get_file_path(result):
return os.path.join(config.upload_dir, re.search('<name>(.*)</name>', result.xml_doc_out).group(1))
name = re.search('<file_name>(.*)</file_name>',result.xml_doc_in).group(1)
fanout = int(config.uldl_dir_fanout)
hashed = filename_hash(name, fanout)
updir = config.upload_dir
result = os.path.join(updir,hashed,name)
return result
def assimilate_handler(wu, results, canonical_result):
# check for valid wu.canonical_resultid
if wu.canonical_result:
# do application specific processing
log_messages.printf(sched_messages.NORMAL, "[%s] Found canonical result\n", wu.name)
question = open(os.path.join('..', 'question'), 'r').read()[:32]
log_messages.printf(sched_messages.DEBUG, "Comparing to %s\n", question)
if len(question) != 32:
log_messages.printf(sched_messages.CRITICAL, "Question %s is wrong length\n", question)
else:
result = get_file_path(canonical_result)
for line in open(result, 'r').readlines():
line = line.strip()
log_messages.printf(sched_messages.DEBUG, " [%s] Answer found %s %s\n", canonical_result.name, line[-32:], line[:-33])
if line[-32:] == question:
log_messages.printf(sched_messages.CRITICAL, "[RESULT#%d %s] Found Answer %s\n", canonical_result.id, canonical_result.name, line[:-33])
result = get_file_path(canonical_result)
for line in open(result, 'r').readlines():
line = line.strip()
log_messages.printf(sched_messages.DEBUG, " [%s] Answer found %s %s\n", canonical_result.name, line[-32:], line[:-33])
else:
log_messages.printf(sched_messages.NORMAL, "[%s] No canonical result\n", wu.name)
@ -60,13 +64,22 @@ def assimilate_handler(wu, results, canonical_result):
log_messages.printf(sched_messages.CRITICAL, "[%s] Error: too many success results\n", wu.name)
# check for error conditions
pass_count = 0
def do_pass(app):
global pass_count
did_something=False
# check for stop trigger
check_stop_trigger()
pass_count += 1
# look for workunits with correct appid and assimilate_state==ASSIMILATE_READY
for wu in database.Workunits.find(app=app, assimilate_state=boinc_db.ASSIMILATE_READY):
units = database.Workunits.find(app=app,assimilate_state=boinc_db.ASSIMILATE_READY)
log_messages.printf(sched_messages.DEBUG, "pass %d, units %d\n", pass_count, len(units))
# look for workunits with correct appid and
# assimilate_state==ASSIMILATE_READY
for wu in units:
did_something=True
canonical_result=None
results=None
@ -131,5 +144,7 @@ if one_pass:
else:
# main loop
while(1):
database.close()
database.connect()
if not do_pass(app):
time.sleep(10)