mirror of https://github.com/BOINC/boinc.git
- scheduler: fix bug in the "homogeneous app version" (HAV) feature
(reported by Kevin Reed). The problem: cache inconsistency. If there are 2 results for the same WU in shared mem, and 2 scheduler instances get them around the same time, they can send them with different app versions. We already fixed this problem for HR by 1) rereading the relevant WU fields while deciding whether to send the result 2) doing a "careful update" of the WU field using a where clause to make sure it wasn't modified in the (short) interval since rereading it. I fixed the HAV problem in the same way, and merged the two mechanisms to combine the DB queries. Also: - The rereads are done in slow_check() (see below). - The careful updates are done in update_wu_on_send(), and this is called *before* doing careful updates on result fields. That way, if the WU updates fail, we don't have orphaned results. - already_sent_to_different_platform_careful() (sic) no longer does DB stuff, so it's merged with already_send_to_different_hr_class() (better name) NOTE: slow_check() is used in array scheduling only. Score-based scheduling uses other code, in which this bug is not yet fixed. Locality scheduling doesn't support HR or HAV at all. This should be unified. svn path=/trunk/boinc/; revision=24484
This commit is contained in:
parent
6297bdbc77
commit
4b826b52a0
|
@ -7776,7 +7776,7 @@ Rom 25 Oct 2011
|
|||
configure.ac
|
||||
version.h
|
||||
|
||||
David 26 Oct 2011
|
||||
David 25 Oct 2011
|
||||
- web: typo in forum RSS from Daniel L G; fixes #1147
|
||||
- client: message tweak
|
||||
|
||||
|
@ -7784,3 +7784,42 @@ David 26 Oct 2011
|
|||
forum_rss.inc
|
||||
client/
|
||||
cpu_sched.cpp
|
||||
|
||||
David 25 Oct 2011
|
||||
- scheduler: fix bug in the "homogeneous app version" (HAV) feature
|
||||
(reported by Kevin Reed).
|
||||
The problem: cache inconsistency.
|
||||
If there are 2 results for the same WU in shared mem,
|
||||
and 2 scheduler instances get them around the same time,
|
||||
they can send them with different app versions.
|
||||
We already fixed this problem for HR by
|
||||
1) rereading the relevant WU fields while deciding
|
||||
whether to send the result
|
||||
2) doing a "careful update" of the WU field using a where clause
|
||||
to make sure it wasn't modified in the (short) interval
|
||||
since rereading it.
|
||||
I fixed the HAV problem in the same way,
|
||||
and merged the two mechanisms to combine the DB queries.
|
||||
|
||||
Also:
|
||||
- The rereads are done in slow_check() (see below).
|
||||
- The careful updates are done in update_wu_on_send(),
|
||||
and this is called *before* doing careful updates on result fields.
|
||||
That way, if the WU updates fail, we don't have orphaned results.
|
||||
- already_sent_to_different_platform_careful() (sic)
|
||||
no longer does DB stuff, so it's merged with
|
||||
already_send_to_different_hr_class() (better name)
|
||||
|
||||
NOTE: slow_check() is used in array scheduling only.
|
||||
Score-based scheduling uses other code,
|
||||
in which this bug is not yet fixed.
|
||||
Locality scheduling doesn't support HR or HAV at all.
|
||||
This should be unified.
|
||||
|
||||
db/
|
||||
db_base.cpp,h
|
||||
sched/
|
||||
sched_hr.cpp,h
|
||||
sched_array.cpp
|
||||
sched_send.cpp
|
||||
sched_score.cpp
|
||||
|
|
|
@ -256,19 +256,25 @@ int DB_BASE::delete_from_db() {
|
|||
return db->do_query(query);
|
||||
}
|
||||
|
||||
int DB_BASE::get_field_int(const char* field, int& val) {
|
||||
int DB_BASE::get_field_ints(const char* fields, int nfields, int* vals) {
|
||||
char query[MAX_QUERY_LEN];
|
||||
int retval;
|
||||
MYSQL_ROW row;
|
||||
MYSQL_RES* rp;
|
||||
|
||||
sprintf(query, "select %s from %s where id=%d", field, table_name, get_id());
|
||||
sprintf(query,
|
||||
"select %s from %s where id=%d", fields, table_name, get_id()
|
||||
);
|
||||
retval = db->do_query(query);
|
||||
if (retval) return retval;
|
||||
rp = mysql_store_result(db->mysql);
|
||||
if (!rp) return -1;
|
||||
row = mysql_fetch_row(rp);
|
||||
if (row) val = atoi(row[0]);
|
||||
if (row) {
|
||||
for (int i=0; i<nfields; i++) {
|
||||
vals[i] = atoi(row[i]);
|
||||
}
|
||||
}
|
||||
mysql_free_result(rp);
|
||||
if (row == 0) return ERR_DB_NOT_FOUND;
|
||||
return 0;
|
||||
|
@ -280,7 +286,9 @@ int DB_BASE::get_field_str(const char* field, char* buf, int buflen) {
|
|||
MYSQL_ROW row;
|
||||
MYSQL_RES* rp;
|
||||
|
||||
sprintf(query, "select %s from %s where id=%d", field, table_name, get_id());
|
||||
sprintf(query,
|
||||
"select %s from %s where id=%d", field, table_name, get_id()
|
||||
);
|
||||
retval = db->do_query(query);
|
||||
if (retval) return retval;
|
||||
rp = mysql_store_result(db->mysql);
|
||||
|
|
|
@ -98,7 +98,7 @@ public:
|
|||
int update_field(const char*, const char* where_clause=NULL);
|
||||
int update_fields_noid(char* set_clause, char* where_clause);
|
||||
int delete_from_db();
|
||||
int get_field_int(const char*, int&);
|
||||
int get_field_ints(const char*, int, int*);
|
||||
int get_field_str(const char*, char*, int);
|
||||
int lookup_id(int id);
|
||||
int lookup(const char*);
|
||||
|
|
|
@ -153,8 +153,11 @@ static bool quick_check(
|
|||
}
|
||||
|
||||
// do slow checks (ones that require DB access)
|
||||
// return true if OK to send
|
||||
//
|
||||
static bool slow_check(WU_RESULT& wu_result, WORKUNIT& wu, APP* app) {
|
||||
static bool slow_check(
|
||||
WU_RESULT& wu_result, WORKUNIT& wu, APP* app, BEST_APP_VERSION* bavp
|
||||
) {
|
||||
int n, retval;
|
||||
DB_RESULT result;
|
||||
char buf[256];
|
||||
|
@ -212,23 +215,51 @@ static bool slow_check(WU_RESULT& wu_result, WORKUNIT& wu, APP* app) {
|
|||
}
|
||||
}
|
||||
|
||||
if (app_hr_type(*app)) {
|
||||
if (already_sent_to_different_platform_careful(
|
||||
wu_result.workunit, *app
|
||||
)) {
|
||||
if (config.debug_send) {
|
||||
log_messages.printf(MSG_NORMAL,
|
||||
"[send] [HOST#%d] [WU#%d %s] is assigned to different platform\n",
|
||||
g_reply->host.id, wu.id, wu.name
|
||||
);
|
||||
}
|
||||
// Mark the workunit as infeasible.
|
||||
// This ensures that jobs already assigned to a platform
|
||||
// are processed first.
|
||||
//
|
||||
wu_result.infeasible_count++;
|
||||
// Checks that require looking up the WU.
|
||||
// Lump these together so we only do 1 lookup
|
||||
//
|
||||
if (app_hr_type(*app) || app->homogeneous_app_version) {
|
||||
DB_WORKUNIT db_wu;
|
||||
db_wu.id = wu_result.workunit.id;
|
||||
int vals[2];
|
||||
retval = db_wu.get_field_ints("hr_class, app_version_id", 2, vals);
|
||||
if (retval) {
|
||||
log_messages.printf(MSG_CRITICAL,
|
||||
"can't get fields for [WU#%d]: %s\n", db_wu.id, boincerror(retval)
|
||||
);
|
||||
return false;
|
||||
}
|
||||
if (app_hr_type(*app)) {
|
||||
wu_result.workunit.hr_class = vals[0];
|
||||
if (already_sent_to_different_hr_class( wu_result.workunit, *app)) {
|
||||
if (config.debug_send) {
|
||||
log_messages.printf(MSG_NORMAL,
|
||||
"[send] [HOST#%d] [WU#%d %s] is assigned to different HR class\n",
|
||||
g_reply->host.id, wu.id, wu.name
|
||||
);
|
||||
}
|
||||
// Mark the workunit as infeasible.
|
||||
// This ensures that jobs already assigned to an HR class
|
||||
// are processed first.
|
||||
//
|
||||
wu_result.infeasible_count++;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
if (app->homogeneous_app_version) {
|
||||
int wu_avid = vals[1];
|
||||
wu_result.workunit.app_version_id = wu_avid;
|
||||
if (wu_avid && wu_avid != bavp->avp->id) {
|
||||
if (config.debug_send) {
|
||||
log_messages.printf(MSG_NORMAL,
|
||||
"[send] [HOST#%d] [WU#%d %s] is assigned to different app version\n",
|
||||
g_reply->host.id, wu.id, wu.name
|
||||
);
|
||||
}
|
||||
wu_result.infeasible_count++;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -308,7 +339,7 @@ static bool scan_work_array() {
|
|||
wu_result.state = g_pid;
|
||||
unlock_sema();
|
||||
|
||||
if (!slow_check(wu_result, wu, app)) {
|
||||
if (!slow_check(wu_result, wu, app, bavp)) {
|
||||
// if we couldn't send the result to this host,
|
||||
// set its state back to PRESENT
|
||||
//
|
||||
|
|
|
@ -48,57 +48,16 @@ bool hr_unknown_platform(HOST& host) {
|
|||
return true;
|
||||
}
|
||||
|
||||
// quick check for platform compatibility
|
||||
// check for HR compatibility
|
||||
//
|
||||
bool already_sent_to_different_platform_quick(WORKUNIT& wu, APP& app) {
|
||||
if (wu.hr_class && (hr_class(g_request->host, app_hr_type(app)) != wu.hr_class)) {
|
||||
bool already_sent_to_different_hr_class(WORKUNIT& wu, APP& app) {
|
||||
g_wreq->hr_reject_temp = false;
|
||||
int host_hr_class = hr_class(g_request->host, app_hr_type(app));
|
||||
if (wu.hr_class && (host_hr_class != wu.hr_class)) {
|
||||
g_wreq->hr_reject_temp = true;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// If we've already sent a result of this WU to a different platform
|
||||
// return true
|
||||
// else if we haven't sent a result to ANY platform
|
||||
// update WU with platform code
|
||||
// return false
|
||||
//
|
||||
// (where "platform" is os_name + p_vendor; may want to sharpen this for Unix)
|
||||
//
|
||||
// This is "careful" in that it rereads the WU from DB
|
||||
//
|
||||
bool already_sent_to_different_platform_careful(WORKUNIT& workunit, APP& app) {
|
||||
DB_WORKUNIT db_wu;
|
||||
int retval, wu_hr_class;
|
||||
char buf[256], buf2[256];
|
||||
|
||||
// reread hr_class field from DB in case it's changed
|
||||
//
|
||||
db_wu.id = workunit.id;
|
||||
retval = db_wu.get_field_int("hr_class", wu_hr_class);
|
||||
if (retval) {
|
||||
log_messages.printf(MSG_CRITICAL,
|
||||
"can't get hr_class for [WU#%d]: %s\n", db_wu.id, boincerror(retval)
|
||||
);
|
||||
return true;
|
||||
}
|
||||
g_wreq->hr_reject_temp = false;
|
||||
int host_hr_class = hr_class(g_request->host, app_hr_type(app));
|
||||
if (wu_hr_class) {
|
||||
if (host_hr_class != wu_hr_class) {
|
||||
g_wreq->hr_reject_temp = true;
|
||||
}
|
||||
} else {
|
||||
// do a "careful update" to make sure the WU's hr_class hasn't
|
||||
// changed since we read it earlier
|
||||
//
|
||||
sprintf(buf, "hr_class=%d", host_hr_class);
|
||||
sprintf(buf2, "hr_class=%d", wu_hr_class);
|
||||
retval = db_wu.update_field(buf, buf2);
|
||||
if (retval) return true;
|
||||
if (boinc_db.affected_rows() != 1) return true;
|
||||
}
|
||||
return g_wreq->hr_reject_temp;
|
||||
}
|
||||
|
||||
const char *BOINC_RCSID_4196d9a5b4="$Id$";
|
||||
|
|
|
@ -18,11 +18,7 @@
|
|||
#ifndef __SCHED_HR__
|
||||
#define __SCHED_HR__
|
||||
|
||||
extern bool already_sent_to_different_platform_quick(WORKUNIT&, APP&);
|
||||
|
||||
extern bool already_sent_to_different_platform_careful(
|
||||
WORKUNIT& workunit, APP&
|
||||
);
|
||||
extern bool already_sent_to_different_hr_class(WORKUNIT& workunit, APP&);
|
||||
|
||||
extern bool hr_unknown_platform(HOST&);
|
||||
|
||||
|
|
|
@ -55,6 +55,8 @@ int read_sendable_result(SCHED_DB_RESULT& result) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
// TODO: use slow_check()
|
||||
//
|
||||
bool wu_is_infeasible_slow(
|
||||
WU_RESULT& wu_result, SCHEDULER_REQUEST&, SCHEDULER_REPLY&
|
||||
) {
|
||||
|
@ -119,7 +121,7 @@ bool wu_is_infeasible_slow(
|
|||
APP* app = ssp->lookup_app(wu_result.workunit.appid);
|
||||
WORKUNIT wu = wu_result.workunit;
|
||||
if (app_hr_type(*app)) {
|
||||
if (already_sent_to_different_platform_careful(wu, *app)) {
|
||||
if (already_sent_to_different_hr_class(wu, *app)) {
|
||||
if (config.debug_send) {
|
||||
log_messages.printf(MSG_NORMAL,
|
||||
"[send] [HOST#%d] [WU#%d %s] WU is infeasible (assigned to different platform)\n",
|
||||
|
|
|
@ -737,7 +737,7 @@ int wu_is_infeasible_fast(
|
|||
}
|
||||
return INFEASIBLE_HR;
|
||||
}
|
||||
if (already_sent_to_different_platform_quick(wu, app)) {
|
||||
if (already_sent_to_different_hr_class(wu, app)) {
|
||||
if (config.debug_send) {
|
||||
log_messages.printf(MSG_NORMAL,
|
||||
"[send] [HOST#%d] [WU#%d %s] failed quick HR check: WU is class %d, host is class %d\n",
|
||||
|
@ -920,13 +920,21 @@ static int insert_deadline_tag(RESULT& result) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
// update workunit when send an instance of it:
|
||||
// update workunit fields when send an instance of it:
|
||||
// - transition time
|
||||
// - app_version_id, if app uses homogeneous app version
|
||||
// and WU.app_version_id was originally zero
|
||||
// - hr_class, if we're using HR
|
||||
// and WU.hr_class was originally zero.
|
||||
//
|
||||
// In the latter two cases, the update is conditional on those
|
||||
// fields still being zero (some other scheduler instance might
|
||||
// have updated them since we read the WU)
|
||||
//
|
||||
int update_wu_on_send(WORKUNIT wu, time_t x, APP& app, BEST_APP_VERSION& bav) {
|
||||
DB_WORKUNIT dbwu;
|
||||
char buf[256];
|
||||
char buf[256], buf2[256], where_clause[256];
|
||||
int retval;
|
||||
|
||||
dbwu.id = wu.id;
|
||||
|
||||
|
@ -936,12 +944,27 @@ int update_wu_on_send(WORKUNIT wu, time_t x, APP& app, BEST_APP_VERSION& bav) {
|
|||
"transition_time=if(transition_time<%d, transition_time, %d)",
|
||||
(int)x, (int)x
|
||||
);
|
||||
if (app.homogeneous_app_version && (bav.avp->id != wu.app_version_id)) {
|
||||
char buf2[256];
|
||||
strcpy(where_clause, "");
|
||||
if (app.homogeneous_app_version && wu.app_version_id==0) {
|
||||
sprintf(buf2, ", app_version_id=%d", bav.avp->id);
|
||||
strcat(buf, buf2);
|
||||
strcpy(where_clause, "app_version_id==0");
|
||||
}
|
||||
return dbwu.update_field(buf);
|
||||
if (app_hr_type(app) && wu.hr_class==0) {
|
||||
int host_hr_class = hr_class(g_request->host, app_hr_type(app));
|
||||
sprintf(buf2, ", hr_class=%d", host_hr_class);
|
||||
strcat(buf, buf2);
|
||||
if (strlen(where_clause)) {
|
||||
strcat(where_clause, " and ");
|
||||
}
|
||||
strcat(where_clause, "hr_class=0");
|
||||
}
|
||||
retval = dbwu.update_field(buf, where_clause);
|
||||
if (retval) return retval;
|
||||
if (boinc_db.affected_rows() != 1) {
|
||||
return ERR_DB_NOT_FOUND;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
// return true iff a result for same WU is already being sent
|
||||
|
@ -1117,19 +1140,32 @@ int add_result_to_reply(
|
|||
bool resent_result = false;
|
||||
APP* app = ssp->lookup_app(wu.appid);
|
||||
|
||||
retval = add_wu_to_reply(wu, *g_reply, app, bavp);
|
||||
if (retval) return retval;
|
||||
|
||||
// Adjust available disk space.
|
||||
// In the scheduling locality case,
|
||||
// reduce the available space by less than the workunit rsc_disk_bound,
|
||||
// if the host already has the file or the file was not already sent.
|
||||
// update WU DB record.
|
||||
// This can fail in normal operation
|
||||
// (other scheduler already updated hr_class or app_version_id)
|
||||
// so do it before updating the result.
|
||||
//
|
||||
if (!locality_scheduling || decrement_disk_space_locality(wu)) {
|
||||
g_wreq->disk_available -= wu.rsc_disk_bound;
|
||||
retval = update_wu_on_send(
|
||||
wu, result.report_deadline + config.report_grace_period, *app, *bavp
|
||||
);
|
||||
if (retval == ERR_DB_NOT_FOUND) {
|
||||
log_messages.printf(MSG_NORMAL,
|
||||
"add_result_to_reply: WU already sent to other HR class or app version\n"
|
||||
);
|
||||
return retval;
|
||||
} else if (retval) {
|
||||
log_messages.printf(MSG_CRITICAL,
|
||||
"add_result_to_reply: WU update failed: %d\n",
|
||||
retval
|
||||
);
|
||||
return retval;
|
||||
}
|
||||
|
||||
// update the result in DB
|
||||
// update result DB record.
|
||||
// This can also fail in normal operation.
|
||||
// In this case, in principle we should undo
|
||||
// the changes we just made to the WU (or use a transaction)
|
||||
// but I don't think it actually matters.
|
||||
//
|
||||
result.hostid = g_reply->host.id;
|
||||
result.userid = g_reply->user.id;
|
||||
|
@ -1169,6 +1205,21 @@ int add_result_to_reply(
|
|||
}
|
||||
if (retval) return retval;
|
||||
|
||||
// done with DB updates.
|
||||
//
|
||||
|
||||
retval = add_wu_to_reply(wu, *g_reply, app, bavp);
|
||||
if (retval) return retval;
|
||||
|
||||
// Adjust available disk space.
|
||||
// In the locality scheduling locality case,
|
||||
// reduce the available space by less than the workunit rsc_disk_bound,
|
||||
// if the host already has the file or the file was not already sent.
|
||||
//
|
||||
if (!locality_scheduling || decrement_disk_space_locality(wu)) {
|
||||
g_wreq->disk_available -= wu.rsc_disk_bound;
|
||||
}
|
||||
|
||||
double est_dur = estimate_duration(wu, *bavp);
|
||||
if (config.debug_send) {
|
||||
log_messages.printf(MSG_NORMAL,
|
||||
|
@ -1177,17 +1228,6 @@ int add_result_to_reply(
|
|||
);
|
||||
}
|
||||
|
||||
retval = update_wu_on_send(
|
||||
wu, result.report_deadline + config.report_grace_period, *app, *bavp
|
||||
);
|
||||
if (retval) {
|
||||
log_messages.printf(MSG_CRITICAL,
|
||||
"add_result_to_reply: can't update WU transition time: %d\n",
|
||||
retval
|
||||
);
|
||||
return retval;
|
||||
}
|
||||
|
||||
// The following overwrites the result's xml_doc field.
|
||||
// But that's OK cuz we're done with DB updates
|
||||
//
|
||||
|
|
Loading…
Reference in New Issue