- scheduler, back end: add "homogeneous app version" feature.

Lets you specify, on a per-app basis,
    that all instances should be done using the same app version.
    This is for validation in the presence of GPUs.
- scheduler: code cleanup
    - Instead of adding a bunch of non-DB fields to RESULT,
        used a derived class SCHED_DB_RESULT.
    - Instead of storing a pointer to BEST_APP_VERSION in RESULT,
        store the structure itself.
        This simplifies the memory allocation situation.
- client: condition "Got server request to delete file" messages
    on <file_xfer_debug>


svn path=/trunk/boinc/; revision=23636
This commit is contained in:
David Anderson 2011-06-06 03:40:42 +00:00
parent 4526ae487a
commit 436415cfe1
18 changed files with 174 additions and 59 deletions

View File

@ -3281,3 +3281,35 @@ Charlie 3 Jun 2011
January 1, 1601) to UNIX time (seconds since January 1, 1970)
clientgui/
browser.cpp
David 5 Jun 2011
- scheduler, back end: add "homogeneous app version" feature.
Lets you specify, on a per-app basis,
that all instances should be done using the same app version.
This is for validation in the presence of GPUs.
- scheduler: code cleanup
- Instead of adding a bunch of non-DB fields to RESULT,
used a derived class SCHED_DB_RESULT.
- Instead of storing a pointer to BEST_APP_VERSION in RESULT,
store the structure itself.
This simplifies the memory allocation situation.
- client: condition "Got server request to delete file" messages
on <file_xfer_debug>
db/
boinc_db.cpp,h
schema.sql
sched/
sched_types.cpp,h
sched_version.cpp
sched_send.cpp,h
sched_array.cpp
sched_locality.cpp
transitioner.cpp
sched_resend.cpp
sched_assign.cpp
sched_score.cpp
html/ops/
db_update.php
client/
cs_scheduler.cpp

View File

@ -741,9 +741,12 @@ int CLIENT_STATE::handle_scheduler_reply(PROJECT* project, char* scheduler_url)
for (i=0; i<sr.file_deletes.size(); i++) {
fip = lookup_file_info(project, sr.file_deletes[i].c_str());
if (fip) {
msg_printf(project, MSG_INFO,
"Got server request to delete file %s", fip->name
);
if (log_flags.file_xfer_debug) {
msg_printf(project, MSG_INFO,
"[file_xfer_debug] Got server request to delete file %s",
fip->name
);
}
fip->sticky = false;
}
}

View File

