- scheduler: in send_result_abort(), distinguish between

the enumeration ending versus it failing.
        This fixes a bug where lots of results would be
        incorrectly aborted if there's a database problem.
        This fix only matters if you use the <send_result_abort> config flag.
        (from Kevin Reed)
    - validator: add -credit_from_wu option.
        This gets credit from the workunit XML
        (which must have a <credit> flag).
        This lets you use credit-from-WU with the standard validators
        (sample_bitwise and sample_trivial)

    sched/
        handle_request.C
        validator.C
        validate_util.h

svn path=/trunk/boinc/; revision=12558
This commit is contained in:
David Anderson 2007-05-04 17:59:50 +00:00
parent bcc72e46be
commit 4c76f95a34
4 changed files with 93 additions and 55 deletions

View File

@ -4383,3 +4383,21 @@ Rom 3 May 2007
Rom 3 May 2007 (HEAD)
- Tag for 5.9.7 release, all platforms
boinc_core_release_5_9_7
David 4 May 2007
- scheduler: in send_result_abort(), distinguish between
the enumeration ending versus it failing.
This fixes a bug where lots of results would be
incorrectly aborted if there's a database problem.
This fix only matters if you use the <send_result_abort> config flag.
(from Kevin Reed)
- validator: add -credit_from_wu option.
This gets credit from the workunit XML
(which must have a <credit> flag).
This lets you use credit-from-WU with the standard validators
(sample_bitwise and sample_trivial)
sched/
handle_request.C
validator.C
validate_util.h

View File

