diff --git a/sched/assimilator.py b/sched/assimilator.py
index 685dcf4f08..9da6606103 100644
--- a/sched/assimilator.py
+++ b/sched/assimilator.py
@@ -4,8 +4,8 @@ Python implementation of an assimilator
Contributed by Stephen Pellicer
'''
-import os, re, boinc_path_config, signal, sys, time
-from Boinc import database, boinc_db, configxml, sched_messages
+import os, re, signal, sys, time, hashlib
+import database, boinc_db, configxml, sched_messages
STOP_TRIGGER_FILENAME = os.path.join('..', 'stop_servers')
caught_sig_int = False
@@ -28,25 +28,29 @@ def sigint_handler(sig, stack):
log_messages.printf(sched_messages.DEBUG, "Handled SIGINT\n")
caught_sig_int = True
+def filename_hash(name, hash_fanout):
+ h = hex(int(hashlib.md5(name).hexdigest()[:8], 16) % hash_fanout)[2:]
+ if h.endswith('L'):
+ h = h[:-1]
+ return h
+
def get_file_path(result):
- return os.path.join(config.upload_dir, re.search('(.*)', result.xml_doc_out).group(1))
+ name = re.search('(.*)',result.xml_doc_in).group(1)
+ fanout = int(config.uldl_dir_fanout)
+ hashed = filename_hash(name, fanout)
+ updir = config.upload_dir
+ result = os.path.join(updir,hashed,name)
+ return result
def assimilate_handler(wu, results, canonical_result):
# check for valid wu.canonical_resultid
if wu.canonical_result:
# do application specific processing
log_messages.printf(sched_messages.NORMAL, "[%s] Found canonical result\n", wu.name)
- question = open(os.path.join('..', 'question'), 'r').read()[:32]
- log_messages.printf(sched_messages.DEBUG, "Comparing to %s\n", question)
- if len(question) != 32:
- log_messages.printf(sched_messages.CRITICAL, "Question %s is wrong length\n", question)
- else:
- result = get_file_path(canonical_result)
- for line in open(result, 'r').readlines():
- line = line.strip()
- log_messages.printf(sched_messages.DEBUG, " [%s] Answer found %s %s\n", canonical_result.name, line[-32:], line[:-33])
- if line[-32:] == question:
- log_messages.printf(sched_messages.CRITICAL, "[RESULT#%d %s] Found Answer %s\n", canonical_result.id, canonical_result.name, line[:-33])
+ result = get_file_path(canonical_result)
+ for line in open(result, 'r').readlines():
+ line = line.strip()
+ log_messages.printf(sched_messages.DEBUG, " [%s] Answer found %s %s\n", canonical_result.name, line[-32:], line[:-33])
else:
log_messages.printf(sched_messages.NORMAL, "[%s] No canonical result\n", wu.name)
@@ -60,13 +64,22 @@ def assimilate_handler(wu, results, canonical_result):
log_messages.printf(sched_messages.CRITICAL, "[%s] Error: too many success results\n", wu.name)
# check for error conditions
+pass_count = 0
+
def do_pass(app):
+ global pass_count
did_something=False
# check for stop trigger
check_stop_trigger()
+ pass_count += 1
- # look for workunits with correct appid and assimilate_state==ASSIMILATE_READY
- for wu in database.Workunits.find(app=app, assimilate_state=boinc_db.ASSIMILATE_READY):
+ units = database.Workunits.find(app=app,assimilate_state=boinc_db.ASSIMILATE_READY)
+
+ log_messages.printf(sched_messages.DEBUG, "pass %d, units %d\n", pass_count, len(units))
+
+ # look for workunits with correct appid and
+ # assimilate_state==ASSIMILATE_READY
+ for wu in units:
did_something=True
canonical_result=None
results=None
@@ -131,5 +144,7 @@ if one_pass:
else:
# main loop
while(1):
+ database.close()
+ database.connect()
if not do_pass(app):
time.sleep(10)