From 4b826b52a0317f548fc45a8781e16f50673b7d49 Mon Sep 17 00:00:00 2001 From: David Anderson Date: Wed, 26 Oct 2011 07:15:22 +0000 Subject: [PATCH] - scheduler: fix bug in the "homogeneous app version" (HAV) feature (reported by Kevin Reed). The problem: cache inconsistency. If there are 2 results for the same WU in shared mem, and 2 scheduler instances get them around the same time, they can send them with different app versions. We already fixed this problem for HR by 1) rereading the relevant WU fields while deciding whether to send the result 2) doing a "careful update" of the WU field using a where clause to make sure it wasn't modified in the (short) interval since rereading it. I fixed the HAV problem in the same way, and merged the two mechanisms to combine the DB queries. Also: - The rereads are done in slow_check() (see below). - The careful updates are done in update_wu_on_send(), and this is called *before* doing careful updates on result fields. That way, if the WU updates fail, we don't have orphaned results. - already_sent_to_different_platform_careful() (sic) no longer does DB stuff, so it's merged with already_send_to_different_hr_class() (better name) NOTE: slow_check() is used in array scheduling only. Score-based scheduling uses other code, in which this bug is not yet fixed. Locality scheduling doesn't support HR or HAV at all. This should be unified. svn path=/trunk/boinc/; revision=24484 --- checkin_notes | 41 ++++++++++++++++++- db/db_base.cpp | 16 ++++++-- db/db_base.h | 2 +- sched/sched_array.cpp | 65 ++++++++++++++++++++++-------- sched/sched_hr.cpp | 53 +++--------------------- sched/sched_hr.h | 6 +-- sched/sched_score.cpp | 4 +- sched/sched_send.cpp | 94 ++++++++++++++++++++++++++++++------------- 8 files changed, 178 insertions(+), 103 deletions(-) diff --git a/checkin_notes b/checkin_notes index 0dea5196f5..2742290dec 100644 --- a/checkin_notes +++ b/checkin_notes @@ -7776,7 +7776,7 @@ Rom 25 Oct 2011 configure.ac version.h -David 26 Oct 2011 +David 25 Oct 2011 - web: typo in forum RSS from Daniel L G; fixes #1147 - client: message tweak @@ -7784,3 +7784,42 @@ David 26 Oct 2011 forum_rss.inc client/ cpu_sched.cpp + +David 25 Oct 2011 + - scheduler: fix bug in the "homogeneous app version" (HAV) feature + (reported by Kevin Reed). + The problem: cache inconsistency. + If there are 2 results for the same WU in shared mem, + and 2 scheduler instances get them around the same time, + they can send them with different app versions. + We already fixed this problem for HR by + 1) rereading the relevant WU fields while deciding + whether to send the result + 2) doing a "careful update" of the WU field using a where clause + to make sure it wasn't modified in the (short) interval + since rereading it. + I fixed the HAV problem in the same way, + and merged the two mechanisms to combine the DB queries. + + Also: + - The rereads are done in slow_check() (see below). + - The careful updates are done in update_wu_on_send(), + and this is called *before* doing careful updates on result fields. + That way, if the WU updates fail, we don't have orphaned results. + - already_sent_to_different_platform_careful() (sic) + no longer does DB stuff, so it's merged with + already_send_to_different_hr_class() (better name) + + NOTE: slow_check() is used in array scheduling only. + Score-based scheduling uses other code, + in which this bug is not yet fixed. + Locality scheduling doesn't support HR or HAV at all. + This should be unified. + + db/ + db_base.cpp,h + sched/ + sched_hr.cpp,h + sched_array.cpp + sched_send.cpp + sched_score.cpp diff --git a/db/db_base.cpp b/db/db_base.cpp index 0ca99f9ab9..4d0d6ff986 100644 --- a/db/db_base.cpp +++ b/db/db_base.cpp @@ -256,19 +256,25 @@ int DB_BASE::delete_from_db() { return db->do_query(query); } -int DB_BASE::get_field_int(const char* field, int& val) { +int DB_BASE::get_field_ints(const char* fields, int nfields, int* vals) { char query[MAX_QUERY_LEN]; int retval; MYSQL_ROW row; MYSQL_RES* rp; - sprintf(query, "select %s from %s where id=%d", field, table_name, get_id()); + sprintf(query, + "select %s from %s where id=%d", fields, table_name, get_id() + ); retval = db->do_query(query); if (retval) return retval; rp = mysql_store_result(db->mysql); if (!rp) return -1; row = mysql_fetch_row(rp); - if (row) val = atoi(row[0]); + if (row) { + for (int i=0; ido_query(query); if (retval) return retval; rp = mysql_store_result(db->mysql); diff --git a/db/db_base.h b/db/db_base.h index f10e72745b..dd05b57cc7 100644 --- a/db/db_base.h +++ b/db/db_base.h @@ -98,7 +98,7 @@ public: int update_field(const char*, const char* where_clause=NULL); int update_fields_noid(char* set_clause, char* where_clause); int delete_from_db(); - int get_field_int(const char*, int&); + int get_field_ints(const char*, int, int*); int get_field_str(const char*, char*, int); int lookup_id(int id); int lookup(const char*); diff --git a/sched/sched_array.cpp b/sched/sched_array.cpp index b52457be66..f30ef237e1 100644 --- a/sched/sched_array.cpp +++ b/sched/sched_array.cpp @@ -153,8 +153,11 @@ static bool quick_check( } // do slow checks (ones that require DB access) +// return true if OK to send // -static bool slow_check(WU_RESULT& wu_result, WORKUNIT& wu, APP* app) { +static bool slow_check( + WU_RESULT& wu_result, WORKUNIT& wu, APP* app, BEST_APP_VERSION* bavp +) { int n, retval; DB_RESULT result; char buf[256]; @@ -212,23 +215,51 @@ static bool slow_check(WU_RESULT& wu_result, WORKUNIT& wu, APP* app) { } } - if (app_hr_type(*app)) { - if (already_sent_to_different_platform_careful( - wu_result.workunit, *app - )) { - if (config.debug_send) { - log_messages.printf(MSG_NORMAL, - "[send] [HOST#%d] [WU#%d %s] is assigned to different platform\n", - g_reply->host.id, wu.id, wu.name - ); - } - // Mark the workunit as infeasible. - // This ensures that jobs already assigned to a platform - // are processed first. - // - wu_result.infeasible_count++; + // Checks that require looking up the WU. + // Lump these together so we only do 1 lookup + // + if (app_hr_type(*app) || app->homogeneous_app_version) { + DB_WORKUNIT db_wu; + db_wu.id = wu_result.workunit.id; + int vals[2]; + retval = db_wu.get_field_ints("hr_class, app_version_id", 2, vals); + if (retval) { + log_messages.printf(MSG_CRITICAL, + "can't get fields for [WU#%d]: %s\n", db_wu.id, boincerror(retval) + ); return false; } + if (app_hr_type(*app)) { + wu_result.workunit.hr_class = vals[0]; + if (already_sent_to_different_hr_class( wu_result.workunit, *app)) { + if (config.debug_send) { + log_messages.printf(MSG_NORMAL, + "[send] [HOST#%d] [WU#%d %s] is assigned to different HR class\n", + g_reply->host.id, wu.id, wu.name + ); + } + // Mark the workunit as infeasible. + // This ensures that jobs already assigned to an HR class + // are processed first. + // + wu_result.infeasible_count++; + return false; + } + } + if (app->homogeneous_app_version) { + int wu_avid = vals[1]; + wu_result.workunit.app_version_id = wu_avid; + if (wu_avid && wu_avid != bavp->avp->id) { + if (config.debug_send) { + log_messages.printf(MSG_NORMAL, + "[send] [HOST#%d] [WU#%d %s] is assigned to different app version\n", + g_reply->host.id, wu.id, wu.name + ); + } + wu_result.infeasible_count++; + return false; + } + } } return true; } @@ -308,7 +339,7 @@ static bool scan_work_array() { wu_result.state = g_pid; unlock_sema(); - if (!slow_check(wu_result, wu, app)) { + if (!slow_check(wu_result, wu, app, bavp)) { // if we couldn't send the result to this host, // set its state back to PRESENT // diff --git a/sched/sched_hr.cpp b/sched/sched_hr.cpp index b2486bb136..f2767ac885 100644 --- a/sched/sched_hr.cpp +++ b/sched/sched_hr.cpp @@ -48,57 +48,16 @@ bool hr_unknown_platform(HOST& host) { return true; } -// quick check for platform compatibility +// check for HR compatibility // -bool already_sent_to_different_platform_quick(WORKUNIT& wu, APP& app) { - if (wu.hr_class && (hr_class(g_request->host, app_hr_type(app)) != wu.hr_class)) { +bool already_sent_to_different_hr_class(WORKUNIT& wu, APP& app) { + g_wreq->hr_reject_temp = false; + int host_hr_class = hr_class(g_request->host, app_hr_type(app)); + if (wu.hr_class && (host_hr_class != wu.hr_class)) { + g_wreq->hr_reject_temp = true; return true; } return false; } -// If we've already sent a result of this WU to a different platform -// return true -// else if we haven't sent a result to ANY platform -// update WU with platform code -// return false -// -// (where "platform" is os_name + p_vendor; may want to sharpen this for Unix) -// -// This is "careful" in that it rereads the WU from DB -// -bool already_sent_to_different_platform_careful(WORKUNIT& workunit, APP& app) { - DB_WORKUNIT db_wu; - int retval, wu_hr_class; - char buf[256], buf2[256]; - - // reread hr_class field from DB in case it's changed - // - db_wu.id = workunit.id; - retval = db_wu.get_field_int("hr_class", wu_hr_class); - if (retval) { - log_messages.printf(MSG_CRITICAL, - "can't get hr_class for [WU#%d]: %s\n", db_wu.id, boincerror(retval) - ); - return true; - } - g_wreq->hr_reject_temp = false; - int host_hr_class = hr_class(g_request->host, app_hr_type(app)); - if (wu_hr_class) { - if (host_hr_class != wu_hr_class) { - g_wreq->hr_reject_temp = true; - } - } else { - // do a "careful update" to make sure the WU's hr_class hasn't - // changed since we read it earlier - // - sprintf(buf, "hr_class=%d", host_hr_class); - sprintf(buf2, "hr_class=%d", wu_hr_class); - retval = db_wu.update_field(buf, buf2); - if (retval) return true; - if (boinc_db.affected_rows() != 1) return true; - } - return g_wreq->hr_reject_temp; -} - const char *BOINC_RCSID_4196d9a5b4="$Id$"; diff --git a/sched/sched_hr.h b/sched/sched_hr.h index 9def5de815..06bfc47453 100644 --- a/sched/sched_hr.h +++ b/sched/sched_hr.h @@ -18,11 +18,7 @@ #ifndef __SCHED_HR__ #define __SCHED_HR__ -extern bool already_sent_to_different_platform_quick(WORKUNIT&, APP&); - -extern bool already_sent_to_different_platform_careful( - WORKUNIT& workunit, APP& -); +extern bool already_sent_to_different_hr_class(WORKUNIT& workunit, APP&); extern bool hr_unknown_platform(HOST&); diff --git a/sched/sched_score.cpp b/sched/sched_score.cpp index 9f3ab9d6b0..81f747d78c 100644 --- a/sched/sched_score.cpp +++ b/sched/sched_score.cpp @@ -55,6 +55,8 @@ int read_sendable_result(SCHED_DB_RESULT& result) { return 0; } +// TODO: use slow_check() +// bool wu_is_infeasible_slow( WU_RESULT& wu_result, SCHEDULER_REQUEST&, SCHEDULER_REPLY& ) { @@ -119,7 +121,7 @@ bool wu_is_infeasible_slow( APP* app = ssp->lookup_app(wu_result.workunit.appid); WORKUNIT wu = wu_result.workunit; if (app_hr_type(*app)) { - if (already_sent_to_different_platform_careful(wu, *app)) { + if (already_sent_to_different_hr_class(wu, *app)) { if (config.debug_send) { log_messages.printf(MSG_NORMAL, "[send] [HOST#%d] [WU#%d %s] WU is infeasible (assigned to different platform)\n", diff --git a/sched/sched_send.cpp b/sched/sched_send.cpp index b49d493a56..857535164f 100644 --- a/sched/sched_send.cpp +++ b/sched/sched_send.cpp @@ -737,7 +737,7 @@ int wu_is_infeasible_fast( } return INFEASIBLE_HR; } - if (already_sent_to_different_platform_quick(wu, app)) { + if (already_sent_to_different_hr_class(wu, app)) { if (config.debug_send) { log_messages.printf(MSG_NORMAL, "[send] [HOST#%d] [WU#%d %s] failed quick HR check: WU is class %d, host is class %d\n", @@ -920,13 +920,21 @@ static int insert_deadline_tag(RESULT& result) { return 0; } -// update workunit when send an instance of it: +// update workunit fields when send an instance of it: // - transition time // - app_version_id, if app uses homogeneous app version +// and WU.app_version_id was originally zero +// - hr_class, if we're using HR +// and WU.hr_class was originally zero. +// +// In the latter two cases, the update is conditional on those +// fields still being zero (some other scheduler instance might +// have updated them since we read the WU) // int update_wu_on_send(WORKUNIT wu, time_t x, APP& app, BEST_APP_VERSION& bav) { DB_WORKUNIT dbwu; - char buf[256]; + char buf[256], buf2[256], where_clause[256]; + int retval; dbwu.id = wu.id; @@ -936,12 +944,27 @@ int update_wu_on_send(WORKUNIT wu, time_t x, APP& app, BEST_APP_VERSION& bav) { "transition_time=if(transition_time<%d, transition_time, %d)", (int)x, (int)x ); - if (app.homogeneous_app_version && (bav.avp->id != wu.app_version_id)) { - char buf2[256]; + strcpy(where_clause, ""); + if (app.homogeneous_app_version && wu.app_version_id==0) { sprintf(buf2, ", app_version_id=%d", bav.avp->id); strcat(buf, buf2); + strcpy(where_clause, "app_version_id==0"); } - return dbwu.update_field(buf); + if (app_hr_type(app) && wu.hr_class==0) { + int host_hr_class = hr_class(g_request->host, app_hr_type(app)); + sprintf(buf2, ", hr_class=%d", host_hr_class); + strcat(buf, buf2); + if (strlen(where_clause)) { + strcat(where_clause, " and "); + } + strcat(where_clause, "hr_class=0"); + } + retval = dbwu.update_field(buf, where_clause); + if (retval) return retval; + if (boinc_db.affected_rows() != 1) { + return ERR_DB_NOT_FOUND; + } + return 0; } // return true iff a result for same WU is already being sent @@ -1117,19 +1140,32 @@ int add_result_to_reply( bool resent_result = false; APP* app = ssp->lookup_app(wu.appid); - retval = add_wu_to_reply(wu, *g_reply, app, bavp); - if (retval) return retval; - - // Adjust available disk space. - // In the scheduling locality case, - // reduce the available space by less than the workunit rsc_disk_bound, - // if the host already has the file or the file was not already sent. + // update WU DB record. + // This can fail in normal operation + // (other scheduler already updated hr_class or app_version_id) + // so do it before updating the result. // - if (!locality_scheduling || decrement_disk_space_locality(wu)) { - g_wreq->disk_available -= wu.rsc_disk_bound; + retval = update_wu_on_send( + wu, result.report_deadline + config.report_grace_period, *app, *bavp + ); + if (retval == ERR_DB_NOT_FOUND) { + log_messages.printf(MSG_NORMAL, + "add_result_to_reply: WU already sent to other HR class or app version\n" + ); + return retval; + } else if (retval) { + log_messages.printf(MSG_CRITICAL, + "add_result_to_reply: WU update failed: %d\n", + retval + ); + return retval; } - // update the result in DB + // update result DB record. + // This can also fail in normal operation. + // In this case, in principle we should undo + // the changes we just made to the WU (or use a transaction) + // but I don't think it actually matters. // result.hostid = g_reply->host.id; result.userid = g_reply->user.id; @@ -1169,6 +1205,21 @@ int add_result_to_reply( } if (retval) return retval; + // done with DB updates. + // + + retval = add_wu_to_reply(wu, *g_reply, app, bavp); + if (retval) return retval; + + // Adjust available disk space. + // In the locality scheduling locality case, + // reduce the available space by less than the workunit rsc_disk_bound, + // if the host already has the file or the file was not already sent. + // + if (!locality_scheduling || decrement_disk_space_locality(wu)) { + g_wreq->disk_available -= wu.rsc_disk_bound; + } + double est_dur = estimate_duration(wu, *bavp); if (config.debug_send) { log_messages.printf(MSG_NORMAL, @@ -1177,17 +1228,6 @@ int add_result_to_reply( ); } - retval = update_wu_on_send( - wu, result.report_deadline + config.report_grace_period, *app, *bavp - ); - if (retval) { - log_messages.printf(MSG_CRITICAL, - "add_result_to_reply: can't update WU transition time: %d\n", - retval - ); - return retval; - } - // The following overwrites the result's xml_doc field. // But that's OK cuz we're done with DB updates //