flush couldnt_send results

svn path=/trunk/boinc/; revision=2343
This commit is contained in:
David Anderson 2003-09-21 21:00:25 +00:00
parent 546947d011
commit 1938fc43ee
6 changed files with 376 additions and 109 deletions

View File

@ -6257,6 +6257,17 @@ Karl 2003/09/10
Sched/
transitioner.C
David Sept 11 2003
- transitioner: set file delete state to READY only if it's currently INIT
This should make file_deleter more efficient,
because each WU/result is file-deleted only once.
Also means that unlink()s should always succeed
- file deleter: print retval of unlink() (should always be zero)
sched/
file_deleter.C
transitioner.C
Karl 2003/09/15
- fixed transitioner bug where usage of `unsigned int' instead of signed
int caused 400k results to be generated.
@ -6274,13 +6285,13 @@ Eric K. 16-Sep-2003
tools/process_result_template.C
Jeff 2003/09/17
No code changes. This is the branch point for the seti beta. The tag is:
No code changes. This is the branch point for the seti beta. The tag is:
setiathome-4.xx_all_platforms_beta
for both seti_boinc and boinc. They both make on solaris without serious
error and the resulting seti client links without error with the resulting
boinc libs.
for both seti_boinc and boinc. They both make on solaris without serious
error and the resulting seti client links without error with the resulting
boinc libs.
Karl,Eric Heien 2003/09/18
- fixed some pre-ANSI C
@ -6288,3 +6299,18 @@ Karl,Eric Heien 2003/09/18
transitioner.C
tools/
create_work.C
David Sept 21 2003
- Added mechanism for marking results as COULDNT_SEND:
When the scheduler finds that a result is infeasible
for a host, it increments the "infeasible_count" in the work array.
When the feeder finds that too many results in the array
have nonzero infeasible_count, it marks some of them
as COULDNT_SEND and removes them from the array
db/
boinc_db.h
sched/
feeder.C
handle_request.C
sched_shmem.h

View File

@ -243,7 +243,10 @@ struct HOST {
// values for file_delete state
#define FILE_DELETE_INIT 0
#define FILE_DELETE_READY 1
// set to this value only when we believe all files are uploaded
#define FILE_DELETE_DONE 2
// means the file uploader ATTEMPTED to delete files.
// May have failed. TODO: retry delete later
// values for assimilate_state
#define ASSIMILATE_INIT 0
@ -468,6 +471,7 @@ public:
int get_id();
void db_print(char*);
void db_parse(MYSQL_ROW &row);
void operator=(WORKUNIT& w) {WORKUNIT::operator=(w);}
};
class DB_WORKSEQ : public DB_BASE, public WORKSEQ {

131
doc/redundancy.php Normal file
View File

@ -0,0 +1,131 @@
<?
require_once("docutil.php");
page_head("Redundancy and errors");
echo "
A BOINC 'result' abstracts an instance of a computation,
possibly not performed yet.
Typically, a BOINC server sends 'results' to clients,
and the clients perform the computation and replies to the server.
But many things can happen to a result:
<ul>
<li> The client computes the result correctly and returns it.
<li> The client computes the result incorrectly and returns it.
<li> The client fails to download or upload files.
<li> The application crashes on the client.
<li> The client never returns anything because it breaks
or stops running BOINC.
<li> The scheduler isn't able to send the result because it
requires more resources than any client has.
</ul>
<p>
BOINC provides a form of redundant computing
in which each computation is performed on multiple clients,
the results are compared,
and are accepted only when a 'consensus' is reached.
In some cases new results must be created and sent.
<p>
BOINC manages most of the details;
however, there are two places where the application developer gets involved:
<ul>
<li> <b>Validation:</b>
This performs two functions.
First, when a sufficient number (a 'quorum') of successful results
have been returned, it compares them and sees if there is a 'consensus'.
The method of comparing results (which may need to take into
account platform-varying floating point arithmetic)
and the policy for determining consensus (e.g., best two out of three)
are supplied by the application.
If a consensus is reached, a particular result is designated
as the 'canonical' result.
Second, if a result arrives after a consensus has already been reached,
the new result is compared with the canonical result;
this determines whether the user gets credit.
<li> <b>Assimilation:</b>
This is the mechanism by which the project is notifed of the completion
(success or unsuccessful) of a work unit.
It is performed exactly once per work unit.
If the work unit was completed successfully
(i.e. if there is a canonical result)
the project-supplied function reads the output file(s)
and handles the information, e.g. by recording it in a database.
If the workunit failed,
the function might write an entry in a log,
send an email, etc.
</ul>
<hr>
In the following example,
the project creates a workunit with
<br>
min_quorum = 2
<br>
target_nresults = 3
<br>
max_delay = 10
<p>
BOINC automatically creates three results,
which are sent at various times.
At time 8, two successful results have returned
so the validater is invoked.
It finds a consensus, so the work unit is assimilated.
At time 10 result 3 arrives;
validation is performed again,
this time to check whether result 3 gets credit.
<pre>
time 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14
created validate; assimilate
WU x x x
created sent success
result 1 x x---------------x
created sent success
result 2 x x-------------------x
created sent success
result 3 x x-----------------------x
</pre>
<hr>
In the next example,
result 2 is lost (i.e., there's no reply to the BOINC scheduler).
When result 3 arrives a consensus is found
and the work unit is assimilated.
At time 13 the scheduler 'gives up' on result 2
(this allows it to delete the canonical result's output files,
which are needed to validate late-arriving results).
<pre>
time 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14
created validate; assimilate
WU x x x
created sent success
result 1 x x---------------x
created sent lost giveup
result 2 x x-------- x
created sent success
result 3 x x-----------------------x
</pre>
<hr>
In the next example,
results 2 returns an error at time 5.
This reduces the number of outstanding results to 2;
because target_nresults is 3, BOINC creates another result (result 4).
A consensus is reached at time 9, before result 4 is returned.
<pre>
time 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14
created validate; assimilate
WU x x x
created sent success
result 1 x x---------------x
created sent error
result 2 x x-------x
created sent success
result 3 x x-------------------x
created sent success
result 4 x x----------------------x
</pre>
";
page_tail();
?>

View File

@ -26,10 +26,6 @@
//
// -asynch fork and run in a separate process
// TODO:
// - check for wu/results that don't get sent for a long time;
// generate a warning message
// Trigger files:
// The feeder program periodically checks for two trigger files:
//
@ -86,8 +82,8 @@ int check_reread_trigger() {
return 0;
}
// Try keep the wu_results array filled.
// This is actually a little tricky.
// Try to keep the work array filled.
// This is a little tricky.
// We use an enumerator.
// The inner loop scans the wu_result table,
// looking for empty slots and trying to fill them in.
@ -102,11 +98,154 @@ int check_reread_trigger() {
// Crude approach: if a "collision" (as above) occurred on
// a pass through the array, wait a long time (5 sec)
//
void feeder_loop() {
int i, j, nadditions, ncollisions, retval;
// Checking for infeasible results (i.e. can't sent to any host):
// - the "infeasible_count" field of WU_RESULT keeps track of
// how many times the WU_RESULT was infeasible for a host
// - the scheduler gives priority to results that have infeasible_count > 0
// - the feeder tries to ensure that the number of WU_RESULTs
// with infeasible_count > 0 doesn't exceed MAX_INFEASIBLE
// (compiled into feeder).
// If it does, then the feeder picks the WU_RESULT with
// the largest infeasible_count,
// flags the result as OVER with outcome COULDNT_SEND,
// flags the WU for the transitioner,
// and repeats this until the infeasible count is low enough again
static void scan_work_array(
DB_RESULT& result, char* clause,
int& nadditions, int& ncollisions, int& ninfeasible,
bool& no_wus
) {
int i, j, retval;
DB_WORKUNIT wu;
bool collision, restarted_enum = false;
for (i=0; i<ssp->nwu_results; i++) {
WU_RESULT& wu_result = ssp->wu_results[i];
if (wu_result.present) {
if (wu_result.infeasible_count > 0) {
ninfeasible++;
}
} else {
try_again:
retval = result.enumerate(clause);
if (retval) {
// if we already restarted the enum on this array scan,
// there's no point in doing it again.
//
if (restarted_enum) {
log_messages.printf(SchedMessages::DEBUG, "already restarted enum on this array scan\n");
break;
}
// restart the enumeration
//
restarted_enum = true;
retval = result.enumerate(clause);
log_messages.printf(SchedMessages::DEBUG, "restarting enumeration\n");
if (retval) {
log_messages.printf(SchedMessages::DEBUG, "enumeration restart returned nothing\n");
no_wus = true;
break;
}
}
// there's a chance this result was sent out
// after the enumeration started.
// So read it from the DB again
//
retval = result.lookup_id(result.id);
if (retval) {
log_messages.printf(SchedMessages::NORMAL,
"[%s] can't reread result: %d\n", result.name, retval
);
goto try_again;
}
if (result.server_state != RESULT_SERVER_STATE_UNSENT) {
log_messages.printf(
SchedMessages::NORMAL,
"[%s] RESULT STATE CHANGED\n",
result.name
);
goto try_again;
}
collision = false;
for (j=0; j<ssp->nwu_results; j++) {
if (ssp->wu_results[j].present
&& ssp->wu_results[j].result.id == result.id
) {
ncollisions++;
collision = true;
break;
}
}
if (!collision) {
log_messages.printf(
SchedMessages::NORMAL,
"[%s] adding result in slot %d\n",
result.name, i
);
retval = wu.lookup_id(result.workunitid);
if (retval) {
log_messages.printf(
SchedMessages::CRITICAL,
"[%s] can't read workunit #%d: %d\n",
result.name, result.workunitid, retval
);
continue;
}
wu_result.result = result;
wu_result.workunit = wu;
wu_result.present = true;
wu_result.infeasible_count = 0;
nadditions++;
}
}
}
}
int remove_most_infeasible() {
int i, max, imax=-1, retval;
DB_RESULT result;
DB_WORKUNIT wu;
bool no_wus, collision, restarted_enum;
max = 0;
for (i=0; i<ssp->nwu_results; i++) {
WU_RESULT& wu_result = ssp->wu_results[i];
if (wu_result.present && wu_result.infeasible_count > max) {
imax = i;
max = wu_result.infeasible_count;
}
}
if (max == 0) return -1; // nothing is infeasible
WU_RESULT& wu_result = ssp->wu_results[imax];
wu_result.present = false; // mark as absent
result = wu_result.result;
wu = wu_result.workunit;
log_messages.printf(
SchedMessages::NORMAL,
"[%s] declaring result as unsendable\n",
result.name
);
result.server_state = RESULT_SERVER_STATE_OVER;
result.outcome = RESULT_OUTCOME_COULDNT_SEND;
retval = result.update();
if (retval) return retval;
wu.transition_time = time(0);
retval = wu.update();
if (retval) return retval;
return 0;
}
void feeder_loop() {
int i, n, retval, nadditions, ncollisions, ninfeasible;
DB_RESULT result;
bool no_wus;
char clause[256];
sprintf(clause, "where server_state=%d order by random limit %d",
@ -116,84 +255,22 @@ void feeder_loop() {
while (1) {
nadditions = 0;
ncollisions = 0;
ninfeasible = 0;
no_wus = false;
restarted_enum = false;
for (i=0; i<ssp->nwu_results; i++) {
if (!ssp->wu_results[i].present) {
try_again:
retval = result.enumerate(clause);
if (retval) {
// if we already restarted the enum on this pass,
// there's no point in doing it again.
//
if (restarted_enum) {
log_messages.printf(SchedMessages::DEBUG, "already restarted enum on this pass\n");
break;
}
scan_work_array(
result, clause, nadditions, ncollisions, ninfeasible, no_wus
);
// restart the enumeration
//
restarted_enum = true;
retval = result.enumerate(clause);
log_messages.printf(SchedMessages::DEBUG, "restarting enumeration\n");
if (retval) {
log_messages.printf(SchedMessages::DEBUG, "enumeration restart returned nothing\n");
no_wus = true;
break;
}
}
ssp->ready = true;
// there's a chance this result was sent out
// after the enumeration started.
// So read it from the DB again
//
retval = result.lookup_id(result.id);
if (retval) {
log_messages.printf(SchedMessages::NORMAL, "can't reread result %s\n", result.name);
goto try_again;
}
if (result.server_state != RESULT_SERVER_STATE_UNSENT) {
log_messages.printf(
SchedMessages::NORMAL,
"[%s] RESULT STATE CHANGED\n",
result.name
);
goto try_again;
}
collision = false;
for (j=0; j<ssp->nwu_results; j++) {
if (ssp->wu_results[j].present
&& ssp->wu_results[j].result.id == result.id
) {
ncollisions++;
collision = true;
break;
}
}
if (!collision) {
log_messages.printf(
SchedMessages::NORMAL,
"[%s] adding result in slot %d\n",
result.name, i
);
retval = wu.lookup_id(result.workunitid);
if (retval) {
log_messages.printf(
SchedMessages::CRITICAL,
"[%s] can't read workunit #%d: %d\n",
result.name, result.workunitid, retval
);
continue;
}
ssp->wu_results[i].result = result;
ssp->wu_results[i].workunit = wu;
ssp->wu_results[i].present = true;
nadditions++;
}
if (ninfeasible > MAX_INFEASIBLE) {
n = ninfeasible - MAX_INFEASIBLE;
for (i=0; i<n; i++ ) {
retval = remove_most_infeasible();
if (retval) break;
}
}
ssp->ready = true;
if (nadditions == 0) {
log_messages.printf(SchedMessages::DEBUG, "No results added; sleeping 1 sec\n");
sleep(1);

View File

@ -653,39 +653,31 @@ static int update_wu_transition_time(WORKUNIT wu, time_t x) {
return 0;
}
int send_work(
// Make a pass through the wu/results array, sending work.
// If "infeasible_only" is true, send only results that were
// previously infeasible for some host
//
static void scan_work_array(
bool infeasible_only, double& seconds_to_fill, int& nresults,
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM& platform,
SCHED_SHMEM& ss
) {
int i, retval, nresults = 0;
double seconds_to_fill;
int i, retval;
WORKUNIT wu;
DB_RESULT result, result_copy;
log_messages.printf(
SchedMessages::NORMAL,
"[HOST#%d] got request for %d seconds of work\n",
reply.host.id, sreq.work_req_seconds
);
if (sreq.work_req_seconds <= 0) return 0;
seconds_to_fill = sreq.work_req_seconds;
if (seconds_to_fill > MAX_SECONDS_TO_SEND) {
seconds_to_fill = MAX_SECONDS_TO_SEND;
}
if (seconds_to_fill < MIN_SECONDS_TO_SEND) {
seconds_to_fill = MIN_SECONDS_TO_SEND;
}
for (i=0; i<ss.nwu_results && seconds_to_fill>0; i++) {
WU_RESULT& wu_result = ss.wu_results[i];
// the following should be a critical section
//
if (!ss.wu_results[i].present) {
if (!wu_result.present) {
continue;
}
wu = ss.wu_results[i].workunit;
if (infeasible_only && wu_result.infeasible_count==0) {
continue;
}
wu = wu_result.workunit;
double wu_seconds_filled = estimate_duration(wu, reply.host);
@ -694,11 +686,12 @@ int send_work(
SchedMessages::DEBUG, "[HOST#%d] [WU#%d %s] WU is infeasible\n",
reply.host.id, wu.id, wu.name
);
wu_result.infeasible_count++;
continue;
}
result = ss.wu_results[i].result;
ss.wu_results[i].present = false;
result = wu_result.result;
wu_result.present = false;
retval = add_wu_to_reply(wu, reply, platform, ss);
if (retval) continue;
@ -717,7 +710,10 @@ int send_work(
retval = update_wu_transition_time(wu, result.report_deadline);
if (retval) {
log_messages.printf(SchedMessages::CRITICAL, "send_work: can't update WU transition time\n");
log_messages.printf(
SchedMessages::CRITICAL,
"send_work: can't update WU transition time\n"
);
}
// copy the result so we don't overwrite its XML fields
@ -739,6 +735,35 @@ int send_work(
nresults++;
if (nresults == MAX_WUS_TO_SEND) break;
}
}
int send_work(
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM& platform,
SCHED_SHMEM& ss
) {
int nresults = 0;
double seconds_to_fill;
log_messages.printf(
SchedMessages::NORMAL,
"[HOST#%d] got request for %d seconds of work\n",
reply.host.id, sreq.work_req_seconds
);
if (sreq.work_req_seconds <= 0) return 0;
seconds_to_fill = sreq.work_req_seconds;
if (seconds_to_fill > MAX_SECONDS_TO_SEND) {
seconds_to_fill = MAX_SECONDS_TO_SEND;
}
if (seconds_to_fill < MIN_SECONDS_TO_SEND) {
seconds_to_fill = MIN_SECONDS_TO_SEND;
}
// give priority to results that were infeasible for some other host
//
scan_work_array(true, seconds_to_fill, nresults, sreq, reply, platform, ss);
scan_work_array(false, seconds_to_fill, nresults, sreq, reply, platform, ss);
log_messages.printf(
SchedMessages::NORMAL, "[HOST#%d] Sent %d results\n",

View File

@ -26,10 +26,14 @@
#define MAX_APPS 10
#define MAX_APP_VERSIONS 1000
#define MAX_WU_RESULTS 1000
#define MAX_INFEASIBLE 500
// if # of elements in work array that were infeasible for some host
// exceeds this, classify some of them as COULDNT_SEND
// a workunit/result pair
struct WU_RESULT {
bool present;
int infeasible_count;
WORKUNIT workunit;
RESULT result;
};