file deletion, assimilation

svn path=/trunk/boinc/; revision=752
This commit is contained in:
David Anderson 2002-12-17 19:00:43 +00:00
parent c9ca6ff61f
commit cba3f6fd36
14 changed files with 355 additions and 21 deletions

View File

@ -2637,3 +2637,26 @@ David Dec 13 2002
main.C
lib/
util.C,y
David Dec. 17 2002
- Added new state fields to workunit and result to support
file deletion and assimilation backend phases
- Added sample assimilator program
- Added file_deleter program (AKA garbage collector)
client/
main.C
db/
db.h
db_mysql.C
schema.sql
sched/
Makefile.in
assimilator.C (new)
config.C,h
result_retry.C
validate.C
test/
test.inc
tools/
backend_lib.C

View File

@ -35,8 +35,8 @@
#include "prefs.h"
#include "util.h"
// Display a message to the user. Depending on the priority, the
// message may be more or less obtrusive
// Display a message to the user.
// Depending on the priority, the message may be more or less obtrusive
//
void show_message(char* message, char* priority) {
if (!strcmp(priority, "high")) {

35
db/db.h
View File

@ -188,11 +188,27 @@ struct HOST {
int parse_net_stats(FILE*);
};
#define WU_STATE_SEND_FAIL 1
// values for main_state
#define WU_MAIN_STATE_INIT 0
#define WU_MAIN_STATE_DONE 1
#define WU_MAIN_STATE_ERROR 2
// values for file_delete state
#define FILE_DELETE_INIT 0
#define FILE_DELETE_READY 1
#define FILE_DELETE_DONE 2
// values for assimilate_state
#define ASSIMILATE_INIT 0
#define ASSIMILATE_READY 1
#define ASSIMILATE_DONE 2
// values for error
#define SEND_FAIL 1
// failed to send results for this WU
#define WU_STATE_TOO_MANY_ERRORS 2
#define TOO_MANY_ERRORS 2
// too many errors; may have bug
#define WU_STATE_TOO_MANY_DONE 3
#define TOO_MANY_DONE 3
// too many results without consensus; may be nondeterministic
struct WORKUNIT {
@ -212,7 +228,10 @@ struct WORKUNIT {
double canonical_credit; // credit that all correct results get
double retry_check_time; // when to check for result retry
int delay_bound; // determines result deadline, retry check time
int state; // see values above
int main_state; // see values above
int error;
int file_delete_state;
int assimilate_state;
int workseq_next; // if part of a sequence, the next WU
// the following not used in the DB
@ -243,13 +262,14 @@ struct RESULT {
unsigned int sent_time; // when result was sent to host
unsigned int received_time; // when result was received from host
char name[256];
int client_state;
int client_state; // records phase when error happened
// (download, compute, upload)
double cpu_time; // CPU time used to complete result
char xml_doc_in[MAX_BLOB_SIZE]; // descriptions of output files
char xml_doc_out[MAX_BLOB_SIZE]; // MD5s of output files
char stderr_out[MAX_BLOB_SIZE]; // stderr output, if any
int batch;
int project_state;
int file_delete_state;
int validate_state;
double claimed_credit; // CPU time times host credit/sec
double granted_credit; // == canonical credit of WU
@ -320,12 +340,15 @@ extern int db_workunit_update(WORKUNIT& p);
extern int db_workunit_lookup_name(WORKUNIT&);
extern int db_workunit_enum_app_need_validate(WORKUNIT&);
extern int db_workunit_enum_retry_check_time(WORKUNIT&);
extern int db_workunit_enum_file_delete_state(WORKUNIT&);
extern int db_workunit_enum_assimilate_state(WORKUNIT&);
extern int db_result_new(RESULT& p);
extern int db_result(int id, RESULT&);
extern int db_result_update(RESULT& p);
extern int db_result_lookup_name(RESULT& p);
extern int db_result_enum_server_state(RESULT&, int);
extern int db_result_enum_file_delete_state(RESULT&);
extern int db_result_enum_wuid(RESULT&);
extern int db_result_count_server_state(int state, int&);

View File

@ -197,14 +197,16 @@ void struct_to_str(void* vp, char* q, int type) {
"rsc_fpops=%f, rsc_iops=%f, rsc_memory=%f, rsc_disk=%f, "
"need_validate=%d, "
"canonical_resultid=%d, canonical_credit=%f, "
"retry_check_time=%f, delay_bound=%d, state=%d, "
"retry_check_time=%f, delay_bound=%d, main_state=%d, "
"error=%d, file_delete_state=%d, assimilate_state=%d, "
"workseq_next=%d",
wup->id, wup->create_time, wup->appid,
wup->name, wup->xml_doc, wup->batch,
wup->rsc_fpops, wup->rsc_iops, wup->rsc_memory, wup->rsc_disk,
wup->need_validate,
wup->canonical_resultid, wup->canonical_credit,
wup->retry_check_time, wup->delay_bound, wup->state,
wup->retry_check_time, wup->delay_bound, wup->main_state,
wup->error, wup->file_delete_state, wup->assimilate_state,
wup->workseq_next
);
break;
@ -215,13 +217,13 @@ void struct_to_str(void* vp, char* q, int type) {
"hostid=%d, report_deadline=%d, sent_time=%d, received_time=%d, "
"name='%s', client_state=%d, cpu_time=%f, "
"xml_doc_in='%s', xml_doc_out='%s', stderr_out='%s', "
"batch=%d, project_state=%d, validate_state=%d, "
"batch=%d, file_delete_state=%d, validate_state=%d, "
"claimed_credit=%f, granted_credit=%f",
rp->id, rp->create_time, rp->workunitid, rp->server_state,
rp->hostid, rp->report_deadline, rp->sent_time, rp->received_time,
rp->name, rp->client_state, rp->cpu_time,
rp->xml_doc_in, rp->xml_doc_out, rp->stderr_out,
rp->batch, rp->project_state, rp->validate_state,
rp->batch, rp->file_delete_state, rp->validate_state,
rp->claimed_credit, rp->granted_credit
);
break;
@ -379,7 +381,10 @@ void row_to_struct(MYSQL_ROW& r, void* vp, int type) {
wup->canonical_credit = atof(r[i++]);
wup->retry_check_time = atof(r[i++]);
wup->delay_bound = atoi(r[i++]);
wup->state = atoi(r[i++]);
wup->main_state = atoi(r[i++]);
wup->error = atoi(r[i++]);
wup->file_delete_state = atoi(r[i++]);
wup->assimilate_state = atoi(r[i++]);
wup->workseq_next = atoi(r[i++]);
break;
case TYPE_RESULT:
@ -400,7 +405,7 @@ void row_to_struct(MYSQL_ROW& r, void* vp, int type) {
strcpy2(rp->xml_doc_out, r[i++]);
strcpy2(rp->stderr_out, r[i++]);
rp->batch = atoi(r[i++]);
rp->project_state = atoi(r[i++]);
rp->file_delete_state = atoi(r[i++]);
rp->validate_state = atoi(r[i++]);
rp->claimed_credit = atof(r[i++]);
rp->granted_credit = atof(r[i++]);
@ -605,6 +610,26 @@ int db_workunit_enum_app_need_validate(WORKUNIT& p) {
return db_enum(e, &p, TYPE_WORKUNIT, buf);
}
int db_workunit_enum_file_delete_state(WORKUNIT& p) {
static ENUM e;
char buf[256];
if (!e.active) {
sprintf(buf, "where file_delete_state=%d", p.file_delete_state);
}
return db_enum(e, &p, TYPE_WORKUNIT, buf);
}
int db_workunit_enum_assimilate_state(WORKUNIT& p) {
static ENUM e;
char buf[256];
if (!e.active) {
sprintf(buf, "where assimilate_state=%d", p.assimilate_state);
}
return db_enum(e, &p, TYPE_WORKUNIT, buf);
}
int db_workunit_enum_retry_check_time(WORKUNIT& p) {
static ENUM e;
char buf[256];
@ -648,6 +673,14 @@ int db_result_enum_server_state(RESULT& p, int limit) {
return db_enum(e, &p, TYPE_RESULT, buf, limit);
}
int db_result_enum_file_delete_state(RESULT& p) {
static ENUM e;
char buf[256];
if (!e.active) sprintf(buf, "where file_delete_state=%d", p.file_delete_state);
return db_enum(e, &p, TYPE_RESULT, buf);
}
int db_result_enum_wuid(RESULT& p) {
static ENUM e;
char buf[256];

View File

@ -136,7 +136,10 @@ create table workunit (
canonical_credit double not null,
retry_check_time double not null,
delay_bound integer not null,
state integer not null,
main_state integer not null,
error integer not null,
file_delete_state integer not null,
assimilate_state integer not null,
workseq_next integer not null,
primary key (id)
);
@ -157,7 +160,7 @@ create table result (
xml_doc_out blob,
stderr_out blob,
batch integer not null,
project_state integer not null,
file_delete_state integer not null,
validate_state integer not null,
claimed_credit double not null,
granted_credit double not null,

View File

@ -18,7 +18,7 @@ CC = g++ $(CFLAGS)
CLIBS = @LIBS@
PROGS = cgi feeder show_shmem file_upload_handler validate_test make_work result_retry
PROGS = cgi feeder show_shmem file_upload_handler validate_test make_work result_retry file_deleter assimilator
all: $(PROGS)
@ -68,6 +68,20 @@ VALIDATE_OBJS = \
../db/mysql_util.o \
../lib/parse.o
FILE_DELETER_OBJS = \
file_deleter.o \
config.o \
../db/db_mysql.o \
../db/mysql_util.o \
../lib/parse.o
ASSIMILATOR_OBJS = \
assimilator.o \
config.o \
../db/db_mysql.o \
../db/mysql_util.o \
../lib/parse.o
MAKE_WORK_OBJS = \
make_work.o \
config.o \
@ -150,6 +164,12 @@ make_work: $(MAKE_WORK_OBJS)
result_retry: $(RESULT_RETRY_OBJS)
$(CC) $(RESULT_RETRY_OBJS) $(MYSQL_LIBS) $(CLIBS) -o result_retry
file_deleter: $(FILE_DELETER_OBJS)
$(CC) $(FILE_DELETER_OBJS) $(MYSQL_LIBS) $(CLIBS) -o file_deleter
assimilator: $(ASSIMILATOR_OBJS)
$(CC) $(ASSIMILATOR_OBJS) $(MYSQL_LIBS) $(CLIBS) -o assimilator
fcgi: $(FCGI_OBJS)
$(CC) $(FCGI_OBJS) $(MYSQL_LIBS) $(CLIBS) $(FCGI_LIBS) \
-o fcgi

84
sched/assimilator.C Normal file
View File

@ -0,0 +1,84 @@
#include <string.h>
#include <unistd.h>
#include "db.h"
#include "parse.h"
#include "config.h"
CONFIG config;
// return nonzero if did anything
//
bool do_pass() {
WORKUNIT wu;
RESULT result;
bool did_something = false;
int retval;
wu.assimilate_state = ASSIMILATE_READY;
while (db_workunit_enum_assimilate_state(wu)) {
did_something = true;
switch(wu.main_state) {
case WU_MAIN_STATE_INIT:
fprintf(stderr, "assimilate: ERROR; shouldn't be in init state\n");
break;
case WU_MAIN_STATE_DONE:
if (!wu.canonical_resultid) {
fprintf(stderr, "assimilate: ERROR: canonical resultid zero\n");
break;
}
retval = db_result(wu.canonical_resultid, result);
if (retval) {
fprintf(stderr, "assimilate: can't get canonical result\n");
break;
}
printf("canonical result for WU %s:\n%s", wu.name, result.xml_doc_out);
result.file_delete_state = FILE_DELETE_READY;
db_result_update(result);
break;
case WU_MAIN_STATE_ERROR:
printf("WU %s had an error\n", wu.name);
break;
}
wu.assimilate_state = ASSIMILATE_DONE;
db_workunit_update(wu);
}
return did_something;
}
int main(int argc, char** argv) {
int retval;
bool asynch = false, one_pass = false;
int i;
for (i=1; i<argc; i++) {
if (!strcmp(argv[i], "-asynch")) {
asynch = true;
} else if (!strcmp(argv[i], "-one_pass")) {
one_pass = true;
} else {
fprintf(stderr, "Unrecognized arg: %s\n", argv[i]);
}
}
retval = config.parse_file();
if (retval) {
fprintf(stderr, "Can't parse config file\n");
exit(1);
}
if (asynch) {
if (fork()) {
exit(0);
}
}
retval = db_open(config.db_name, config.db_passwd);
if (one_pass) {
do_pass();
} else {
while (1) {
if (!do_pass()) sleep(10);
}
}
}

View File

@ -38,6 +38,7 @@ int CONFIG::parse(FILE* in) {
else if (parse_int(buf, "<shmem_key>", shmem_key)) continue;
else if (parse_str(buf, "<key_dir>", key_dir, sizeof(key_dir))) continue;
else if (parse_str(buf, "<download_url>", download_url, sizeof(download_url))) continue;
else if (parse_str(buf, "<download_dir>", download_dir, sizeof(download_dir))) continue;
else if (parse_str(buf, "<upload_url>", upload_url, sizeof(upload_url))) continue;
else if (parse_str(buf, "<upload_dir>", upload_dir, sizeof(upload_dir))) continue;
else if (parse_str(buf, "<user_name>", user_name, sizeof(user_name))) continue;

View File

@ -29,6 +29,7 @@ public:
int shmem_key;
char key_dir[256];
char download_url[256];
char download_dir[256];
char upload_url[256];
char upload_dir[256];
char user_name[256];

121
sched/file_deleter.C Normal file
View File

@ -0,0 +1,121 @@
#include <string.h>
#include <unistd.h>
#include "db.h"
#include "parse.h"
#include "config.h"
CONFIG config;
int wu_delete_files(WORKUNIT& wu) {
char* p;
char filename[256], pathname[256];
bool no_delete;
p = strtok(wu.xml_doc, "\n");
strcpy(filename, "");
while (p) {
p = strtok(0, "\n");
if (parse_str(p, "<name>", filename, sizeof(filename))) {
continue;
} else if (match_tag(p, "<file_info>")) {
no_delete = false;
strcpy(filename, "");
} else if (match_tag(p, "<no_delete/>")) {
no_delete = true;
} else if (match_tag(p, "</file_info>")) {
if (!no_delete) {
sprintf(pathname, "%s/%s", config.download_dir, filename);
unlink(pathname);
}
}
}
return 0;
}
int result_delete_files(RESULT& result) {
char* p;
char filename[256], pathname[256];
bool no_delete;
p = strtok(result.xml_doc_in, "\n");
while (p) {
p = strtok(0, "\n");
if (parse_str(p, "<name>", filename, sizeof(filename))) {
continue;
} else if (match_tag(p, "<file_info>")) {
no_delete = false;
strcpy(filename, "");
} else if (match_tag(p, "<no_delete/>")) {
no_delete = true;
} else if (match_tag(p, "</file_info>")) {
if (!no_delete) {
sprintf(pathname, "%s/%s", config.upload_dir, filename);
unlink(pathname);
}
}
}
return 0;
}
// return nonzero if did anything
//
bool do_pass() {
WORKUNIT wu;
RESULT result;
bool did_something = false;
wu.file_delete_state = FILE_DELETE_READY;
while (db_workunit_enum_file_delete_state(wu)) {
did_something = true;
wu_delete_files(wu);
wu.file_delete_state = FILE_DELETE_DONE;
db_workunit_update(wu);
}
result.file_delete_state = FILE_DELETE_READY;
while (db_result_enum_file_delete_state(result)) {
did_something = true;
result_delete_files(result);
result.file_delete_state = FILE_DELETE_DONE;
db_result_update(result);
}
return did_something;
}
int main(int argc, char** argv) {
int retval;
bool asynch = false, one_pass = false;
int i;
for (i=1; i<argc; i++) {
if (!strcmp(argv[i], "-asynch")) {
asynch = true;
} else if (!strcmp(argv[i], "-one_pass")) {
one_pass = true;
} else {
fprintf(stderr, "Unrecognized arg: %s\n", argv[i]);
}
}
retval = config.parse_file();
if (retval) {
fprintf(stderr, "Can't parse config file\n");
exit(1);
}
if (asynch) {
if (fork()) {
exit(0);
}
}
retval = db_open(config.db_name, config.db_passwd);
if (one_pass) {
do_pass();
} else {
while (1) {
if (!do_pass()) sleep(10);
}
}
}

View File

@ -188,7 +188,10 @@ bool do_pass(APP& app) {
//
if (result.server_state == RESULT_SERVER_STATE_UNSENT) {
fprintf(stderr, "WU %s has unsent result\n", wu.name);
wu.state = WU_STATE_SEND_FAIL;
wu.main_state = WU_MAIN_STATE_ERROR;
wu.error = SEND_FAIL;
wu.file_delete_state = FILE_DELETE_READY;
wu.assimilate_state = ASSIMILATE_READY;
wu.retry_check_time = 0;
goto update_wu;
}
@ -204,13 +207,19 @@ bool do_pass(APP& app) {
//
if (nerrors > max_errors) {
fprintf(stderr, "WU %s has too many errors\n", wu.name);
wu.state = WU_STATE_TOO_MANY_ERRORS;
wu.main_state = WU_MAIN_STATE_ERROR;
wu.error = TOO_MANY_ERRORS;
wu.file_delete_state = FILE_DELETE_READY;
wu.assimilate_state = ASSIMILATE_READY;
wu.retry_check_time = 0;
goto update_wu;
}
if (ndone > max_done) {
fprintf(stderr, "WU %s has too many answers\n", wu.name);
wu.state = WU_STATE_TOO_MANY_DONE;
wu.main_state = WU_MAIN_STATE_ERROR;
wu.error = TOO_MANY_DONE;
wu.file_delete_state = FILE_DELETE_READY;
wu.assimilate_state = ASSIMILATE_READY;
wu.retry_check_time = 0;
goto update_wu;
}

View File

@ -193,7 +193,20 @@ bool do_validate_scan(APP& app, int min_quorum) {
printf("found a canonical result\n");
wu.canonical_resultid = canonicalid;
wu.canonical_credit = credit;
wu.main_state = WU_MAIN_STATE_DONE;
wu.file_delete_state = FILE_DELETE_READY;
wu.assimilate_state = ASSIMILATE_READY;
for (i=0; i<results.size(); i++) {
// if result is not canonical, arrange to delete
// its output files
//
if (results[i].id != canonicalid) {
results[i].file_delete_state = FILE_DELETE_READY;
}
// grant credit for valid results
//
if (results[i].validate_state == VALIDATE_STATE_VALID) {
retval = grant_credit(results[i], credit);
if (retval) {
@ -270,6 +283,8 @@ int main(int argc, char** argv) {
strcpy(app_name, argv[++i]);
} else if (!strcmp(argv[i], "-quorum")) {
min_quorum = atoi(argv[++i]);
} else {
fprintf(stderr, "unrecognized arg: %s\n", argv[i]);
}
}

View File

@ -256,6 +256,7 @@ class Project {
fputs($f, " <shmem_key>$this->shmem_key</shmem_key>\n");
fputs($f, " <key_dir>$this->key_dir</key_dir>\n");
fputs($f, " <download_url>$this->download_url</download_url>\n");
fputs($f, " <download_dir>$this->project_dir/download</download_dir>\n");
fputs($f, " <upload_url>$this->upload_url</upload_url>\n");
fputs($f, " <upload_dir>$this->project_dir/upload</upload_dir>\n");
fputs($f, " <user_name>$this->user_name</user_name>\n");

View File

@ -152,7 +152,7 @@ void initialize_result(RESULT& result, WORKUNIT& wu) {
result.cpu_time = 0;
strcpy(result.xml_doc_out, "");
strcpy(result.stderr_out, "");
result.project_state = 0;
result.file_delete_state = ASSIMILATE_INIT;
result.validate_state = VALIDATE_STATE_INITIAL;
result.claimed_credit = 0;
result.granted_credit = 0;