Performance Improvements

svn path=/trunk/boinc/; revision=3791
This commit is contained in:
Rom Walton 2004-07-06 18:30:22 +00:00
parent 2a38e5b7e3
commit 41e9bad3ed
5 changed files with 316 additions and 107 deletions

View File

@ -14708,4 +14708,18 @@ Daniel 2004-07-06
client/
main.C
Rom 6 July 2004
- Removed ';' from queries that had them since the API Docs state not to put
them in the query.
- Implement a scheduler optimization so we do one batched read and one
transactional write. In the worst case scenario where the client had submitted
20 results we would do 41 reads and 40 writes just to update the results. Now we
batch read all the results at one time and surround the write requests within a
transaction. So now our worst case scenario is 2 reads and 40 writes within a
transaction.
db/
boinc_db.C, .h
db_base.C
sched/
handle_request.C

View File

@ -52,6 +52,8 @@ void RESULT::clear() {memset(this, 0, sizeof(*this));}
void WORKUNIT::clear() {memset(this, 0, sizeof(*this));}
void MSG_FROM_HOST::clear() {memset(this, 0, sizeof(*this));}
void MSG_TO_HOST::clear() {memset(this, 0, sizeof(*this));}
void TRANSITIONER_ITEM::clear() {memset(this, 0, sizeof(*this));}
void SCHED_RESULT_ITEM::clear() {memset(this, 0, sizeof(*this));}
DB_PLATFORM::DB_PLATFORM() : DB_BASE(boinc_db, "platform"){}
DB_CORE_VERSION::DB_CORE_VERSION() : DB_BASE(boinc_db, "core_version"){}
@ -66,6 +68,7 @@ DB_MSG_FROM_HOST::DB_MSG_FROM_HOST() : DB_BASE(boinc_db, "msg_from_host"){}
DB_MSG_TO_HOST::DB_MSG_TO_HOST() : DB_BASE(boinc_db, "msg_to_host"){}
DB_TRANSITIONER_ITEM_SET::DB_TRANSITIONER_ITEM_SET() : DB_BASE_SPECIAL(boinc_db){}
DB_WORK_ITEM::DB_WORK_ITEM() : DB_BASE_SPECIAL(boinc_db){}
DB_SCHED_RESULT_ITEM_SET::DB_SCHED_RESULT_ITEM_SET() : DB_BASE_SPECIAL(boinc_db){}
int DB_PLATFORM::get_id() {return id;}
int DB_CORE_VERSION::get_id() {return id;}
@ -576,6 +579,33 @@ void DB_MSG_TO_HOST::db_parse(MYSQL_ROW& r) {
strcpy2(xml, r[i++]);
}
void TRANSITIONER_ITEM::parse(MYSQL_ROW& r) {
int i=0;
clear();
id = atoi(r[i++]);
strcpy2(name, r[i++]);
appid = atoi(r[i++]);
min_quorum = atoi(r[i++]);
canonical_resultid = atoi(r[i++]);
transition_time = atoi(r[i++]);
delay_bound = atoi(r[i++]);
error_mask = atoi(r[i++]);
max_error_results = atoi(r[i++]);
max_total_results = atoi(r[i++]);
file_delete_state = atoi(r[i++]);
assimilate_state = atoi(r[i++]);
target_nresults = atoi(r[i++]);
strcpy2(result_template_file, r[i++]);
res_id = safe_atoi(r[i++]);
strcpy2(res_name, r[i++]);
res_report_deadline = safe_atoi(r[i++]);
res_server_state = safe_atoi(r[i++]);
res_outcome = safe_atoi(r[i++]);
res_validate_state = safe_atoi(r[i++]);
res_file_delete_state = safe_atoi(r[i++]);
res_sent_time = safe_atoi(r[i++]);
}
int DB_TRANSITIONER_ITEM_SET::enumerate(
int transition_time, int ntotal_transitioners, int ntransitioner,
int nresult_limit,
@ -680,38 +710,11 @@ int DB_TRANSITIONER_ITEM_SET::enumerate(
return 0;
}
void TRANSITIONER_ITEM::parse(MYSQL_ROW& r) {
int i=0;
memset(this, 0, sizeof(TRANSITIONER_ITEM));
id = atoi(r[i++]);
strcpy2(name, r[i++]);
appid = atoi(r[i++]);
min_quorum = atoi(r[i++]);
canonical_resultid = atoi(r[i++]);
transition_time = atoi(r[i++]);
delay_bound = atoi(r[i++]);
error_mask = atoi(r[i++]);
max_error_results = atoi(r[i++]);
max_total_results = atoi(r[i++]);
file_delete_state = atoi(r[i++]);
assimilate_state = atoi(r[i++]);
target_nresults = atoi(r[i++]);
strcpy2(result_template_file, r[i++]);
res_id = safe_atoi(r[i++]);
strcpy2(res_name, r[i++]);
res_report_deadline = safe_atoi(r[i++]);
res_server_state = safe_atoi(r[i++]);
res_outcome = safe_atoi(r[i++]);
res_validate_state = safe_atoi(r[i++]);
res_file_delete_state = safe_atoi(r[i++]);
res_sent_time = safe_atoi(r[i++]);
}
int DB_TRANSITIONER_ITEM_SET::update_result(TRANSITIONER_ITEM& ti) {
char query[MAX_QUERY_LEN];
sprintf(query,
"update result set server_state=%d, outcome=%d, validate_state=%d, file_delete_state=%d where id=%d;",
"update result set server_state=%d, outcome=%d, validate_state=%d, file_delete_state=%d where id=%d",
ti.res_server_state,
ti.res_outcome,
ti.res_validate_state,
@ -725,7 +728,7 @@ int DB_TRANSITIONER_ITEM_SET::update_workunit(TRANSITIONER_ITEM& ti) {
char query[MAX_QUERY_LEN];
sprintf(query,
"update workunit set need_validate=%d, error_mask=%d, assimilate_state=%d, file_delete_state=%d, transition_time=%d where id=%d;",
"update workunit set need_validate=%d, error_mask=%d, assimilate_state=%d, file_delete_state=%d, transition_time=%d where id=%d",
ti.need_validate,
ti.error_mask,
ti.assimilate_state,
@ -828,6 +831,153 @@ int DB_WORK_ITEM::enumerate(int limit) {
return 0;
}
void SCHED_RESULT_ITEM::parse(MYSQL_ROW& r) {
int i=0;
clear();
id = atoi(r[i++]);
strcpy2(name, r[i++]);
workunitid = atoi(r[i++]);
server_state = atoi(r[i++]);
hostid = atoi(r[i++]);
userid = atoi(r[i++]);
received_time = atoi(r[i++]);
}
int DB_SCHED_RESULT_ITEM_SET::add_result(char* result_name) {
SCHED_RESULT_ITEM result;
strcpy2(result.queried_name, result_name);
results.push_back(result);
return 0;
}
int DB_SCHED_RESULT_ITEM_SET::enumerate() {
char query[MAX_QUERY_LEN];
unsigned int i;
unsigned int result_count;
MYSQL_RES* rp;
MYSQL_ROW row;
SCHED_RESULT_ITEM ri;
if (0 < results.size()) {
// construct the query
strcpy2(query,
"SELECT "
" id, "
" name, "
" workunitid, "
" server_state, "
" hostid, "
" userid, "
" received_time "
"FROM "
" result "
"WHERE "
" name IN ( "
);
// we should only get here if there is one or more results to
// lookup, note the vector is zero index based.
result_count = results.size() - 1;
for (i=0; i<=result_count; i++) {
strcat(query, "'");
if (i != result_count) {
strcat(query, results[i].queried_name);
strcat(query, "', ");
} else {
strcat(query, results[i].queried_name);
strcat(query, "' )");
}
}
x = db->do_query(query);
if (x) return mysql_errno(db->mysql);
// the following stores the entire result set in memory
rp = mysql_store_result(db->mysql);
if (!rp) return mysql_errno(db->mysql);
// populate the results missing values that we are going to make
// decisions on.
do {
row = mysql_fetch_row(rp);
if (!row) {
mysql_free_result(rp);
} else {
ri.parse(row);
for (i=0; i<results.size(); i++) {
if (0 == strcmp(results[i].queried_name, ri.name)) {
results[i].parse(row);
}
}
}
} while (row);
}
return 0;
}
int DB_SCHED_RESULT_ITEM_SET::lookup_result(char* result_name, SCHED_RESULT_ITEM& ri) {
unsigned int i;
int retval = -1;
for (i=0; i<results.size(); i++) {
if (0 == strcmp(results[i].name, result_name)) {
ri = results[i];
retval = 0;
}
}
return retval;
}
int DB_SCHED_RESULT_ITEM_SET::update_result(SCHED_RESULT_ITEM& ri) {
char query[MAX_QUERY_LEN];
sprintf(query,
"UPDATE result SET "
" hostid=%d, "
" received_time=%d, "
" client_state=%d, "
" cpu_time=%.15e, "
" exit_status=%d, "
" app_version_num=%d, "
" claimed_credit=%.15e, "
" server_state=%d, "
" outcome=%d, "
" stderr_out='%s', "
" xml_doc_out='%s', "
" validate_state=%d, "
" teamid=%d "
"WHERE "
" id=%d",
ri.hostid,
ri.received_time,
ri.client_state,
ri.cpu_time,
ri.exit_status,
ri.app_version_num,
ri.claimed_credit,
ri.server_state,
ri.outcome,
ri.stderr_out,
ri.xml_doc_out,
ri.validate_state,
ri.teamid,
ri.id
);
return db->do_query(query);
}
int DB_SCHED_RESULT_ITEM_SET::update_workunit(SCHED_RESULT_ITEM& ri) {
char query[MAX_QUERY_LEN];
sprintf(query,
"UPDATE workunit SET transition_time=%d WHERE id=%d",
time(0),
ri.id
);
return db->do_query(query);
}
#if 0
int DB_WORK_ITEM::read_result() {
char query[MAX_QUERY_LEN];

View File

@ -621,44 +621,43 @@ public:
// and various result fields
};
// used by the scheduler for handling completed results
//
class DB_RESULT_DONE : public DB_BASE_SPECIAL {
public:
DB_RESULT_DONE();
int hostid;
int received_time;
int client_state;
int cpu_time;
int exit_status;
int claimed_credit;
int teamid;
struct SCHED_RESULT_ITEM {
char queried_name[256];
int id;
char name[256];
int workunitid;
char stderr_out[LARGE_BLOB_SIZE];
int server_state;
int client_state;
int validate_state;
int outcome;
int hostid;
int userid;
int teamid;
int received_time;
double cpu_time;
double claimed_credit;
char xml_doc_out[LARGE_BLOB_SIZE];
char stderr_out[LARGE_BLOB_SIZE];
int app_version_num;
int exit_status;
int lookup();
// lookup by name; reads hostid, server_state, workunitid
int update();
// updates all fields except hostid, workunitid
// sets transition time of corresponding WU
void clear();
void parse(MYSQL_ROW& row);
};
// used by the scheduler for looking up and updating host/user/team
//
class DB_ACCOUNT_INFO : public DB_BASE_SPECIAL {
DB_ACCOUNT_INFO();
HOST host;
USER user;
TEAM team;
class DB_SCHED_RESULT_ITEM_SET : public DB_BASE_SPECIAL {
public:
DB_SCHED_RESULT_ITEM_SET();
std::vector<SCHED_RESULT_ITEM>& results;
int lookup_hostid();
// used when hostid is supplied; reads all 3 records
int lookup_auth();
// used when no hostid is supplied; reads user/team
// must manually create host
// no update functions here because we always update the entire host,
// and we update the entire user infrequently
int add_result(char* result_name);
int enumerate();
int lookup_result(char* result_name, SCHED_RESULT_ITEM& result);
int update_result(SCHED_RESULT_ITEM& result);
int update_workunit(SCHED_RESULT_ITEM& result);
};
#if 0

View File

@ -41,9 +41,9 @@ int DB_CONN::insert_id() {
MYSQL_RES* rp;
if (mysql) {
retval = do_query("select HIGH_PRIORITY LAST_INSERT_ID();");
retval = do_query("select HIGH_PRIORITY LAST_INSERT_ID()");
} else {
retval = do_query("select LAST_INSERT_ID();");
retval = do_query("select LAST_INSERT_ID()");
}
if (retval) return retval;
@ -77,7 +77,7 @@ void DB_BASE::db_parse(MYSQL_ROW&) {}
int DB_BASE::insert() {
char vals[MAX_QUERY_LEN], query[MAX_QUERY_LEN];
db_print(vals);
sprintf(query, "insert into %s set %s;", table_name, vals);
sprintf(query, "insert into %s set %s", table_name, vals);
return db->do_query(query);
}
@ -86,7 +86,7 @@ int DB_BASE::insert() {
int DB_BASE::update() {
char vals[MAX_QUERY_LEN], query[MAX_QUERY_LEN];
db_print(vals);
sprintf(query, "update %s set %s where id=%d;", table_name, vals, get_id());
sprintf(query, "update %s set %s where id=%d", table_name, vals, get_id());
return db->do_query(query);
}
@ -95,7 +95,7 @@ int DB_BASE::update() {
//
int DB_BASE::update_field(char* clause) {
char query[MAX_QUERY_LEN];
sprintf(query, "update %s set %s where id=%d;", table_name, clause, get_id());
sprintf(query, "update %s set %s where id=%d", table_name, clause, get_id());
return db->do_query(query);
}
@ -106,9 +106,9 @@ int DB_BASE::lookup(char* clause) {
MYSQL_RES* rp;
if (db->mysql && is_high_priority) {
sprintf(query, "select HIGH_PRIORITY * from %s %s;", table_name, clause);
sprintf(query, "select HIGH_PRIORITY * from %s %s", table_name, clause);
} else {
sprintf(query, "select * from %s %s;", table_name, clause);
sprintf(query, "select * from %s %s", table_name, clause);
}
retval = db->do_query(query);
@ -134,9 +134,9 @@ int DB_BASE::lookup_id(int id) {
MYSQL_RES* rp;
if (db->mysql && is_high_priority) {
sprintf(query, "select HIGH_PRIORITY * from %s where id=%d;", table_name, id);
sprintf(query, "select HIGH_PRIORITY * from %s where id=%d", table_name, id);
} else {
sprintf(query, "select * from %s where id=%d;", table_name, id);
sprintf(query, "select * from %s where id=%d", table_name, id);
}
retval = db->do_query(query);
@ -161,9 +161,9 @@ int DB_BASE::enumerate(char* clause) {
cursor.active = true;
if (db->mysql && is_high_priority) {
sprintf(query, "select HIGH_PRIORITY * from %s %s;", table_name, clause);
sprintf(query, "select HIGH_PRIORITY * from %s %s", table_name, clause);
} else {
sprintf(query, "select * from %s %s;", table_name, clause);
sprintf(query, "select * from %s %s", table_name, clause);
}
x = db->do_query(query);
@ -230,9 +230,9 @@ int DB_BASE::count(int& n, char* clause) {
char query[MAX_QUERY_LEN];
if (db->mysql && is_high_priority) {
sprintf(query, "select HIGH_PRIORITY count(*) from %s %s;", table_name, clause);
sprintf(query, "select HIGH_PRIORITY count(*) from %s %s", table_name, clause);
} else {
sprintf(query, "select count(*) from %s %s;", table_name, clause);
sprintf(query, "select count(*) from %s %s", table_name, clause);
}
return get_integer(query, n);
@ -242,9 +242,9 @@ int DB_BASE::sum(double& x, char* field, char* clause) {
char query[MAX_QUERY_LEN];
if (db->mysql && is_high_priority) {
sprintf(query, "select HIGH_PRIORITY sum(%s) from %s %s;", field, table_name, clause);
sprintf(query, "select HIGH_PRIORITY sum(%s) from %s %s", field, table_name, clause);
} else {
sprintf(query, "select sum(%s) from %s %s;", field, table_name, clause);
sprintf(query, "select sum(%s) from %s %s", field, table_name, clause);
}
return get_double(query, x);
@ -256,11 +256,11 @@ DB_BASE_SPECIAL::DB_BASE_SPECIAL(DB_CONN& p) : db(&p) {
}
int DB_BASE_SPECIAL::start_transaction() {
return db->do_query("START TRANSACTION;");;
return db->do_query("START TRANSACTION");
}
int DB_BASE_SPECIAL::commit_transaction() {
return db->do_query("COMMIT;");
return db->do_query("COMMIT");
}
// convert a string into a form that allows it to be used

View File

@ -311,12 +311,27 @@ int handle_global_prefs(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
int handle_results(
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply
) {
DB_SCHED_RESULT_ITEM_SET result_handler;
SCHED_RESULT_ITEM result;
unsigned int i;
int retval;
DB_RESULT result;
RESULT* rp;
DB_WORKUNIT wu;
char buf[256];
// lets request the status of all results the user is reporting in a batch
//
if (0 < sreq.results.size()) {
for (i=0; i<sreq.results.size(); i++) {
result_handler.add_result(sreq.results[i].name);
}
retval = result_handler.enumerate();
if (retval) {
log_messages.printf(
SCHED_MSG_LOG::CRITICAL,
"[HOST#%d] Batch query failed\n"
);
}
}
for (i=0; i<sreq.results.size(); i++) {
rp = &sreq.results[i];
@ -326,9 +341,7 @@ int handle_results(
//
reply.result_acks.push_back(*rp);
strncpy(result.name, rp->name, sizeof(result.name));
sprintf(buf, "where name='%s'", result.name);
retval = result.lookup(buf);
retval = result_handler.lookup_result(rp->name, result);
if (retval) {
log_messages.printf(
SCHED_MSG_LOG::CRITICAL,
@ -396,6 +409,7 @@ int handle_results(
// update the result record in DB
//
result.hostid = reply.host.id;
result.teamid = reply.user.teamid;
result.received_time = time(0);
result.client_state = rp->client_state;
result.cpu_time = rp->cpu_time;
@ -432,38 +446,70 @@ int handle_results(
result.outcome = RESULT_OUTCOME_CLIENT_ERROR;
result.validate_state = VALIDATE_STATE_INVALID;
}
}
result.teamid = reply.user.teamid;
retval = result.update();
if (retval) {
log_messages.printf(
SCHED_MSG_LOG::NORMAL,
"[HOST#%d] [RESULT#%d %s] can't update result: %s\n",
reply.host.id, result.id, result.name, boinc_db.error_string()
);
}
// trigger the transition handle for the result's WU
//
retval = wu.lookup_id(result.workunitid);
if (retval) {
log_messages.printf(
SCHED_MSG_LOG::CRITICAL,
"[HOST#%d] [RESULT#%d %s] Can't find [WU#%d] for result\n",
reply.host.id, result.id, result.name, result.workunitid
);
} else {
wu.transition_time = time(0);
retval = wu.update();
// Lets update all the results we have
//
log_messages.printf(
SCHED_MSG_LOG::DEBUG,
"[HOST#%d] Starting Result Update Transaction...\n",
reply.host.id
);
retval = result_handler.start_transaction();
if (retval) {
log_messages.printf(SCHED_MSG_LOG::CRITICAL,
"[HOST#%d] result_handler.start_transaction() == %d\n",
reply.host.id, retval
);
}
for (i=0; i<result_handler.results.size(); i++) {
if (0 < result_handler.results[i].id) {
retval = result_handler.update_result(result_handler.results[i]);
if (retval) {
log_messages.printf(
SCHED_MSG_LOG::CRITICAL,
"[HOST#%d] [RESULT#%d %s] Can't update [WU#%d %s]\n",
reply.host.id, result.id, result.name, wu.id, wu.name
"[HOST#%d] [RESULT#%d %s] can't update result: %s\n",
reply.host.id, result_handler.results[i].id, result_handler.results[i].name,
boinc_db.error_string()
);
}
// trigger the transition handle for the result's WU
//
retval = result_handler.update_workunit(result_handler.results[i]);
if (retval) {
log_messages.printf(
SCHED_MSG_LOG::CRITICAL,
"[HOST#%d] [RESULT#%d %s] can't update [WU#%d %s]\n",
reply.host.id, result_handler.results[i].id, result_handler.results[i].name,
result_handler.results[i].workunitid, wu.name
);
}
}
}
log_messages.printf(
SCHED_MSG_LOG::DEBUG,
"[HOST#%d] Committing Transaction...\n",
reply.host.id
);
retval = result_handler.commit_transaction();
if (retval) {
log_messages.printf(
SCHED_MSG_LOG::CRITICAL,
"[HOST#%d] result_handler.commit_transaction() == %d\n",
reply.host.id, retval
);
} else {
log_messages.printf(
SCHED_MSG_LOG::DEBUG,
"[HOST#%d] Committed Transaction Successfully...\n",
reply.host.id
);
}
return 0;
}