Added additional options -gzip, -zip, -max_wu_per_file to db_purge. The

compression options are 'efficient' in that they do not write an uncompressed
file then compress -- they go straight to compressed.  SIGHUP and lockfile
shutdown signals are properly caught and close compressed files and DB cleanly.

svn path=/trunk/boinc/; revision=4782
This commit is contained in:
Bruce Allen 2004-12-07 22:54:54 +00:00
parent 5acc0330bb
commit ce142bc42d
2 changed files with 180 additions and 55 deletions

View File

@ -20841,3 +20841,12 @@ Rom 7 Dec 2004
client/
client_state.C
Bruce 7 Dec 2004
- Added additional options -gzip, -zip, -max_wu_per_file to db_purge. The
compression options are 'efficient' in that they do not write an uncompressed
file then compress -- they go straight to compressed. SIGHUP and lockfile
shutdown signals are properly caught and close compressed files and DB cleanly.
sched/
db_purge.C

View File

@ -32,6 +32,7 @@ static volatile const char *BOINCrcsid="$Id$";
#include <ctime>
#include <string>
#include <time.h>
#include <errno.h>
using namespace std;
@ -55,11 +56,11 @@ using namespace std;
#define DB_QUERY_LIMIT 1000
SCHED_CONFIG config;
FILE *wu_stream;
FILE *re_stream;
FILE *wu_index_stream;
FILE *re_index_stream;
int time_int;
FILE *wu_stream=NULL;
FILE *re_stream=NULL;
FILE *wu_index_stream=NULL;
FILE *re_index_stream=NULL;
int time_int=0;
// These is used if limiting the total number of workunits to eliminate
int purged_workunits= 0;
@ -69,32 +70,133 @@ int purged_workunits= 0;
// also limits the number of purged results.
int max_number_workunits_to_purge=-1;
// set on command line if compression of archives is desired
#define COMPRESSION_NONE 0
#define COMPRESSION_GZIP 1
#define COMPRESSION_ZIP 2
int open_archive(char* filename_prefix, FILE*& f){
int retval=0;
char path[256];
// subscripts MUST be in agreement with defines above
char *suffix[3]={"", ".gz", ".zip"};
sprintf(path,"../archives/%s_%d.xml", filename_prefix, time_int);
// default is no compression
int compress=COMPRESSION_NONE;
// set on command line if archive files should be closed and re-opened
// after getting some max no of WU in the file
int max_wu_per_file=0;
// keep track of how many WU archived in file so far
int wu_stored_in_file=0;
// this opens the archive. Only subtle thing is that if the user has
// asked for compression, then we popen(2) a pipe to gzip or zip.
// This does 'in place' compression.
//
void open_archive(char* filename_prefix, FILE*& f){
char path[256];
char command[512];
// append appropriate suffix for file type
sprintf(path, "../archives/%s_%d.xml", filename_prefix, time_int);
strcat(path, suffix[compress]);
// and construct appropriate command if needed
if (compress==COMPRESSION_GZIP)
sprintf(command, "gzip - > %s", path);
if (compress==COMPRESSION_ZIP)
sprintf(command, "zip %s -", path);
log_messages.printf(SCHED_MSG_LOG::NORMAL, "Opening archive %s\n", path);
if ((f = fopen( path,"a+")) == NULL) {
log_messages.printf(SCHED_MSG_LOG::CRITICAL,"Can't open archive file %s\n", path);
return ERR_FOPEN;
// in the case with no compression, just open the file, else open
// a pipe to the compression executable.
if (compress==COMPRESSION_NONE) {
if (!(f = fopen( path,"w"))) {
log_messages.printf(SCHED_MSG_LOG::CRITICAL,"Can't open archive file %s %s\n",
path, errno?strerror(errno):"");
exit(3);
}
}
else if (!(f = popen(command,"w"))) {
log_messages.printf(SCHED_MSG_LOG::CRITICAL,"Can't open pipe %s %s\n",
command, errno?strerror(errno):"");
exit(4);
}
//fprintf(f,
// "<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>\n"
//);
// set buffering to line buffered, since we are outputing XML on a
// line-by-line basis.
//
setlinebuf(f);
//fprintf(f,
// "<%s>\n",filename_prefix
//);
setbuf( f, NULL );
return retval;
return;
}
void close_archive(char *filename, FILE*& fp){
char path[256];
// Set file pointer to NULL after closing file to indicate that
// it's closed.
if (!fp)
return;
// In case of errors, carry on anyway. This is deliberate, not lazy
if (compress==COMPRESSION_NONE)
fclose(fp);
else
pclose(fp);
fp=NULL;
// append appropriate file type
sprintf(path, "../archives/%s_%d.xml", filename, time_int);
strcat(path, suffix[compress]);
log_messages.printf(SCHED_MSG_LOG::NORMAL,
"Closed archive file %s containing records of %d workunits\n",
path, wu_stored_in_file);
return;
}
// opens the various archive files. Guarantees that the timestamp
// does not equal the previous timestamp
//
void open_all_archives() {
int old_time=time_int;
// make sure we get a NEW value of the file timestamp!
while (old_time == (time_int = (int)time(0)))
sleep(1);
// open all the archives.
open_archive(WU_FILENAME_PREFIX, wu_stream);
open_archive(RESULT_FILENAME_PREFIX, re_stream);
open_archive(RESULT_INDEX_FILENAME_PREFIX, re_index_stream);
open_archive(WU_INDEX_FILENAME_PREFIX, wu_index_stream);
return;
}
// closes (and optionally compresses) the archive files. Clears file
// pointers to indicate that files are not open.
//
void close_all_archives() {
close_archive(WU_FILENAME_PREFIX, wu_stream);
close_archive(RESULT_FILENAME_PREFIX, re_stream);
close_archive(RESULT_INDEX_FILENAME_PREFIX, re_index_stream);
close_archive(WU_INDEX_FILENAME_PREFIX, wu_index_stream);
return;
}
// The exit handler always calls this at the end to be sure that the
// database is closed cleanly.
void close_db_exit_handler() {
boinc_db.close();
return;
}
int archive_result(DB_RESULT& result) {
fprintf(re_stream,
@ -280,7 +382,7 @@ int purge_and_archive_results(DB_WORKUNIT& wu, int& number_results) {
//
bool do_pass() {
int retval= 0;
// The number of workunits/results purged in a single pass of
// do_pass(). Since do_pass() may be invoked multiple times,
// corresponding global variables store global totals.
@ -288,6 +390,10 @@ bool do_pass() {
int do_pass_purged_workunits = 0;
int do_pass_purged_results = 0;
// check to see if we got a stop signal. Note that if we do catch
// a stop signal here, we call an exit handler that closes [and
// optionally compresses] files before returning to the OS.
//
check_stop_daemons();
bool did_something = false;
@ -303,6 +409,10 @@ bool do_pass() {
// compiler complaints about uninitialized
while (!wu.enumerate(buf)) {
did_something = true;
// if archives have not already been opened, then open them.
if (!wu_stream)
open_all_archives();
retval = purge_and_archive_results(wu, n);
do_pass_purged_results += n;
@ -312,7 +422,7 @@ bool do_pass() {
log_messages.printf(SCHED_MSG_LOG::CRITICAL,
"Failed to write to XML file workunit:%d\n", wu.id
);
exit(1);
exit(5);
}
log_messages.printf(SCHED_MSG_LOG::DEBUG,"Archived workunit [%d] to a file\n", wu.id);
@ -320,13 +430,23 @@ bool do_pass() {
retval= wu.delete_from_db();
if (retval) {
log_messages.printf(SCHED_MSG_LOG::CRITICAL,"Can't delete workunit [%d] from database:%d\n", wu.id, retval);
exit(1);
exit(6);
}
log_messages.printf(SCHED_MSG_LOG::DEBUG,"Purged workunit [%d] from database\n", wu.id);
purged_workunits++;
do_pass_purged_workunits++;
wu_stored_in_file++;
// flush the various output files.
fflush(NULL);
// if file has got max # of workunits, close and compress it.
// This sets file pointers to NULL
if (max_wu_per_file && wu_stored_in_file>=max_wu_per_file) {
close_all_archives();
wu_stored_in_file=0;
}
if (max_number_workunits_to_purge>=0 && purged_workunits >= max_number_workunits_to_purge)
break;
@ -360,6 +480,24 @@ int main(int argc, char** argv) {
// also purged, it indirectly controls that number as
// well.
max_number_workunits_to_purge= atoi(argv[++i]);
} else if (!strcmp(argv[i], "-zip")) {
// compress output files using zip. If used with
// -max_wu_per_file then the files get compressed after
// being closed. In any case the files are compressed
// when db_purge exits on a signal.
compress=COMPRESSION_ZIP;
// compress output files using gzip. If used with
// -max_wu_per_file then the files get compressed after
// being closed. In any case the files are compressed
// when db_purge exits on a signal.
} else if (!strcmp(argv[i], "-gzip")) {
compress=COMPRESSION_GZIP;
} else if (!strcmp(argv[i], "-max_wu_per_file")) {
// This is the limit on the maximum number of workunits to
// purge into a given file. The file is then closed and
// another file is opened. This can be used to get a
// series of bite-sized files instead of one huge file.
max_wu_per_file = atoi(argv[++i]);
} else {
log_messages.printf(SCHED_MSG_LOG::CRITICAL,
"Unrecognized arg: %s\n",
@ -382,21 +520,13 @@ int main(int argc, char** argv) {
}
}
// // Call lock_file after fork(), because file locks are not always
// inherited
// if (lock_file(LOCKFILE)) {
// log_messages.printf(SCHED_MSG_LOG::NORMAL, "Another copy of file
// deleter is running\n");
// exit(1);
// }
// write_pid_file(PIDFILE);
log_messages.printf(SCHED_MSG_LOG::NORMAL, "Starting\n");
retval = boinc_db.open(config.db_name, config.db_host, config.db_user,
config.db_passwd);
if (retval) {
log_messages.printf(SCHED_MSG_LOG::CRITICAL, "Can't open DB\n");
exit(1);
exit(2);
}
install_stop_signal_handler();
@ -408,24 +538,13 @@ int main(int argc, char** argv) {
//now= time(0);
//tim= *(localtime(&now));
//i= strftime(time_int,30,"%b%d_%Y_%H:%M:%S", &tim);
time_int= (int)time(0);
mkdir("../archives", 0777);
retval = open_archive(WU_FILENAME_PREFIX, wu_stream);
if (!retval)
retval = open_archive(RESULT_FILENAME_PREFIX, re_stream);
if (!retval)
retval = open_archive(RESULT_INDEX_FILENAME_PREFIX,
re_index_stream);
if (!retval)
retval = open_archive(WU_INDEX_FILENAME_PREFIX, wu_index_stream);
if (retval) {
log_messages.printf(SCHED_MSG_LOG::CRITICAL, "Can't open archives\n");
exit(1);
}
// on exit, either via the check_stop_daemons signal handler, or
// through a regular call to exit, these functions will be called
// in the opposite order of registration.
atexit(close_db_exit_handler);
atexit(close_all_archives);
if (one_pass) {
do_pass();
@ -436,11 +555,8 @@ int main(int argc, char** argv) {
if (!do_pass())
sleep(10);
}
}
}
boinc_db.close();
fclose(wu_stream);
fclose(re_stream);
fclose(wu_index_stream);
fclose(re_index_stream);
// files and database are closed by exit handler
exit(0);
}