*** empty log message ***

svn path=/trunk/boinc/; revision=4307
This commit is contained in:
Lana Alber 2004-10-08 22:41:33 +00:00
parent 28fc427cbf
commit 764c08e5ff
6 changed files with 385 additions and 100 deletions

View File

@ -18195,3 +18195,18 @@ David 8 Oct 2004
wingui_mainwindow.cpp
lib/
prefs.C
Lana 8 Oct 2004
- changed API check_set to take input WORKUNIT instead of
DB_WORKUNIT
- added VALIDATOR_ITEM and DB_VALIDATOR_ITEM_SET
- optimized validator to work with the DB_VALIDATOR_ITEM_SET
instead of going to the database
db/
boinc_db.C, .h
sched/
validator.C
sample_bitwise_validator.C
sample_trivial_validator.C

View File

@ -58,6 +58,7 @@ void WORKUNIT::clear() {memset(this, 0, sizeof(*this));}
void MSG_FROM_HOST::clear() {memset(this, 0, sizeof(*this));}
void MSG_TO_HOST::clear() {memset(this, 0, sizeof(*this));}
void TRANSITIONER_ITEM::clear() {memset(this, 0, sizeof(*this));}
void VALIDATOR_ITEM::clear() {memset(this, 0, sizeof(*this));}
void SCHED_RESULT_ITEM::clear() {memset(this, 0, sizeof(*this));}
DB_PLATFORM::DB_PLATFORM(DB_CONN* dc) :
@ -84,6 +85,8 @@ DB_MSG_TO_HOST::DB_MSG_TO_HOST(DB_CONN* dc) :
DB_BASE("msg_to_host", dc?dc:&boinc_db){}
DB_TRANSITIONER_ITEM_SET::DB_TRANSITIONER_ITEM_SET(DB_CONN* dc) :
DB_BASE_SPECIAL(dc?dc:&boinc_db){}
DB_VALIDATOR_ITEM_SET::DB_VALIDATOR_ITEM_SET(DB_CONN* dc) :
DB_BASE_SPECIAL(dc?dc:&boinc_db){}
DB_WORK_ITEM::DB_WORK_ITEM(DB_CONN* dc) :
DB_BASE_SPECIAL(dc?dc:&boinc_db){}
DB_SCHED_RESULT_ITEM_SET::DB_SCHED_RESULT_ITEM_SET(DB_CONN* dc) :
@ -757,7 +760,8 @@ int DB_TRANSITIONER_ITEM_SET::update_result(TRANSITIONER_ITEM& ti) {
char query[MAX_QUERY_LEN];
sprintf(query,
"update result set server_state=%d, outcome=%d, validate_state=%d, file_delete_state=%d where id=%d",
"update result set server_state=%d, outcome=%d, "
"validate_state=%d, file_delete_state=%d where id=%d",
ti.res_server_state,
ti.res_outcome,
ti.res_validate_state,
@ -771,7 +775,9 @@ int DB_TRANSITIONER_ITEM_SET::update_workunit(TRANSITIONER_ITEM& ti) {
char query[MAX_QUERY_LEN];
sprintf(query,
"update workunit set need_validate=%d, error_mask=%d, assimilate_state=%d, file_delete_state=%d, transition_time=%d where id=%d",
"update workunit set need_validate=%d, error_mask=%d, "
"assimilate_state=%d, file_delete_state=%d, "
"transition_time=%d where id=%d",
ti.need_validate,
ti.error_mask,
ti.assimilate_state,
@ -782,6 +788,215 @@ int DB_TRANSITIONER_ITEM_SET::update_workunit(TRANSITIONER_ITEM& ti) {
return db->do_query(query);
}
void VALIDATOR_ITEM::parse(MYSQL_ROW& r) {
int i=0;
clear();
id = atoi(r[i++]);
appid = atoi(r[i++]);
strcpy2(name, r[i++]);
need_validate= atoi(r[i++]);
canonical_resultid = atoi(r[i++]);
canonical_credit = atof(r[i++]);
min_quorum = atoi(r[i++]);
assimilate_state = atoi(r[i++]);
transition_time = atoi(r[i++]);
opaque = atof(r[i++]);
batch = atoi(r[i++]);
max_success_results = atoi(r[i++]);
error_mask = atoi(r[i++]);
res_id = atoi(r[i++]);
strcpy2(res_name, r[i++]);
res_validate_state = atoi(r[i++]);
res_server_state = atoi(r[i++]);
res_outcome = atoi(r[i++]);
res_claimed_credit = atof(r[i++]);
res_granted_credit = atof(r[i++]);
strcpy2(res_xml_doc_out, r[i++]);
res_cpu_time = atof(r[i++]);
res_batch = atoi(r[i++]);
res_opaque = atof(r[i++]);
res_exit_status = atoi(r[i++]);
res_hostid = atoi(r[i++]);
res_sent_time = atoi(r[i++]);
}
int DB_VALIDATOR_ITEM_SET::enumerate(
int appid, int nresult_limit,
std::vector<VALIDATOR_ITEM>& items
) {
int x;
char query[MAX_QUERY_LEN];
char priority[256];
MYSQL_ROW row;
VALIDATOR_ITEM new_item;
if (!cursor.active) {
strcpy(priority, "");
if (db->mysql) strcpy(priority, "HIGH_PRIORITY");
sprintf(query,
"SELECT %s "
" wu.id, "
" wu.appid, "
" wu.name, "
" wu.need_validate, "
" wu.canonical_resultid, "
" wu.canonical_credit, "
" wu.min_quorum, "
" wu.assimilate_state, "
" wu.transition_time, "
" wu.opaque, "
" wu.batch, "
" wu.max_success_results,"
" wu.error_mask,"
" res.id, "
" res.name, "
" res.validate_state, "
" res.server_state, "
" res.outcome, "
" res.claimed_credit, "
" res.granted_credit, "
" res.xml_doc_out, "
" res.cpu_time, "
" res.batch, "
" res.opaque, "
" res.exit_status, "
" res.hostid, "
" res.sent_time "
"FROM "
" workunit AS wu "
" LEFT JOIN result AS res ON wu.id = res.workunitid "
"WHERE "
" wu.appid = %d and wu.need_validate > 0 "
"LIMIT "
" %d ",
priority, appid, nresult_limit);
x = db->do_query(query);
if (x) return mysql_errno(db->mysql);
// the following stores the entire result set in memory
cursor.rp = mysql_store_result(db->mysql);
if (!cursor.rp) return mysql_errno(db->mysql);
cursor.active = true;
row = mysql_fetch_row(cursor.rp);
if (!row) {
mysql_free_result(cursor.rp);
cursor.active = false;
return -1;
}
last_item.parse(row);
nitems_this_query = 1;
}
items.clear();
while (true) {
items.push_back(last_item);
row = mysql_fetch_row(cursor.rp);
if (!row) {
mysql_free_result(cursor.rp);
cursor.active = false;
// if got fewer rows than requested, last group is complete
//
if (nitems_this_query < nresult_limit) {
return 0;
} else {
return -1;
}
}
new_item.parse(row);
nitems_this_query++;
if (new_item.id != last_item.id) {
last_item = new_item;
return 0;
}
last_item = new_item;
}
return 0;
}
int DB_VALIDATOR_ITEM_SET::update_result(RESULT& res) {
char query[MAX_QUERY_LEN];
sprintf(query,
"update result set validate_state=%d, granted_credit=%.15e, "
"server_state=%d, outcome=%d "
"where id=%d",
res.validate_state,
res.granted_credit,
res.server_state,
res.outcome,
res.id
);
return db->do_query(query);
}
int DB_VALIDATOR_ITEM_SET::update_workunit(WORKUNIT& wu) {
char query[MAX_QUERY_LEN];
sprintf(query,
"update workunit set need_validate=%d, error_mask=%d, "
"assimilate_state=%d, transition_time=%d, "
"canonical_resultid=%d, canonical_credit=%.15e "
"where id=%d",
wu.need_validate,
wu.error_mask,
wu.assimilate_state,
wu.transition_time,
wu.canonical_resultid,
wu.canonical_credit,
wu.id
);
return db->do_query(query);
}
RESULT DB_VALIDATOR_ITEM_SET::create_result(VALIDATOR_ITEM& vi) {
RESULT result;
result.workunitid = vi.id;
result.id = vi.res_id;
result.name = vi.res_name;
result.validate_state = vi.res_validate_state;
result.server_state = vi.res_server_state;
result.outcome = vi.res_outcome;
result.claimed_credit = vi.res_claimed_credit;
result.granted_credit = vi.res_granted_credit;
strcpy2(result.xml_doc_out, vi.res_xml_doc_out);
result.cpu_time = vi.res_cpu_time;
result.batch = vi.res_batch;
result.opaque = vi.res_opaque;
result.exit_status = vi.res_exit_status;
result.hostid = vi.res_hostid;
result.sent_time = vi.res_sent_time;
return result;
}
WORKUNIT DB_VALIDATOR_ITEM_SET::create_workunit(VALIDATOR_ITEM& vi) {
WORKUNIT wu;
wu.id = vi.id;
wu.appid = vi.appid;
strcpy2(wu.name, vi.name);
wu.need_validate = vi.need_validate;
wu.canonical_resultid = vi.canonical_resultid;
wu.canonical_credit = vi.canonical_credit;
wu.min_quorum = vi.min_quorum;
wu.assimilate_state = vi.assimilate_state;
wu.transition_time = vi.transition_time;
wu.opaque = vi.opaque;
wu.batch = vi.batch;
wu.max_success_results = vi.max_success_results;
wu.error_mask = vi.error_mask;
return wu;
}
void WORK_ITEM::parse(MYSQL_ROW& r) {
int i=0;
memset(this, 0, sizeof(WORK_ITEM));

View File

@ -464,6 +464,40 @@ struct TRANSITIONER_ITEM {
void parse(MYSQL_ROW&);
};
struct VALIDATOR_ITEM {
int id;
int appid;
char name[256];
bool need_validate;
int canonical_resultid;
double canonical_credit;
int min_quorum;
int assimilate_state;
int transition_time;
double opaque;
int batch;
int max_success_results;
int error_mask;
int res_id;
char res_name[256];
int res_validate_state;
int res_server_state;
int res_outcome;
double res_claimed_credit;
double res_granted_credit;
char res_xml_doc_out[LARGE_BLOB_SIZE];
double res_cpu_time;
int res_batch;
double res_opaque;
int res_exit_status;
int res_hostid;
int res_sent_time;
void clear();
void parse(MYSQL_ROW&);
};
class DB_PLATFORM : public DB_BASE, public PLATFORM {
public:
DB_PLATFORM(DB_CONN* p=0);
@ -576,6 +610,26 @@ public:
int update_workunit(TRANSITIONER_ITEM&);
};
// The validator uses this to get (WU, result) pairs efficiently.
// Each call to enumerate() returns a list of the pairs for a single WU
//
class DB_VALIDATOR_ITEM_SET : public DB_BASE_SPECIAL {
public:
DB_VALIDATOR_ITEM_SET(DB_CONN* p=0);
VALIDATOR_ITEM last_item;
int nitems_this_query;
int enumerate(
int appid,
int nresult_limit,
std::vector<VALIDATOR_ITEM>& items
);
int update_result(RESULT&);
int update_workunit(WORKUNIT&);
RESULT create_result(VALIDATOR_ITEM&);
WORKUNIT create_workunit(VALIDATOR_ITEM&);
};
// used by the feeder and scheduler for outgoing work
//
struct WORK_ITEM {

View File

@ -104,7 +104,7 @@ int cleanup_result_string(RESULT const& /*result*/, void* data) {
// See if there's a strict majority under equality.
//
int check_set(
vector<RESULT>& results, DB_WORKUNIT&, int& canonicalid, double& credit,
vector<RESULT>& results, WORKUNIT&, int& canonicalid, double& credit,
bool& retry
) {
retry = false;

View File

@ -44,7 +44,7 @@ int cleanup_result_trivial(RESULT const&, void*) {
}
int check_set(
vector<RESULT>& results, DB_WORKUNIT&, int& canonicalid, double& credit,
vector<RESULT>& results, WORKUNIT&, int& canonicalid, double& credit,
bool& retry
) {
retry = false;

View File

@ -44,8 +44,11 @@ using namespace std;
#define LOCKFILE "validate.out"
#define PIDFILE "validate.pid"
#define SELECT_LIMIT 1000
extern int check_set(
vector<RESULT>&, DB_WORKUNIT& wu, int& canonical, double& credit,
vector<RESULT>&, WORKUNIT& wu, int& canonical, double& credit,
bool& retry
);
extern int check_pair(
@ -58,7 +61,7 @@ char app_name[256];
// here when a result has been validated;
// grant credit to host, user and team
//
int grant_credit(DB_RESULT& result, double credit) {
int grant_credit(RESULT& result, double credit) {
DB_USER user;
DB_HOST host;
DB_TEAM team;
@ -146,53 +149,65 @@ int grant_credit(DB_RESULT& result, double credit) {
return 0;
}
void handle_wu(DB_WORKUNIT& wu) {
DB_RESULT result, canonical_result;
void handle_wu(DB_VALIDATOR_ITEM_SET& validator,
std::vector<VALIDATOR_ITEM>& items) {
int canonical_result_index = -1;
bool update_result, retry;
bool canonical_result_missing = false;
bool need_immediate_transition = false, need_delayed_transition = false;
int retval, canonicalid = 0;
double credit;
int retval = 0, canonicalid = 0;
double credit = 0;
unsigned int i;
char buf[256];
RESULT result, canonical_result;
WORKUNIT wu;
if (wu.canonical_resultid) {
VALIDATOR_ITEM& wu_vi = items[0];
if (wu_vi.canonical_resultid) {
log_messages.printf(
SCHED_MSG_LOG::NORMAL,
"[WU#%d %s] handle_wu(): Already has canonical result\n",
wu.id, wu.name
"[WU#%d %s] handle_wu(): Already has canonical result %d\n",
wu_vi.id, wu_vi.name, wu_vi.canonical_resultid
);
++log_messages;
// Here if WU already has a canonical result.
// Get unchecked results and see if they match the canonical result
//
retval = canonical_result.lookup_id(wu.canonical_resultid);
if (retval == ERR_DB_NOT_FOUND) {
for (i=0; i<items.size(); i++) {
VALIDATOR_ITEM& vi= items[i];
log_messages.printf(
SCHED_MSG_LOG::NORMAL,
"[WU#%d %s] handle_wu(): Analyzing result %d\n",
vi.id, vi.name, vi.res_id
);
if (vi.res_id == wu_vi.canonical_resultid)
canonical_result_index= i;
}
if (canonical_result_index == (-1)) {
log_messages.printf(
SCHED_MSG_LOG::CRITICAL,
"[WU#%d %s] Canonical result not in DB %d",
wu.id, wu.name, retval
"[WU#%d %s] Can't read canonical result %d; exiting\n",
wu_vi.id, wu_vi.name, wu_vi.canonical_resultid
);
canonical_result_missing = true;
} else if (retval) {
log_messages.printf(
SCHED_MSG_LOG::CRITICAL,
"[WU#%d %s] Can't read canonical result %d; exiting",
wu.id, wu.name, retval
);
exit(retval);
exit(1);
}
// scan this WU's results, and check the unchecked ones
// TODO: do we have an index on these fields?
// maybe better just to enum on workunitid
//
sprintf(buf, "where workunitid=%d and validate_state=%d and server_state=%d and outcome=%d",
wu.id, VALIDATE_STATE_INIT, RESULT_SERVER_STATE_OVER, RESULT_OUTCOME_SUCCESS
);
while (!result.enumerate(buf)) {
for (i=0; i<items.size(); i++) {
VALIDATOR_ITEM& vitem= items[i];
if (!((vitem.res_validate_state == VALIDATE_STATE_INIT) &&
(vitem.res_server_state == RESULT_SERVER_STATE_OVER) &&
(vitem.res_outcome == RESULT_OUTCOME_SUCCESS)))
continue;
need_immediate_transition = true;
result = validator.create_result(items[i]);
canonical_result =
validator.create_result(items[canonical_result_index]);
retval = check_pair(
result, canonical_result, retry
@ -216,7 +231,7 @@ void handle_wu(DB_WORKUNIT& wu) {
switch (result.validate_state) {
case VALIDATE_STATE_VALID:
update_result = true;
result.granted_credit = wu.canonical_credit;
result.granted_credit = wu_vi.canonical_credit;
log_messages.printf(
SCHED_MSG_LOG::NORMAL,
"[RESULT#%d %s] pair_check() matched: setting result to valid; credit %f\n",
@ -240,18 +255,13 @@ void handle_wu(DB_WORKUNIT& wu) {
);
}
if (update_result) {
sprintf(
buf, "validate_state=%d, granted_credit=%f",
result.validate_state,
result.granted_credit
);
log_messages.printf(
SCHED_MSG_LOG::NORMAL,
"[RESULT#%d %s] granted_credit %f",
result.id, result.name, result.granted_credit
);
retval = result.update_field(buf);
retval = validator.update_result(result);
if (retval) {
log_messages.printf(
SCHED_MSG_LOG::CRITICAL,
@ -269,38 +279,41 @@ void handle_wu(DB_WORKUNIT& wu) {
log_messages.printf(
SCHED_MSG_LOG::NORMAL,
"[WU#%d %s] handle_wu(): No canonical result yet\n", wu.id, wu.name
"[WU#%d %s] handle_wu(): No canonical result yet\n", wu_vi.id,
wu_vi.name
);
++log_messages;
// sprintf(buf, "where workunitid=%d", wu.id);
// TODO: do we have an index on these fields?
// maybe better to enum on workunitid
// while (!result.enumerate(buf)) {
// if (result.server_state == RESULT_SERVER_STATE_OVER
// && result.outcome == RESULT_OUTCOME_SUCCESS
// ) {
sprintf(buf, "where workunitid=%d and validate_state=%d and server_state=%d and outcome=%d",
wu.id, VALIDATE_STATE_INIT, RESULT_SERVER_STATE_OVER, RESULT_OUTCOME_SUCCESS
);
while (!result.enumerate(buf)) {
for (i=0; i<items.size(); i++) {
VALIDATOR_ITEM& vitem= items[i];
if (!((vitem.res_validate_state == VALIDATE_STATE_INIT) &&
(vitem.res_server_state == RESULT_SERVER_STATE_OVER) &&
(vitem.res_outcome == RESULT_OUTCOME_SUCCESS)))
continue;
RESULT result= validator.create_result(vitem);
results.push_back(result);
}
log_messages.printf(
SCHED_MSG_LOG::DEBUG, "[WU#%d %s] Found %d successful results\n",
wu.id, wu.name, (int)results.size()
wu_vi.id, wu_vi.name, (int)results.size()
);
if (results.size() >= (unsigned int)wu.min_quorum) {
if (results.size() >= (unsigned int)wu_vi.min_quorum) {
log_messages.printf(
SCHED_MSG_LOG::DEBUG,
"[WU#%d %s] Enough for quorum, checking set.\n", wu.id, wu.name
"[WU#%d %s] Enough for quorum, checking set.\n", wu_vi.id,
wu_vi.name
);
wu= validator.create_workunit(wu_vi);
retval = check_set(results, wu, canonicalid, credit, retry);
if (retval) {
log_messages.printf(
SCHED_MSG_LOG::CRITICAL,
"[WU#%d %s] check_set returned %d, exiting",
wu.id, wu.name, retval
"[WU#%d %s] check_set returned %d, exiting\n",
wu_vi.id, wu_vi.name, retval
);
exit(retval);
}
@ -312,7 +325,7 @@ void handle_wu(DB_WORKUNIT& wu) {
result = results[i];
if (result.outcome != RESULT_OUTCOME_SUCCESS) {
need_immediate_transition = true;
retval = result.update();
retval = validator.update_result(result);
if (retval) {
log_messages.printf(
SCHED_MSG_LOG::CRITICAL,
@ -327,11 +340,11 @@ void handle_wu(DB_WORKUNIT& wu) {
log_messages.printf(
SCHED_MSG_LOG::DEBUG,
"[WU#%d %s] Found a canonical result: id=%d\n",
wu.id, wu.name, canonicalid
wu_vi.id, wu_vi.name, canonicalid
);
wu.canonical_resultid = canonicalid;
wu.canonical_credit = credit;
wu.assimilate_state = ASSIMILATE_READY;
wu_vi.canonical_resultid = canonicalid;
wu_vi.canonical_credit = credit;
wu_vi.assimilate_state = ASSIMILATE_READY;
for (i=0; i<results.size(); i++) {
result = results[i];
@ -359,18 +372,13 @@ void handle_wu(DB_WORKUNIT& wu) {
);
}
sprintf(buf,
"validate_state=%d, granted_credit=%f",
result.validate_state,
result.granted_credit
);
log_messages.printf(
SCHED_MSG_LOG::NORMAL,
"[RESULT#%d %s] granted_credit %f",
"[RESULT#%d %s] granted_credit %f\n",
result.id, result.name, result.granted_credit
);
retval = result.update_field(buf);
retval = validator.update_result(result);
if (retval) {
log_messages.printf(
SCHED_MSG_LOG::CRITICAL,
@ -381,20 +389,16 @@ void handle_wu(DB_WORKUNIT& wu) {
}
// If found a canonical result, don't send any unsent results
// TODO: could do this in a single SQL statement
//
sprintf(buf, "where workunitid=%d and server_state=%d",
wu.id, RESULT_SERVER_STATE_UNSENT
);
while (!result.enumerate(buf)) {
for (i=0; i<items.size(); i++) {
VALIDATOR_ITEM& vitem= items[i];
if (!(vitem.res_server_state ==
RESULT_SERVER_STATE_UNSENT))
continue;
result.server_state = RESULT_SERVER_STATE_OVER;
result.outcome = RESULT_OUTCOME_DIDNT_NEED;
sprintf(buf,
"server_state=%d, outcome=%d",
result.server_state,
result.outcome
);
retval = result.update_field(buf);
retval = validator.update_result(result);
if (retval) {
log_messages.printf(
SCHED_MSG_LOG::CRITICAL,
@ -406,8 +410,8 @@ void handle_wu(DB_WORKUNIT& wu) {
} else {
// here if no consensus; check if #success results is too large
//
if ((int)results.size() > wu.max_success_results) {
wu.error_mask |= WU_ERROR_TOO_MANY_SUCCESS_RESULTS;
if ((int)results.size() > wu_vi.max_success_results) {
wu_vi.error_mask |= WU_ERROR_TOO_MANY_SUCCESS_RESULTS;
need_immediate_transition = true;
}
}
@ -417,26 +421,22 @@ void handle_wu(DB_WORKUNIT& wu) {
--log_messages;
if (need_immediate_transition) {
wu.transition_time = time(0);
wu_vi.transition_time = time(0);
} else if (need_delayed_transition) {
int x = time(0) + 6*3600;
if (x < wu.transition_time) wu.transition_time = x;
if (x < wu_vi.transition_time) wu_vi.transition_time = x;
}
// clear WU.need_validate
//
wu.need_validate = 0;
sprintf(buf,
"need_validate=%d, transition_time=%d, "
"canonical_resultid=%d,canonical_credit=%f,assimilate_state=%d",
wu.need_validate, wu.transition_time, wu.canonical_resultid,
wu.canonical_credit, wu.assimilate_state
);
retval = wu.update_field(buf);
wu_vi.need_validate = 0;
wu= validator.create_workunit(wu_vi);
retval = validator.update_workunit(wu);
if (retval) {
log_messages.printf(
SCHED_MSG_LOG::CRITICAL,
"[WU#%d %s] wu.update() failed: %d; exiting\n", wu.id, wu.name, retval
"[WU#%d %s] wu_vi.update() failed: %d; exiting\n", wu_vi.id,
wu_vi.name, retval
);
exit(1);
}
@ -446,13 +446,14 @@ void handle_wu(DB_WORKUNIT& wu) {
// return true if there were any
//
bool do_validate_scan(APP& app) {
DB_WORKUNIT wu;
char buf[256];
DB_VALIDATOR_ITEM_SET validator;
std::vector<VALIDATOR_ITEM> items;
bool found=false;
sprintf(buf, "where appid=%d and need_validate > 0 limit 1000", app.id);
while (!wu.enumerate(buf)) {
handle_wu(wu);
// loop over entries that need to be checked
//
while (!validator.enumerate(app.id, SELECT_LIMIT, items)) {
handle_wu(validator, items);
found = true;
}
return found;