boinc/sched/timeout_check.C

425 lines
13 KiB
C

// The contents of this file are subject to the BOINC Public License
// Version 1.0 (the "License"); you may not use this file except in
// compliance with the License. You may obtain a copy of the License at
// http://boinc.berkeley.edu/license_1.0.txt
//
// Software distributed under the License is distributed on an "AS IS"
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
// License for the specific language governing rights and limitations
// under the License.
//
// The Original Code is the Berkeley Open Infrastructure for Network Computing.
//
// The Initial Developer of the Original Code is the SETI@home project.
// Portions created by the SETI@home project are Copyright (C) 2002
// University of California at Berkeley. All Rights Reserved.
//
// Contributor(s):
//
// timeout_check - do various time-based tasks
// - time out results
// - create new results to make up for lost ones
// - check for WU error conditions
//
// timeout_check
// -app appname
// [ -nerror n ] if get this many errors, bail on WU
// [ -ndet n ] if get this results w/o consensus, bail
// [ -nredundancy n ] try to get at least this many done results
// [ -asynch ] be asynchronous
using namespace std;
#include <vector>
#include <unistd.h>
#include <sys/time.h>
#include "boinc_db.h"
#include "util.h"
#include "backend_lib.h"
#include "config.h"
#include "sched_util.h"
#define LOCKFILE "timeout_check.out"
#define PIDFILE "timeout_check.pid"
int max_errors = 999;
int max_done = 999;
int nredundancy = 0;
int startup_time;
CONFIG config;
R_RSA_PRIVATE_KEY key;
char app_name[256];
// The scheme for generating unique output filenames is as follows.
// If the original filename is of the form x__y,
// then y is replaced with a string of the form time_seqno,
// where "time" is when this program started up.
// NOTE: if you ever need to start up multiple copies of this,
// you'll need to add a PID in there somewhere.
//
// If the original filename doesn't have __, add a string
// of the form __time_seqno
void make_unique_name(char* name) {
char buf[256], *p;
static int seqno;
sprintf(buf, "%d_%d", startup_time, seqno++);
p = strstr(name, "__");
if (p) {
strcpy(p+2, buf);
} else {
strcat(name, "__");
strcat(name, buf);
}
}
// convert a result's XML document to generate new output filenames.
// The input has the form
// <file_info>
// <name>xxx</name>
// ...
// </file_info>
// ...
// <result>
// <file_ref>
// <file_name>xxx</file_name>
// ...
// </file_ref>
// ...
// </result>
//
// Look for <name>...</name> elements within <file_info>
// and make a unique name based on it;
// apply the same conversion to the <file_name> element later on.
//
// TODO: this is ad-hoc. Would be nice to use some generic
// XML parsing routines, or XSLT or something.
//
int assign_new_names(char* in) {
char *p = in, *n1, *n2, *r;
char name[256], newname[256], element[256], buf[MAX_BLOB_SIZE];
int len;
// notice where the <result> is so we don't try to convert
// the result name
//
r = strstr(in, "<result>");
while (1) {
n1 = strstr(p, "<name>");
if (!n1) break;
if (n1 > r) break; // don't go past <result>
n1 += strlen("<name>");
n2 = strstr(p, "</name>");
if (!n2) {
log_messages.printf(SchedMessages::CRITICAL, "assign_new_names(): malformed XML:\n%s", in);
return 1;
}
len = n2 - n1;
memcpy(name, n1, len);
name[len] = 0;
strcpy(newname, name);
make_unique_name(newname);
strcpy(buf, n2);
strcpy(n1, newname);
strcat(n1, buf);
// replace the name in the <file_name> element
//
sprintf(element, "<file_name>%s</file_name>", name);
n2 = strstr(n1, element);
if (!n2) {
log_messages.printf(SchedMessages::CRITICAL, "assign_new_names(): no <file_name>:\n%s", in);
return 1;
}
strcpy(buf, n2+strlen(element));
sprintf(element, "<file_name>%s</file_name>", newname);
strcpy(n2, element);
strcat(n2, buf);
p = n1;
}
return 0;
}
void handle_wu(DB_WORKUNIT& wu) {
vector<RESULT> results;
DB_RESULT result;
int nerrors, ndone, retval;
unsigned int i, n;
char buf[256];
unsigned int now = time(0);
bool /*wu_error = false,*/ all_over;
log_messages.printf(SchedMessages::DEBUG, "[WU#%d %s] handling WU\n", wu.id, wu.name);
ScopeMessages scope_messages(log_messages, SchedMessages::NORMAL);
// scan the results for the WU
//
sprintf(buf, "where workunitid=%d", wu.id);
while (!result.enumerate(buf)) {
results.push_back(result);
}
if (results.size() == 0) {
log_messages.printf(SchedMessages::NORMAL, "[WU#%d %s] No results\n",
wu.id, wu.name);
return;
}
log_messages.printf(SchedMessages::DEBUG, "[WU#%d %s] enumerated %d results\n",
wu.id, wu.name, (int)results.size());
nerrors = 0;
ndone = 0;
for (i=0; i<results.size(); i++) {
result = results[i];
switch (result.server_state) {
case RESULT_SERVER_STATE_IN_PROGRESS:
if (result.report_deadline < now) {
log_messages.printf(
SchedMessages::NORMAL,
"[WU#%d %s] [RESULT#%d %s] result timed out (%d < %d)\n",
wu.id, wu.name, result.id, result.name, result.report_deadline, now
);
// clean up any incomplete uploads
result.file_delete_state = FILE_DELETE_READY;
result.server_state = RESULT_SERVER_STATE_OVER;
result.received_time = time(0);
result.outcome = RESULT_OUTCOME_NO_REPLY;
result.update();
}
break;
case RESULT_SERVER_STATE_OVER:
switch (result.outcome) {
case RESULT_OUTCOME_COULDNT_SEND:
log_messages.printf(
SchedMessages::NORMAL,
"[WU#%d %s] [RESULT#%d %s] result couldn't be sent\n",
wu.id, wu.name, result.id, result.name
);
wu.error_mask |= WU_ERROR_COULDNT_SEND_RESULT;
// wu_error = true;
break;
case RESULT_OUTCOME_SUCCESS:
ndone++;
break;
default:
nerrors++;
break;
}
break;
}
}
// check for too many errors or too many results
//
if (nerrors > max_errors) {
log_messages.printf(
SchedMessages::NORMAL,
"[WU#%d %s] WU has too many errors (%d errors for %d results)\n",
wu.id, wu.name, nerrors, (int)results.size()
);
wu.error_mask |= WU_ERROR_TOO_MANY_ERROR_RESULTS;
// wu_error = true;
}
if (ndone > max_done) {
log_messages.printf(
SchedMessages::NORMAL,
"[WU#%d %s] WU has too many answers (%d of %d results)\n",
wu.id, wu.name, ndone, (int)results.size()
);
wu.error_mask |= WU_ERROR_TOO_MANY_RESULTS;
// wu_error = true;
}
all_over = true;
// if this WU had an error, don't send any unsent results
//
if (wu.error_mask) {
for (i=0; i<results.size(); i++) {
result = results[i];
if (result.server_state == RESULT_SERVER_STATE_UNSENT) {
result.server_state = RESULT_SERVER_STATE_OVER;
result.received_time = time(0);
result.outcome = RESULT_OUTCOME_DIDNT_NEED;
result.update();
}
}
if (wu.assimilate_state == ASSIMILATE_INIT) {
wu.assimilate_state = ASSIMILATE_READY;
}
} else {
// If no error, generate new results if needed.
// Munge the XML of an existing result
// to create unique new output filenames.
//
if (nredundancy > ndone) {
n = nredundancy - ndone; // TODO: subtract # in-progress?
all_over = false;
log_messages.printf(
SchedMessages::NORMAL,
"[WU#%d %s] Generating %d more results\n",
wu.id, wu.name, n
);
for (i=0; i<n; i++) {
result = results[0];
make_unique_name(result.name);
initialize_result(result, wu);
remove_signatures(result.xml_doc_in);
assign_new_names(result.xml_doc_in);
add_signatures(result.xml_doc_in, key);
retval = result.insert();
if (retval) {
log_messages.printf(
SchedMessages::CRITICAL,
"[WU#%d %s] [RESULT#%d %s] result.insert() %d\n",
wu.id, wu.name, result.id, result.name, retval
);
break;
}
}
}
}
if (all_over) {
// see if all results are OVER and result is assimilated;
// if so we don't need to check this WU ever again.
//
for (i=0; i<results.size(); i++) {
result = results[i];
if (result.server_state != RESULT_SERVER_STATE_OVER) {
all_over = false;
break;
}
}
}
if (all_over && wu.assimilate_state == ASSIMILATE_DONE) {
wu.file_delete_state = FILE_DELETE_READY;
wu.timeout_check_time = 0;
log_messages.printf(
SchedMessages::DEBUG,
"[WU#%d %s] ASSIMILATE_DONE => setting FILE_DELETE_READY\n",
wu.id, wu.name
);
} else {
wu.timeout_check_time = now + wu.delay_bound;
}
retval = wu.update();
if (retval) {
log_messages.printf(
SchedMessages::CRITICAL,
"[WU#%d %s] workunit.update() %d\n",
wu.id, wu.name, retval
);
}
}
bool do_pass(APP& app) {
DB_WORKUNIT wu;
char buf[256];
bool did_something = false;
check_stop_trigger();
// loop over WUs that are due to be checked
//
sprintf(buf, "where appid=%d and timeout_check_time>0 and timeout_check_time<%d", app.id, (int)time(0));
while (!wu.enumerate(buf)) {
did_something = true;
handle_wu(wu);
}
return did_something;
}
void main_loop(bool one_pass) {
DB_APP app;
int retval;
char buf[256];
retval = boinc_db_open(config.db_name, config.db_passwd);
if (retval) {
log_messages.printf(SchedMessages::CRITICAL, "boinc_db_open: %d\n", retval);
exit(1);
}
sprintf(buf, "where name='%s'", app_name);
retval = app.lookup(buf);
if (retval) {
log_messages.printf(SchedMessages::CRITICAL, "can't find app %s\n", app.name);
exit(1);
}
if (one_pass) {
do_pass(app);
} else {
while (1) {
if (!do_pass(app)) sleep(1);
}
}
}
int main(int argc, char** argv) {
int i, retval;
bool asynch = false, one_pass=false;
char path[256];
check_stop_trigger();
startup_time = time(0);
for (i=1; i<argc; i++) {
if (!strcmp(argv[i], "-app")) {
strcpy(app_name, argv[++i]);
} else if (!strcmp(argv[i], "-nerror")) {
max_errors = atoi(argv[++i]);
} else if (!strcmp(argv[i], "-ndet")) {
max_done = atoi(argv[++i]);
} else if (!strcmp(argv[i], "-asynch")) {
asynch = true;
} else if (!strcmp(argv[i], "-one_pass")) {
one_pass = true;
} else if (!strcmp(argv[i], "-d")) {
log_messages.set_debug_level(atoi(argv[++i]));
} else if (!strcmp(argv[i], "-nredundancy")) {
nredundancy = atoi(argv[++i]);;
}
}
retval = config.parse_file();
if (retval) {
log_messages.printf(SchedMessages::CRITICAL, "can't read config file\n");
exit(1);
}
sprintf(path, "%s/upload_private", config.key_dir);
retval = read_key_file(path, key);
if (retval) {
log_messages.printf(SchedMessages::CRITICAL, "can't read key\n");
exit(1);
}
if (asynch) {
if (fork()) {
exit(0);
}
}
// // Call lock_file after fork(), because file locks are not always inherited
// if (lock_file(LOCKFILE)) {
// log_messages.printf(SchedMessages::NORMAL, "Another copy of timeout_check is already running\n");
// exit(1);
// }
// write_pid_file(PIDFILE);
log_messages.printf(SchedMessages::NORMAL, "Starting\n");
install_sigint_handler();
main_loop(one_pass);
}