diff --git a/checkin_notes b/checkin_notes index a5a5696ea9..6fa5e1509f 100755 --- a/checkin_notes +++ b/checkin_notes @@ -2637,3 +2637,26 @@ David Dec 13 2002 main.C lib/ util.C,y + +David Dec. 17 2002 + - Added new state fields to workunit and result to support + file deletion and assimilation backend phases + - Added sample assimilator program + - Added file_deleter program (AKA garbage collector) + + client/ + main.C + db/ + db.h + db_mysql.C + schema.sql + sched/ + Makefile.in + assimilator.C (new) + config.C,h + result_retry.C + validate.C + test/ + test.inc + tools/ + backend_lib.C diff --git a/client/main.C b/client/main.C index 29a2c62f7a..81eae445ad 100644 --- a/client/main.C +++ b/client/main.C @@ -35,8 +35,8 @@ #include "prefs.h" #include "util.h" -// Display a message to the user. Depending on the priority, the -// message may be more or less obtrusive +// Display a message to the user. +// Depending on the priority, the message may be more or less obtrusive // void show_message(char* message, char* priority) { if (!strcmp(priority, "high")) { diff --git a/db/db.h b/db/db.h index be4855a6b5..85b397e8f5 100644 --- a/db/db.h +++ b/db/db.h @@ -188,11 +188,27 @@ struct HOST { int parse_net_stats(FILE*); }; -#define WU_STATE_SEND_FAIL 1 +// values for main_state +#define WU_MAIN_STATE_INIT 0 +#define WU_MAIN_STATE_DONE 1 +#define WU_MAIN_STATE_ERROR 2 + +// values for file_delete state +#define FILE_DELETE_INIT 0 +#define FILE_DELETE_READY 1 +#define FILE_DELETE_DONE 2 + +// values for assimilate_state +#define ASSIMILATE_INIT 0 +#define ASSIMILATE_READY 1 +#define ASSIMILATE_DONE 2 + +// values for error +#define SEND_FAIL 1 // failed to send results for this WU -#define WU_STATE_TOO_MANY_ERRORS 2 +#define TOO_MANY_ERRORS 2 // too many errors; may have bug -#define WU_STATE_TOO_MANY_DONE 3 +#define TOO_MANY_DONE 3 // too many results without consensus; may be nondeterministic struct WORKUNIT { @@ -212,7 +228,10 @@ struct WORKUNIT { double canonical_credit; // credit that all correct results get double retry_check_time; // when to check for result retry int delay_bound; // determines result deadline, retry check time - int state; // see values above + int main_state; // see values above + int error; + int file_delete_state; + int assimilate_state; int workseq_next; // if part of a sequence, the next WU // the following not used in the DB @@ -243,13 +262,14 @@ struct RESULT { unsigned int sent_time; // when result was sent to host unsigned int received_time; // when result was received from host char name[256]; - int client_state; + int client_state; // records phase when error happened + // (download, compute, upload) double cpu_time; // CPU time used to complete result char xml_doc_in[MAX_BLOB_SIZE]; // descriptions of output files char xml_doc_out[MAX_BLOB_SIZE]; // MD5s of output files char stderr_out[MAX_BLOB_SIZE]; // stderr output, if any int batch; - int project_state; + int file_delete_state; int validate_state; double claimed_credit; // CPU time times host credit/sec double granted_credit; // == canonical credit of WU @@ -320,12 +340,15 @@ extern int db_workunit_update(WORKUNIT& p); extern int db_workunit_lookup_name(WORKUNIT&); extern int db_workunit_enum_app_need_validate(WORKUNIT&); extern int db_workunit_enum_retry_check_time(WORKUNIT&); +extern int db_workunit_enum_file_delete_state(WORKUNIT&); +extern int db_workunit_enum_assimilate_state(WORKUNIT&); extern int db_result_new(RESULT& p); extern int db_result(int id, RESULT&); extern int db_result_update(RESULT& p); extern int db_result_lookup_name(RESULT& p); extern int db_result_enum_server_state(RESULT&, int); +extern int db_result_enum_file_delete_state(RESULT&); extern int db_result_enum_wuid(RESULT&); extern int db_result_count_server_state(int state, int&); diff --git a/db/db_mysql.C b/db/db_mysql.C index 85916d4069..6a8e9d2f52 100644 --- a/db/db_mysql.C +++ b/db/db_mysql.C @@ -197,14 +197,16 @@ void struct_to_str(void* vp, char* q, int type) { "rsc_fpops=%f, rsc_iops=%f, rsc_memory=%f, rsc_disk=%f, " "need_validate=%d, " "canonical_resultid=%d, canonical_credit=%f, " - "retry_check_time=%f, delay_bound=%d, state=%d, " + "retry_check_time=%f, delay_bound=%d, main_state=%d, " + "error=%d, file_delete_state=%d, assimilate_state=%d, " "workseq_next=%d", wup->id, wup->create_time, wup->appid, wup->name, wup->xml_doc, wup->batch, wup->rsc_fpops, wup->rsc_iops, wup->rsc_memory, wup->rsc_disk, wup->need_validate, wup->canonical_resultid, wup->canonical_credit, - wup->retry_check_time, wup->delay_bound, wup->state, + wup->retry_check_time, wup->delay_bound, wup->main_state, + wup->error, wup->file_delete_state, wup->assimilate_state, wup->workseq_next ); break; @@ -215,13 +217,13 @@ void struct_to_str(void* vp, char* q, int type) { "hostid=%d, report_deadline=%d, sent_time=%d, received_time=%d, " "name='%s', client_state=%d, cpu_time=%f, " "xml_doc_in='%s', xml_doc_out='%s', stderr_out='%s', " - "batch=%d, project_state=%d, validate_state=%d, " + "batch=%d, file_delete_state=%d, validate_state=%d, " "claimed_credit=%f, granted_credit=%f", rp->id, rp->create_time, rp->workunitid, rp->server_state, rp->hostid, rp->report_deadline, rp->sent_time, rp->received_time, rp->name, rp->client_state, rp->cpu_time, rp->xml_doc_in, rp->xml_doc_out, rp->stderr_out, - rp->batch, rp->project_state, rp->validate_state, + rp->batch, rp->file_delete_state, rp->validate_state, rp->claimed_credit, rp->granted_credit ); break; @@ -379,7 +381,10 @@ void row_to_struct(MYSQL_ROW& r, void* vp, int type) { wup->canonical_credit = atof(r[i++]); wup->retry_check_time = atof(r[i++]); wup->delay_bound = atoi(r[i++]); - wup->state = atoi(r[i++]); + wup->main_state = atoi(r[i++]); + wup->error = atoi(r[i++]); + wup->file_delete_state = atoi(r[i++]); + wup->assimilate_state = atoi(r[i++]); wup->workseq_next = atoi(r[i++]); break; case TYPE_RESULT: @@ -400,7 +405,7 @@ void row_to_struct(MYSQL_ROW& r, void* vp, int type) { strcpy2(rp->xml_doc_out, r[i++]); strcpy2(rp->stderr_out, r[i++]); rp->batch = atoi(r[i++]); - rp->project_state = atoi(r[i++]); + rp->file_delete_state = atoi(r[i++]); rp->validate_state = atoi(r[i++]); rp->claimed_credit = atof(r[i++]); rp->granted_credit = atof(r[i++]); @@ -605,6 +610,26 @@ int db_workunit_enum_app_need_validate(WORKUNIT& p) { return db_enum(e, &p, TYPE_WORKUNIT, buf); } +int db_workunit_enum_file_delete_state(WORKUNIT& p) { + static ENUM e; + char buf[256]; + + if (!e.active) { + sprintf(buf, "where file_delete_state=%d", p.file_delete_state); + } + return db_enum(e, &p, TYPE_WORKUNIT, buf); +} + +int db_workunit_enum_assimilate_state(WORKUNIT& p) { + static ENUM e; + char buf[256]; + + if (!e.active) { + sprintf(buf, "where assimilate_state=%d", p.assimilate_state); + } + return db_enum(e, &p, TYPE_WORKUNIT, buf); +} + int db_workunit_enum_retry_check_time(WORKUNIT& p) { static ENUM e; char buf[256]; @@ -648,6 +673,14 @@ int db_result_enum_server_state(RESULT& p, int limit) { return db_enum(e, &p, TYPE_RESULT, buf, limit); } +int db_result_enum_file_delete_state(RESULT& p) { + static ENUM e; + char buf[256]; + + if (!e.active) sprintf(buf, "where file_delete_state=%d", p.file_delete_state); + return db_enum(e, &p, TYPE_RESULT, buf); +} + int db_result_enum_wuid(RESULT& p) { static ENUM e; char buf[256]; diff --git a/db/schema.sql b/db/schema.sql index 5e9dc532f6..615e7e3a6a 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -136,7 +136,10 @@ create table workunit ( canonical_credit double not null, retry_check_time double not null, delay_bound integer not null, - state integer not null, + main_state integer not null, + error integer not null, + file_delete_state integer not null, + assimilate_state integer not null, workseq_next integer not null, primary key (id) ); @@ -157,7 +160,7 @@ create table result ( xml_doc_out blob, stderr_out blob, batch integer not null, - project_state integer not null, + file_delete_state integer not null, validate_state integer not null, claimed_credit double not null, granted_credit double not null, diff --git a/sched/Makefile.in b/sched/Makefile.in index f70b32067b..307e3e51db 100644 --- a/sched/Makefile.in +++ b/sched/Makefile.in @@ -18,7 +18,7 @@ CC = g++ $(CFLAGS) CLIBS = @LIBS@ -PROGS = cgi feeder show_shmem file_upload_handler validate_test make_work result_retry +PROGS = cgi feeder show_shmem file_upload_handler validate_test make_work result_retry file_deleter assimilator all: $(PROGS) @@ -68,6 +68,20 @@ VALIDATE_OBJS = \ ../db/mysql_util.o \ ../lib/parse.o +FILE_DELETER_OBJS = \ + file_deleter.o \ + config.o \ + ../db/db_mysql.o \ + ../db/mysql_util.o \ + ../lib/parse.o + +ASSIMILATOR_OBJS = \ + assimilator.o \ + config.o \ + ../db/db_mysql.o \ + ../db/mysql_util.o \ + ../lib/parse.o + MAKE_WORK_OBJS = \ make_work.o \ config.o \ @@ -150,6 +164,12 @@ make_work: $(MAKE_WORK_OBJS) result_retry: $(RESULT_RETRY_OBJS) $(CC) $(RESULT_RETRY_OBJS) $(MYSQL_LIBS) $(CLIBS) -o result_retry +file_deleter: $(FILE_DELETER_OBJS) + $(CC) $(FILE_DELETER_OBJS) $(MYSQL_LIBS) $(CLIBS) -o file_deleter + +assimilator: $(ASSIMILATOR_OBJS) + $(CC) $(ASSIMILATOR_OBJS) $(MYSQL_LIBS) $(CLIBS) -o assimilator + fcgi: $(FCGI_OBJS) $(CC) $(FCGI_OBJS) $(MYSQL_LIBS) $(CLIBS) $(FCGI_LIBS) \ -o fcgi diff --git a/sched/assimilator.C b/sched/assimilator.C new file mode 100644 index 0000000000..546e1e3659 --- /dev/null +++ b/sched/assimilator.C @@ -0,0 +1,84 @@ +#include +#include + +#include "db.h" +#include "parse.h" +#include "config.h" + +CONFIG config; + +// return nonzero if did anything +// +bool do_pass() { + WORKUNIT wu; + RESULT result; + bool did_something = false; + int retval; + + wu.assimilate_state = ASSIMILATE_READY; + while (db_workunit_enum_assimilate_state(wu)) { + did_something = true; + switch(wu.main_state) { + case WU_MAIN_STATE_INIT: + fprintf(stderr, "assimilate: ERROR; shouldn't be in init state\n"); + break; + case WU_MAIN_STATE_DONE: + if (!wu.canonical_resultid) { + fprintf(stderr, "assimilate: ERROR: canonical resultid zero\n"); + break; + } + retval = db_result(wu.canonical_resultid, result); + if (retval) { + fprintf(stderr, "assimilate: can't get canonical result\n"); + break; + } + printf("canonical result for WU %s:\n%s", wu.name, result.xml_doc_out); + result.file_delete_state = FILE_DELETE_READY; + db_result_update(result); + break; + case WU_MAIN_STATE_ERROR: + printf("WU %s had an error\n", wu.name); + break; + } + wu.assimilate_state = ASSIMILATE_DONE; + db_workunit_update(wu); + } + return did_something; +} + +int main(int argc, char** argv) { + int retval; + bool asynch = false, one_pass = false; + int i; + + for (i=1; i", shmem_key)) continue; else if (parse_str(buf, "", key_dir, sizeof(key_dir))) continue; else if (parse_str(buf, "", download_url, sizeof(download_url))) continue; + else if (parse_str(buf, "", download_dir, sizeof(download_dir))) continue; else if (parse_str(buf, "", upload_url, sizeof(upload_url))) continue; else if (parse_str(buf, "", upload_dir, sizeof(upload_dir))) continue; else if (parse_str(buf, "", user_name, sizeof(user_name))) continue; diff --git a/sched/config.h b/sched/config.h index 9be213d793..d44db3606c 100644 --- a/sched/config.h +++ b/sched/config.h @@ -29,6 +29,7 @@ public: int shmem_key; char key_dir[256]; char download_url[256]; + char download_dir[256]; char upload_url[256]; char upload_dir[256]; char user_name[256]; diff --git a/sched/file_deleter.C b/sched/file_deleter.C new file mode 100644 index 0000000000..94f029e493 --- /dev/null +++ b/sched/file_deleter.C @@ -0,0 +1,121 @@ +#include +#include + +#include "db.h" +#include "parse.h" +#include "config.h" + +CONFIG config; + +int wu_delete_files(WORKUNIT& wu) { + char* p; + char filename[256], pathname[256]; + bool no_delete; + + p = strtok(wu.xml_doc, "\n"); + strcpy(filename, ""); + while (p) { + p = strtok(0, "\n"); + if (parse_str(p, "", filename, sizeof(filename))) { + continue; + } else if (match_tag(p, "")) { + no_delete = false; + strcpy(filename, ""); + } else if (match_tag(p, "")) { + no_delete = true; + } else if (match_tag(p, "")) { + if (!no_delete) { + sprintf(pathname, "%s/%s", config.download_dir, filename); + unlink(pathname); + } + } + } + return 0; +} + +int result_delete_files(RESULT& result) { + char* p; + char filename[256], pathname[256]; + bool no_delete; + + p = strtok(result.xml_doc_in, "\n"); + while (p) { + p = strtok(0, "\n"); + if (parse_str(p, "", filename, sizeof(filename))) { + continue; + } else if (match_tag(p, "")) { + no_delete = false; + strcpy(filename, ""); + } else if (match_tag(p, "")) { + no_delete = true; + } else if (match_tag(p, "")) { + if (!no_delete) { + sprintf(pathname, "%s/%s", config.upload_dir, filename); + unlink(pathname); + } + } + } + return 0; +} + +// return nonzero if did anything +// +bool do_pass() { + WORKUNIT wu; + RESULT result; + bool did_something = false; + + wu.file_delete_state = FILE_DELETE_READY; + while (db_workunit_enum_file_delete_state(wu)) { + did_something = true; + wu_delete_files(wu); + wu.file_delete_state = FILE_DELETE_DONE; + db_workunit_update(wu); + } + + result.file_delete_state = FILE_DELETE_READY; + while (db_result_enum_file_delete_state(result)) { + did_something = true; + result_delete_files(result); + result.file_delete_state = FILE_DELETE_DONE; + db_result_update(result); + } + return did_something; +} + +int main(int argc, char** argv) { + int retval; + bool asynch = false, one_pass = false; + int i; + + for (i=1; i max_errors) { fprintf(stderr, "WU %s has too many errors\n", wu.name); - wu.state = WU_STATE_TOO_MANY_ERRORS; + wu.main_state = WU_MAIN_STATE_ERROR; + wu.error = TOO_MANY_ERRORS; + wu.file_delete_state = FILE_DELETE_READY; + wu.assimilate_state = ASSIMILATE_READY; wu.retry_check_time = 0; goto update_wu; } if (ndone > max_done) { fprintf(stderr, "WU %s has too many answers\n", wu.name); - wu.state = WU_STATE_TOO_MANY_DONE; + wu.main_state = WU_MAIN_STATE_ERROR; + wu.error = TOO_MANY_DONE; + wu.file_delete_state = FILE_DELETE_READY; + wu.assimilate_state = ASSIMILATE_READY; wu.retry_check_time = 0; goto update_wu; } diff --git a/sched/validate.C b/sched/validate.C index f724d96a25..7061131f17 100644 --- a/sched/validate.C +++ b/sched/validate.C @@ -193,7 +193,20 @@ bool do_validate_scan(APP& app, int min_quorum) { printf("found a canonical result\n"); wu.canonical_resultid = canonicalid; wu.canonical_credit = credit; + wu.main_state = WU_MAIN_STATE_DONE; + wu.file_delete_state = FILE_DELETE_READY; + wu.assimilate_state = ASSIMILATE_READY; for (i=0; i$this->shmem_key\n"); fputs($f, " $this->key_dir\n"); fputs($f, " $this->download_url\n"); + fputs($f, " $this->project_dir/download\n"); fputs($f, " $this->upload_url\n"); fputs($f, " $this->project_dir/upload\n"); fputs($f, " $this->user_name\n"); diff --git a/tools/backend_lib.C b/tools/backend_lib.C index 52c72e8c43..3506e482a3 100644 --- a/tools/backend_lib.C +++ b/tools/backend_lib.C @@ -152,7 +152,7 @@ void initialize_result(RESULT& result, WORKUNIT& wu) { result.cpu_time = 0; strcpy(result.xml_doc_out, ""); strcpy(result.stderr_out, ""); - result.project_state = 0; + result.file_delete_state = ASSIMILATE_INIT; result.validate_state = VALIDATE_STATE_INITIAL; result.claimed_credit = 0; result.granted_credit = 0;