// The contents of this file are subject to the BOINC Public License // Version 1.0 (the "License"); you may not use this file except in // compliance with the License. You may obtain a copy of the License at // http://boinc.berkeley.edu/license_1.0.txt // // Software distributed under the License is distributed on an "AS IS" // basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the // License for the specific language governing rights and limitations // under the License. // // The Original Code is the Berkeley Open Infrastructure for Network Computing. // // The Initial Developer of the Original Code is the SETI@home project. // Portions created by the SETI@home project are Copyright (C) 2002 // University of California at Berkeley. All Rights Reserved. // // Contributor(s): // // transitioner - handle transitions in the state of a WU // - a result has become DONE (via timeout or client reply) // - the WU error mask is set (e.g. by validater) // - assimilation is finished // // cmdline: // [ -asynch ] be asynchronous // [ -one_pass ] do one pass, then exit // [ -d x ] debug level x // [ -mod n i ] process only WUs with (id mod n) == i using namespace std; #include #include #include #include #include "boinc_db.h" #include "util.h" #include "backend_lib.h" #include "sched_config.h" #include "sched_util.h" #include "sched_msgs.h" #define LOCKFILE "transitioner.out" #define PIDFILE "transitioner.pid" #define SELECT_LIMIT 1000 #define BATCH_INSERT 1 int startup_time; SCHED_CONFIG config; R_RSA_PRIVATE_KEY key; int mod_n, mod_i; bool do_mod = false; int result_suffix(char* name) { char* p = strrchr(name, '_'); if (p) return atoi(p+1); return 0; } int handle_wu( DB_TRANSITIONER_ITEM_SET& transitioner, std::vector& items ) { int ntotal, nerrors, retval, ninprogress, nsuccess; int nunsent, ncouldnt_send, nover; int canonical_result_index; char suffix[256]; time_t now = time(0), x; bool all_over_and_validated, have_result_to_validate, do_delete; // count up the number of results in various states, // and check for timed-out results // ntotal = 0; nunsent = 0; ninprogress = 0; nover = 0; nerrors = 0; nsuccess = 0; ncouldnt_send = 0; have_result_to_validate = false; int rs, max_result_suffix = -1; for (unsigned int i=0; i max_result_suffix) max_result_suffix = rs; switch (items[i].res_server_state) { case RESULT_SERVER_STATE_UNSENT: nunsent++; break; case RESULT_SERVER_STATE_IN_PROGRESS: if (items[i].res_report_deadline < now) { log_messages.printf( SCHED_MSG_LOG::NORMAL, "[WU#%d %s] [RESULT#%d %s] result timed out (%d < %d) server_state:IN_PROGRESS=>OVER; outcome:NO_REPLY\n", items[0].id, items[0].name, items[i].res_id, items[i].res_name, items[i].res_report_deadline, (int)now ); items[i].res_server_state = RESULT_SERVER_STATE_OVER; items[i].res_outcome = RESULT_OUTCOME_NO_REPLY; retval = transitioner.update_result(items[i]); if (retval) { log_messages.printf( SCHED_MSG_LOG::CRITICAL, "[WU#%d %s] [RESULT#%d %s] result.update() == %d\n", items[0].id, items[0].name, items[i].res_id, items[i].res_name, retval ); } nover++; } else { ninprogress++; } break; case RESULT_SERVER_STATE_OVER: nover++; switch (items[i].res_outcome) { case RESULT_OUTCOME_COULDNT_SEND: log_messages.printf( SCHED_MSG_LOG::NORMAL, "[WU#%d %s] [RESULT#%d %s] result couldn't be sent\n", items[0].id, items[0].name, items[i].res_id, items[i].res_name ); ncouldnt_send++; break; case RESULT_OUTCOME_SUCCESS: if (items[i].res_validate_state == VALIDATE_STATE_INIT) { have_result_to_validate = true; } nsuccess++; break; case RESULT_OUTCOME_CLIENT_ERROR: nerrors++; break; } break; } } log_messages.printf( SCHED_MSG_LOG::DEBUG, "[WU#%d %s] %d results: unsent %d, in_progress %d, over %d (success %d, error %d, couldnt_send %d)\n", items[0].id, items[0].name, ntotal, nunsent, ninprogress, nover, nsuccess, nerrors, ncouldnt_send ); // trigger validation if we have a quorum // and some result hasn't been validated // if (nsuccess >= items[0].min_quorum && have_result_to_validate) { items[0].need_validate = true; log_messages.printf( SCHED_MSG_LOG::NORMAL, "[WU#%d %s] need_validate:=>true [nsuccess=%d >= min_quorum=%d]\n", items[0].id, items[0].name, nsuccess, items[0].min_quorum ); } // check for WU error conditions // NOTE: check on max # of success results is done in validater // if (ncouldnt_send > 0) { items[0].error_mask |= WU_ERROR_COULDNT_SEND_RESULT; } if (nerrors > items[0].max_error_results) { log_messages.printf( SCHED_MSG_LOG::NORMAL, "[WU#%d %s] WU has too many errors (%d errors for %d results)\n", items[0].id, items[0].name, nerrors, (int)items.size() ); items[0].error_mask |= WU_ERROR_TOO_MANY_ERROR_RESULTS; } if ((int)items.size() > items[0].max_total_results) { log_messages.printf( SCHED_MSG_LOG::NORMAL, "[WU#%d %s] WU has too many total results (%d)\n", items[0].id, items[0].name, (int)items.size() ); items[0].error_mask |= WU_ERROR_TOO_MANY_TOTAL_RESULTS; } // if this WU had an error, don't send any unsent results, // and trigger assimilation if needed // if (items[0].error_mask) { for (unsigned int i=0; iOVER; outcome:=>DIDNT_NEED\n", items[0].id, items[0].name, items[i].res_id, items[i].res_name ); items[i].res_server_state = RESULT_SERVER_STATE_OVER; items[i].res_outcome = RESULT_OUTCOME_DIDNT_NEED; update_result = true; } if (items[i].res_validate_state == VALIDATE_STATE_INIT) { items[i].res_validate_state = VALIDATE_STATE_NO_CHECK; update_result = true; } if (update_result) { retval = transitioner.update_result(items[i]); if (retval) { log_messages.printf( SCHED_MSG_LOG::CRITICAL, "[WU#%d %s] [RESULT#%d %s] result.update() == %d\n", items[0].id, items[0].name, items[i].res_id, items[i].res_name, retval ); } } } } if (items[0].assimilate_state == ASSIMILATE_INIT) { items[0].assimilate_state = ASSIMILATE_READY; log_messages.printf( SCHED_MSG_LOG::NORMAL, "[WU#%d %s] error_mask:%d assimilate_state:INIT=>READY\n", items[0].id, items[0].name, items[0].error_mask ); } } else if (items[0].assimilate_state == ASSIMILATE_INIT) { // If no error, generate new results if needed. // NOTE: n must be signed // int n = items[0].target_nresults - nunsent - ninprogress - nsuccess; string values; char value_buf[MAX_QUERY_LEN]; if (n > 0) { log_messages.printf( SCHED_MSG_LOG::NORMAL, "[WU#%d %s] Generating %d more results (%d target - %d unsent - %d in progress - %d success)\n", items[0].id, items[0].name, n, items[0].target_nresults, nunsent, ninprogress, nsuccess ); for (int i=0; iREADY\n", items[0].id, items[0].name ); } // output of error results can be deleted immediately; // output of success results can be deleted if validated // for (unsigned int i=0; iREADY\n", items[0].id, items[0].name, items[i].res_id, items[i].res_name ); items[i].res_file_delete_state = FILE_DELETE_READY; retval = transitioner.update_result(items[i]); if (retval) { log_messages.printf( SCHED_MSG_LOG::CRITICAL, "[WU#%d %s] [RESULT#%d %s] result.update() == %d\n", items[0].id, items[0].name, items[i].res_id, items[i].res_name, retval ); } } } } } items[0].transition_time = INT_MAX; for (unsigned int i=0; i items; bool did_something = false; check_stop_daemons(); // loop over entries that are due to be checked // while (!transitioner.enumerate((int)time(0), SELECT_LIMIT, items)) { did_something = true; // if we are assigned a transitioner number, then limit which records we should // look at. It'll be less expensive to do the check here that in the DB. if ((mod_n == 0) || ((mod_n != 0) && (mod_i == (items[0].id % mod_n)))) { if (config.use_transactions) { retval = boinc_db.start_transaction(); if (retval) { log_messages.printf( SCHED_MSG_LOG::CRITICAL, "[WU#%d %s] transitioner.start_transaction() == %d\n", items[0].id, items[0].name, retval ); } } retval = handle_wu(transitioner, items); if (retval) { log_messages.printf( SCHED_MSG_LOG::CRITICAL, "[WU#%d %s] handle_wu: %d; quitting\n", items[0].id, items[0].name, retval ); exit(1); } if (config.use_transactions) { retval = boinc_db.commit_transaction(); if (retval) { log_messages.printf( SCHED_MSG_LOG::CRITICAL, "[WU#%d %s] transitioner.commit_transaction() == %d\n", items[0].id, items[0].name, retval ); } } check_stop_daemons(); } } return did_something; } void main_loop(bool one_pass) { int retval; retval = boinc_db.open(config.db_name, config.db_host, config.db_user, config.db_passwd); if (retval) { log_messages.printf(SCHED_MSG_LOG::CRITICAL, "boinc_db.open: %d\n", retval); exit(1); } if (one_pass) { do_pass(); } else { while (1) { if (!do_pass()) sleep(1); } } } int main(int argc, char** argv) { int i, retval; bool asynch = false, one_pass=false; char path[256]; check_stop_daemons(); startup_time = time(0); for (i=1; i