@ -195,7 +195,8 @@ void DB_APP::db_print(char* buf){
"beta=%d, "
"target_nresults=%d, "
"min_avg_pfc=%.15e, "
"host_scale_check=%d, ",
"host_scale_check=%d, "
"homogeneous_app_version=%d, ",
create_time,
name,
min_version,
@ -206,7 +207,8 @@ void DB_APP::db_print(char* buf){
beta?1:0,
target_nresults,
min_avg_pfc,
host_scale_check?1:0
host_scale_check?1:0,
homogeneous_app_version?1:0
);
}
@ -225,6 +227,7 @@ void DB_APP::db_parse(MYSQL_ROW &r) {
target_nresults = atoi(r[i++]);
min_avg_pfc = atof(r[i++]);
host_scale_check = (atoi(r[i++]) != 0);
homogeneous_app_version = (atoi(r[i++]) != 0);
}
void DB_APP_VERSION::db_print(char* buf){
@ -779,7 +782,8 @@ void DB_WORKUNIT::db_print(char* buf){
"result_template_file='%s', "
"priority=%d, "
"rsc_bandwidth_bound=%.15e, "
"fileset_id=%d ",
"fileset_id=%d, "
"app_version_id=%d, ",
create_time, appid,
name, xml_doc, batch,
rsc_fpops_est, rsc_fpops_bound, rsc_memory_bound, rsc_disk_bound,
@ -796,7 +800,8 @@ void DB_WORKUNIT::db_print(char* buf){
result_template_file,
priority,
rsc_bandwidth_bound,
fileset_id
fileset_id,
app_version_id
);
}
@ -833,6 +838,7 @@ void DB_WORKUNIT::db_parse(MYSQL_ROW &r) {
strcpy2(mod_time, r[i++]);
rsc_bandwidth_bound = atof(r[i++]);
fileset_id = atoi(r[i++]);
app_version_id = atoi(r[i++]);
}
void DB_CREDITED_JOB::db_print(char* buf){
@ -1241,6 +1247,7 @@ void TRANSITIONER_ITEM::parse(MYSQL_ROW& r) {
priority = atoi(r[i++]);
hr_class = atoi(r[i++]);
batch = atoi(r[i++]);
app_version_id = atoi(r[i++]);
// use safe_atoi() from here on cuz they might not be there
//
@ -1299,6 +1306,7 @@ int DB_TRANSITIONER_ITEM_SET::enumerate(
" wu.priority, "
" wu.hr_class, "
" wu.batch, "
" wu.app_version_id, "
" res.id, "
" res.name, "
" res.report_deadline, "
@ -1417,6 +1425,10 @@ int DB_TRANSITIONER_ITEM_SET::update_workunit(
sprintf(buf, " hr_class=%d,", ti.hr_class);
strcat(updates, buf);
}
if (ti.app_version_id != ti_original.app_version_id) {
sprintf(buf, " app_version_id=%d,", ti.app_version_id);
strcat(updates, buf);
}
int n = strlen(updates);
if (n == 0) {
return 0;
@ -1668,6 +1680,7 @@ void WORK_ITEM::parse(MYSQL_ROW& r) {
strcpy2(wu.mod_time, r[i++]);
wu.rsc_bandwidth_bound = atof(r[i++]);
wu.fileset_id = atoi(r[i++]);
wu.app_version_id = atoi(r[i++]);
}
int DB_WORK_ITEM::enumerate(

View File

@ -83,6 +83,8 @@ struct APP {
// Approximates (actual FLOPS)/wu.rsc_fpops_est
bool host_scale_check;
// use host scaling cautiously, to thwart cherry picking
bool homogeneous_app_version;
// do all instances of each job using the same app version
int write(FILE*);
void clear();
@ -428,6 +430,9 @@ struct WORKUNIT {
double rsc_bandwidth_bound;
// send only to hosts with at least this much download bandwidth
int fileset_id;
int app_version_id;
// if app uses homogeneous_app_version,
// which version this job is committed to (0 if none)
// the following not used in the DB
char app_name[256];
@ -543,20 +548,7 @@ struct RESULT {
// -1 anon platform, unknown resource type (relic)
// -2/-3/-4 anonymous platform (see variants above)
// the following used by the scheduler, but not stored in the DB
//
char wu_name[256];
double fpops_per_cpu_sec;
double fpops_cumulative;
double intops_per_cpu_sec;
double intops_cumulative;
int units; // used for granting credit by # of units processed
int parse_from_client(FILE*);
char platform_name[256];
BEST_APP_VERSION* bavp;
void clear();
int write_to_client(FILE*);
};
struct MSG_FROM_HOST {
@ -609,6 +601,7 @@ struct TRANSITIONER_ITEM {
int priority;
int hr_class;
int batch;
int app_version_id;
int res_id; // This is the RESULT ID
char res_name[256];
int res_report_deadline;

View File

@ -51,6 +51,7 @@ create table app (
target_nresults smallint not null default 0,
min_avg_pfc double not null default 1,
host_scale_check tinyint not null default 0,
homogeneous_app_version tinyint not null default 0,
primary key (id)
) engine=InnoDB;
@ -241,6 +242,7 @@ create table workunit (
mod_time timestamp,
rsc_bandwidth_bound double not null,
fileset_id integer not null,
app_version_id integer not null,
primary key (id)
) engine=InnoDB;

View File

@ -112,7 +112,7 @@ function show_download($pname) {
<p>
<b>"
.sprintf(
tra("Note: if your computer is equipped with an Graphics Processing Unit (GPU), you may be able to %suse it to compute faster%s"),
tra("Note: if your computer is equipped with a Graphics Processing Unit (GPU), you may be able to %suse it to compute faster%s"),
"<a href=http://boinc.berkeley.edu/wiki/GPU_computing>", ".</a>"
)
."</b>

View File

@ -715,6 +715,15 @@ function update_6_10_2010() {
");
}
function update_6_3_2011() {
do_query("alter table app
add homogeneous_app_version tinyint not null default 0
");
do_query("alter table workunit
add app_version_id integer not null default 0
");
}
// Updates are done automatically if you use "upgrade".
//
// If you need to do updates manually,
@ -732,6 +741,7 @@ $db_updates = array (
array(20932, "update_3_17_2010"),
array(21226, "update_4_21_2010"),
array(21728, "update_6_10_2010"),
array(23635, "update_6_3_2011"),
);
?>

View File

@ -280,7 +280,7 @@ static bool scan_work_array() {
APP* app;
BEST_APP_VERSION* bavp;
bool no_more_needed = false;
DB_RESULT result;
SCHED_DB_RESULT result;
lock_sema();

View File

@ -84,7 +84,7 @@ static int send_assigned_job(ASSIGNMENT& asg) {
return retval;
}
int result_id = boinc_db.insert_id();
DB_RESULT result;
SCHED_DB_RESULT result;
retval = result.lookup_id(result_id);
add_result_to_reply(result, wu, bavp, false);

View File

@ -275,9 +275,9 @@ int decrement_disk_space_locality( WORKUNIT& wu) {
// - already sent a result for this WU
// - no app_version available
//
static int possibly_send_result(DB_RESULT& result) {
static int possibly_send_result(SCHED_DB_RESULT& result) {
DB_WORKUNIT wu;
DB_RESULT result2;
SCHED_DB_RESULT result2;
int retval, count;
char buf[256];
BEST_APP_VERSION* bavp;
@ -547,7 +547,7 @@ static int send_results_for_file(
int& nsent,
bool /*in_working_set*/
) {
DB_RESULT result, prev_result;
SCHED_DB_RESULT result, prev_result;
char buf[256], query[1024];
int i, maxid, retval_max, retval_lookup, sleep_made_no_work=0;
@ -772,7 +772,7 @@ static int send_results_for_file(
static int send_new_file_work_deterministic_seeded(
int& nsent, const char *start_f, const char *end_f
) {
DB_RESULT result;
SCHED_DB_RESULT result;
char filename[256], min_resultname[256], query[1024];
int retval;
@ -976,7 +976,7 @@ static int send_new_file_work() {
static int send_old_work(int t_min, int t_max) {
char buf[1024], filename[256];
int retval, extract_retval, nsent;
DB_RESULT result;
SCHED_DB_RESULT result;
int now=time(0);
if (!work_needed(true)) {

View File

@ -100,7 +100,7 @@ static int possibly_give_result_new_deadline(
// Return true if there were any such jobs
//
bool resend_lost_work() {
DB_RESULT result;
SCHED_DB_RESULT result;
std::vector<DB_RESULT>results;
unsigned int i;
char buf[256];
@ -190,8 +190,9 @@ bool resend_lost_work() {
continue;
}
retval = update_wu_transition_time(
wu, result.report_deadline + config.report_grace_period
retval = update_wu_on_send(
wu, result.report_deadline + config.report_grace_period,
*app, *bavp
);
if (retval) {
log_messages.printf(MSG_CRITICAL,

View File

@ -36,7 +36,7 @@
// TODO: from here to add_result_to_reply()
// (which updates the DB record) should be a transaction
//
int read_sendable_result(DB_RESULT& result) {
int read_sendable_result(SCHED_DB_RESULT& result) {
int retval = result.lookup_id(result.id);
if (retval) {
log_messages.printf(MSG_CRITICAL,
@ -61,7 +61,7 @@ bool wu_is_infeasible_slow(
char buf[256];
int retval;
int n;
DB_RESULT result;
SCHED_DB_RESULT result;
// Don't send if we've already sent a result of this WU to this user.
//
@ -213,7 +213,7 @@ double JOB_SET::higher_score_disk_usage(double v) {
void JOB_SET::send() {
WORKUNIT wu;
DB_RESULT result;
SCHED_DB_RESULT result;
int retval;
std::list<JOB>::iterator i = jobs.begin();

View File

@ -920,7 +920,11 @@ static int insert_deadline_tag(RESULT& result) {
return 0;
}
int update_wu_transition_time(WORKUNIT wu, time_t x) {
// update workunit when send an instance of it:
// - transition time
// - app_version_id, if app uses homogeneous app version
//
int update_wu_on_send(WORKUNIT wu, time_t x, APP& app, BEST_APP_VERSION& bav) {
DB_WORKUNIT dbwu;
char buf[256];
@ -932,6 +936,11 @@ int update_wu_transition_time(WORKUNIT wu, time_t x) {
"transition_time=if(transition_time<%d, transition_time, %d)",
(int)x, (int)x
);
if (app.homogeneous_app_version && (bav.avp->id != wu.app_version_id)) {
char buf2[256];
sprintf(buf2, ", app_version_id=%d", bav.avp->id);
strcat(buf, buf2);
}
return dbwu.update_field(buf);
}
@ -1073,7 +1082,7 @@ inline static int get_app_version_id(BEST_APP_VERSION* bavp) {
}
int add_result_to_reply(
DB_RESULT& result, WORKUNIT& wu, BEST_APP_VERSION* bavp,
SCHED_DB_RESULT& result, WORKUNIT& wu, BEST_APP_VERSION* bavp,
bool locality_scheduling
) {
int retval;
@ -1140,8 +1149,8 @@ int add_result_to_reply(
);
}
retval = update_wu_transition_time(
wu, result.report_deadline + config.report_grace_period
retval = update_wu_on_send(
wu, result.report_deadline + config.report_grace_period, *app, *bavp
);
if (retval) {
log_messages.printf(MSG_CRITICAL,
@ -1169,7 +1178,7 @@ int add_result_to_reply(
);
return retval;
}
result.bavp = bavp;
result.bav = *bavp;
g_reply->insert_result(result);
if (g_wreq->rsc_spec_request) {
if (bavp->host_usage.ncudas) {
@ -1651,7 +1660,7 @@ void send_work_setup() {
// If a record is not in DB, create it.
//
int update_host_app_versions(vector<RESULT>& results, int hostid) {
int update_host_app_versions(vector<SCHED_DB_RESULT>& results, int hostid) {
vector<DB_HOST_APP_VERSION> new_havs;
unsigned int i, j;
int retval;

View File

@ -26,7 +26,7 @@
extern void send_work();
extern int add_result_to_reply(
DB_RESULT& result, WORKUNIT& wu, BEST_APP_VERSION* bavp,
SCHED_DB_RESULT& result, WORKUNIT& wu, BEST_APP_VERSION* bavp,
bool locality_scheduling
);
@ -62,7 +62,7 @@ extern bool wu_already_in_reply(WORKUNIT& wu);
extern double estimate_duration(WORKUNIT& wu, BEST_APP_VERSION&);
extern int update_wu_transition_time(WORKUNIT wu, time_t x);
extern int update_wu_on_send(WORKUNIT wu, time_t x, APP&, BEST_APP_VERSION&);
extern void lock_sema();
extern void unlock_sema();

View File

@ -181,7 +181,7 @@ void WORK_REQ::add_no_work_message(const char* message) {
//
const char* SCHEDULER_REQUEST::parse(FILE* fin) {
char buf[256];
RESULT result;
SCHED_DB_RESULT result;
int retval;
strcpy(authenticator, "");
@ -961,7 +961,7 @@ void SCHEDULER_REPLY::insert_workunit_unique(WORKUNIT& wu) {
wus.push_back(wu);
}
void SCHEDULER_REPLY::insert_result(RESULT& result) {
void SCHEDULER_REPLY::insert_result(SCHED_DB_RESULT& result) {
results.push_back(result);
}
@ -1054,7 +1054,7 @@ int APP_VERSION::write(FILE* fout) {
return 0;
}
int RESULT::write_to_client(FILE* fout) {
int SCHED_DB_RESULT::write_to_client(FILE* fout) {
char buf[BLOB_SIZE];
strcpy(buf, xml_doc_in);
@ -1066,8 +1066,8 @@ int RESULT::write_to_client(FILE* fout) {
*p = 0;
fputs(buf, fout);
APP_VERSION* avp = bavp->avp;
CLIENT_APP_VERSION* cavp = bavp->cavp;
APP_VERSION* avp = bav.avp;
CLIENT_APP_VERSION* cavp = bav.cavp;
if (avp) {
PLATFORM* pp = ssp->lookup_platform_id(avp->platformid);
fprintf(fout,
@ -1089,7 +1089,7 @@ int RESULT::write_to_client(FILE* fout) {
return 0;
}
int RESULT::parse_from_client(FILE* fin) {
int SCHED_DB_RESULT::parse_from_client(FILE* fin) {
char buf[256];
char tmp[BLOB_SIZE];

View File

@ -192,6 +192,22 @@ struct BEST_APP_VERSION {
}
};
struct SCHED_DB_RESULT : DB_RESULT {
// the following used by the scheduler, but not stored in the DB
//
char wu_name[256];
double fpops_per_cpu_sec;
double fpops_cumulative;
double intops_per_cpu_sec;
double intops_cumulative;
int units; // used for granting credit by # of units processed
int parse_from_client(FILE*);
char platform_name[256];
BEST_APP_VERSION bav;
int write_to_client(FILE*);
};
// subset of global prefs used by scheduler
//
struct GLOBAL_PREFS {
@ -288,7 +304,7 @@ struct SCHEDULER_REQUEST {
HOST host; // request message is parsed into here.
// does NOT contain the full host record.
COPROCS coprocs;
std::vector<RESULT> results;
std::vector<SCHED_DB_RESULT> results;
// completed results being reported
std::vector<MSG_FROM_HOST_DESC> msgs_from_host;
std::vector<FILE_INFO> file_infos;
@ -435,7 +451,6 @@ struct WORK_REQ {
std::vector<USER_MESSAGE> no_work_messages;
std::vector<BEST_APP_VERSION*> best_app_versions;
std::vector<BEST_APP_VERSION*> all_best_app_versions;
std::vector<DB_HOST_APP_VERSION> host_app_versions;
std::vector<DB_HOST_APP_VERSION> host_app_versions_orig;
@ -457,11 +472,7 @@ struct WORK_REQ {
void add_no_work_message(const char*);
void get_job_limits();
~WORK_REQ() {
for (unsigned int i=0; i<all_best_app_versions.size(); i++) {
delete all_best_app_versions[i];
}
}
~WORK_REQ() {}
};
// NOTE: if any field requires initialization,
@ -485,7 +496,7 @@ struct SCHEDULER_REPLY {
std::vector<APP> apps;
std::vector<APP_VERSION> app_versions;
std::vector<WORKUNIT>wus;
std::vector<RESULT>results;
std::vector<SCHED_DB_RESULT>results;
std::vector<std::string>result_acks;
std::vector<std::string>result_aborts;
std::vector<std::string>result_abort_if_not_starteds;
@ -502,7 +513,7 @@ struct SCHEDULER_REPLY {
void insert_app_unique(APP&);
void insert_app_version_unique(APP_VERSION&);
void insert_workunit_unique(WORKUNIT&);
void insert_result(RESULT&);
void insert_result(SCHED_DB_RESULT&);
void insert_message(const char* msg, const char* prio);
void insert_message(USER_MESSAGE&);
void set_delay(double);

View File

@ -374,6 +374,39 @@ static double max_32b_address_space() {
return 2*GIGA;
}
// The WU is already committed to an app version.
// - check if this host supports that platform
// - if plan class, check if this host can handle it
// - check if we need work for the resource
//
static BEST_APP_VERSION* check_homogeneous_app_version(
WORKUNIT& wu, bool reliable_only
) {
static BEST_APP_VERSION bav;
bool found=false;
APP_VERSION *avp = ssp->lookup_app_version(wu.app_version_id);
for (unsigned int i=0; i<g_request->platforms.list.size(); i++) {
PLATFORM* p = g_request->platforms.list[i];
if (p->id == avp->platformid) {
found = true;
break;
}
}
if (!found) return NULL;
if (strlen(avp->plan_class)) {
if (!app_plan(*g_request, avp->plan_class, bav.host_usage)) {
return NULL;
}
} else {
bav.host_usage.sequential_app(g_reply->host.p_fpops);
}
if (!need_this_resource(bav.host_usage, avp, NULL)) {
return NULL;
}
return &bav;
}
// return BEST_APP_VERSION for the given job and host, or NULL if none
//
// check_req: check whether we still need work for the resource
@ -411,6 +444,13 @@ BEST_APP_VERSION* get_app_version(
return NULL;
}
// handle the case where we're using homogeneous app version
// and the WU is already committed to an app version
//
if (app->homogeneous_app_version && wu.app_version_id) {
return check_homogeneous_app_version(wu, reliable_only);
}
// see if app is already in memoized array
//
std::vector<BEST_APP_VERSION*>::iterator bavi;
@ -538,7 +578,6 @@ BEST_APP_VERSION* get_app_version(
}
}
g_wreq->best_app_versions.push_back(bavp);
g_wreq->all_best_app_versions.push_back(bavp);
if (!bavp->present) return NULL;
return bavp;
}

View File

@ -352,10 +352,12 @@ int handle_wu(
}
// if WU has results with errors and no success yet,
// reset homogeneous redundancy class to give other platforms a try
// reset homogeneous redundancy class to give other platforms a try;
// also reset app version ID if using HAV
//
if (nerrors && !(nsuccess || ninprogress)) {
wu_item.hr_class = 0;
wu_item.app_version_id = 0;
}
if (nerrors > wu_item.max_error_results) {