// This file is part of BOINC. // http://boinc.berkeley.edu // Copyright (C) 2008 University of California // // BOINC is free software; you can redistribute it and/or modify it // under the terms of the GNU Lesser General Public License // as published by the Free Software Foundation, // either version 3 of the License, or (at your option) any later version. // // BOINC is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // See the GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with BOINC. If not, see . // validator - check and validate results, and grant credit // --app appname // [-d N] [--debug_level N] // log verbosity (1=least, 4=most) // [--one_pass_N_WU N] // Validate only N WU in one pass, then exit // [--one_pass] // make one pass through WU table, then exit // [--mod n i] // process only WUs with (id mod n) == i // [--max_granted_credit X] // limit maximum granted credit to X // [--max_claimed_credit Y] // invalid if claims more than Y // [--grant_claimed_credit] // just grant whatever is claimed // [--update_credited_job] // add userid/wuid pair to credited_job table // [--credit_from_wu] // get credit from WU XML #include "config.h" #include #include #include #include #include #include #include #include "boinc_db.h" #include "util.h" #include "str_util.h" #include "error_numbers.h" #include "svn_version.h" #include "credit.h" #include "sched_config.h" #include "sched_util.h" #include "sched_msgs.h" #include "validator.h" #include "validate_util.h" #include "validate_util2.h" #ifdef GCL_SIMULATOR #include "gcl_simulator.h" #endif #define LOCKFILE "validate.out" #define PIDFILE "validate.pid" #define SELECT_LIMIT 1000 #define SLEEP_PERIOD 5 int sleep_interval = SLEEP_PERIOD; typedef enum { NEVER, DELAYED, IMMEDIATE, NO_CHANGE } TRANSITION_TIME; char app_name[256]; DB_APP app; int wu_id_modulus=0; int wu_id_remainder=0; int one_pass_N_WU=0; bool one_pass = false; double max_granted_credit = 0; double max_claimed_credit = 0; bool grant_claimed_credit = false; bool update_credited_job = false; bool credit_from_wu = false; WORKUNIT* g_wup; vector app_versions; // cache of app_versions; used by v2 credit system bool is_unreplicated(WORKUNIT& wu) { return (wu.target_nresults == 1 && app.target_nresults > 1); } // Here when a result has been validated. // - update consecutive_valid // - udpdate turnaround stats // - insert credited_job record if needed // int is_valid(DB_HOST& host, RESULT& result, WORKUNIT& wu, DB_HOST_APP_VERSION& hav) { DB_CREDITED_JOB credited_job; int retval; double turnaround = result.received_time - result.sent_time; compute_avg_turnaround(host, turnaround); // increment daily quota // hav.max_jobs_per_day++; // increment consecutive_valid, but only if unreplicated // if (!is_unreplicated(wu)) { hav.consecutive_valid++; log_messages.printf(MSG_DEBUG, "[HAV#%d] consecutive valid now %d\n", hav.app_version_id, hav.consecutive_valid ); } if (update_credited_job) { credited_job.userid = host.userid; credited_job.workunitid = long(wu.opaque); retval = credited_job.insert(); if (retval) { log_messages.printf(MSG_CRITICAL, "[RESULT#%d] Warning: credited_job insert failed (userid: %d workunit: %f err: %s)\n", result.id, host.userid, wu.opaque, boincerror(retval) ); } else { log_messages.printf(MSG_DEBUG, "[RESULT#%d %s] added credited_job record [WU#%d OPAQUE#%f USER#%d]\n", result.id, result.name, wu.id, wu.opaque, host.userid ); } } return 0; } static inline void is_invalid(DB_HOST_APP_VERSION& hav) { hav.consecutive_valid = 0; if (hav.max_jobs_per_day > config.daily_result_quota) { hav.max_jobs_per_day--; } } // handle a workunit which has new results // int handle_wu( DB_VALIDATOR_ITEM_SET& validator, std::vector& items ) { int canonical_result_index = -1; bool update_result, retry; TRANSITION_TIME transition_time = NO_CHANGE; int retval = 0, canonicalid = 0, x; double credit = 0; unsigned int i; WORKUNIT& wu = items[0].wu; g_wup = &wu; if (wu.canonical_resultid) { log_messages.printf(MSG_NORMAL, "[WU#%d %s] Already has canonical result %d\n", wu.id, wu.name, wu.canonical_resultid ); ++log_messages; // Here if WU already has a canonical result. // Get unchecked results and see if they match the canonical result // for (i=0; i havv; havv.push_back(hav); vector rv; switch (result.validate_state) { case VALIDATE_STATE_VALID: update_result = true; update_hav = true; log_messages.printf(MSG_NORMAL, "[RESULT#%d %s] pair_check() matched: setting result to valid\n", result.id, result.name ); retval = is_valid(host, result, wu, havv[0]); if (retval) { log_messages.printf(MSG_NORMAL, "[RESULT#%d %s] is_valid() error: %s\n", result.id, result.name, boincerror(retval) ); } // do credit computation, but grant credit of canonical result // rv.push_back(result); assign_credit_set( wu, rv, app, app_versions, havv, max_granted_credit, credit ); result.granted_credit = canonical_result.granted_credit; grant_credit(host, result.sent_time, result.granted_credit); break; case VALIDATE_STATE_INVALID: update_result = true; update_hav = true; log_messages.printf(MSG_NORMAL, "[RESULT#%d %s] pair_check() didn't match: setting result to invalid\n", result.id, result.name ); is_invalid(havv[0]); } if (hav.host_id && update_hav) { havv[0].update_validator(hav_orig); } host.update_diff_validator(host_initial); if (update_result) { log_messages.printf(MSG_NORMAL, "[RESULT#%d %s] granted_credit %f\n", result.id, result.name, result.granted_credit ); retval = validator.update_result(result); if (retval) { log_messages.printf(MSG_CRITICAL, "[RESULT#%d %s] Can't update result: %s\n", result.id, result.name, boincerror(retval) ); } } } } else { vector results; vector host_app_versions, host_app_versions_orig; int nsuccess_results; // Here if WU doesn't have a canonical result yet. // Try to get one log_messages.printf(MSG_NORMAL, "[WU#%d %s] handle_wu(): No canonical result yet\n", wu.id, wu.name ); ++log_messages; // make a vector of the successful results, // and a parallel vector of host_app_versions // for (i=0; i= (unsigned int)wu.min_quorum) { log_messages.printf(MSG_DEBUG, "[WU#%d %s] Enough for quorum, checking set.\n", wu.id, wu.name ); double dummy; retval = check_set(results, wu, canonicalid, dummy, retry); if (retval) { log_messages.printf(MSG_CRITICAL, "[WU#%d %s] check_set() error: %s, exiting\n", wu.id, wu.name, boincerror(retval) ); return retval; } if (retry) transition_time = DELAYED; if (credit_from_wu) { retval = get_credit_from_wu(wu, results, credit); if (retval) { log_messages.printf(MSG_CRITICAL, "[WU#%d %s] get_credit_from_wu(): credit not specified in WU\n", wu.id, wu.name ); credit = 0; } } else { if (canonicalid) { retval = assign_credit_set( wu, results, app, app_versions, host_app_versions, max_granted_credit, credit ); if (retval) { log_messages.printf(MSG_CRITICAL, "[WU#%d %s] assign_credit_set(): %s\n", wu.id, wu.name, boincerror(retval) ); transition_time = DELAYED; goto leave; } } } if (max_granted_credit && credit>max_granted_credit) { credit = max_granted_credit; } // scan results. // update as needed, and count the # of results // that are still outcome=SUCCESS // (some may have changed to VALIDATE_ERROR) // nsuccess_results = 0; for (i=0; i wu.max_success_results) { wu.error_mask |= WU_ERROR_TOO_MANY_SUCCESS_RESULTS; transition_time = IMMEDIATE; } // if #success results >= target_nresults, // we need more results, so bump target_nresults // NOTE: nsuccess_results should never be > target_nresults, // but accommodate that if it should happen // if (nsuccess_results >= wu.target_nresults) { wu.target_nresults = nsuccess_results+1; transition_time = IMMEDIATE; } } } } leave: --log_messages; switch (transition_time) { case IMMEDIATE: wu.transition_time = time(0); break; case DELAYED: x = time(0) + 6*3600; if (x < wu.transition_time) wu.transition_time = x; break; case NEVER: wu.transition_time = INT_MAX; break; case NO_CHANGE: break; } wu.need_validate = 0; retval = validator.update_workunit(wu); if (retval) { log_messages.printf(MSG_CRITICAL, "[WU#%d %s] update_workunit() failed: %s; exiting\n", wu.id, wu.name, boincerror(retval) ); return retval; } return 0; } // make one pass through the workunits with need_validate set. // return true if there were any // bool do_validate_scan() { DB_VALIDATOR_ITEM_SET validator; std::vector items; bool found=false; int retval, i=0; // loop over entries that need to be checked // while (1) { retval = validator.enumerate( app.id, SELECT_LIMIT, wu_id_modulus, wu_id_remainder, items ); if (retval) { if (retval != ERR_DB_NOT_FOUND) { log_messages.printf(MSG_DEBUG, "DB connection lost, exiting\n" ); exit(0); } break; } retval = handle_wu(validator, items); if (!retval) found = true; if (++i == one_pass_N_WU) break; } return found; } int main_loop() { int retval; bool did_something; char buf[256]; retval = boinc_db.open( config.db_name, config.db_host, config.db_user, config.db_passwd ); if (retval) { log_messages.printf(MSG_CRITICAL, "boinc_db.open failed: %s\n", boincerror(retval) ); exit(1); } sprintf(buf, "where name='%s'", app_name); while (1) { check_stop_daemons(); // look up app within the loop, // in case its min_avg_pfc has been changed by the feeder // retval = app.lookup(buf); if (retval) { log_messages.printf(MSG_CRITICAL, "can't find app %s\n", app_name); exit(1); } did_something = do_validate_scan(); if (!did_something) { write_modified_app_versions(app_versions); if (one_pass) break; #ifdef GCL_SIMULATOR char nameforsim[64]; sprintf(nameforsim, "validator%i", app.id); continue_simulation(nameforsim); signal(SIGUSR2, simulator_signal_handler); pause(); #else sleep(sleep_interval); #endif } } return 0; } // For use by project-supplied routines check_set() and check_match() // int debug_level=0; int main(int argc, char** argv) { int i, retval; const char *usage = "\nUsage: %s -app [OPTIONS]\n" "Start validator for application \n\n" "Optional arguments:\n" " --one_pass_N_WU N Validate at most N WUs, then exit\n" " --one_pass Make one pass through WU table, then exit\n" " --mod n i Process only WUs with (id mod n) == i\n" " --max_claimed_credit X If a result claims more credit than this, mark it as invalid\n" " --max_granted_credit X Grant no more than this amount of credit to a result\n" " --grant_claimed_credit Grant the claimed credit, regardless of what other results for this workunit claimed\n" " --update_credited_job Add record to credited_job table after granting credit\n" " --credit_from_wu Credit is specified in WU XML\n" " --sleep_interval n Set sleep-interval to n\n" " -d n, --debug_level n Set log verbosity level, 1-4\n" " -h | --help Show this\n" " -v | --version Show version information\n"; if ((argc > 1) && (!strcmp(argv[1], "-h") || !strcmp(argv[1], "--help"))) { printf (usage, argv[0] ); exit(0); } check_stop_daemons(); for (i=1; i