diff --git a/checkin_notes b/checkin_notes index 53a6fa3433..020c5bd9a5 100755 --- a/checkin_notes +++ b/checkin_notes @@ -9591,3 +9591,12 @@ David 26 July 2005 client_types.C cs_scheduler.C cs_statefile.C + +Jeff 26 July 2005 + - Allow multiple assimilators and file_deleters to run via + modulus on WU id for the assimilator and WU and result + id for the file_deleter. + + sched/ + assimilator.C + file_deleter.C diff --git a/sched/assimilator.C b/sched/assimilator.C index 740bd029f3..01b523257d 100644 --- a/sched/assimilator.C +++ b/sched/assimilator.C @@ -46,6 +46,7 @@ SCHED_CONFIG config; bool update_db = true; bool noinsert = false; +int wu_id_modulus=0, wu_id_remainder=0; #define SLEEP_INTERVAL 10 @@ -59,14 +60,23 @@ bool do_pass(APP& app) { DB_RESULT canonical_result, result; bool did_something = false; char buf[256]; + char mod_clause[256]; int retval; int num_assimilated=0; check_stop_daemons(); + if (wu_id_modulus) { + sprintf(mod_clause, " and workunit.id %% %d = %d ", + wu_id_modulus, wu_id_remainder + ); + } else { + strcpy(mod_clause, ""); + } + sprintf(buf, - "where appid=%d and assimilate_state=%d limit %d", - app.id, ASSIMILATE_READY, + "where appid=%d and assimilate_state=%d %s limit %d", + app.id, ASSIMILATE_READY, mod_clause, one_pass_N_WU ? one_pass_N_WU : 1000 ); while (!wu.enumerate(buf)) { @@ -79,7 +89,7 @@ bool do_pass(APP& app) { } log_messages.printf(SCHED_MSG_LOG::DEBUG, - "[%s] assimilating; state=%d\n", wu.name, wu.assimilate_state + "[%s] assimilating boinc WU %d; state=%d\n", wu.name, wu.id, wu.assimilate_state ); sprintf(buf, "where workunitid=%d", wu.id); @@ -161,11 +171,19 @@ int main(int argc, char** argv) { // prevent the inserting of results into the *backend* // (as opposed to the boinc) DB. noinsert = true; + } else if (!strcmp(argv[i], "-mod")) { + wu_id_modulus = atoi(argv[++i]); + wu_id_remainder = atoi(argv[++i]); } else { log_messages.printf(SCHED_MSG_LOG::CRITICAL, "Unrecognized arg: %s\n", argv[i]); } } + if (wu_id_modulus) { + log_messages.printf(SCHED_MSG_LOG::DEBUG, "Using mod'ed WU enumeration. modulus = %d remainder = %d\n", + wu_id_modulus, wu_id_remainder); + } + retval = config.parse_file(".."); if (retval) { log_messages.printf(SCHED_MSG_LOG::CRITICAL, "Can't parse config file\n"); diff --git a/sched/file_deleter.C b/sched/file_deleter.C index 9f4c2eed04..265f9518d2 100644 --- a/sched/file_deleter.C +++ b/sched/file_deleter.C @@ -43,6 +43,8 @@ SCHED_CONFIG config; +int id_modulus=0, id_remainder=0; + // Given a filename, find its full path in the upload directory hierarchy // Return an error if file isn't there. // @@ -183,14 +185,23 @@ bool do_pass(bool retry_error) { DB_RESULT result; bool did_something = false; char buf[256]; + char mod_clause[256]; int retval; check_stop_daemons(); - if (retry_error) { - sprintf(buf, "where file_delete_state=%d or file_delete_state=%d limit 1000", FILE_DELETE_READY, FILE_DELETE_ERROR); + if (id_modulus) { + sprintf(mod_clause, " and id %% %d = %d ", + id_modulus, id_remainder + ); } else { - sprintf(buf, "where file_delete_state=%d limit 1000", FILE_DELETE_READY); + strcpy(mod_clause, ""); + } + + if (retry_error) { + sprintf(buf, "where file_delete_state=%d or file_delete_state=%d %s limit 1000", FILE_DELETE_READY, FILE_DELETE_ERROR, mod_clause); + } else { + sprintf(buf, "where file_delete_state=%d %s limit 1000", FILE_DELETE_READY, mod_clause); } while (!wu.enumerate(buf)) { did_something = true; @@ -210,9 +221,9 @@ bool do_pass(bool retry_error) { } if ( retry_error ) { - sprintf(buf, "where file_delete_state=%d or file_delete_state=%d limit 1000", FILE_DELETE_READY, FILE_DELETE_ERROR); + sprintf(buf, "where file_delete_state=%d or file_delete_state=%d %s limit 1000", FILE_DELETE_READY, FILE_DELETE_ERROR, mod_clause); } else { - sprintf(buf, "where file_delete_state=%d limit 1000", FILE_DELETE_READY); + sprintf(buf, "where file_delete_state=%d limit %s 1000", FILE_DELETE_READY, mod_clause); } while (!result.enumerate(buf)) { did_something = true; @@ -265,11 +276,19 @@ int main(int argc, char** argv) { preserve_result_files = true; } else if (!strcmp(argv[i], "-d")) { log_messages.set_debug_level(atoi(argv[++i])); + } else if (!strcmp(argv[i], "-mod")) { + id_modulus = atoi(argv[++i]); + id_remainder = atoi(argv[++i]); } else { log_messages.printf(SCHED_MSG_LOG::CRITICAL, "Unrecognized arg: %s\n", argv[i]); } } + if (id_modulus) { + log_messages.printf(SCHED_MSG_LOG::DEBUG, "Using mod'ed WU/result enumeration. modulus = %d remainder = %d\n", + id_modulus, id_remainder); + } + retval = config.parse_file(".."); if (retval) { log_messages.printf(SCHED_MSG_LOG::CRITICAL, "Can't parse config file\n");