diff --git a/checkin_notes b/checkin_notes index 0dea5196f5..2742290dec 100644 --- a/checkin_notes +++ b/checkin_notes @@ -7776,7 +7776,7 @@ Rom 25 Oct 2011 configure.ac version.h -David 26 Oct 2011 +David 25 Oct 2011 - web: typo in forum RSS from Daniel L G; fixes #1147 - client: message tweak @@ -7784,3 +7784,42 @@ David 26 Oct 2011 forum_rss.inc client/ cpu_sched.cpp + +David 25 Oct 2011 + - scheduler: fix bug in the "homogeneous app version" (HAV) feature + (reported by Kevin Reed). + The problem: cache inconsistency. + If there are 2 results for the same WU in shared mem, + and 2 scheduler instances get them around the same time, + they can send them with different app versions. + We already fixed this problem for HR by + 1) rereading the relevant WU fields while deciding + whether to send the result + 2) doing a "careful update" of the WU field using a where clause + to make sure it wasn't modified in the (short) interval + since rereading it. + I fixed the HAV problem in the same way, + and merged the two mechanisms to combine the DB queries. + + Also: + - The rereads are done in slow_check() (see below). + - The careful updates are done in update_wu_on_send(), + and this is called *before* doing careful updates on result fields. + That way, if the WU updates fail, we don't have orphaned results. + - already_sent_to_different_platform_careful() (sic) + no longer does DB stuff, so it's merged with + already_send_to_different_hr_class() (better name) + + NOTE: slow_check() is used in array scheduling only. + Score-based scheduling uses other code, + in which this bug is not yet fixed. + Locality scheduling doesn't support HR or HAV at all. + This should be unified. + + db/ + db_base.cpp,h + sched/ + sched_hr.cpp,h + sched_array.cpp + sched_send.cpp + sched_score.cpp diff --git a/db/db_base.cpp b/db/db_base.cpp index 0ca99f9ab9..4d0d6ff986 100644 --- a/db/db_base.cpp +++ b/db/db_base.cpp @@ -256,19 +256,25 @@ int DB_BASE::delete_from_db() { return db->do_query(query); } -int DB_BASE::get_field_int(const char* field, int& val) { +int DB_BASE::get_field_ints(const char* fields, int nfields, int* vals) { char query[MAX_QUERY_LEN]; int retval; MYSQL_ROW row; MYSQL_RES* rp; - sprintf(query, "select %s from %s where id=%d", field, table_name, get_id()); + sprintf(query, + "select %s from %s where id=%d", fields, table_name, get_id() + ); retval = db->do_query(query); if (retval) return retval; rp = mysql_store_result(db->mysql); if (!rp) return -1; row = mysql_fetch_row(rp); - if (row) val = atoi(row[0]); + if (row) { + for (int i=0; ido_query(query); if (retval) return retval; rp = mysql_store_result(db->mysql); diff --git a/db/db_base.h b/db/db_base.h index f10e72745b..dd05b57cc7 100644 --- a/db/db_base.h +++ b/db/db_base.h @@ -98,7 +98,7 @@ public: int update_field(const char*, const char* where_clause=NULL); int update_fields_noid(char* set_clause, char* where_clause); int delete_from_db(); - int get_field_int(const char*, int&); + int get_field_ints(const char*, int, int*); int get_field_str(const char*, char*, int); int lookup_id(int id); int lookup(const char*); diff --git a/sched/sched_array.cpp b/sched/sched_array.cpp index b52457be66..f30ef237e1 100644 --- a/sched/sched_array.cpp +++ b/sched/sched_array.cpp @@ -153,8 +153,11 @@ static bool quick_check( } // do slow checks (ones that require DB access) +// return true if OK to send // -static bool slow_check(WU_RESULT& wu_result, WORKUNIT& wu, APP* app) { +static bool slow_check( + WU_RESULT& wu_result, WORKUNIT& wu, APP* app, BEST_APP_VERSION* bavp +) { int n, retval; DB_RESULT result; char buf[256]; @@ -212,23 +215,51 @@ static bool slow_check(WU_RESULT& wu_result, WORKUNIT& wu, APP* app) { } } - if (app_hr_type(*app)) { - if (already_sent_to_different_platform_careful( - wu_result.workunit, *app - )) { - if (config.debug_send) { - log_messages.printf(MSG_NORMAL, - "[send] [HOST#%d] [WU#%d %s] is assigned to different platform\n", - g_reply->host.id, wu.id, wu.name - ); - } - // Mark the workunit as infeasible. - // This ensures that jobs already assigned to a platform - // are processed first. - // - wu_result.infeasible_count++; + // Checks that require looking up the WU. + // Lump these together so we only do 1 lookup + // + if (app_hr_type(*app) || app->homogeneous_app_version) { + DB_WORKUNIT db_wu; + db_wu.id = wu_result.workunit.id; + int vals[2]; + retval = db_wu.get_field_ints("hr_class, app_version_id", 2, vals); + if (retval) { + log_messages.printf(MSG_CRITICAL, + "can't get fields for [WU#%d]: %s\n", db_wu.id, boincerror(retval) + ); return false; } + if (app_hr_type(*app)) { + wu_result.workunit.hr_class = vals[0]; + if (already_sent_to_different_hr_class( wu_result.workunit, *app)) { + if (config.debug_send) { + log_messages.printf(MSG_NORMAL, + "[send] [HOST#%d] [WU#%d %s] is assigned to different HR class\n", + g_reply->host.id, wu.id, wu.name + ); + } + // Mark the workunit as infeasible. + // This ensures that jobs already assigned to an HR class + // are processed first. + // + wu_result.infeasible_count++; + return false; + } + } + if (app->homogeneous_app_version) { + int wu_avid = vals[1]; + wu_result.workunit.app_version_id = wu_avid; + if (wu_avid && wu_avid != bavp->avp->id) { + if (config.debug_send) { + log_messages.printf(MSG_NORMAL, + "[send] [HOST#%d] [WU#%d %s] is assigned to different app version\n", + g_reply->host.id, wu.id, wu.name + ); + } + wu_result.infeasible_count++; + return false; + } + } } return true; } @@ -308,7 +339,7 @@ static bool scan_work_array() { wu_result.state = g_pid; unlock_sema(); - if (!slow_check(wu_result, wu, app)) { + if (!slow_check(wu_result, wu, app, bavp)) { // if we couldn't send the result to this host, // set its state back to PRESENT // diff --git a/sched/sched_hr.cpp b/sched/sched_hr.cpp index b2486bb136..f2767ac885 100644 --- a/sched/sched_hr.cpp +++ b/sched/sched_hr.cpp @@ -48,57 +48,16 @@ bool hr_unknown_platform(HOST& host) { return true; } -// quick check for platform compatibility +// check for HR compatibility // -bool already_sent_to_different_platform_quick(WORKUNIT& wu, APP& app) { - if (wu.hr_class && (hr_class(g_request->host, app_hr_type(app)) != wu.hr_class)) { +bool already_sent_to_different_hr_class(WORKUNIT& wu, APP& app) { + g_wreq->hr_reject_temp = false; + int host_hr_class = hr_class(g_request->host, app_hr_type(app)); + if (wu.hr_class && (host_hr_class != wu.hr_class)) { + g_wreq->hr_reject_temp = true; return true; } return false; } -// If we've already sent a result of this WU to a different platform -// return true -// else if we haven't sent a result to ANY platform -// update WU with platform code -// return false -// -// (where "platform" is os_name + p_vendor; may want to sharpen this for Unix) -// -// This is "careful" in that it rereads the WU from DB -// -bool already_sent_to_different_platform_careful(WORKUNIT& workunit, APP& app) { - DB_WORKUNIT db_wu; - int retval, wu_hr_class; - char buf[256], buf2[256]; - - // reread hr_class field from DB in case it's changed - // - db_wu.id = workunit.id; - retval = db_wu.get_field_int("hr_class", wu_hr_class); - if (retval) { - log_messages.printf(MSG_CRITICAL, - "can't get hr_class for [WU#%d]: %s\n", db_wu.id, boincerror(retval) - ); - return true; - } - g_wreq->hr_reject_temp = false; - int host_hr_class = hr_class(g_request->host, app_hr_type(app)); - if (wu_hr_class) { - if (host_hr_class != wu_hr_class) { - g_wreq->hr_reject_temp = true; - } - } else { - // do a "careful update" to make sure the WU's hr_class hasn't - // changed since we read it earlier - // - sprintf(buf, "hr_class=%d", host_hr_class); - sprintf(buf2, "hr_class=%d", wu_hr_class); - retval = db_wu.update_field(buf, buf2); - if (retval) return true; - if (boinc_db.affected_rows() != 1) return true; - } - return g_wreq->hr_reject_temp; -} - const char *BOINC_RCSID_4196d9a5b4="$Id$"; diff --git a/sched/sched_hr.h b/sched/sched_hr.h index 9def5de815..06bfc47453 100644 --- a/sched/sched_hr.h +++ b/sched/sched_hr.h @@ -18,11 +18,7 @@ #ifndef __SCHED_HR__ #define __SCHED_HR__ -extern bool already_sent_to_different_platform_quick(WORKUNIT&, APP&); - -extern bool already_sent_to_different_platform_careful( - WORKUNIT& workunit, APP& -); +extern bool already_sent_to_different_hr_class(WORKUNIT& workunit, APP&); extern bool hr_unknown_platform(HOST&); diff --git a/sched/sched_score.cpp b/sched/sched_score.cpp index 9f3ab9d6b0..81f747d78c 100644 --- a/sched/sched_score.cpp +++ b/sched/sched_score.cpp @@ -55,6 +55,8 @@ int read_sendable_result(SCHED_DB_RESULT& result) { return 0; } +// TODO: use slow_check() +// bool wu_is_infeasible_slow( WU_RESULT& wu_result, SCHEDULER_REQUEST&, SCHEDULER_REPLY& ) { @@ -119,7 +121,7 @@ bool wu_is_infeasible_slow( APP* app = ssp->lookup_app(wu_result.workunit.appid); WORKUNIT wu = wu_result.workunit; if (app_hr_type(*app)) { - if (already_sent_to_different_platform_careful(wu, *app)) { + if (already_sent_to_different_hr_class(wu, *app)) { if (config.debug_send) { log_messages.printf(MSG_NORMAL, "[send] [HOST#%d] [WU#%d %s] WU is infeasible (assigned to different platform)\n", diff --git a/sched/sched_send.cpp b/sched/sched_send.cpp index b49d493a56..857535164f 100644 --- a/sched/sched_send.cpp +++ b/sched/sched_send.cpp @@ -737,7 +737,7 @@ int wu_is_infeasible_fast( } return INFEASIBLE_HR; } - if (already_sent_to_different_platform_quick(wu, app)) { + if (already_sent_to_different_hr_class(wu, app)) { if (config.debug_send) { log_messages.printf(MSG_NORMAL, "[send] [HOST#%d] [WU#%d %s] failed quick HR check: WU is class %d, host is class %d\n", @@ -920,13 +920,21 @@ static int insert_deadline_tag(RESULT& result) { return 0; } -// update workunit when send an instance of it: +// update workunit fields when send an instance of it: // - transition time // - app_version_id, if app uses homogeneous app version +// and WU.app_version_id was originally zero +// - hr_class, if we're using HR +// and WU.hr_class was originally zero. +// +// In the latter two cases, the update is conditional on those +// fields still being zero (some other scheduler instance might +// have updated them since we read the WU) // int update_wu_on_send(WORKUNIT wu, time_t x, APP& app, BEST_APP_VERSION& bav) { DB_WORKUNIT dbwu; - char buf[256]; + char buf[256], buf2[256], where_clause[256]; + int retval; dbwu.id = wu.id; @@ -936,12 +944,27 @@ int update_wu_on_send(WORKUNIT wu, time_t x, APP& app, BEST_APP_VERSION& bav) { "transition_time=if(transition_time<%d, transition_time, %d)", (int)x, (int)x ); - if (app.homogeneous_app_version && (bav.avp->id != wu.app_version_id)) { - char buf2[256]; + strcpy(where_clause, ""); + if (app.homogeneous_app_version && wu.app_version_id==0) { sprintf(buf2, ", app_version_id=%d", bav.avp->id); strcat(buf, buf2); + strcpy(where_clause, "app_version_id==0"); } - return dbwu.update_field(buf); + if (app_hr_type(app) && wu.hr_class==0) { + int host_hr_class = hr_class(g_request->host, app_hr_type(app)); + sprintf(buf2, ", hr_class=%d", host_hr_class); + strcat(buf, buf2); + if (strlen(where_clause)) { + strcat(where_clause, " and "); + } + strcat(where_clause, "hr_class=0"); + } + retval = dbwu.update_field(buf, where_clause); + if (retval) return retval; + if (boinc_db.affected_rows() != 1) { + return ERR_DB_NOT_FOUND; + } + return 0; } // return true iff a result for same WU is already being sent @@ -1117,19 +1140,32 @@ int add_result_to_reply( bool resent_result = false; APP* app = ssp->lookup_app(wu.appid); - retval = add_wu_to_reply(wu, *g_reply, app, bavp); - if (retval) return retval; - - // Adjust available disk space. - // In the scheduling locality case, - // reduce the available space by less than the workunit rsc_disk_bound, - // if the host already has the file or the file was not already sent. + // update WU DB record. + // This can fail in normal operation + // (other scheduler already updated hr_class or app_version_id) + // so do it before updating the result. // - if (!locality_scheduling || decrement_disk_space_locality(wu)) { - g_wreq->disk_available -= wu.rsc_disk_bound; + retval = update_wu_on_send( + wu, result.report_deadline + config.report_grace_period, *app, *bavp + ); + if (retval == ERR_DB_NOT_FOUND) { + log_messages.printf(MSG_NORMAL, + "add_result_to_reply: WU already sent to other HR class or app version\n" + ); + return retval; + } else if (retval) { + log_messages.printf(MSG_CRITICAL, + "add_result_to_reply: WU update failed: %d\n", + retval + ); + return retval; } - // update the result in DB + // update result DB record. + // This can also fail in normal operation. + // In this case, in principle we should undo + // the changes we just made to the WU (or use a transaction) + // but I don't think it actually matters. // result.hostid = g_reply->host.id; result.userid = g_reply->user.id; @@ -1169,6 +1205,21 @@ int add_result_to_reply( } if (retval) return retval; + // done with DB updates. + // + + retval = add_wu_to_reply(wu, *g_reply, app, bavp); + if (retval) return retval; + + // Adjust available disk space. + // In the locality scheduling locality case, + // reduce the available space by less than the workunit rsc_disk_bound, + // if the host already has the file or the file was not already sent. + // + if (!locality_scheduling || decrement_disk_space_locality(wu)) { + g_wreq->disk_available -= wu.rsc_disk_bound; + } + double est_dur = estimate_duration(wu, *bavp); if (config.debug_send) { log_messages.printf(MSG_NORMAL, @@ -1177,17 +1228,6 @@ int add_result_to_reply( ); } - retval = update_wu_on_send( - wu, result.report_deadline + config.report_grace_period, *app, *bavp - ); - if (retval) { - log_messages.printf(MSG_CRITICAL, - "add_result_to_reply: can't update WU transition time: %d\n", - retval - ); - return retval; - } - // The following overwrites the result's xml_doc field. // But that's OK cuz we're done with DB updates //