wrapper: add --trickle X cmdline option to periodically report runtime

Add logic in wrapper to maintain and checkpoint total runtime.
Also vboxwrapper: slight refactor.

Note: we often use "elapsed time" where we mean "runtime".
Should use the latter.
This commit is contained in:
David Anderson 2014-07-04 12:19:31 -07:00
parent 65c82b067f
commit 658547ccf6
2 changed files with 95 additions and 54 deletions

View File

@ -76,6 +76,9 @@
using std::vector;
using std::string;
double elapsed_time = 0;
// job's total elapsed time (over all sessions)
double trickle_period = 0;
bool is_boinc_client_version_newer(APP_INIT_DATA& aid, int maj, int min, int rel) {
if (maj < aid.major_version) return true;
@ -437,6 +440,37 @@ void VBOX_VM::check_trickle_triggers() {
}
}
// see if it's time to send trickle-up reporting elapsed time
//
void check_trickle_period() {
char buf[256];
static double last_trickle_report_time = 0;
if ((elapsed_time - last_trickle_report_time) < trickle_period) {
return;
}
last_trickle_report_time = elapsed_time;
fprintf(
stderr,
"%s Status Report: Trickle-Up Event.\n",
vboxwrapper_msg_prefix(buf, sizeof(buf))
);
sprintf(buf,
"<cpu_time>%f</cpu_time>", last_trickle_report_time
);
int retval = boinc_send_trickle_up(
const_cast<char*>("cpu_time"), buf
);
if (retval) {
fprintf(
stderr,
"%s Sending Trickle-Up Event failed (%d).\n",
vboxwrapper_msg_prefix(buf, sizeof(buf)),
retval
);
}
}
int main(int argc, char** argv) {
int retval;
int loop_iteration = 0;
@ -444,14 +478,11 @@ int main(int argc, char** argv) {
VBOX_VM vm;
APP_INIT_DATA aid;
double random_checkpoint_factor = 0;
double elapsed_time = 0;
double trickle_period = 0;
double fraction_done = 0;
double current_cpu_time = 0;
double starting_cpu_time = 0;
double last_checkpoint_time = 0;
double last_status_report_time = 0;
double last_trickle_report_time = 0;
double stopwatch_starttime = 0;
double stopwatch_endtime = 0;
double stopwatch_elapsedtime = 0;
@ -487,9 +518,6 @@ int main(int argc, char** argv) {
boinc_options.main_program = true;
boinc_options.check_heartbeat = true;
boinc_options.handle_process_control = true;
if (trickle_period > 0.0) {
boinc_options.handle_trickle_ups = true;
}
boinc_init_options(&boinc_options);
// Prepare environment for detecting system conditions
@ -1183,29 +1211,10 @@ int main(int argc, char** argv) {
}
}
// send elapsed-time trickle message if needed
//
if (trickle_period) {
if ((elapsed_time - last_trickle_report_time) >= trickle_period) {
last_trickle_report_time = elapsed_time;
fprintf(
stderr,
"%s Status Report: Trickle-Up Event.\n",
vboxwrapper_msg_prefix(buf, sizeof(buf))
);
sprintf(buf,
"<cpu_time>%f</cpu_time>", last_trickle_report_time
);
retval = boinc_send_trickle_up(
const_cast<char*>("cpu_time"), buf
);
if (retval) {
fprintf(
stderr,
"%s Sending Trickle-Up Event failed (%d).\n",
vboxwrapper_msg_prefix(buf, sizeof(buf)),
retval
);
}
}
check_trickle_period();
}
if (boinc_status.reread_init_data_file) {

View File

@ -15,8 +15,17 @@
// You should have received a copy of the GNU Lesser General Public License
// along with BOINC. If not, see <http://www.gnu.org/licenses/>.
// wrapper.cpp
// wrapper program - lets you use non-BOINC apps with BOINC
// BOINC wrapper - lets you use non-BOINC apps with BOINC
// See http://boinc.berkeley.edu/trac/wiki/WrapperApp
//
// cmdline options:
// --device N macro-substitute N for $GPU_DEVICE_NUM
// in worker cmdlines and env values
// --nthreads X macro-substitute X for $NTHREADS
// in worker cmdlines and env values
// --trickle X send a trickle-up message reporting elapsed time every X sec
// (use this for credit granting if your app does its
// own job management)
//
// Handles:
// - suspend/resume/quit/abort
@ -25,14 +34,6 @@
// - checkpointing
// (at the level of task; or potentially within task)
//
// See http://boinc.berkeley.edu/trac/wiki/WrapperApp for details
//
// cmdline options:
// --nthreads X: macro-substitute X for $NTHREADS
// in worker cmdlines and env values
// --device N: macro-substitute N for $GPU_DEVICE_NUM
// in worker cmdlines and env values
//
// Contributor: Andrew J. Younge (ajy4490@umiacs.umd.edu)
#ifndef _WIN32
@ -74,6 +75,9 @@
#include "regexp.h"
using std::vector;
using std::string;
//#define DEBUG
#if 1
#define debug_msg(x)
@ -88,10 +92,15 @@ inline void debug_msg(const char* x) {
#define POLL_PERIOD 1.0
using std::vector;
using std::string;
int nthreads = 1;
int gpu_device_num = -1;
double runtime = 0;
// run time this session
double trickle_period = 0;
vector<string> unzip_filenames;
string zip_filename;
vector<regexp*> zip_patterns;
APP_INIT_DATA aid;
struct TASK {
string application;
@ -220,10 +229,6 @@ struct TASK {
vector<TASK> tasks;
vector<TASK> daemons;
vector<string> unzip_filenames;
string zip_filename;
vector<regexp*> zip_patterns;
APP_INIT_DATA aid;
// replace s1 with s2
//
@ -931,32 +936,51 @@ void poll_boinc_messages(TASK& task) {
}
}
// see if it's time to send trickle-up reporting elapsed time
//
void check_trickle_period() {
char buf[256];
static double last_trickle_report_time = 0;
if ((runtime - last_trickle_report_time) < trickle_period) {
return;
}
last_trickle_report_time = runtime;
sprintf(buf,
"<cpu_time>%f</cpu_time>", last_trickle_report_time
);
boinc_send_trickle_up(
const_cast<char*>("cpu_time"), buf
);
}
// Support for multiple tasks.
// We keep a checkpoint file that says how many tasks we've completed
// and how much CPU time has been used so far
// and how much CPU time and runtime has been used so far
//
void write_checkpoint(int ntasks_completed, double cpu) {
void write_checkpoint(int ntasks_completed, double cpu, double rt) {
boinc_begin_critical_section();
FILE* f = fopen(CHECKPOINT_FILENAME, "w");
if (!f) return;
fprintf(f, "%d %f\n", ntasks_completed, cpu);
fprintf(f, "%d %f %f\n", ntasks_completed, cpu, rt);
fclose(f);
boinc_checkpoint_completed();
}
int read_checkpoint(int& ntasks_completed, double& cpu) {
int read_checkpoint(int& ntasks_completed, double& cpu, double& rt) {
int nt;
double c;
double c, r;
ntasks_completed = 0;
cpu = 0;
FILE* f = fopen(CHECKPOINT_FILENAME, "r");
if (!f) return ERR_FOPEN;
int n = fscanf(f, "%d %lf", &nt, &c);
int n = fscanf(f, "%d %lf %lf", &nt, &c, &r);
fclose(f);
if (n != 2) return 0;
ntasks_completed = nt;
cpu = c;
rt = r;
return 0;
}
@ -978,6 +1002,8 @@ int main(int argc, char** argv) {
nthreads = atoi(argv[++j]);
} else if (!strcmp(argv[j], "--device")) {
gpu_device_num = atoi(argv[++j]);
} else if (!strcmp(argv[j], "--trickle")) {
trickle_period = atof(argv[++j]);
}
}
@ -992,14 +1018,14 @@ int main(int argc, char** argv) {
do_unzip_inputs();
retval = read_checkpoint(ntasks_completed, checkpoint_cpu_time);
retval = read_checkpoint(ntasks_completed, checkpoint_cpu_time, runtime);
if (retval && !zip_filename.empty()) {
// this is the first time we've run.
// If we're going to zip output files,
// make a list of files present at this point
// so we can exclude them.
//
write_checkpoint(0, 0);
write_checkpoint(0, 0, 0);
get_initial_file_list();
}
@ -1109,11 +1135,17 @@ int main(int argc, char** argv) {
if (task.has_checkpointed()) {
cpu_time = task.cpu_time();
checkpoint_cpu_time = task.starting_cpu + cpu_time;
write_checkpoint(i, checkpoint_cpu_time);
write_checkpoint(i, checkpoint_cpu_time, runtime);
}
if (trickle_period) {
check_trickle_period();
}
boinc_sleep(POLL_PERIOD);
if (!task.suspended) {
task.elapsed_time += POLL_PERIOD;
runtime += POLL_PERIOD;
}
counter++;
}
@ -1131,7 +1163,7 @@ int main(int argc, char** argv) {
checkpoint_cpu_time,
frac_done + task.weight/total_weight
);
write_checkpoint(i+1, checkpoint_cpu_time);
write_checkpoint(i+1, checkpoint_cpu_time, runtime);
weight_completed += task.weight;
}
kill_daemons();