- Wrapper can handle multiple tasks

wrapper/
    job.xml
    wrapper.C

svn path=/trunk/boinc_samples/; revision=12758
This commit is contained in:
David Anderson 2007-05-28 16:31:39 +00:00
parent 88f47331bd
commit 7c350ed33e
3 changed files with 93 additions and 31 deletions

View File

@ -355,3 +355,10 @@ David 23 May 2007
uc2.h
uppercase/
Makefile
David 28 May 2007
- Wrapper can handle multiple tasks
wrapper/
job.xml
wrapper.C

View File

@ -5,4 +5,10 @@
<stdout_filename>stdout</stdout_filename>
<command_line>10</command_line>
</task>
<task>
<application>worker2</application>
<stdin_filename>stdin2</stdin_filename>
<stdout_filename>stdout2</stdout_filename>
<command_line>10</command_line>
</task>
</job_desc>

View File

@ -53,6 +53,9 @@
#include "util.h"
#include "error_numbers.h"
#define JOB_FILENAME "job.xml"
#define CHECKPOINT_FILENAME "checkpoint.txt"
using std::vector;
using std::string;
@ -62,6 +65,8 @@ struct TASK {
string stdout_filename;
string stderr_filename;
string command_line;
double final_cpu_time;
double starting_cpu;
#ifdef _WIN32
HANDLE pid_handle;
HANDLE thread_handle;
@ -85,6 +90,7 @@ int TASK::parse(XML_PARSER& xp) {
char tag[1024];
bool is_tag;
final_cpu_time = 0;
while (!xp.get(tag, sizeof(tag), is_tag)) {
if (!is_tag) {
fprintf(stderr, "SCHED_CONFIG::parse(): unexpected text %s\n", tag);
@ -107,9 +113,12 @@ int parse_job_file() {
char tag[1024], buf[256];
bool is_tag;
boinc_resolve_filename("job.xml", buf, 1024);
boinc_resolve_filename(JOB_FILENAME, buf, 1024);
FILE* f = boinc_fopen(buf, "r");
if (!f) return ERR_FOPEN;
if (!f) {
fprintf(stderr, "can't open job file %s\n", buf);
return ERR_FOPEN;
}
mf.init_file(f);
XML_PARSER xp(&mf);
@ -284,6 +293,7 @@ int TASK::run(int argct, char** argvt) {
argv[0] = buf;
strlcpy(arglist, command_line.c_str(), sizeof(arglist));
argc = parse_command_line(arglist, argv+1);
fprintf(stderr, "wrapper: running %s (%s)\n", buf, arglist);
retval = execv(buf, argv);
exit(ERR_EXEC);
}
@ -297,14 +307,17 @@ bool TASK::poll(int& status) {
if (GetExitCodeProcess(pid_handle, &exit_code)) {
if (exit_code != STILL_ACTIVE) {
status = exit_code;
final_cpu_time = cpu_time();
return true;
}
}
#else
int wpid, stat;
wpid = waitpid(pid, &stat, WNOHANG);
struct rusage ru;
wpid = wait4(pid, &status, WNOHANG, &ru);
if (wpid) {
status = stat;
final_cpu_time = (float)ru.ru_utime.tv_sec + ((float)ru.ru_utime.tv_usec)/1e+6;
return true;
}
#endif
@ -369,9 +382,10 @@ double TASK::cpu_time() {
ULARGE_INTEGER tKernel, tUser;
LONGLONG totTime;
GetProcessTimes(
int retval = GetProcessTimes(
pid_handle, &creation_time, &exit_time, &kernel_time, &user_time
);
if (retval == 0) return 0;
tKernel.LowPart = kernel_time.dwLowDateTime;
tKernel.HighPart = kernel_time.dwHighDateTime;
@ -379,20 +393,46 @@ double TASK::cpu_time() {
tUser.HighPart = user_time.dwHighDateTime;
totTime = tKernel.QuadPart + tUser.QuadPart;
double cpu = totTime / 1.e7;
return cpu;
return totTime / 1.e7;
#else
return linux_cpu_time(pid);
return linux_cpu_time(pid);
#endif
}
void send_status_message(TASK& task) {
boinc_report_app_status(task.cpu_time(), 0, 0);
void send_status_message(TASK& task, double frac_done) {
boinc_report_app_status(task.cpu_time(), task.starting_cpu, frac_done);
}
// Support for multiple tasks.
// We keep a checkpoint file that says how many tasks we've completed
// and how much CPU time has been used so far
//
void write_checkpoint(int ntasks, double cpu) {
FILE* f = fopen(CHECKPOINT_FILENAME, "w");
if (!f) return;
fprintf(f, "%d %f\n", ntasks, cpu);
fclose(f);
}
void read_checkpoint(int& ntasks, double& cpu) {
int nt;
double c;
ntasks = 0;
cpu = 0;
FILE* f = fopen(CHECKPOINT_FILENAME, "r");
if (!f) return;
int n = fscanf(f, "%d %lf", &nt, &c);
if (n != 2) return;
ntasks = nt;
cpu = c;
}
int main(int argc, char** argv) {
BOINC_OPTIONS options;
int retval;
int ntasks;
double cpu;
boinc_init_diagnostics(
BOINC_DIAG_DUMPCALLSTACKENABLED
@ -413,33 +453,42 @@ int main(int argc, char** argv) {
fprintf(stderr, "can't parse job file: %d\n", retval);
boinc_finish(retval);
}
if (tasks.size() != 1) {
fprintf(stderr, "multiple tasks not supported\n");
boinc_finish(1);
}
parse_state_file();
TASK& task = tasks[0];
read_checkpoint(ntasks, cpu);
if (ntasks > tasks.size()) {
fprintf(stderr, "Checkpoint file: ntasks %d too large\n", ntasks);
boinc_finish(1);
}
for (unsigned int i=ntasks; i<tasks.size(); i++) {
TASK& task = tasks[i];
double frac_done = ((double)i)/((double)tasks.size());
fprintf(stderr, "running %s\n", task.application.c_str());
retval = task.run(argc, argv);
if (retval) {
fprintf(stderr, "can't run app: %d\n", retval);
boinc_finish(retval);
}
while(1) {
int status;
if (task.poll(status)) {
if (status) {
fprintf(stderr, "app error: 0x%x\n", status);
}
boinc_finish(status);
fprintf(stderr, "running %s\n", task.application.c_str());
task.starting_cpu = cpu;
retval = task.run(argc, argv);
if (retval) {
fprintf(stderr, "can't run app: %d\n", retval);
boinc_finish(retval);
}
poll_boinc_messages(task);
send_status_message(task);
boinc_sleep(1.);
while(1) {
int status;
if (task.poll(status)) {
if (status) {
fprintf(stderr, "app error: 0x%x\n", status);
boinc_finish(status);
}
break;
}
poll_boinc_messages(task);
send_status_message(task, frac_done);
boinc_sleep(1.);
}
cpu += task.final_cpu_time;
write_checkpoint(i+1, cpu);
}
boinc_finish(0);
}
#ifdef _WIN32