@ -486,16 +486,20 @@ static int update_host_record(HOST& initial_host, HOST& xhost, USER& user) {
return 0;
}
// Figure out which of the results the user currently has
// should be aborted outright, or aborted if not started yet
//
int send_result_abort(
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, SCHED_SHMEM& ss
) {
int aborts_sent = 0;
int aborts_sent = 0;
int retval = 0;
DB_IN_PROGRESS_RESULT result;
std::string result_names;
unsigned int i;
if (sreq.other_results.size() == 0) {
return 0;
return 0;
}
// initially mark all results for abort and build list of results to query
@ -504,7 +508,7 @@ int send_result_abort(
OTHER_RESULT& orp=sreq.other_results[i];
orp.abort = true;
orp.abort_if_not_started = false;
if ( i > 0 ) result_names.append(", ");
if (i > 0) result_names.append(", ");
result_names.append("'");
result_names.append(orp.name);
result_names.append("'");
@ -516,54 +520,60 @@ int send_result_abort(
// query the db for the results and set the appropriate flag
//
while (!result.enumerate(reply.host.id, result_names.c_str())) {
while (!(retval = result.enumerate(reply.host.id, result_names.c_str()))) {
for (i=0; i<sreq.other_results.size(); i++) {
OTHER_RESULT& orp = sreq.other_results[i];
if (!strcmp(orp.name.c_str(), result.result_name)) {
if ( result.error_mask&WU_ERROR_CANCELLED ) {
// do nothing, it should be aborted
} else if ( result.assimilate_state == ASSIMILATE_DONE ) {
// only send abort if not started
orp.abort = false;
orp.abort_if_not_started = true;
} else if ( result.server_state == RESULT_SERVER_STATE_OVER && result.outcome == RESULT_OUTCOME_NO_REPLY ) {
// the result is late so abort it if it hasn't been started
orp.abort=false;
orp.abort_if_not_started = true;
} else {
// all is good with the result - let it process
orp.abort=false;
}
break;
if ( result.error_mask&WU_ERROR_CANCELLED ) {
// do nothing, it should be aborted
} else if ( result.assimilate_state == ASSIMILATE_DONE ) {
// only send abort if not started
orp.abort = false;
orp.abort_if_not_started = true;
} else if ( result.server_state == RESULT_SERVER_STATE_OVER && result.outcome == RESULT_OUTCOME_NO_REPLY ) {
// the result is late so abort it if it hasn't been started
orp.abort=false;
orp.abort_if_not_started = true;
} else {
// all is good with the result - let it process
orp.abort=false;
}
break;
}
}
}
// If enumeration returned an error, don't send any aborts
//
if (retval && (retval != ERR_DB_NOT_FOUND)) {
return retval;
}
// loop through the results and send the appropriate message (if any)
//
for (i=0; i<sreq.other_results.size(); i++) {
OTHER_RESULT& orp = sreq.other_results[i];
if (orp.abort) {
reply.result_aborts.push_back(orp.name);
log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,
"[HOST#%d]: Send result_abort for result %s\n",
reply.host.id, orp.name.c_str()
OTHER_RESULT& orp = sreq.other_results[i];
if (orp.abort) {
reply.result_aborts.push_back(orp.name);
log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,
"[HOST#%d]: Send result_abort for result %s\n",
reply.host.id, orp.name.c_str()
);
// send user message
// send user message
char buf[256];
sprintf(buf, "Result %s is no longer usable\n", orp.name.c_str());
USER_MESSAGE um(buf, "high");
reply.insert_message(um);
} else if (orp.abort_if_not_started) {
reply.result_abort_if_not_starteds.push_back(orp.name);
log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,
"[HOST#%d]: Send result_abort_if_unstarted for result %s\n",
reply.host.id, orp.name.c_str()
reply.result_abort_if_not_starteds.push_back(orp.name);
log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,
"[HOST#%d]: Send result_abort_if_unstarted for result %s\n",
reply.host.id, orp.name.c_str()
);
}
}
}
return aborts_sent;
return aborts_sent;
}
// 1) Decide which global prefs to use for sched decisions: either

View File

@ -28,6 +28,7 @@
extern int get_output_file_path(RESULT const& result, std::string& path);
extern int get_output_file_paths(RESULT const& result, std::vector<std::string>&);
extern double median_mean_credit(WORKUNIT&, std::vector<RESULT>& results);
extern double get_credit_from_wu(WORKUNIT&, std::vector<RESULT>& results);
extern int update_credit_per_cpu_sec(
double credit, double cpu_time, double& credit_per_cpu_sec
);

View File

@ -20,13 +20,14 @@
// validator - check and validate results, and grant credit
// -app appname
// [-d debug_level]
// [-one_pass_N_WU N] // Validate only N WU in one pass, then exit
// [-one_pass] // make one pass through WU table, then exit
// [-mod n i] // process only WUs with (id mod n) == i
// [-max_granted_credit X] // limit maximum granted credit to X
// [-max_claimed_credit Y] // invalid if claims more than Y
// [-grant_claimed_credit] // just grant whatever is claimed
// [-update_credited_job] // add userid/wuid pair to credited_job table
// [-one_pass_N_WU N] // Validate only N WU in one pass, then exit
// [-one_pass] // make one pass through WU table, then exit
// [-mod n i] // process only WUs with (id mod n) == i
// [-max_granted_credit X] // limit maximum granted credit to X
// [-max_claimed_credit Y] // invalid if claims more than Y
// [-grant_claimed_credit] // just grant whatever is claimed
// [-update_credited_job] // add userid/wuid pair to credited_job table
// [-credit_from_wu] // get credit from WU XML
//
// This program must be linked with two project-specific functions:
// check_set() and check_pair().
@ -80,6 +81,7 @@ double max_granted_credit = 0;
double max_claimed_credit = 0;
bool grant_claimed_credit = false;
bool update_credited_job = false;
bool credit_from_wu = false;
void update_error_rate(DB_HOST& host, bool valid) {
if (host.error_rate > 1) host.error_rate = 1;
@ -210,16 +212,16 @@ int is_valid(RESULT& result, WORKUNIT& wu) {
retval = credited_job.insert();
if (retval) {
log_messages.printf(
SCHED_MSG_LOG::MSG_NORMAL,
SCHED_MSG_LOG::MSG_CRITICAL,
"[RESULT#%d] Warning: credited_job insert failed (userid: %d workunit: %d err: %d)\n",
result.id, user.id, long(wu.opaque), retval
);
} else {
log_messages.printf(
SCHED_MSG_LOG::MSG_DEBUG,
"[RESULT#%d %s] Granted contribution to valid result [WU#%d OPAQUE#%d USER#%d]\n",
result.id, result.name, wu.id, long(wu.opaque), user.id
);
log_messages.printf(
SCHED_MSG_LOG::MSG_DEBUG,
"[RESULT#%d %s] added credited_job record [WU#%d OPAQUE#%d USER#%d]\n",
result.id, result.name, wu.id, long(wu.opaque), user.id
);
}
}
@ -431,6 +433,10 @@ int handle_wu(
}
if (retry) transition_time = DELAYED;
if (credit_from_wu) {
credit = get_credit_from_wu(wu, results);
}
// scan results.
// update as needed, and count the # of results
// that are still outcome=SUCCESS
@ -653,15 +659,16 @@ int main(int argc, char** argv) {
"\nUsage: %s -app <app-name> [OPTIONS]\n"
"Start validator for application <app-name>\n\n"
"Optional arguments:\n"
" -one_pass_N_WU N Validate at most N WUs, then exit\n"
" -one_pass Make one pass through WU table, then exit\n"
" -mod n i Process only WUs with (id mod n) == i\n"
" -max_claimed_credit X If a result claims more credit than this, mark it as invalid\n"
" -max_granted_credit X Grant no more than this amount of credit to a result\n"
" -grant_claimed_credit Grant the claimed credit, regardless of what other results for this workunit claimed\n"
" -update_credited_job Add userid/wuid pair to credited_job after granting credit\n"
" -sleep_interval n Set sleep-interval to n\n"
" -d level Set debug-level\n\n";
" -one_pass_N_WU N Validate at most N WUs, then exit\n"
" -one_pass Make one pass through WU table, then exit\n"
" -mod n i Process only WUs with (id mod n) == i\n"
" -max_claimed_credit X If a result claims more credit than this, mark it as invalid\n"
" -max_granted_credit X Grant no more than this amount of credit to a result\n"
" -grant_claimed_credit Grant the claimed credit, regardless of what other results for this workunit claimed\n"
" -update_credited_job Add record to credited_job table after granting credit\n"
" -credit_from_wu Credit is specified in WU XML\n"
" -sleep_interval n Set sleep-interval to n\n"
" -d level Set debug-level\n\n";
if ( (argc > 1) && ( !strcmp(argv[1], "-h") || !strcmp(argv[1], "--help") ) ) {
printf (usage, argv[0] );
@ -694,7 +701,9 @@ int main(int argc, char** argv) {
} else if (!strcmp(argv[i], "-grant_claimed_credit")) {
grant_claimed_credit = true;
} else if (!strcmp(argv[i], "-update_credited_job")) {
update_credited_job= true;
update_credited_job = true;
} else if (!strcmp(argv[i], "-credit_from_wu")) {
credit_from_wu = true;
} else {
fprintf(stderr, "Invalid option '%s'\nTry `%s --help` for more information\n", argv[i], argv[0]);
log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL, "unrecognized arg: %s\n", argv[i]);