// This file is part of BOINC.
// http://boinc.berkeley.edu
// Copyright (C) 2008 University of California
//
// BOINC is free software; you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License
// as published by the Free Software Foundation,
// either version 3 of the License, or (at your option) any later version.
//
// BOINC is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with BOINC. If not, see .
// db_purge options
//
// purge workunit and result records that are no longer needed.
// Specifically, purges WUs for which file_delete_state=DONE;
// this occurs only when it has been assimilated
// and all results have server_state=OVER.
// Purging a WU means writing it and all its results
// to XML-format archive files, then deleting it and its results from the DB.
//
// The XML files have names of the form
// wu_archive_TIME and result_archive_TIME
// where TIME is the time it was created.
// In addition there are index files associating each WU and result ID
// with the timestamp of the file it's in.
#include "config.h"
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "boinc_db.h"
#include "util.h"
#include "filesys.h"
#include "parse.h"
#include "sched_config.h"
#include "sched_util.h"
#include "sched_msgs.h"
#include "error_numbers.h"
#define WU_FILENAME_PREFIX "wu_archive"
#define RESULT_FILENAME_PREFIX "result_archive"
#define WU_INDEX_FILENAME_PREFIX "wu_index"
#define RESULT_INDEX_FILENAME_PREFIX "result_index"
#define DB_QUERY_LIMIT 1000
#define COMPRESSION_NONE 0
#define COMPRESSION_GZIP 1
#define COMPRESSION_ZIP 2
FILE *wu_stream=NULL;
FILE *re_stream=NULL;
FILE *wu_index_stream=NULL;
FILE *re_index_stream=NULL;
int time_int=0;
int min_age_days = 0;
bool no_archive = false;
int purged_workunits = 0;
// used if limiting the total number of workunits to eliminate
int max_number_workunits_to_purge = 0;
// If nonzero, maximum number of workunits to purge.
// Since all results associated with a purged workunit are also purged,
// this also limits the number of purged results.
const char *suffix[3] = {"", ".gz", ".zip"};
// subscripts MUST be in agreement with defines above
int compression_type = COMPRESSION_NONE;
int max_wu_per_file = 0;
// set on command line if archive files should be closed and re-opened
// after getting some max no of WU in the file
int wu_stored_in_file = 0;
// keep track of how many WU archived in file so far
bool time_to_quit() {
if (max_number_workunits_to_purge) {
if (purged_workunits >= max_number_workunits_to_purge) return true;
}
return false;
}
// Open an archive. Only subtle thing is that if the user has
// asked for compression, then we popen(2) a pipe to gzip or zip.
// This does 'in place' compression.
//
void open_archive(const char* filename_prefix, FILE*& f){
char path[256];
char command[512];
// append appropriate suffix for file type
sprintf(path, "../archives/%s_%d.xml", filename_prefix, time_int);
strcat(path, suffix[compression_type]);
// and construct appropriate command if needed
if (compression_type == COMPRESSION_GZIP) {
sprintf(command, "gzip - > %s", path);
}
if (compression_type == COMPRESSION_ZIP) {
sprintf(command, "zip %s -", path);
}
log_messages.printf(MSG_NORMAL,
"Opening archive %s\n", path
);
// in the case with no compression, just open the file, else open
// a pipe to the compression executable.
//
if (compression_type == COMPRESSION_NONE) {
if (!(f = fopen( path,"w"))) {
log_messages.printf(MSG_CRITICAL,
"Can't open archive file %s %s\n",
path, errno?strerror(errno):""
);
exit(3);
}
} else if (!(f = popen(command,"w"))) {
log_messages.printf(MSG_CRITICAL,
"Can't open pipe %s %s\n",
command, errno?strerror(errno):""
);
exit(4);
}
// set buffering to line buffered, since we are outputing XML on a
// line-by-line basis.
//
setlinebuf(f);
return;
}
void close_archive(const char *filename, FILE*& fp){
char path[256];
// Set file pointer to NULL after closing file to indicate that it's closed.
//
if (!fp) return;
// In case of errors, carry on anyway. This is deliberate, not lazy
//
if (compression_type == COMPRESSION_NONE) {
fclose(fp);
} else {
pclose(fp);
}
fp = NULL;
// append appropriate file type
sprintf(path, "../archives/%s_%d.xml", filename, time_int);
strcat(path, suffix[compression_type]);
log_messages.printf(MSG_NORMAL,
"Closed archive file %s containing records of %d workunits\n",
path, wu_stored_in_file
);
return;
}
// opens the various archive files. Guarantees that the timestamp
// does not equal the previous timestamp
//
void open_all_archives() {
int old_time=time_int;
// make sure we get a NEW value of the file timestamp!
//
while (old_time == (time_int = (int)time(0))) {
sleep(1);
}
// open all the archives.
open_archive(WU_FILENAME_PREFIX, wu_stream);
open_archive(RESULT_FILENAME_PREFIX, re_stream);
open_archive(RESULT_INDEX_FILENAME_PREFIX, re_index_stream);
open_archive(WU_INDEX_FILENAME_PREFIX, wu_index_stream);
fprintf(wu_stream, "\n");
fprintf(re_stream, "\n");
return;
}
// closes (and optionally compresses) the archive files. Clears file
// pointers to indicate that files are not open.
//
void close_all_archives() {
if (wu_stream) fprintf(wu_stream, "\n");
if (re_stream) fprintf(re_stream, "\n");
close_archive(WU_FILENAME_PREFIX, wu_stream);
close_archive(RESULT_FILENAME_PREFIX, re_stream);
close_archive(RESULT_INDEX_FILENAME_PREFIX, re_index_stream);
close_archive(WU_INDEX_FILENAME_PREFIX, wu_index_stream);
log_messages.printf(MSG_NORMAL,
"Closed archive files with %d workunits\n",
wu_stored_in_file
);
return;
}
// The exit handler always calls this at the end to be sure that the
// database is closed cleanly.
//
void close_db_exit_handler() {
boinc_db.close();
return;
}
int archive_result(DB_RESULT& result) {
fprintf(re_stream,
"\n"
" %d\n",
result.id
);
// xml_escape can increase size by factor of 6, e.g. x -> NNN;
//
char buf[BLOB_SIZE*6];
xml_escape(result.stderr_out, buf, sizeof(buf));
fprintf(
re_stream,
" %d\n"
" %d\n"
" %d\n"
" %d\n"
" %d\n"
" %d\n"
" %d\n"
" %d\n"
" %d\n"
" %d\n"
" %s\n"
" %.15e\n"
" %s\n"
" %s\n"
" %s\n"
" %d\n"
" %d\n"
" %d\n"
" %.15e\n"
" %.15e\n"
" %f\n"
" %d\n"
" %d\n"
" %d\n"
" %d\n"
" %d\n"
" %d\n"
" %s\n",
result.create_time,
result.workunitid,
result.server_state,
result.outcome,
result.client_state,
result.hostid,
result.userid,
result.report_deadline,
result.sent_time,
result.received_time,
result.name,
result.cpu_time,
result.xml_doc_in,
result.xml_doc_out,
buf,
result.batch,
result.file_delete_state,
result.validate_state,
result.claimed_credit,
result.granted_credit,
result.opaque,
result.random,
result.app_version_num,
result.appid,
result.exit_status,
result.teamid,
result.priority,
result.mod_time
);
fprintf(re_stream,
"\n"
);
fprintf(re_index_stream,
"%d %d\n",
result.id, time_int
);
return 0;
}
int archive_wu(DB_WORKUNIT& wu) {
fprintf(wu_stream,
"\n"
" %d\n",
wu.id
);
fprintf(wu_stream,
" %d\n"
" %d\n"
" %s\n"
" %s\n"
" %d\n"
" %.15e\n"
" %.15e\n"
" %.15e\n"
" %.15e\n"
" %d\n"
" %d\n"
" %.15e\n"
" %d\n"
" %d\n"
" %d\n"
" %d\n"
" %d\n"
" %d\n"
" %f\n"
" %d\n"
" %d\n"
" %d\n"
" %d\n"
" %d\n"
" %s\n"
" %d\n"
" %s\n",
wu.create_time,
wu.appid,
wu.name,
wu.xml_doc,
wu.batch,
wu.rsc_fpops_est,
wu.rsc_fpops_bound,
wu.rsc_memory_bound,
wu.rsc_disk_bound,
wu.need_validate,
wu.canonical_resultid,
wu.canonical_credit,
wu.transition_time,
wu.delay_bound,
wu.error_mask,
wu.file_delete_state,
wu.assimilate_state,
wu.hr_class,
wu.opaque,
wu.min_quorum,
wu.target_nresults,
wu.max_error_results,
wu.max_total_results,
wu.max_success_results,
wu.result_template_file,
wu.priority,
wu.mod_time
);
fprintf(wu_stream,
"\n"
);
fprintf(wu_index_stream,
"%d %d\n",
wu.id, time_int
);
return 0;
}
int purge_and_archive_results(DB_WORKUNIT& wu, int& number_results) {
int retval= 0;
DB_RESULT result;
char buf[256];
number_results=0;
sprintf(buf, "where workunitid=%d", wu.id);
while (!result.enumerate(buf)) {
if (!no_archive) {
retval = archive_result(result);
if (retval) return retval;
log_messages.printf(MSG_DEBUG,
"Archived result [%d] to a file\n", result.id
);
}
retval = result.delete_from_db();
if (retval) return retval;
log_messages.printf(MSG_DEBUG,
"Purged result [%d] from database\n", result.id
);
number_results++;
}
return 0;
}
// return true if did anything
//
bool do_pass() {
int retval = 0;
// The number of workunits/results purged in a single pass of do_pass().
// Since do_pass() may be invoked multiple times,
// corresponding global variables store global totals.
//
int do_pass_purged_workunits = 0;
int do_pass_purged_results = 0;
// check to see if we got a stop signal.
// Note that if we do catch a stop signal here,
// we call an exit handler that closes [and optionally compresses] files
// before returning to the OS.
//
check_stop_daemons();
bool did_something = false;
DB_WORKUNIT wu;
char buf[256];
if (min_age_days) {
char timestamp[15];
mysql_timestamp(dtime()-min_age_days*86400, timestamp);
sprintf(buf,
"where file_delete_state=%d and mod_time<'%s' limit %d",
FILE_DELETE_DONE, timestamp, DB_QUERY_LIMIT
);
} else {
sprintf(buf,
"where file_delete_state=%d limit %d",
FILE_DELETE_DONE, DB_QUERY_LIMIT
);
}
int n=0;
while (1) {
retval = wu.enumerate(buf);
if (retval) {
if (retval != ERR_DB_NOT_FOUND) {
log_messages.printf(MSG_DEBUG,
"DB connection lost, exiting\n"
);
exit(0);
}
break;
}
if (strstr(wu.name, "nodelete")) continue;
did_something = true;
// if archives have not already been opened, then open them.
//
if (!no_archive && !wu_stream) {
open_all_archives();
}
retval = purge_and_archive_results(wu, n);
do_pass_purged_results += n;
if (!no_archive) {
retval= archive_wu(wu);
if (retval) {
log_messages.printf(MSG_CRITICAL,
"Failed to write to XML file workunit:%d\n", wu.id
);
exit(5);
}
log_messages.printf(MSG_DEBUG,
"Archived workunit [%d] to a file\n", wu.id
);
}
// purge workunit from DB
//
retval= wu.delete_from_db();
if (retval) {
log_messages.printf(MSG_CRITICAL,
"Can't delete workunit [%d] from database:%d\n", wu.id, retval
);
exit(6);
}
log_messages.printf(MSG_DEBUG,
"Purged workunit [%d] from database\n", wu.id
);
purged_workunits++;
do_pass_purged_workunits++;
wu_stored_in_file++;
if (!no_archive) {
fflush(NULL);
// if file has got max # of workunits, close and compress it.
// This sets file pointers to NULL
//
if (max_wu_per_file && wu_stored_in_file>=max_wu_per_file) {
close_all_archives();
wu_stored_in_file = 0;
}
}
if (time_to_quit()) {
break;
}
}
if (do_pass_purged_workunits) {
log_messages.printf(MSG_NORMAL,
"Archived %d workunits and %d results\n",
do_pass_purged_workunits, do_pass_purged_results
);
}
if (did_something && wu_stored_in_file>0) {
log_messages.printf(MSG_DEBUG,
"Currently open archive files contain %d workunits\n",
wu_stored_in_file
);
}
if (do_pass_purged_workunits > DB_QUERY_LIMIT/2) {
return true;
} else {
return false;
}
}
void usage(char** argv) {
fprintf(stderr,
"Purge workunit and result records that are no longer needed.\n\n"
"Usage: %s [options]\n"
" [-d N] Set verbosity level (1, 2, 3=most verbose)\n"
" [-min_age_days N] Purge Wus w/ mod time at least N days ago\n"
" [-max N] Purge at more N WUs\n"
" [-zip] Compuress output files using zip\n"
" [-gzip] Compuress output files using gzip\n"
" [-no_archive] Don't write output files, just purge\n"
" [-max_wu_per_file N] Write at most N WUs per output file\n"
" [-sleep N] Sleep N sec after DB scan\n"
" [-one_pass] Make one DB scan, then exit\n",
argv[0]
);
exit(0);
}
int main(int argc, char** argv) {
int retval;
bool one_pass = false;
int i;
int sleep_sec = 600;
check_stop_daemons();
for (i=1; i 86400) {
log_messages.printf(MSG_CRITICAL,
"Unreasonable value of sleep interval: %d seconds\n",
sleep_sec
);
usage(argv);
}
} else if (!strcmp(argv[i], "--help") || !strcmp(argv[i], "-h")) {
usage(argv);
} else {
log_messages.printf(MSG_CRITICAL,
"Unrecognized arg: %s\n", argv[i]
);
usage(argv);
}
}
retval = config.parse_file("..");
if (retval) {
log_messages.printf(MSG_CRITICAL,
"Can't parse config file\n"
);
exit(1);
}
log_messages.printf(MSG_NORMAL, "Starting\n");
retval = boinc_db.open(
config.db_name, config.db_host, config.db_user, config.db_passwd
);
if (retval) {
log_messages.printf(MSG_CRITICAL, "Can't open DB\n");
exit(2);
}
install_stop_signal_handler();
boinc_mkdir("../archives");
// on exit, either via the check_stop_daemons signal handler, or
// through a regular call to exit, these functions will be called
// in the opposite order of registration.
//
atexit(close_db_exit_handler);
atexit(close_all_archives);
while (1) {
if (time_to_quit()) {
break;
}
if (!do_pass()) {
if (one_pass) break;
log_messages.printf(MSG_NORMAL, "Sleeping....\n");
sleep(sleep_sec);
}
}
// files and database are closed by exit handler
exit(0);
}
const char *BOINC_RCSID_0c1c4336f1 = "$Id$";