boinc/vda/vdad.cpp

306 lines
8.8 KiB
C++

// This file is part of BOINC.
// http://boinc.berkeley.edu
// Copyright (C) 2012 University of California
//
// BOINC is free software; you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License
// as published by the Free Software Foundation,
// either version 3 of the License, or (at your option) any later version.
//
// BOINC is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with BOINC. If not, see <http://www.gnu.org/licenses/>.
// vdad - volunteer data archival daemon
//
// Enumerates files needing updating from the DB.
// Creates the corresponding tree of META_CHUNKs, CHUNKs,
// and VDA_CHUNK_HOSTs.
// Calls the recovery routines to initiate transfers,
// update the DB, etc.
#include <set>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <vector>
#include "boinc_db.h"
#include "sched_config.h"
#include "sched_util.h"
#include "error_numbers.h"
#include "util.h"
#include "filesys.h"
#include "vda_lib.h"
using std::vector;
using std::set;
void show_msg(char* msg) {
printf("%s", msg);
}
int handle_file(VDA_FILE_AUX& vf, DB_VDA_FILE& dvf) {
int retval;
char buf[1024];
log_messages.printf(MSG_NORMAL, "processing file %s\n", vf.file_name);
// read the policy file
//
sprintf(buf, "%s/boinc_meta.txt", vf.dir);
retval = vf.policy.parse(buf);
if (retval) {
log_messages.printf(MSG_CRITICAL, "Can't parse policy file %s\n", buf);
return retval;
}
if (vf.initialized) {
log_messages.printf(MSG_NORMAL, "Getting state\n");
retval = vf.get_state();
if (retval) {
log_messages.printf(MSG_CRITICAL, "vf.get_state failed %d\n", retval);
return retval;
}
} else {
log_messages.printf(MSG_NORMAL, "Initializing\n");
retval = vf.init();
if (retval) {
log_messages.printf(MSG_CRITICAL, "vf.init failed %d\n", retval);
return retval;
}
sprintf(buf, "initialized=1, chunk_size=%.0f", vf.policy.chunk_size());
dvf.update_field(buf);
}
log_messages.printf(MSG_NORMAL, "Recovery plan:\n");
vf.meta_chunk->recovery_plan();
// see if we're retrieving this file
//
if (vf.retrieving) {
if (vf.retrieved) return 0;
switch (vf.meta_chunk->status) {
case PRESENT:
// we have enough chunks to reconstruct it - do so
//
retval = vf.meta_chunk->reconstruct();
if (retval) {
log_messages.printf(MSG_CRITICAL,
"reconstruct of %s failed: %d\n", vf.file_name, retval
);
} else {
log_messages.printf(MSG_NORMAL,
"retrieval of %s completed successfully\n", vf.file_name
);
// Decoding produces a file with unused space at the end.
// Remove this space.
//
sprintf(buf, "truncate %s/%s --reference %s/%s",
vf.dir, vf.file_name, vf.dir, vf.file_name
);
retval = system(buf);
if (retval) {
log_messages.printf(MSG_CRITICAL,
"Can't truncate %s: %d\n", vf.file_name, retval
);
}
dvf.retrieved = true;
dvf.update();
}
break;
case RECOVERABLE:
// otherwise start all possible uploads
//
vf.meta_chunk->upload_all();
break;
case UNRECOVERABLE:
// if it looks like we can't recover the file, print a msg
//
log_messages.printf(MSG_CRITICAL,
"Can't retrieve %s: unrecoverable\n",
vf.file_name
);
break;
}
return 0;
}
retval = vf.meta_chunk->decide_reconstruct();
if (retval) {
log_messages.printf(MSG_CRITICAL, "vf.decide_reconstruct failed %d\n", retval);
return retval;
}
retval = vf.meta_chunk->reconstruct_and_cleanup();
if (retval) {
log_messages.printf(MSG_CRITICAL,
"vf.reconstruct_and_cleanup failed %d\n", retval
);
return retval;
}
log_messages.printf(MSG_NORMAL, "Recovery action:\n");
retval = vf.meta_chunk->recovery_action(dtime());
if (retval) {
log_messages.printf(MSG_CRITICAL, "vf.recovery_action failed %d\n", retval);
return retval;
}
vf.meta_chunk->compute_min_failures();
return 0;
}
// handle files
//
bool scan_files() {
DB_VDA_FILE vf;
bool found = false;
int retval;
while (1) {
retval = vf.enumerate("where need_update<>0");
if (retval == ERR_DB_NOT_FOUND) break;
if (retval) {
log_messages.printf(MSG_CRITICAL, "VDA_FILE enumerate failed\n");
exit(1);
}
VDA_FILE_AUX vfa(vf);
found = true;
retval = handle_file(vfa, vf);
if (retval) {
log_messages.printf(
MSG_CRITICAL, "handle_file() failed: %d\n", retval
);
exit(1);
} else {
retval = vf.update_field("need_update=0");
if (retval) {
log_messages.printf(
MSG_CRITICAL, "update_field() failed: %d\n", retval
);
exit(1);
}
}
}
return found;
}
// this host is declared dead; deal with the loss of data
//
int handle_dead_host(DB_HOST& h) {
DB_VDA_CHUNK_HOST ch;
char buf[256];
int retval;
log_messages.printf(MSG_NORMAL, "processing dead host %lu\n", h.id);
sprintf(buf, "where host_id=%lu", h.id);
while (1) {
retval = ch.enumerate(buf);
if (retval == ERR_DB_NOT_FOUND) break;
if (retval) return retval;
log_messages.printf(MSG_NORMAL,
" updating file %lu\n", ch.vda_file_id
);
DB_VDA_FILE vf;
retval = vf.lookup_id(ch.vda_file_id);
if (retval) {
log_messages.printf(MSG_CRITICAL,
" file lookup failed %lu\n", ch.vda_file_id
);
return retval;
}
retval = vf.update_field("need_update=1");
if (retval) {
log_messages.printf(MSG_CRITICAL,
" file update failed %lu\n", ch.vda_file_id
);
return retval;
}
}
return 0;
}
// identify and process dead (i.e. timed-out) hosts
//
bool scan_hosts() {
DB_HOST h;
char buf[256];
int retval;
bool found = false;
sprintf(buf,
"where cpu_efficiency=0 and rpc_time < %f",
dtime() - config.vda_host_timeout
);
while (1) {
retval = h.enumerate(buf);
if (retval == ERR_DB_NOT_FOUND) break;
if (retval) {
log_messages.printf(MSG_CRITICAL, "host.enumerate() failed\n");
exit(1);
}
found = true;
retval = handle_dead_host(h);
if (retval) {
log_messages.printf(MSG_CRITICAL, "handle_host() failed: %d\n", retval);
exit(1);
}
retval = h.update_field("cpu_efficiency=1");
if (retval) {
log_messages.printf(MSG_CRITICAL, "h.update_field() failed: %d\n", retval);
exit(1);
}
}
return found;
}
int main(int argc, char** argv) {
setbuf(stdout, 0);
for (int i=1; i<argc; i++) {
if (!strcmp(argv[i], "-d") || !strcmp(argv[i], "--debug_level")) {
int dl = atoi(argv[++i]);
log_messages.set_debug_level(dl);
if (dl == 4) g_print_queries = true;
}
}
int retval = config.parse_file();
if (retval) {
log_messages.printf(MSG_CRITICAL, "can't parse config file\n");
exit(1);
}
if (!config.enable_vda) {
log_messages.printf(MSG_CRITICAL, "VDA not enabled\n");
exit(1);
}
if (config.vda_host_timeout == 0) {
log_messages.printf(MSG_CRITICAL, "Must specify VDA host timeout\n");
exit(1);
}
retval = boinc_db.open(
config.db_name, config.db_host, config.db_user, config.db_passwd
);
if (retval) {
log_messages.printf(MSG_CRITICAL, "can't open DB\n");
exit(1);
}
#if 0
VDA_FILE_AUX vf;
memset(&vf, 0, sizeof(vf));
strcpy(vf.dir, "/mydisks/b/users/boincadm/vda_test");
strcpy(vf.name, "file.ext");
handle_file(vf);
exit(0);
#endif
while(1) {
bool action = scan_hosts();
action |= scan_files();
if (!action) boinc_sleep(5.);
}
}