diff --git a/checkin_notes b/checkin_notes index 816ce2a1ca..297a33497e 100755 --- a/checkin_notes +++ b/checkin_notes @@ -20841,3 +20841,12 @@ Rom 7 Dec 2004 client/ client_state.C + +Bruce 7 Dec 2004 + - Added additional options -gzip, -zip, -max_wu_per_file to db_purge. The + compression options are 'efficient' in that they do not write an uncompressed + file then compress -- they go straight to compressed. SIGHUP and lockfile + shutdown signals are properly caught and close compressed files and DB cleanly. + + sched/ + db_purge.C diff --git a/sched/db_purge.C b/sched/db_purge.C index c0c6e2da10..6fee73a3c8 100644 --- a/sched/db_purge.C +++ b/sched/db_purge.C @@ -32,6 +32,7 @@ static volatile const char *BOINCrcsid="$Id$"; #include #include #include +#include using namespace std; @@ -55,11 +56,11 @@ using namespace std; #define DB_QUERY_LIMIT 1000 SCHED_CONFIG config; -FILE *wu_stream; -FILE *re_stream; -FILE *wu_index_stream; -FILE *re_index_stream; -int time_int; +FILE *wu_stream=NULL; +FILE *re_stream=NULL; +FILE *wu_index_stream=NULL; +FILE *re_index_stream=NULL; +int time_int=0; // These is used if limiting the total number of workunits to eliminate int purged_workunits= 0; @@ -69,32 +70,133 @@ int purged_workunits= 0; // also limits the number of purged results. int max_number_workunits_to_purge=-1; +// set on command line if compression of archives is desired +#define COMPRESSION_NONE 0 +#define COMPRESSION_GZIP 1 +#define COMPRESSION_ZIP 2 -int open_archive(char* filename_prefix, FILE*& f){ - int retval=0; - char path[256]; +// subscripts MUST be in agreement with defines above +char *suffix[3]={"", ".gz", ".zip"}; - sprintf(path,"../archives/%s_%d.xml", filename_prefix, time_int); +// default is no compression +int compress=COMPRESSION_NONE; + +// set on command line if archive files should be closed and re-opened +// after getting some max no of WU in the file +int max_wu_per_file=0; + +// keep track of how many WU archived in file so far +int wu_stored_in_file=0; + +// this opens the archive. Only subtle thing is that if the user has +// asked for compression, then we popen(2) a pipe to gzip or zip. +// This does 'in place' compression. +// +void open_archive(char* filename_prefix, FILE*& f){ + char path[256]; + char command[512]; + + // append appropriate suffix for file type + sprintf(path, "../archives/%s_%d.xml", filename_prefix, time_int); + strcat(path, suffix[compress]); + + // and construct appropriate command if needed + if (compress==COMPRESSION_GZIP) + sprintf(command, "gzip - > %s", path); + + if (compress==COMPRESSION_ZIP) + sprintf(command, "zip %s -", path); log_messages.printf(SCHED_MSG_LOG::NORMAL, "Opening archive %s\n", path); - if ((f = fopen( path,"a+")) == NULL) { - log_messages.printf(SCHED_MSG_LOG::CRITICAL,"Can't open archive file %s\n", path); - return ERR_FOPEN; + // in the case with no compression, just open the file, else open + // a pipe to the compression executable. + if (compress==COMPRESSION_NONE) { + if (!(f = fopen( path,"w"))) { + log_messages.printf(SCHED_MSG_LOG::CRITICAL,"Can't open archive file %s %s\n", + path, errno?strerror(errno):""); + exit(3); + } + } + else if (!(f = popen(command,"w"))) { + log_messages.printf(SCHED_MSG_LOG::CRITICAL,"Can't open pipe %s %s\n", + command, errno?strerror(errno):""); + exit(4); } - //fprintf(f, - // "\n" - //); + // set buffering to line buffered, since we are outputing XML on a + // line-by-line basis. + // + setlinebuf(f); - //fprintf(f, - // "<%s>\n",filename_prefix - //); - - setbuf( f, NULL ); - return retval; + return; } +void close_archive(char *filename, FILE*& fp){ + char path[256]; + + // Set file pointer to NULL after closing file to indicate that + // it's closed. + if (!fp) + return; + + // In case of errors, carry on anyway. This is deliberate, not lazy + if (compress==COMPRESSION_NONE) + fclose(fp); + else + pclose(fp); + + fp=NULL; + + // append appropriate file type + sprintf(path, "../archives/%s_%d.xml", filename, time_int); + strcat(path, suffix[compress]); + + log_messages.printf(SCHED_MSG_LOG::NORMAL, + "Closed archive file %s containing records of %d workunits\n", + path, wu_stored_in_file); + + return; +} + +// opens the various archive files. Guarantees that the timestamp +// does not equal the previous timestamp +// +void open_all_archives() { + int old_time=time_int; + + // make sure we get a NEW value of the file timestamp! + while (old_time == (time_int = (int)time(0))) + sleep(1); + + // open all the archives. + open_archive(WU_FILENAME_PREFIX, wu_stream); + open_archive(RESULT_FILENAME_PREFIX, re_stream); + open_archive(RESULT_INDEX_FILENAME_PREFIX, re_index_stream); + open_archive(WU_INDEX_FILENAME_PREFIX, wu_index_stream); + + return; +} + +// closes (and optionally compresses) the archive files. Clears file +// pointers to indicate that files are not open. +// +void close_all_archives() { + close_archive(WU_FILENAME_PREFIX, wu_stream); + close_archive(RESULT_FILENAME_PREFIX, re_stream); + close_archive(RESULT_INDEX_FILENAME_PREFIX, re_index_stream); + close_archive(WU_INDEX_FILENAME_PREFIX, wu_index_stream); + + return; +} + + +// The exit handler always calls this at the end to be sure that the +// database is closed cleanly. +void close_db_exit_handler() { + boinc_db.close(); + return; +} int archive_result(DB_RESULT& result) { fprintf(re_stream, @@ -280,7 +382,7 @@ int purge_and_archive_results(DB_WORKUNIT& wu, int& number_results) { // bool do_pass() { int retval= 0; - + // The number of workunits/results purged in a single pass of // do_pass(). Since do_pass() may be invoked multiple times, // corresponding global variables store global totals. @@ -288,6 +390,10 @@ bool do_pass() { int do_pass_purged_workunits = 0; int do_pass_purged_results = 0; + // check to see if we got a stop signal. Note that if we do catch + // a stop signal here, we call an exit handler that closes [and + // optionally compresses] files before returning to the OS. + // check_stop_daemons(); bool did_something = false; @@ -303,6 +409,10 @@ bool do_pass() { // compiler complaints about uninitialized while (!wu.enumerate(buf)) { did_something = true; + + // if archives have not already been opened, then open them. + if (!wu_stream) + open_all_archives(); retval = purge_and_archive_results(wu, n); do_pass_purged_results += n; @@ -312,7 +422,7 @@ bool do_pass() { log_messages.printf(SCHED_MSG_LOG::CRITICAL, "Failed to write to XML file workunit:%d\n", wu.id ); - exit(1); + exit(5); } log_messages.printf(SCHED_MSG_LOG::DEBUG,"Archived workunit [%d] to a file\n", wu.id); @@ -320,13 +430,23 @@ bool do_pass() { retval= wu.delete_from_db(); if (retval) { log_messages.printf(SCHED_MSG_LOG::CRITICAL,"Can't delete workunit [%d] from database:%d\n", wu.id, retval); - exit(1); + exit(6); } log_messages.printf(SCHED_MSG_LOG::DEBUG,"Purged workunit [%d] from database\n", wu.id); purged_workunits++; do_pass_purged_workunits++; + wu_stored_in_file++; + // flush the various output files. + fflush(NULL); + + // if file has got max # of workunits, close and compress it. + // This sets file pointers to NULL + if (max_wu_per_file && wu_stored_in_file>=max_wu_per_file) { + close_all_archives(); + wu_stored_in_file=0; + } if (max_number_workunits_to_purge>=0 && purged_workunits >= max_number_workunits_to_purge) break; @@ -360,6 +480,24 @@ int main(int argc, char** argv) { // also purged, it indirectly controls that number as // well. max_number_workunits_to_purge= atoi(argv[++i]); + } else if (!strcmp(argv[i], "-zip")) { + // compress output files using zip. If used with + // -max_wu_per_file then the files get compressed after + // being closed. In any case the files are compressed + // when db_purge exits on a signal. + compress=COMPRESSION_ZIP; + // compress output files using gzip. If used with + // -max_wu_per_file then the files get compressed after + // being closed. In any case the files are compressed + // when db_purge exits on a signal. + } else if (!strcmp(argv[i], "-gzip")) { + compress=COMPRESSION_GZIP; + } else if (!strcmp(argv[i], "-max_wu_per_file")) { + // This is the limit on the maximum number of workunits to + // purge into a given file. The file is then closed and + // another file is opened. This can be used to get a + // series of bite-sized files instead of one huge file. + max_wu_per_file = atoi(argv[++i]); } else { log_messages.printf(SCHED_MSG_LOG::CRITICAL, "Unrecognized arg: %s\n", @@ -382,21 +520,13 @@ int main(int argc, char** argv) { } } - // // Call lock_file after fork(), because file locks are not always - // inherited - // if (lock_file(LOCKFILE)) { - // log_messages.printf(SCHED_MSG_LOG::NORMAL, "Another copy of file - // deleter is running\n"); - // exit(1); - // } - // write_pid_file(PIDFILE); log_messages.printf(SCHED_MSG_LOG::NORMAL, "Starting\n"); retval = boinc_db.open(config.db_name, config.db_host, config.db_user, config.db_passwd); if (retval) { log_messages.printf(SCHED_MSG_LOG::CRITICAL, "Can't open DB\n"); - exit(1); + exit(2); } install_stop_signal_handler(); @@ -408,24 +538,13 @@ int main(int argc, char** argv) { //now= time(0); //tim= *(localtime(&now)); //i= strftime(time_int,30,"%b%d_%Y_%H:%M:%S", &tim); - time_int= (int)time(0); - mkdir("../archives", 0777); - retval = open_archive(WU_FILENAME_PREFIX, wu_stream); - if (!retval) - retval = open_archive(RESULT_FILENAME_PREFIX, re_stream); - if (!retval) - retval = open_archive(RESULT_INDEX_FILENAME_PREFIX, - re_index_stream); - if (!retval) - retval = open_archive(WU_INDEX_FILENAME_PREFIX, wu_index_stream); - - - if (retval) { - log_messages.printf(SCHED_MSG_LOG::CRITICAL, "Can't open archives\n"); - exit(1); - } + // on exit, either via the check_stop_daemons signal handler, or + // through a regular call to exit, these functions will be called + // in the opposite order of registration. + atexit(close_db_exit_handler); + atexit(close_all_archives); if (one_pass) { do_pass(); @@ -436,11 +555,8 @@ int main(int argc, char** argv) { if (!do_pass()) sleep(10); } - } + } - boinc_db.close(); - fclose(wu_stream); - fclose(re_stream); - fclose(wu_index_stream); - fclose(re_index_stream); + // files and database are closed by exit handler + exit(0); }